11.並行処理(partition)のサンプル
概要
ここでは、partitionによる並行処理を見て行きます。
自由度が高いのは、splitによる並行処理ですが、スケーリングを自動で行ってくれるのがpartitionの特徴です。
スケーリングとは、処理の量や種類などによって、スレッド数を多くしたり、少なくしたりすることです。
こうすることで、リソースをうまく使うことができます。
詳しくは、前の章で記述しています、並行処理の概要の記事を読んでみてください。
目標
まず、以下のサンプルの目標(ゴール)を示します。
複数のファイルを同時に処理することを考えます。
処理内容は同じなのですが、ファイルが複数に分かれているようなケースを想像いただければと思います。
複数に分かれたファイルを順番に処理すると時間がかかります。
最近のPCはデュアルコアになっていたり、同時処理に向いたものが多くなってきています。
そのため、シングルに処理をするよりも、同時に複数処理をするほうが早くなるケースが多くなってきていると思います。
さて、ここでは、Spring Batchに載っていたサンプルを基にします。
【処理内容】
3つのCSVファイルを3つのスレッドを立てて、同時に処理します。
内容は、元のファイルを読み込んで、フィールドの順番を替えて出力することとします。
使用サンプル
<バッチ処理設定サンプル: /job-context.xml>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<import resource="classpath:/remotetest/back-context.xml"/>
<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch" incrementer="jobParametersIncrementer">
<step id="step">
<partition step="step01" partitioner="partitioner">
<handler grid-size="3" task-executor="taskExecutor" />
</partition>
</step>
</job>
<!-- enables the functionality of JobOperator.startNextInstance(jobName) -->
<bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer" />
<!-- ファイルを読み込むパーティショナー -->
<bean id="partitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
<property name="resources" value="classpath:/remotetest/delimited*.txt" />
</bean>
<!-- 非同期タスクExecutor -->
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<!-- 各スレッドで実行するステップ -->
<step id="step01" xmlns="http://www.springframework.org/schema/batch">
<tasklet transaction-manager="transactionManager">
<chunk writer="itemWriter" reader="itemReader" commit-interval="1" />
<listeners>
<listener ref="outputFileListener"/>
</listeners>
</tasklet>
</step>
<!-- 出力ファイル名をStepExecutionContextに設定するリスナー -->
<bean id="outputFileListener" class="remotetest.OutputFileListener"/>
<bean id="itemReader" scope="step" autowire-candidate="false" parent="itemReaderParent">
<property name="resource" value="#{stepExecutionContext[fileName]}" />
</bean>
<bean id="itemReaderParent" class="org.springframework.batch.item.file.FlatFileItemReader" abstract="true">
<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value="," />
<property name="names" value="name,credit,k" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper" />
</property>
</bean>
</property>
</bean>
<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="file:c:/temp/#{stepExecutionContext[outFile]}" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="," />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.PassThroughFieldExtractor" />
</property>
</bean>
</property>
</bean>
</beans>
【タグについて】
partitionerタグは、内部にhandlerタグを持ちます。
handlerは、PartitionHandlerクラスに相当するものです。
handler内のgrid-sizeとは、生成するスレッドの最大数を設定します。
今回は3つにしますので、3と設定しています。
【Partionerについて】
さて、partitionerに設定しているPartitionerクラスを見てみてください。
自作しても良いのですが、既にSpringBatchが用意してくれているMultiResourcePartitionerクラスを使用しました。
MultiResourcePartitionerクラスは、*でファイルパスを指定できますので、複数のファイルを取得します。
ファイル名を取得したら、それをStepExecutionContextをファイルの数だけ生成し、
"fileName"というキー名でファイル名を登録します。
これにより、itemRaderのresourceに、#{stepExecutionContext[fileName]}という遅延バインディングでファイル名を設定できるように
なります。
出力ファイル名は、partitionerでは生成されませんので、outputFileListenerリスナーで設定しています。
これは既存では用意されていないので、自作することになります。
以下でそのサンプルも見てみることします。
【Partitionerが生成したStepExecutionContextがgrid-sizeより少ない場合】
Partitionerは、grid-sizeの3ファイルより少ない数(例えば2ファイル)を取得するかもしれません。
その場合は、PartitionHandlerが、スレッド数は2に自動で縮退します。
<出力ファイル名作成リスナーサンプル: /remotetest/OutputFileListener.java>
public class OutputFileListener implements StepExecutionListener {
@Override
public ExitStatus afterStep(StepExecution arg0) {
return null;
}
@Override
public void beforeStep(StepExecution se) {
String path = se.getExecutionContext().getString("fileName");
String file = StringUtils.getFilename(path);
se.getExecutionContext().put("outFile", file);
}
}
このリスナーは、partitionerが作った読み込みファイルパスから、ファイル名のみを取り出して、
StepExecutionContextの、"outFile"に設定します。
あとは、writerのリソースで、遅延バインディングで取り出します。
<入力ファイルサンプル: /remotetest/delemeted1.txt>
12,www,dddjjd
11,hhh,eee
00,fff,sss
上記と同様なカンマ区切りファイルをあと2ファイル用意して下さい。
用意できれば、後はジョブを実行していただければ、動作を確認できるはずです。
最後に
partitionerを使用すると、簡単にスケーリングを含めた処理を実現できます。
ただし、既存で用意されているクラスは少ないので、基本的には、上記で見たサンプルのように自作することが多そうです。
soracaneでは、Partitionerも提供予定です。
ユーザガイドやjavadocをご覧ください。
Created Date: 2010/01/12