14.restart可能な部品の仕組みと、その自作方法
概要
SpringBatchで用意されているチャンク処理の部品(ItemReader、ItemWriterなど)は、通常restart可能になっています。
restartは、オプション(-restart)を指定したジョブの起動を指しますが、
具体的には
「前回異常終了した位置(ステップ、チャンク)から処理を開始する」
ものです。
しかし、自作の部品を何の考慮もなく作成すれば、正しくrestart処理ができるとは限りません。
ここでは、どのようにしたらrestart可能な部品になるのかを見て行きます。
この記事を読む前にExecutionContextの記事を必ず読んでください。
04.Spring Batchの基本概念(ExecutionContext)
SpringBatchのItemReaderは何故restart可能なのか?
SpringBatchのItemReaderは、書き込みがされる度にitemが書き込まれた位置をExecutionContextに保管しています。
そのためいつ異常終了しても、ItemReaderには終了位置が分かるようになっています。
ItemReaderは、最後に書き込まれた次の位置から読み込みを開始するので正常なrestart処理ができるのです。
ItemWriterは特にrestart処理を気にしておらず、渡されたitemのチャンクをたんたんと処理するだけです。
なかなかうまくできてますよね?
具体的にどのように終了位置を保管し、どのように読み込みを開始するのかは、自作のItemReaderのサンプルで見ていくことにします。
restart可能なItemReaderを自作する
目標
まずはどのようなItemReaderを作るか?を示しておきます。
作成するのは、restart可能なListItemReaderです。
ListItemReader自体はSpringBatchが用意しているクラスですが、実はテスト用でrestart可能ではありません。
そこで、restart可能なものを作ろうというわけです。
ListItemReaderの動作も念のため簡単に述べておきます。
ListItemReaderは、コンストラクタにListを渡すと、Listの要素1つをitemとして扱い、順番に読み込んでいくクラスです。
サンプル
public class RestartableListItemReader<T> implements ItemStreamReader<T> {
private static final String KEY = "RestartableListItemReader.";
private int index = 0;
private int listSize = 0;
private List<T> iterateData;
/**
* Listを設定するコンストラクタ。
* @param list
*/
public RestartableListItemReader(List<T> list) {
this.iterateData = list;
this.listSize = list.size();
}
@Override
public T read() throws Exception, UnexpectedInputException, ParseException {
int n = this.index;
++this.index;
if(this.index > listSize) return null;
return this.iterateData.get(n);
}
@Override
public void close() throws ItemStreamException {
}
@Override
public void open(ExecutionContext context) throws ItemStreamException {
//開始位置を取得
this.index = context.getInt(KEY + "index", 0);
}
@Override
public void update(ExecutionContext context)
throws ItemStreamException {
context.putInt(KEY + "index", this.index);
}
}
implementsしているインターフェースは確認して置いてください。
ItemStreamインターフェースがimplementsされたクラスは、
ステップ処理開始時に、open()メソッド、
コミット時にupdate()メソッド、
ステップ処理終了時に、close()メソッド、
が呼ばれます。
終了位置の保存方法
まずは終了位置の保存方法を見てみます。
update()メソッドを見てみてください。
context.putInt(KEY + "index", this.index);
update()メソッドは、ItemWriterで、writeメソッドが成功したときに呼ばれます。
つまり、上記のupdate()メソッドは、ExecutionContextに書き込まれたitemの数を保存しているのです。
このようにして書き込まれたitemの数(=終了位置)を保存します。
開始位置の取得方法
open()メソッドを見てください。
this.index = context.getInt(KEY + "index", 0);
ExecutionContextから、保管した終了位置を取得しています。
getInt()の2番目の引数の0は、初めてのステップ処理のときに要素0から開始されるようにするためです。
そして、read()メソッドも確認して置いてください。
初めてread()が呼ばれる前に、1度だけopen()メソッドが呼ばれることを念頭に置きながら見てみてください。
初めてread()が呼ばれたときthis.indexが次の読み込み位置になっているので、上記のthis.indexの初期化が行われた後にread()が呼ばれれば自然に終了位置から読み込み始めます。
(今回の場合はこれでOKです。)
通常は開始位置までitemの位置を進めます。
open()メソッドの中で、以下のようなメソッドを呼び出すイメージになります。
int startPos = context.getInt(KEY + "index", 0);
for(int i=0; i<startPos; ++i) read();
上記のように、終了位置の保管と、終了位置までitemを読み進める、ということをすれば基本的にはrestart可能になります。
最後に
なんとなくrestart可能にする方法が分かりましたでしょうか?
ここではチャンク処理を例にサンプルを作りましたが、taskletの処理を自作するときも同じです。
自分で開始位置を保管して、restartしたときにその位置まで処理を進めることです。
もしくは、処理開始時に処理途中のごみデータをすべて削除して、最初からデータ処理をするようにします。
そうするとこで、restart時もデータの不整合を起こすことなく正しく処理されると思います。
いづれにしても自作する場合はrestartのための設計を自分で考える必要があります。
ちゃんと設計しておくように気をつけましょう。
もしrestart不可にするならば、設定ファイルのjobタグの属性にrestartable="false"を指定すると良いかと思います。
-restartオプションを指定して起動しようとするとエラーになるはずです。
Created Date: 2011/09/05