14.restart可能な部品の仕組みと、その自作方法

概要

SpringBatchで用意されているチャンク処理の部品(ItemReader、ItemWriterなど)は、通常restart可能になっています。

restartは、オプション(-restart)を指定したジョブの起動を指しますが、

具体的には

「前回異常終了した位置(ステップ、チャンク)から処理を開始する」

ものです。

しかし、自作の部品を何の考慮もなく作成すれば、正しくrestart処理ができるとは限りません。

ここでは、どのようにしたらrestart可能な部品になるのかを見て行きます。

この記事を読む前にExecutionContextの記事を必ず読んでください。

04.Spring Batchの基本概念(ExecutionContext)

SpringBatchのItemReaderは何故restart可能なのか?

SpringBatchのItemReaderは、書き込みがされる度にitemが書き込まれた位置をExecutionContextに保管しています。

そのためいつ異常終了しても、ItemReaderには終了位置が分かるようになっています。

ItemReaderは、最後に書き込まれた次の位置から読み込みを開始するので正常なrestart処理ができるのです。

ItemWriterは特にrestart処理を気にしておらず、渡されたitemのチャンクをたんたんと処理するだけです。

なかなかうまくできてますよね?

具体的にどのように終了位置を保管し、どのように読み込みを開始するのかは、自作のItemReaderのサンプルで見ていくことにします。

restart可能なItemReaderを自作する

目標

まずはどのようなItemReaderを作るか?を示しておきます。

作成するのは、restart可能なListItemReaderです。

ListItemReader自体はSpringBatchが用意しているクラスですが、実はテスト用でrestart可能ではありません。

そこで、restart可能なものを作ろうというわけです。

ListItemReaderの動作も念のため簡単に述べておきます。

ListItemReaderは、コンストラクタにListを渡すと、Listの要素1つをitemとして扱い、順番に読み込んでいくクラスです。

サンプル

public class RestartableListItemReader<T> implements ItemStreamReader<T> {

private static final String KEY = "RestartableListItemReader.";

private int index = 0;

private int listSize = 0;

private List<T> iterateData;

/**

* Listを設定するコンストラクタ。

* @param list

*/

public RestartableListItemReader(List<T> list) {

this.iterateData = list;

this.listSize = list.size();

}

@Override

public T read() throws Exception, UnexpectedInputException, ParseException {

int n = this.index;

++this.index;

if(this.index > listSize) return null;

return this.iterateData.get(n);

}

@Override

public void close() throws ItemStreamException {

}

@Override

public void open(ExecutionContext context) throws ItemStreamException {

//開始位置を取得

this.index = context.getInt(KEY + "index", 0);

}

@Override

public void update(ExecutionContext context)

throws ItemStreamException {

context.putInt(KEY + "index", this.index);

}

}

implementsしているインターフェースは確認して置いてください。

ItemStreamインターフェースがimplementsされたクラスは、

ステップ処理開始時に、open()メソッド、

コミット時にupdate()メソッド、

ステップ処理終了時に、close()メソッド、

が呼ばれます。

終了位置の保存方法

まずは終了位置の保存方法を見てみます。

update()メソッドを見てみてください。

context.putInt(KEY + "index", this.index);

update()メソッドは、ItemWriterで、writeメソッドが成功したときに呼ばれます。

つまり、上記のupdate()メソッドは、ExecutionContextに書き込まれたitemの数を保存しているのです。

このようにして書き込まれたitemの数(=終了位置)を保存します。

開始位置の取得方法

open()メソッドを見てください。

this.index = context.getInt(KEY + "index", 0);

ExecutionContextから、保管した終了位置を取得しています。

getInt()の2番目の引数の0は、初めてのステップ処理のときに要素0から開始されるようにするためです。

そして、read()メソッドも確認して置いてください。

初めてread()が呼ばれる前に、1度だけopen()メソッドが呼ばれることを念頭に置きながら見てみてください。

初めてread()が呼ばれたときthis.indexが次の読み込み位置になっているので、上記のthis.indexの初期化が行われた後にread()が呼ばれれば自然に終了位置から読み込み始めます。

(今回の場合はこれでOKです。)

通常は開始位置までitemの位置を進めます。

open()メソッドの中で、以下のようなメソッドを呼び出すイメージになります。

int startPos = context.getInt(KEY + "index", 0);

for(int i=0; i<startPos; ++i) read();

上記のように、終了位置の保管と、終了位置までitemを読み進める、ということをすれば基本的にはrestart可能になります。

最後に

なんとなくrestart可能にする方法が分かりましたでしょうか?

ここではチャンク処理を例にサンプルを作りましたが、taskletの処理を自作するときも同じです。

自分で開始位置を保管して、restartしたときにその位置まで処理を進めることです。

もしくは、処理開始時に処理途中のごみデータをすべて削除して、最初からデータ処理をするようにします。

そうするとこで、restart時もデータの不整合を起こすことなく正しく処理されると思います。

いづれにしても自作する場合はrestartのための設計を自分で考える必要があります。

ちゃんと設計しておくように気をつけましょう。

もしrestart不可にするならば、設定ファイルのjobタグの属性にrestartable="false"を指定すると良いかと思います。

-restartオプションを指定して起動しようとするとエラーになるはずです。

Created Date: 2011/09/05