並列化関係‎ > ‎

snowパッケージで並列化(R)

Rのsnowパッケージを使って、逐次処理を並列処理にします。snowはMPIを使って、マルチプロセスで並列化をしてくれます。

クラスタのタイプは4つ選べるようになっており、ソケット以外は各々パッケージ、及びRパッケージを別途ダウンロードしないといけないです。

・ソケット:特に何も必要無し
・MPI(Massage Passing Interface):MPI + Rmpi
・PVM(Parallel Virtual Machine):PVM + rpvm
・NWS(Python Network Spaces):?

使い分けは、ローカルのマシンで気軽に並列化したい場合はソケットで、環境構築に苦労してもとにかくできるだけ速くしたい場合はMPI(この場合OpenMPI、MPICHとかをインストールする必要がある)、大型のLinuxサーバでやる場合はMPIですかね。PVM、NWSはよくわからないです。。。
環境構築の手順は別ページでまとめます。

snow独自の関数の一覧(マニュアルの訳)




まずは基本

makeCluster( ) クラスター生成。クラスター数を設定できる。typeはMPI,SOCK(ソケット),PVMから選べる。length(cl)で幾つクラスタ数を確保したかを後で、確認する事もできる。
    ex) cl <- makeCluster(4,type="SOCK")

stopCluster( ) : クラスターを閉じる。makeCluster()したら最後はこれを書く。
    ex) stopCluster(cl)

clusterCall( ) : ユーザが設定した関数が出す返り値が、各ノードでどのような値を出しているかを見る。初期値も与える。
    ex) f <- function(y){ x + y }
          clusterCall(cl, f, 2)

clusterEvalQ( ) : 各ノードの評価をする。
    ex) #XMLライブラリを各ノードにブートする。
           clusterEvalQ(cl,library("XML"))
           #各ノードのホスト名を調べる。
           clusterEvalQ(cl,Sys.getenv("HOST"))

clusterExport( ) : 変数を各クラスタで利用できるように渡す。
    ex) clusterExport(cl,parameter1,parameter2)

clusterSplit( ) : ベクトルが各クラスタで連続して割り当てられるようにする。下の例だとクラスタが4つの場合、4:4:5:4で割り当てられる。
    ex) clusterSplit(cl,1:17)

clusterApply( ) : クラスタに引数を割当て、定義した関数を基に計算を実行させる。下の例でクラスタがもし4つの場合だと、ベクトルは8個あるから、2回以上計算しているクラスタが出てくる(リサイクル)。
    ex) clusterApply(cl, 1:8, get("+"), 3)

clusterApplyLB( ) : LBはload balancingの意。リサイクルは、ジョブが終わったノードが次のジョブをするが、clusterApplyはよりバランス良くジョブを各ノードに割り当てる。ただプロセス間通信が増えると遅くなる。クラスタタイプがSOCKの場合は使えない。
    ex) clusterApplyLB(cl,1:8,x,get("+"), 3)

clusterMap( ) : ?
    ex) clusterMap(cl,fun,...,MoreArgs = NULL,RECYCLE = TRUE)



実行例
#ソケットクラスタを2個生成
cl <- makeSOCKcluster(c("localhost","localhost"))
#各クラスタに3を用意し、get関数を基に各々1と2を足す
#[[1]],[[2]]のように各ノードには番号が割り振られている
clusterApply(cl, 1:2, get("+"), 3)



#出力結果
#[[1]]
#[1] 4
#
#[[2]]
#[1] 5



#boot(ブートストラップ推定のパッケージ)を各ノードで使えるようにする。最初から入っているやつが8個ある。
clusterEvalQ(cl, library(boot))



#出力結果
#[[1]]
#[1] "boot"      "snow"      "methods"   "stats"    
#[5] "graphics"  "grDevices" "utils"     "datasets" 
#[9] "base"     
#
#[[2]]
#[1] "boot"      "snow"      "methods"   "stats"    
#[5] "graphics"  "grDevices" "utils"     "datasets" 
#[9] "base"     



#xに1を代入
x<-1
#各クラスタにxを引き渡す
clusterExport(cl, "x")
#各クラスタに関数を基に計算させた時の(初期値2)結果を表示
clusterCall(cl, function(y){ x + y}, 2)



#出力結果
#[[1]]
#[1] 3
#
#[[2]]
#[1] 3





並列バージョンのapply()関数
(実質ここらへんが並列化の要)

Rにはapply( )ファミリーというものがあって、apply(), lapply(), sapply(), mapply()などがある。簡単な例でいうと、Aという行列の各要素に1を足すのは、Rの場合だと A <- A + 1 でいい。これと同じように、ベクトル、行列、リストに同じ処理を一括でできるようにする関数をapply()ファミリーという。以下はそれを並列化させる関数。

parApply( ) : ベクトル、行列のデータを扱う。
MARGIN=1なら行に関して、MARGIN = 2なら列に関して、MARGIN = c(1,2)なら各要素に関して(1,2)、関数を適用し、結果を各々返す。
    ex) parApply(cl, fun, MARGIN, GUN, ...)

parLapply( ) : リストのデータを扱う。
結果がリストで返ってくる。
    ex) parLapply(cl, x, fun,...)

parSapply( ) :リストのデータを扱う。
 結果がベクトルか行列で返ってくる。
    ex) parSapply(cl,x,fun,...)

parRapply( ) :行列のデータを扱う。
 行列xの行に対して並列化する。
    ex) parRapply(cl, x, fun,...)

parCapply( ) :行列のデータを扱う。
 行列xの列に対して並列化する。
    ex) parCapply(cl, x, fun,...)

parMM( ) :行列のデータを扱う。
 行列と行列のかけ算。
    ex) parMM(cl, A, B)



実行例2
#ソケットクラスタを2個生成
cl <- makeSOCKcluster(c("localhost","localhost"))
#各クラスタに、関数getを基に初期値3に1〜20を各々足していった時の結果を出力
parSapply(cl,1:20,get("+"),3)



#出力結果
# [1]  4  5  6  7  8  9 10 11 12 13 14 15 16 17 18 19 20 21
#[19] 22 23




各クラスタで乱数を発生させる

clusterSetupRNG( ) : Rのrlecuyerライブラリを使って各ノードで乱数を発生させる
   ex) clusterSetupRNG(cl, type ="RNGstream" , ....)

clusterSetupRNGstream( )
   ex) clusterSetupRNGstream(cl, seed=rep(12345,6), ....)

clusterSetupSPRNG ( ) : SPRNGというライブラリを使って各ノードで乱数を発生させる。Rwiki曰くSPRNGは最近あまり人気が無いので、使わない方が良い。
   ex)  clusterSetupSPRNG(cl, seed="round(2^32 * runif(1)),prngkind = "default", para = 0, ...)



実行例3
clusterSetupSPRNG(cl)
clusterSetupSPRNG(cl,seed=1234)
clulsterSetupRNG(cl,seed=rep(1,6))



#始め方&終わらせ方
#MPI,PVM,NWSは環境が構築されていないとできない
setDefaultClusterOptions() : オプションはいっぱいあるらしい。デフォルトでは、Rmpiが検索で引っかかったら、
type="MPI"
それ以外でrpvmがインストールされていたら、
type="PVM"
それ以外でRmpiがインストールされていたら、
type="MPI"
上の2つがなかったら
type="SOCK"
になるように設定されている。
  ex)  setDefaultClusterOptions(...)

makeSOCKcluster( ) : ソケットクラスタを生成。
   ex)  makeSOCKcluster(names, ... , options = defaultClusterOptions)

makePVMcluster( ) : PVMクラスタを生成。
   ex)  makePVMcluster(count, ... , options = defaultClusterOptions)

makeMPIcluster( ) : MPIクラスタを生成。
   ex)  makeMPIcluster(count, ... , options = defaultClusterOptions)

makeNWScluster( ) : NWSクラスタを生成。
   ex) makeNWScluster(names, ... , options = defaultClusterOptions)

SOCK,NWSはname
PVM,MPIはcount
を指定する。

getMPIcluster( ) : MPIクラスタの状況を見る。
   ex) getMPIcluster()



実行例4

#Rのスクリプトとsnowの場所を指定(Macの場合)

macOptions <- list(host = "owasso",rscript ="/Library/Frameworks/R.framework/Resources/bin/Rscript",

snowlib = "/Library/Frameworks/R.framework/Resources/library/snow")

#Rのスクリプトとsnowの場所を指定(Linuxの場合)

lnxOptions <- list(host = "itasca",

rscript = "/usr/lib64/R/bin/Rscript",

snowlib = "/home/luke/tmp/lib")

#Rのスクリプトとsnowの場所を指定(Windowsの場合)

winOptions <- list(host="192.168.1.168",

rscript="C:/Program Files/R/R-2.7.1/bin/Rscript.exe",

snowlib="C:/Rlibs")






タイミング

snow.time( ) :各クラスタの実行時間を計算 
   ex) x <- snow.time(parApply(cl,x,fun))

print( ) : 各クラスタの実行時間を出力
   ex) print(x)

plot( ) : パッケージxpvmでガンチャートを出力
   ex)plot(x)



実行例5

#ソケットクラスタを2個生成

cl <- makeCluster(2,type="SOCK")

#標準正規分布にもとづき百万個の数を生成

x <- rnorm(1000000)

#xの和を計算する事を100回繰り返した時の各クラスタの実行時間を計算

tm <- snow.time(clusterCall(cl, function(x) for (i in 1:100) sum(x), x))

#結果出力

print(tm)

#ガンチャート出力

plot(tm)

#クラスタを閉じる

stopCluster(cl)





使っていてわかったsnow関数の変なクセ

#Apply関数の読み込み(ベクトルの場合)
m <- c(1:10)
#転置しないとエラーを吐き出す
m <- t(m)
parApply(cl,m,mean,MARGIN=1)
→転置したベクトルしか読み込めない?(行列だとこんな事はない)



参考文献
・snowパッケージのマニュアル
・snowの関数
・RWiki(snow+Rmpi)
・RWiki(snow)
・RWiki(いろいろな並列化の仕方)
・RWiki(大規模データ)
 ・SlideShare(岡田先生)
・ryamada先生
・apply()ファミリー
Comments