12.並行処理の応用(並列処理のサンプル)

概要

Spring Batchでは、partitionerやsplitを使用し、Spring Remoteと併用すれば、簡単に並列処理ができることが分かるかと思います。

ここでは、他のサーバに処理を任せることで、並列処理をする例を見てみようと思います。

正確に内容を把握したい場合は、SpringRemoteの記事も読んでください。

補足:

通常、複数のサーバを利用して処理分散する場合、Springのサブプロジェクトの1つであるSpring Integrationフレームワークを使用することを推奨しているようです。

このサンプルではSpring Integrationを使用しませんが、興味のある方は調べてみてください。

目標

まず、以下のサンプルの目標(ゴール)を示します。

【RMIサーバ側】

AccountServiceというインターフェースを公開し、update()というメソッドを公開します。

このメソッドは、引数の値をSystem.out.println()するだけの単純な機能です。

【クライアント側】

SpringBatchは、partitionerで並行処理をします。

並行処理では、それぞれのスレッドがRMI接続し、AccountService#update()メソッドを呼び出します。

ここではupdate()メソッドを呼び出すのは、NumberExec クラスが仲介して呼び出します。

こうすることで並列処理を実装します。

ここでは、スレッド1つしか生成していなかったり、各サーバへの負荷分散などはしていませんが

参考にはなるかと思います。

※ただし、PCを複数用意するのは面倒ですので、ここでは、1つのPC上で実行するサンプルとします。

本当は、複数のPCを用意して実行することになります。

使用サンプル

Spring設定ファイル(remoteTest-Context.xml)

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance"

xmlns:util="http://www.springframework.org/schema/util"

xmlns:aop ="http://www.springframework.org/schema/aop"

xmlns:tx ="http://www.springframework.org/schema/tx"

xmlns:context="http://www.springframework.org/schema/context"

xsi:schemaLocation="

http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-2.5.xsd

http://www.springframework.org/schema/util

http://www.springframework.org/schema/util/spring-util-2.5.xsd

http://www.springframework.org/schema/aop

http://www.springframework.org/schema/aop/spring-aop-2.5.xsd

http://www.springframework.org/schema/tx

http://www.springframework.org/schema/tx/spring-tx-2.5.xsd

http://www.springframework.org/schema/context

http://www.springframework.org/schema/context/spring-context-2.5.xsd

http://www.springframework.org/schema/batch

http://www.springframework.org/schema/batch/spring-batch-2.1.xsd">

<import resource="classpath:/remotetest/back-context.xml"/>

<!-- サーバ側の設定 =======================================

-->

<bean id="accService" class="remotetest.AccountServiceImpl" />

<bean class="org.springframework.remoting.rmi.RmiServiceExporter">

<property name="serviceName" value="AccountService" />

<property name="service" ref="accService"/>

<property name="serviceInterface" value="remotetest.AccountService" />

<property name="registryPort" value="41199" />

</bean>

<!-- クライアント側の設定 ===========================================

-->

<bean id="remoteAccService" class="org.springframework.remoting.rmi.RmiProxyFactoryBean">

<property name="serviceUrl" value="rmi://localhost:41199/AccountService" />

<property name="serviceInterface" value="remotetest.AccountService" />

</bean>

<!-- ジョブの設定 ===========================================

-->

<job id="remoteJob" xmlns="http://www.springframework.org/schema/batch"

incrementer="jobParametersIncrementer">

<step id="step">

<partition step="remoteStep" partitioner="partitioner" >

<handler grid-size="2" task-executor="taskExecutor" />

</partition>

</step>

</job>

<!-- enables the functionality of JobOperator.startNextInstance(jobName) -->

<bean id="jobParametersIncrementer" class="org.springframework.batch.core.launch.support.RunIdIncrementer" />

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />

<!-- パーティショナー -->

<bean id="partitioner" class="remotetest.NumberPartitioner" />

<step id="remoteStep" xmlns="http://www.springframework.org/schema/batch">

<tasklet ref="remoteTasklet"/>

</step>

<!-- リモートのメソッドを実行するTasklet -->

<bean id="remoteTasklet" class="org.springframework.batch.core.step.tasklet.MethodInvokingTaskletAdapter" scope="step">

<property name="targetObject" ref="exec"/>

<property name="targetMethod" value="exec"/>

<property name="arguments" value="${stepExecutionContext[num]}"/>

</bean>

<bean id="exec" class="remotetest.NumberExec">

<property name="accountService" ref="remoteAccService"/>

</bean>

</beans>

行っていることは、RMIでサービスを公開し、クライアント側で、Taskletの中で利用するだけです。

partitionerは自作しています。

RMIの設定については、SpringRemoteの記事を参照ください。

パーティショナーの実装サンプル(/remotetest/NumberPartitioner.java)

public class NumberPartitioner implements Partitioner {

@Override

public Map<String, ExecutionContext> partition(int gridsize) {

Map<String, ExecutionContext> map = new HashMap<String, ExecutionContext>();

for(int i=0; i<1; ++i){

ExecutionContext ex = new ExecutionContext();

ex.putInt("num", i);

map.put("partition" + i, ex);

}

return map;

}

}

単純にStepExecutionContextの、numに数字を設定しているだけです。

特に意味はないことをしています。

実際には、処理対象などを現す数字や文字列などを設定すると良いと思います。

そして、それを、遅延バインディングで、実行するbeanに設定します。

実行クラスの実装サンプル(/remotetest/NumberExec.java)

public class NumberExec {

private AccountService accountService;

public void exec(String num){

//リモートサーバを振り分けたりする

//

this.accountService.update(num);

}

public void setAccountService(AccountService accountService) {

this.accountService = accountService;

}

}

単純に、リモートのオブジェクトであるAccountServiceのメソッドを呼び出すだけです。

実際には、リモート先のサーバに命令が複数行かないように制御すると良いと思います。

1つのサーバに1つの実行の方が効率が良いのではないかと思いますので、ここでそれを制御します。

(面倒なので、ここでは単純にリモート先を呼び出すだけになっています)

サーバ側の公開インターフェースの実装サンプル

【インターフェースサンプル (remotetest.AccountService.java)】

public interface AccountService {

public void update(int num);

}

【実装サンプル】

public class AccountServiceImpl implements AccountService{

public void update(int num) {

System.out.println("num=" + num);

}

}

ここでは、引数のnameプロパティを標準出力にprintするだけの機能にしました。

これで準備完了です。

あとは実際に実行すればよいだけです。

最後に

SpringRemoteの使い方が分かれば、特に難しくないかと思います。

Partitionerを自作して処理を振り分けられるようにするだけです。

少し、自作する箇所があるので多少面倒かもしれません。

そこでsoracaneでは、リモート先のstepを実行する部品を用意しています。

この部品は、負荷分散もしてくれますので使いやすいのではないかと思います。

ユーザガイドやjavadocを参照してみてください。

Created Date: 2010/01/12