multiprocessing

multiprocessingはpython標準の並列計算モジュールである.関数をn番目のprocessで実行することができる.もっともよくあるデータ解析での利用は,複数のアンサンブルやモデルの出力に同じ処理をすることであろう.この場合はそれぞれの計算結果は,ファイルに書き出すだけでよく,process間の通信を行う必要がない.

# 以下の例は,時間フィルターを101のアンサンブルに施した場合である.各アンサンブルの結果は別々のディレクトリに保存されている.
import multiprocessingimport time
def filter_in_dirs(dir_names, nth_p): for dir_name in dir_names: .... # directory dir_name に含まれるファイルに対する作業

if __name__=='__main__':
dir_names=subprocess.check_output('find ./ -type d', shell=True).decode('ascii').split('\n') dir_names.sort() dir_names=dir_names[2:] # The first two items , i.e., u'', u'./',, are removed.
no_processes=10 # いくつのprocessに分割するか
no_ttl_dir=len(dir_names) # total directoryの数 no_dirs_per_process=(no_ttl_dir/(no_processes-1)) print('totol dir number=%d no_dirs_per_process=%d'%(no_ttl_dir,no_dirs_per_process))
time1=time.time() jobs=[] for nth_p in range(no_processes): # nth_p stands for nth process ndir_bgn=(no_processes+1)*(nth_p) ndir_end=(no_processes+1)*(nth_p+1) ndir_end=min(ndir_end, no_ttl_dir) print('process=%d, ndir=%d %d'%(nth_p,ndir_bgn,ndir_end))
job=multiprocessing.Process(target=filter_in_dirs, \ args=(dir_names[ndir_bgn:ndir_end], nth_p)) jobs.append(job) job.start()
[job.join() for job in jobs] time2=time.time()
print('All process ends. Elapsed time=%d sec'%(time2-time1))


--------------------------# 以下の例は,時間フィルターを101のアンサンブルに施した場合である.各アンサンブルの結果は別々のディレクトリに保存されている.
import multiprocessingimport time
def filter_in_dirs(dir_names, nth_p): for dir_name in dir_names: .... # directory dir_name に含まれるファイルに対する作業

if __name__=='__main__':
dir_names=subprocess.check_output('find ./ -type d', shell=True).decode('ascii').split('\n') dir_names.sort() dir_names=dir_names[2:] # The first two items , i.e., u'', u'./',, are removed.
no_processes=10 # いくつのprocessに分割するか
no_ttl_dir=len(dir_names) # total directoryの数 no_dirs_per_process=(no_ttl_dir/(no_processes-1)) print('totol dir number=%d no_dirs_per_process=%d'%(no_ttl_dir,no_dirs_per_process))
time1=time.time() jobs=[] for nth_p in range(no_processes): # nth_p stands for nth process ndir_bgn=(no_processes+1)*(nth_p) ndir_end=(no_processes+1)*(nth_p+1) ndir_end=min(ndir_end, no_ttl_dir) print('process=%d, ndir=%d %d'%(nth_p,ndir_bgn,ndir_end))
job=multiprocessing.Process(target=filter_in_dirs, \ args=(dir_names[ndir_bgn:ndir_end], nth_p)) jobs.append(job) job.start()
[job.join() for job in jobs] time2=time.time()
print('All process ends. Elapsed time=%d sec'%(time2-time1))


multiprocessing is a standard python module for parallel computation. This allows a function call for the same function can be conducted in nth process. The most common application of parallel processing of data analysis is to conduct the same operation to different data. An example is to apply time filtering for a large number of ensemble members, and is shown the sample script below. In this case, operation can be conducted totally independent. The results of different ensemble members are stored in different directories in this example. import multiprocessingimport timedef filter_in_dirs(dir_names, nth_p): for dir_name in dir_names: .... # directory dir_name に含まれるファイルに対する作業
if __name__=='__main__':
dir_names=subprocess.check_output('find ./ -type d', shell=True).decode('ascii').split('\n') dir_names.sort() dir_names=dir_names[2:] # The first two items , i.e., u'', u'./',, are removed.
no_processes=10 # いくつのprocessに分割するか
no_ttl_dir=len(dir_names) # total directoryの数 no_dirs_per_process=(no_ttl_dir/(no_processes-1)) print('totol dir number=%d no_dirs_per_process=%d'%(no_ttl_dir,no_dirs_per_process))
time1=time.time() jobs=[] for nth_p in range(no_processes): # nth_p stands for nth process ndir_bgn=(no_processes+1)*(nth_p) ndir_end=(no_processes+1)*(nth_p+1) ndir_end=min(ndir_end, no_ttl_dir) print('process=%d, ndir=%d %d'%(nth_p,ndir_bgn,ndir_end))
job=multiprocessing.Process(target=filter_in_dirs, \ args=(dir_names[ndir_bgn:ndir_end], nth_p)) jobs.append(job) job.start()
[job.join() for job in jobs] time2=time.time()
print('All process ends. Elapsed time=%d sec'%(time2-time1))