13.チャンク処理における異常処理の対処
(Skip、Retry、Recovery処理)
概要
いくつかのサンプルにて、通常のチャンク処理を見てきました。
05.バッチ処理の実際のサンプル(チャンク処理:ファイル読み込み)
06.バッチ処理の実際のサンプル(チャンク処理:DB読み込み)
ここではもう少し突っ込んで、異常発生時に関する処理を見て行きます。
ここでいう異常発生時の処理とは以下のようなものを指しています。
・チャンク処理中に例外が発生したが、無視して処理を進めたい
・もしくは、例外対処をして次のitem処理に進めたい
・例外が発生したときにリトライしたい
など
Skip、Retry処理とは
Skip、Retry処理はチャンクに対する例外処理です。
そしてSpringBatchではどちらか1つしか設定することはできません。
以下ではそれぞれの処理の概要を記述します。
【Skip処理とは】
ItemReader、ItemProcessor、ItemWriterで、あるitemが例外を起こしたときに無視して次のitemの処理をするための機構です。
具体的には、スキップしてよい例外を設定しておくと、それを無視します。
また、SkipListenerを使用するとスキップしたitemを拾い、どこかに退避することもできます。
【Retry処理とは】
ItemReader、ItemProcessor、ItemWriterで、あるitemが例外を起こしたときに、同じ処理を再実行するための機構です。
例えばItemWriterで例外が発生した場合、例外を起こしたitemが再度ItemWriterに渡ってきます。
リトライの使用ケースとしては、DBのロックなどのように時間が経つと解決するような例外に対処する場合になります。
また、リトライ処理の中にはリカバリ処理も含まれていますので、これを利用すると解決できるものもあるかもしれません。
概要で記述すると上記のような感じになります。
しかし、処理内容はもう少し複雑です。
正確に理解しておいた方が良いので、以下でもう少し処理手順を見てみることにします。
Skip、Retryにおける処理手順概要
ここでは、チャンク処理のデータであるitemがどのような手順でItemReader、ItemProcessor、ItemWriterに渡っていくかを見て行きます。
まず、通常のチャンクの処理手順を示します。
以前の章で一度チャンクの流れは見ていますが、省略した部分を含め、処理手順として再度、詳細な動作を見ていきます。
・処理の概要
まず、ItemReaderはコミット数(commit-interval)の数だけ呼び出されます。
そこで溜め込んだデータ(item)を1個ずつItemProcessorに渡していきます。
ItemProcessorが返したitemがコミット数分溜まるまでこの処理を続け、溜まったらチャンクとしてItemWtierに渡します。
ですので、もしコミット数が2の場合は、ItemReader、ItemProcessor、ItemWriterそれぞれが呼び出される順番は以下のようになります。
1.ItemReader
2.ItemReader
3.ItemProcessor
4.ItemProcessor
5.ItemWriter
次に例外発生時にスキップ、リトライをするように設定した場合の動作を見て行きます。
・処理の概要
上記は、ItemProcessorもしくはItemWriterで例外が発生した場合のリトライ・スキップ処理です。
このときItemWriterなどではチャンクの中のitemを既にいくつか処理している可能性がありますが、
ロールバックにより、チャンク処理開始前の状態になっている必要があります。(SpringBatchが用意しているクラスでDB系のものはそうなっているはずです)
まず、既に溜め込んでおいたitemを1個ItemProcessorに渡します。
次にItemProcessorが返した1個のitemをチャンクとしてItemWriterに渡します。
ItemWriterの処理が終了しましたら、再度、倉庫から次のitem1個を取り出して、ItemProcessor~ItemWriterの処理を続けます。
これは倉庫のitemがなくなるまで続きます。
ですので、もしコミット数が2の場合は、ItemReader、ItemProcessor、ItemWriterそれぞれが呼び出される順番は以下のようになります。
1.ItemReader
2.ItemReader
3.ItemProcessor
4.ItemProcessor
5.ItemWriter ⇒ここで例外発生したします
6.ItemProcessor
7.ItemWriter
8.ItemProcessor
9.ItemWriter
10.ItemReader
:
※指定したスキップやリトライの条件に達するまで繰り返します。スキップ・リトライ時に例外が発生した場合は異常終了します。
※補足
1つのitemに対して何故2回もItemProcessor、ItemWriterが呼ばれるのか?(上記3.と6.は同じitemに対する処理。)
何故コミット数が1個になってしまうのか?
そんな疑問をもたれた方もいらっしゃるかも知れません。自分も最初はそう思いましたが、処理が理解できた今では正しい動作と思っています。
なぜかというと、ItemWriterがJdbcのBatch処理のような処理のとき、まとめて複数のSQLが実行されるため、
どのitemでエラーが発生したかを知ることはできません。
また、エラーが発生したitemは読み飛ばし、それ以外はコミットしたいですが、そういうこともできません。
ですのでItemWriterが何であれ、いつでも例外が発生したitemが何なのかを知るには1つずつitemを処理しなければならないのです。
スキップ・リトライの使用に当たって知っておいた方が良いこと
【ItemProcessorの作り方】
もし、ItemProcessorを自作する場合の注意点です。
上記の図を見て分かるように、itemは倉庫に保管され、スキップやリトライ処理で再度利用されます。
ですので、ItemProcessorでitemを直接いじってしまうと倉庫の中のitemが変わってしまい、
スキップやリトライ処理のときにそのitemが倉庫からItemProcessorに渡ってきます。ItemProcessorは再度変換をかけることになります。
必ず他のインスタンスをnewするか、コピーしたitemを処理するようにしましょう。
例: itemがMapのとき
public Map process(Map item) throws Exception {
int n = (Integer)item.get("pay") + 1000;
Map newItem = new Map(item); //itemをいじらないように、コピーする
newItem.put("pay", n);
return newItem;
}
【使用時の注意点について】
上記の図を見ても分かるとは思いますが、念のため記述させてください。
スキップ・リトライを使用し、例外が発生すると、コミット数が1になります。
つまり、もともとのコミット数が1000の処理では、1000回のコミットをすることになります。
コミットに時間が掛かるような処理ではかなり処理が遅くなると思います。
ですので、スキップなどの機構を使うのが正しいのか?、他の方法が良いのか?、を良く考えて使ってください。
例えば、例外が発生するitemの条件が分かっているなら、ItemProcessorで取り除いたり、他の処理をすることができます。
なるべく他の手立てで設計し、どうしても例外処理しなければならない場合に利用するのが良いのではと個人的には思います。
【skip、retryなどチャンク処理で活躍するStep】
チャンク処理で例外処理をしたい場合、
Stepは、SimpleStepFactoryBeanを使用するよりも、FaultTolerantStepFactoryBeanを使用する方が良いと思います。
skip, retryなどの処理を設定するためのプロパティがいくつか存在しています。
実際にサンプルに記述していますので以下のサンプルを見てみてください。
他のリスナーとスキップ用・リトライ用リスナーとの違い
スキップ処理やリトライ処理においてリスナーを使用するには、SkipListenerやRetryListenerを使用します。
【他のリスナーとの違い】
エラーを補足するリスナーは他にも、ItemReadListener, ItemProcessListener, ItemWriteListnerなど様々あります。
まず、これらのリスナーとSkipListenerとの違いを簡単に述べておきます。
ItemReadListenerなどは例外発生時にonError()が呼ばれますが、スキップされたかどうかは無関係です。
例えば、たとえあるitemがリトライやスキップされ、成功したとしてもonError()は呼ばれます。
しかし、SkipListenerはスキップしたときにだけ呼ばれます。
ですので、明確にこれらのリスナーの間には違いがあります。
【SkipListener】
スキップリスナーは、スキップ処理中(コミット数=1)に例外発生し、スキップされたitemを受け取れます。
例外を起こしたitemを知る、唯一の方法ではないかと思います。
【RetryListener】
リトライリスナーは、ItemProcessor、ItemWriterが呼ばれるたびに呼び出されます。
呼び出される前にopen()メソッドが、呼び出された後はclose()メソッドが呼ばれます。
そして、処理中に例外が発生した場合には、onError()メソッドが呼ばれます。
リトライリスナーですることは恐らく、状態のログをとったり、リトライを止めるくらいだと思います。
例えば、open()メソッドではbooleanを返却しますが、これがfalseになるとリトライが止まります。
異常処理の対処のサンプル
以下に、異常処理の対処のサンプルを、SpringBatchの公式HPの5章を元にして見て行くことにします。
基本的には英訳したものを記述しているだけです。
(処理内容は上記の説明記事を読んでおかないと、おそらく勘違いされるかと思います。お手数ですが先立ってお読みいただければと思います。)
<例外発生時にスキップするサンプル>
<step id="step1">
<tasklet>
<chunk reader="flatFileItemReader" writer="itemWriter" skip-limit="10" >
<skippable-exception-classes>
<include class="java.lang.Exception"/>
<exclude class="java.io.FileNotFoundException"/>
</skippable-exception-classes>
</chunk>
</tasklet>
</step>
上記のように設定することで指定した例外で処理を止めずに、次のitemに処理を移すことができます。
skippable-exception-classesタグ:
このタグのincludeで指定された例外が発生した場合は、処理がスキップされ、次のitemに処理を移します。
上記の場合は、Exceptionとその派生クラスがすべて対象になることを示しています。
excludeで指定された例外は、includeで対象となった例外でもスキップ対象から除外されます。
上記の場合は、FileNotFoundExceptionだけがスキップされずにFAILすることになります。
includeだけ指定することもできます。
skip-limit属性:
スキップ対象となったitemの数が、ここで指定された数を超えた場合、FAILすることになります。
上記の場合は、10個までは処理を継続しますが、11個目のitemがスキップされたときにエラーで落ちます。
エラーは、SkipListenerFailedException例外が発生します。
skip-policyタグ:
上記のサンプルでは記述はありません。
このタグで自作したSkipPolicyを指定すると、自作したクラス内で、発生した例外に応じて自由にスキップするかどうかを指定できます。
<スキップしたitemに対して何か処理を行いたい場合>
スキップするだけなく何か処理を行いたいときがあります。
そんなときは、SkipListenerをimplimentsしたクラスを作成し、listenersタグに設定します。
通常のリスナーと同じように作成し、設定すればよいです。
リスナークラスには、発生した例外と、例外発生時のitemが渡ってきます。
【SkipListenerのサンプル】
例えば、書き込みでスキップされたitemを溜めておいて、ステップが正常終了したときに何か別の処理をするような場合は、
以下のようなリスナーになるかと思います。 もし、例外発生する毎に処理するならばStepExecutionListener は不要です。
public class SkipSampleListener
implements SkipListener<Member, Member>, StepExecutionListener {
public final String KEY_NAME = "SkipSampleListener.list";
private StepExecution stepExecution;
@Override
public void onSkipInProcess(Member item, Throwable e) {
}
@Override
public void onSkipInRead(Throwable e) {
}
@Override
public void onSkipInWrite(Member item, Throwable e) {
//リスタート可能にしたいなら、ExecutionContextに保存したほうがよいと思います。
ExecutionContext context = this.stepExecution.getExecutionContext();
List<String> list = (List<String>)context.get(KEY_NAME);
String id = item.getId();
list.add(id);
}
@Override
public ExitStatus afterStep(StepExecution se) {
List<String> list = (List<String>)se.getExecutionContext().get(KEY_NAME);
if(se.getStatus() == BatchStatus.COMPLETED && !list.isEmpty()){
for(String id : list){
//何か処理する....
}
}
return null;
}
@Override
public void beforeStep(StepExecution se) {
this.stepExecution = se;
se.getExecutionContext().put(KEY_NAME, new ArrayList<String>());
}
}
※注意
ExecutionContextの使い方は他の記事を参照ください。
ExecutionContextは最終的にはDBに保存されます。カラムのサイズ以上は保存できないため、あまりに多くの容量を使用するオブジェクトを保存すると
正しく動作しません。
上記の例では、MemberオブジェクトをExecutionContextに保存するのではなく、キーとなるIDだけを保存したのは容量オーバーを防ぐためです。
また、Skipする数が多くなるとListの要素が増えるので、SpringBatchのskip-limit属性を使用して最大数を制限しておいた方がよいでしょう。
<例外発生時にリトライするサンプル>
<job id="retryTest" xmlns="http://www.springframework.org/schema/batch" incrementer="jobParametersIncrementer" >
<step id="step1" parent="tolerantStep">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2" retry-limit="2">
<retryable-exception-classes>
<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</retryable-exception-classes>
</chunk>
</tasklet>
</step>
</job>
<!-- backoffPolicyはFaultTolerantStepFactoryBeanでしか設定できない -->
<bean id="tolerantStep" class="org.springframework.batch.core.step.item.FaultTolerantStepFactoryBean" abstract="true">
<property name="jobRepository" ref="jobRepository" />
<property name="startLimit" value="100" />
<property name="commitInterval" value="1" />
<property name="backOffPolicy" ref="backoffPolicy"/>
</bean>
<!-- 1000ミリ秒待ってからリトライを行うためのbean -->
<bean id="backoffPolicy" class="org.springframework.batch.retry.backoff.FixedBackOffPolicy">
<property name="backOffPeriod" value="1000"/>
</bean>
上記のようにすることで、例外が発生したときに再度、同じチャンクの処理をしようとします。
この設定の利用ケースは次のような場合が考えられます。
DeadlockLoserDataAccessExceptionのような例外が発生した場合、誰かがDBのロックをしているために発生しているので、もう一度同じ処理をしたときに
ロックが解除されていて、成功する可能性があります。
このようなケースでは上記のようなリトライ処理が有効です。
使用できるポリシーは主に2つあります。
・RetryPolicyはリトライするときの制限のようなものを指定するためのポリシーです。
例えば、最大3回までリトライする、最大3秒経つまでリトライするなどです。
・BackOffPolicyはリトライするときの停止時間などを指定するためのポリシーです。
例えば、10秒経ったらリトライする、などです。
FixedBackOffPolicyなどをうまく使えば、10秒おきにリトライするなど、かなり自由に処理ができるかと思います。
BackOffPolicyは他と違い、xml上に<listener>のような設定タグが用意されていないようです。
ですので、FaultTolerantStepFactoryBeanのbackOffPolicyプロパティで設定することになります。
(フォーラムにその手の質問が挙がっていたので、将来追加されるかもしれません)
リトライは、あくまで繰り返しをするための機構です。
もし、データを変更して処理を続行できるようにしたい場合などは、RecoveryCallbackを使用するようです。
<例外発生時にリカバリするサンプル(十分な検証をしていません)>
リカバリするサンプルは英語でも資料が少ないため、試していないものを記載することをご了承ください。
設定ファイル:
<job id="retryTest" xmlns="http://www.springframework.org/schema/batch" incrementer="jobParametersIncrementer" >
<step id="step1" parent="tolerantStep">
<tasklet>
<chunk reader="itemReader" writer="itemWriter" commit-interval="2" retry-limit="2">
<retryable-exception-classes>
<include class="org.springframework.dao.DeadlockLoserDataAccessException"/>
</retryable-exception-classes>
</chunk>
</tasklet>
</step>
</job>
<!-- step -->
<bean id="tolerantStep" class="org.springframework.batch.core.step.item.FaultTolerantStepFactoryBean" abstract="true">
<property name="jobRepository" ref="jobRepository" />
<property name="startLimit" value="100" />
<property name="commitInterval" value="1" />
</bean>
<!-- リトライのためのインターセプター。今回のメインのbean -->
<bean id="retryInterceptor" class="org.springframework.batch.retry.interceptor.StatefulRetryOperationsInterceptor">
<property name="recoverer">
<bean class="test.retry.TestMethodInvocationRecoverer" />
</property>
</bean>
<!--
AOPの設定です(SpringBatchのドキュメントから引用したもの)。
下記の方法ですと同じクラスであればすべて織り込まれてしまいます。
実際には、NameMatchMethodPointcutAdvisorを使用する方が実践向きかも知れません。
-->
<aop:config>
<aop:pointcut id="transactional"
expression="execution(* org.springframework.batch.item.database.JdbcBatchItemWriter.write(..))" />
<aop:advisor pointcut-ref="transactional" advice-ref="retryInterceptor" order="-1"/>
</aop:config>
AOPで織り込むインターセプター:
public class TestMethodInvocationRecoverer<T> implements MethodInvocationRecoverer<T> {
private static Log log = LogFactory.getLog(TestMethodInvocationRecoverer.class);
@Override
public T recover(Object[] args, Throwable paramThrowable) {
//ItemWriter.write()の引数
List<T> items = (List<T>)args[0];
log.info("Invocation.recover :" + items);
return null;
}
}
上記のようにAOPすればリカバリが起動するはずです。
どうも、リカバリのために用意されている機構はAOPで織り込むインターセプタだけのようです。
ドキュメントの読み込みが足りないだけでしたらすみません。
また、ビジネスロジックに織り込むような書き方だった気がするので、自由な箇所にリカバリの処理を入れられるのかも知れません。
しかしここでは、ItemWriterに織り込むように書いています。
recover()メソッド:
上記の場合、ログを出しているだけです。
特に例外もthrowしていません。この場合、問題になったチャンクは何も処理されないのに(リカバリされないのに)、FAILすることなく次のチャンクの処理に進んでしまいます。
ですので、何も処理をしなかった場合や、リカバリが失敗したときには例外を投げるべきかと思います。
※注意点
リカバリの処理は、リトライが指定の回数以上失敗したときに呼ばれます。
ですので、上記のリカバリの設定をしてもリトライの処理はされます。
また、リトライ処理ではコミット数=1と同様の動作でしたが、recover()メソッドに渡されるitemはコミット数分あります。
それと、上記サンプルを動作したとき、retry-limitを、commit-intervalの数より大きくしないとrecover()メソッドが呼ばれませんでした。
まだまだ完全には動作を追い切れていません。
申し訳ないです。
Created Date: 2011/09/02