11.並行処理(partition)のサンプル

概要

ここでは、partitionによる並行処理を見て行きます。

自由度が高いのは、splitによる並行処理ですが、スケーリングを自動で行ってくれるのがpartitionの特徴です。

スケーリングとは、処理の量や種類などによって、スレッド数を多くしたり、少なくしたりすることです。

こうすることで、リソースをうまく使うことができます。

詳しくは、前の章で記述しています、並行処理の概要の記事を読んでみてください。

目標

まず、以下のサンプルの目標(ゴール)を示します。

複数のファイルを同時に処理することを考えます。

処理内容は同じなのですが、ファイルが複数に分かれているようなケースを想像いただければと思います。

複数に分かれたファイルを順番に処理すると時間がかかります。

最近のPCはデュアルコアになっていたり、同時処理に向いたものが多くなってきています。

そのため、シングルに処理をするよりも、同時に複数処理をするほうが早くなるケースが多くなってきていると思います。

さて、ここでは、Spring Batchに載っていたサンプルを基にします。

【処理内容】

3つのCSVファイルを3つのスレッドを立てて、同時に処理します。

内容は、元のファイルを読み込んで、フィールドの順番を替えて出力することとします。

使用サンプル

<バッチ処理設定サンプル: /job-context.xml>

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd

http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd

http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

<import resource="classpath:/remotetest/back-context.xml"/>

<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch" incrementer="jobParametersIncrementer">

<step id="step">

<partition step="step01" partitioner="partitioner">

<handler grid-size="3" task-executor="taskExecutor" />

</partition>

</step>

</job>

<!-- enables the functionality of JobOperator.startNextInstance(jobName) -->

<bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer" />

<!-- ファイルを読み込むパーティショナー -->

<bean id="partitioner" class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">

<property name="resources" value="classpath:/remotetest/delimited*.txt" />

</bean>

<!-- 非同期タスクExecutor -->

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

<!-- 各スレッドで実行するステップ -->

<step id="step01" xmlns="http://www.springframework.org/schema/batch">

<tasklet transaction-manager="transactionManager">

<chunk writer="itemWriter" reader="itemReader" commit-interval="1" />

<listeners>

<listener ref="outputFileListener"/>

</listeners>

</tasklet>

</step>

<!-- 出力ファイル名をStepExecutionContextに設定するリスナー -->

<bean id="outputFileListener" class="remotetest.OutputFileListener"/>

<bean id="itemReader" scope="step" autowire-candidate="false" parent="itemReaderParent">

<property name="resource" value="#{stepExecutionContext[fileName]}" />

</bean>

<bean id="itemReaderParent" class="org.springframework.batch.item.file.FlatFileItemReader" abstract="true">

<property name="lineMapper">

<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">

<property name="lineTokenizer">

<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">

<property name="delimiter" value="," />

<property name="names" value="name,credit,k" />

</bean>

</property>

<property name="fieldSetMapper">

<bean class="org.springframework.batch.item.file.mapping.PassThroughFieldSetMapper" />

</property>

</bean>

</property>

</bean>

<bean id="itemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">

<property name="resource" value="file:c:/temp/#{stepExecutionContext[outFile]}" />

<property name="lineAggregator">

<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">

<property name="delimiter" value="," />

<property name="fieldExtractor">

<bean class="org.springframework.batch.item.file.transform.PassThroughFieldExtractor" />

</property>

</bean>

</property>

</bean>

</beans>

【タグについて】

partitionerタグは、内部にhandlerタグを持ちます。

handlerは、PartitionHandlerクラスに相当するものです。

handler内のgrid-sizeとは、生成するスレッドの最大数を設定します。

今回は3つにしますので、3と設定しています。

【Partionerについて】

さて、partitionerに設定しているPartitionerクラスを見てみてください。

自作しても良いのですが、既にSpringBatchが用意してくれているMultiResourcePartitionerクラスを使用しました。

MultiResourcePartitionerクラスは、*でファイルパスを指定できますので、複数のファイルを取得します。

ファイル名を取得したら、それをStepExecutionContextをファイルの数だけ生成し、

"fileName"というキー名でファイル名を登録します。

これにより、itemRaderのresourceに、#{stepExecutionContext[fileName]}という遅延バインディングでファイル名を設定できるように

なります。

出力ファイル名は、partitionerでは生成されませんので、outputFileListenerリスナーで設定しています。

これは既存では用意されていないので、自作することになります。

以下でそのサンプルも見てみることします。

【Partitionerが生成したStepExecutionContextがgrid-sizeより少ない場合】

Partitionerは、grid-sizeの3ファイルより少ない数(例えば2ファイル)を取得するかもしれません。

その場合は、PartitionHandlerが、スレッド数は2に自動で縮退します。

<出力ファイル名作成リスナーサンプル: /remotetest/OutputFileListener.java>

public class OutputFileListener implements StepExecutionListener {

@Override

public ExitStatus afterStep(StepExecution arg0) {

return null;

}

@Override

public void beforeStep(StepExecution se) {

String path = se.getExecutionContext().getString("fileName");

String file = StringUtils.getFilename(path);

se.getExecutionContext().put("outFile", file);

}

}

このリスナーは、partitionerが作った読み込みファイルパスから、ファイル名のみを取り出して、

StepExecutionContextの、"outFile"に設定します。

あとは、writerのリソースで、遅延バインディングで取り出します。

<入力ファイルサンプル: /remotetest/delemeted1.txt>

12,www,dddjjd

11,hhh,eee

00,fff,sss

上記と同様なカンマ区切りファイルをあと2ファイル用意して下さい。

用意できれば、後はジョブを実行していただければ、動作を確認できるはずです。

最後に

partitionerを使用すると、簡単にスケーリングを含めた処理を実現できます。

ただし、既存で用意されているクラスは少ないので、基本的には、上記で見たサンプルのように自作することが多そうです。

soracaneでは、Partitionerも提供予定です。

ユーザガイドやjavadocをご覧ください。

Created Date: 2010/01/12