07.バッチ処理の実際のサンプル
(チャンク処理:DB読み込み)

概要

ここでは具体的にどのようにバッチ処理を記述するか、サンプル見てみます。

DB読み込み編です。

目標

まず、以下のサンプルの目標(ゴール)を示します。

目標は、テーブルからデータを読み込んで、他のテーブルへ値をinsertすることとします。

ItemReaderでデータを読み込んで、ItemWriterでinsertする予定です。

本当は、ItemProcessorを使用してデータを変換した後にItemWriterに渡すサンプルの方が実際の使い方に近いのですが、

サンプルコードが大きくなりそうですので、省略してそのままItemWriterに渡すことにします。

そういう意味では、1つのSQL文で表現できる内容ですが、チャンク処理でDBを扱うサンプルとして

ご覧いただければと思います。

また、ここではIBatisを例題にしますが、他のO/Rマッピングでも考え方は同じはずです。

まずは考え方だけ把握していただければと思います。

使用サンプル

<バッチ処理設定サンプル: /job-context.xml>

<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:p="http://www.springframework.org/schema/p"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xsi:schemaLocation="http://www.springframework.org/schema/batch

http://www.springframework.org/schema/batch/spring-batch-2.1.xsd

http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-2.5.xsd

">

<!-- SpringBatchの基盤部分に関する設定のインポート -->

<import resource="classpath:/back-context.xml" />

<!-- ジョブの設定 -->

<job id="job1" xmlns="http://www.springframework.org/schema/batch"

incrementer="jobParametersIncrementer">

<step id="step1" parent="simpleStep">

<tasklet>

<chunk reader="itemReader" writer="itemWriter" />

</tasklet>

</step>

</job>

<!-- enables the functionality of JobOperator.startNextInstance(jobName) -->

<bean id="jobParametersIncrementer"

class="org.springframework.batch.core.launch.support.RunIdIncrementer" />

<bean id="simpleStep"

class="org.springframework.batch.core.step.item.SimpleStepFactoryBean" abstract="true">

<property name="jobRepository" ref="jobRepository" />

<property name="commitInterval" value="1000" />

</bean>

<!-- iBatisの設定 -->

<bean id="sqlMapClient" class="org.springframework.orm.ibatis.SqlMapClientFactoryBean">

<property name="dataSource" ref="dataSource"/>

<property name="configLocation" value="classpath:/dao/SqlMapConfig.xml"/>

</bean>

<!-- DBのItemReaderの設定 -->

<bean id="itemReader" class="org.springframework.batch.item.database.IbatisPagingItemReader" >

<property name="sqlMapClient" ref="sqlMapClient"/>

<property name="queryId" value="selectPagingMember"/>

<property name="pageSize" value="10000"/>

</bean>

<!-- DBのItemWriterの設定 -->

<bean id="itemWriter" class="org.springframework.batch.item.database.IbatisBatchItemWriter">

<property name="sqlMapClient" ref="sqlMapClient" />

<property name="statementId" value="insertMember2" />

</bean>

</beans>

【sqlMapClientについて】

このクラスは、iBatisを使用するためのものです。

特にSpringBatch専用のクラスというわけではありませんので、説明は省略します。

他のO/Rマッピングを使用する場合は、また他のクラス設定があるかと思います。

【IbatisPagingItemReaderについて】

単なるItemReaderではなく、「Paging」という単語がついています。

ページングというのは、WEBの検索結果をイメージした言葉かと思います。

上記の設定の場合、1ページ10000レコードを、SQL文でselectして、抽出した10000レコードをメモリに展開します。

その上で、1レコードずつをitemとして、ItemWriterに渡していきます。

ItemWriterでは、commitIntervalで設定した数分だけListで受け取って処理を実行し、コミットします。

これは、通常のチャンクの動作のとおりです。

ItemReaderは、この後、ItemWriterに渡すデータ(item)が無くなると、次のページを読み込もうとします。

このようにして、データが返ってこなくなるまで次のページの読み込みを続けます。

IBatsi以外のO/Rマッピングの場合も、使用するItemReaderクラスこそ違いますが、ほぼ同じようなやり方になります。

【IbatisBatchItemWriterについて】

指定のクエリIDのSQLを実行します。

このクラスにはBatchという単語がついています。

この意味は、jdbcのバッチ処理をすることを意味します。

つまり、毎回prepareするようなことをせずに、次々にSQLにパラメタを渡していくために、処理が早くなります。

詳しくはjdbcのバッチ更新などの一般的な情報を参照いただければと思います。

ItemWriterについても、IBatis以外のO/Rマッピングについても同様かと思います。

【commitIntervalの補足】

commitIntervalは、simpleStepで、デフォルト値を設定でき、

chunkタグのcommitInterval属性で個別設定することもできます。

<バッチ基盤に関する設定ファイル: /back-context.xml>

以下の記事の「Spring設定ファイルの記述例」の設定ファイルをコピーしてください。

使用するための準備

※DBの設定については、同じページの「オンメモリのリポジトリを使用する方法」の項を参考に書き換えてください。

<IBatisの設定ファイルサンプル: /dao/SqlMapConfig.xml>

<?xml version="1.0" encoding="UTF-8" ?>

<!DOCTYPE sqlMapConfig

PUBLIC "-//ibatis.apache.org//DTD SQL Map Config 2.0//EN"

"http://ibatis.apache.org/dtd/sql-map-config-2.dtd">

<sqlMapConfig>

<!-- sqlMapファイル参照する -->

<sqlMap resource="/dao/SqlMap.xml"/>

</sqlMapConfig>

このファイルは、IBatis独特の設定ファイルです。

ここではそれほど注目しなくて大丈夫です。

<IBatisのSQL設定ファイルサンプル: /dao/SqlMap.xml>

<?xml version="1.0" encoding="UTF-8" ?>

<!DOCTYPE sqlMap

PUBLIC "-//ibatis.apache.org//DTD SQL Map 2.0//EN"

"http://ibatis.apache.org/dtd/sql-map-2.dtd">

<sqlMap>

<select id="selectPagingMember" resultClass="test.Member">

select id, name

from (

select

row_number() over(order by id) rn,

member.*

from member

)

where

<![CDATA[

rn > (#_page# * #_pagesize#)

and rn <= ( (#_page# + 1) * #_pagesize# )

]]>

order by id

</select>

<insert id="insertMember2" parameterClass="test.Member">

insert into member2(id, name) values(#id#, #name#)

</insert>

</sqlMap>

【ページングのselect文について】

IbatisPagingItemReaderクラスは、ページ情報をIBatisに渡します。

ページ情報とは具体的には、_page、_pagesize、_skiprowsの3つのパラメタです。

上記ではそのうちの2つの情報を利用して、指定のページの情報を取得しています。

※上記のサンプルSQLは、Oracleの場合の記述例です。

Postgresなどは、以下のように_skiprowsを使用する方がSQL文を書きやすいです。以下は記述例です。

select * from member order by id asc offset #_skiprows# limit #_pagesize#

※ページングのselect文には注意点があります。このページの「最後に」に記述していますので必ずお読みください。

<サンプルPOJOクラス: test.Member.java>

public class Member {

private int id;

private String name;

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

}

itemクラスです。

<実行方法>

ジョブの実行方法は、他の記事に記述していますので、そちらを参照ください。

参考: ジョブの起動方法

最後に

ここでは、IBatisを例題にしてDBアクセスの方法をみました。

他のO/Rマッピングでも考え方は同じのようですので、この記事を参考にItemReader、ItemWriterのjavadocを見てみてください。

サブクラスを見ていくと、おそらく使いたいO/Rマッピングに対応するItemReader、ItemWriterクラスが見つかると思います。

【ページングのselect文の注意点】

注意点として、いつでも同じ順番にselectされるようにする必要があることを補足しておきます。

なぜなら、毎回並び順が違うと、次のページに遷移したときに同じレコードをselectしてしまうからです。

そうすると、前のページで既に処理したことがあるレコードを、再度処理してしまいます。

ですので、細心の注意を払ってください。

とはいえ、たいていのテーブルにはプライマリキーがあるはずですので、それをorder byの条件にすれば良いと思います。

また、バッチ処理の途中でテーブルが更新される可能性があれば、それも考慮に入れておくべきだと思います。

例えば、会員情報の登録がいつでも起きる可能性があれば、登録日の古い順にorder byしておくべきだと思います。

そうすれば、最新の登録は最後に並ぶことになり、リスタートしたとしても、読み飛ばされる可能性がなくなります。

いづれにしても状況に合わせて設計を工夫しなければいけませんのでご注意ください。

順番が不定なSQLの問題は、よく陥りがちなSQLのトラブルでもありますのでgoogleなどで調べてみると面白いかもしれません。

Created Date: 2010/01/12