バッチ処理とトランザクション
再試行なしの単純なバッチ処理
再試行のないネストされたバッチの次の簡単な例を検討してください。これは、バッチ処理の一般的なシナリオを示しています。入力ソースは使い果たされるまで処理され、処理の「チャンク」の最後に定期的にコミットされます。
1 | REPEAT(until=exhausted) { | 2 | TX { 3 | REPEAT(size=5) { 3.1 | input; 3.2 | output; | } | } | | }
入力操作 (3.1) は、メッセージベースの受信 (JMS などから) またはファイルベースの読み取りのいずれかになりますが、回復してジョブ全体を完了する可能性がある処理を続行するには、トランザクションである必要があります。3.2 での運用も同様です。トランザクション対応またはべき等である必要があります。
3.2 でのデータベース例外のために REPEAT
(3) のチャンクが失敗した場合、TX
(2) はチャンク全体をロールバックする必要があります。
単純なステートレス再試行
次の例に示すように、Web サービスやその他の リモートリソースへの呼び出しなど、トランザクションではない操作に再試行を使用することも役立ちます。
0 | TX { 1 | input; 1.1 | output; 2 | RETRY { 2.1 | remote access; | } | }
リモート呼び出しはデータベース更新よりも失敗する可能性が高く、再試行可能であるため、これは実際には再試行の最も有用なアプリケーションの 1 つです。リモートアクセス (2.1) が最終的に成功する限り、トランザクション TX
(0) はコミットされます。リモートアクセス (2.1) が最終的に失敗した場合、トランザクション TX
(0) はロールバックすることが保証されます。
典型的な繰り返し再試行パターン
最も一般的なバッチ処理パターンは、次の例に示すように、チャンクの内部ブロックに再試行を追加することです。
1 | REPEAT(until=exhausted, exception=not critical) { | 2 | TX { 3 | REPEAT(size=5) { | 4 | RETRY(stateful, exception=deadlock loser) { 4.1 | input; 5 | } PROCESS { 5.1 | output; 6 | } SKIP and RECOVER { | notify; | } | | } | } | | }
内側の RETRY
(4) ブロックは「ステートフル」としてマークされます。ステートフルな再試行の説明については、一般的な使用例を参照してください。これは、再試行 PROCESS
(5) ブロックが失敗した場合、RETRY
(4) の動作が次のようになることを意味します。
例外をスローし、チャンクレベルでトランザクション
TX
(2) をロールバックし、アイテムを入力キューに再提示できるようにします。項目が再表示されると、設定されている再試行ポリシーに応じて再試行され、
PROCESS
(5) が再度実行されます。2 回目以降の試行は再び失敗し、例外が再スローされる可能性があります。最終的に、アイテムは最後に再表示されます。再試行ポリシーは別の試行を許可しないため、
PROCESS
(5) は実行されません。この場合、RECOVER
(6) パスに従い、受信して処理中のアイテムを効果的に「スキップ」します。
計画の RETRY
(4) に使用される表記は、入力ステップ (4.1) が再試行の一部であることを明示的に示していることに注意してください。また、処理には 2 つの代替パスがあることも明らかになります。PROCESS
(5) で示される通常のケースと、RECOVER
(6) で別のブロックに示される回復パスです。2 つの代替パスは完全に異なります。通常の状況で取得されるのは 1 つだけです。
特別なケース (特別な TranscationValidException
型など) では、再試行ポリシーは、項目が失敗するのを待つ代わりに、PROCESS
(5) が失敗した後の最後の試行で RECOVER
(6) パスを取ることができると判断できる場合があります。再提示されます。これは、PROCESS
(5) ブロック内で何が起こったのかについての詳細な知識が必要になるため、デフォルトの動作ではありませんが、通常は利用できません。例: エラーの前に出力に書き込みアクセスが含まれていた場合、トランザクションの整合性を確保するために例外を再スローする必要があります。
外側の REPEAT
(1) の完了ポリシーは、計画の成功にとって重要です。出力 (5.1) が失敗した場合、例外がスローされる可能性があります (説明されているように、通常はスローされます)。この場合、トランザクション TX
(2) が失敗し、例外が外側のバッチ REPEAT
(1) を介して伝播する可能性があります。RETRY
(4) は再試行しても成功する可能性があるため、バッチ全体を停止させたくありません。そのため、外側の REPEAT
(1) に exception=not critical
を追加します。
ただし、TX
(2) が失敗し、外完了政策のおかげで、次の内部 REPEAT
(3) で処理されている項目で、もう一度試してない場合だけ失敗したことを 1 であることが保証されていないこと。そうかもしれませんが、入力の実装(4.1)に依存します。新しい項目または古い項目のいずれかで出力(5.1)が再び失敗する可能性があります。バッチのクライアントは、各 RETRY
(4) 試行が最後に失敗したアイテムと同じアイテムを処理しようとしていると想定してはなりません。例: REPEAT
(1) の終了ポリシーが 10 回の試行後に失敗する場合、10 回連続して失敗しますが、必ずしも同じアイテムで失敗するわけではありません。これは、全体的な再試行戦略と一致しています。内側の RETRY
(4) は各アイテムの履歴を認識しており、別の試行を行うかどうかを決定できます。
非同期チャンク処理
典型的な例の内側のバッチまたはチャンクは、AsyncTaskExecutor
を使用するように外側のバッチを構成することで同時に実行できます。外側のバッチは、完了する前にすべてのチャンクが完了するのを待ちます。次の例は、非同期チャンク処理を示しています。
1 | REPEAT(until=exhausted, concurrent, exception=not critical) { | 2 | TX { 3 | REPEAT(size=5) { | 4 | RETRY(stateful, exception=deadlock loser) { 4.1 | input; 5 | } PROCESS { | output; 6 | } RECOVER { | recover; | } | | } | } | | }
非同期アイテム処理
典型的な例のチャンク内の個々のアイテムも、原則として同時に処理できます。この場合、トランザクション境界は、次の例に示すように、各トランザクションが単一のスレッド上にあるように、個々の項目のレベルに移動する必要があります。
1 | REPEAT(until=exhausted, exception=not critical) { | 2 | REPEAT(size=5, concurrent) { | 3 | TX { 4 | RETRY(stateful, exception=deadlock loser) { 4.1 | input; 5 | } PROCESS { | output; 6 | } RECOVER { | recover; | } | } | | } | | }
この計画では、すべてのトランザクションリソースをチャンク化するという単純な計画にあった最適化の利点が犠牲になります。処理のコスト (5) がトランザクション管理のコスト (3) よりもはるかに高い場合にのみ役立ちます。
バッチ処理とトランザクション伝播の相互作用
バッチ再試行とトランザクション管理の間には、理想的に望んでいるよりも密接な結合があります。特に、NESTED 伝播をサポートしていないトランザクションマネージャーを使用して、ステートレス再試行を使用してデータベース操作を再試行することはできません。
次の例では、繰り返しのない再試行を使用しています。
1 | TX { | 1.1 | input; 2.2 | database access; 2 | RETRY { 3 | TX { 3.1 | database access; | } | } | | }
再び、同じ理由で、RETRY
(2) が最終的に成功した場合でも、内部トランザクション TX
(3) により、外部トランザクション TX
(1) が失敗する可能性があります。
残念ながら、次の例に示すように、同じ効果が再試行ブロックから周囲の繰り返しバッチ (存在する場合) まで浸透します。
1 | TX { | 2 | REPEAT(size=5) { 2.1 | input; 2.2 | database access; 3 | RETRY { 4 | TX { 4.1 | database access; | } | } | } | | }
現在、TX(3) がロールバックする場合、TX(1) でバッチ全体を汚染し、最後に強制的にロールバックできます。
デフォルト以外の伝播はどうですか?
前の例では、
TX
(3) のPROPAGATION_REQUIRES_NEW
は、両方のトランザクションが最終的に成功した場合に、外側のTX
(1) が汚染されるのを防ぎます。しかし、TX
(3) がコミットし、TX
(1) がロールバックすると、TX
(3) はコミットされたままになるため、TX
(1) のトランザクション契約に違反します。TX
(3) がロールバックする場合、TX
(1) は必ずしもロールバックするとは限りません (ただし、再試行によってロールバック例外がスローされるため、実際にはロールバックする可能性があります)。TX
(3) のPROPAGATION_NESTED
は、再試行の場合(およびスキップのあるバッチの場合)に必要なように機能します。TX
(3) はコミットできますが、その後、外部トランザクションTX
(1) によってロールバックされます。TX
(3) がロールバックする場合、TX
(1) は実際にロールバックします。このオプションは、Hibernate または JTA を含まない一部のプラットフォームでのみ使用できますが、一貫して機能する唯一のオプションです。
再試行ブロックにデータベースアクセスが含まれる場合、NESTED
パターンが最適です。
特別なケース: 直交リソースを使用したトランザクション
ネストされたデータベーストランザクションがない単純なケースでは、デフォルトの伝播は常に問題ありません。SESSION
と TX
がグローバル XA
リソースではないため、リソースが直交している次の例を検討してください。
0 | SESSION { 1 | input; 2 | RETRY { 3 | TX { 3.1 | database access; | } | } | }
ここにトランザクションメッセージ SESSION
(0) がありますが、これは PlatformTransactionManager
を使用する他のトランザクションには参加しないため、TX
(3) の開始時に伝播しません。RETRY
(2) ブロック外のデータベースアクセスはありません。TX
(3) が失敗し、再試行で最終的に成功した場合、SESSION
(0) は (TX
ブロックとは関係なく) コミットできます。これは、バニラの「ベストエフォート 1 フェーズコミット」シナリオに似ています。起こりうる最悪の事態は、RETRY
(2) が成功し、SESSION
(0) がコミットできない場合 (たとえば、メッセージシステムが利用できないため) にメッセージが重複することです。
回復できないステートレス再試行
前に示した典型的な例のステートレス再試行とステートフル再試行の違いは重要です。実際、最終的に区別を強制するのはトランザクションの制約であり、この制約により、区別が存在する理由も明らかになります。
トランザクションでアイテム処理をラップしない限り、失敗したアイテムをスキップして残りのチャンクを正常にコミットする方法がないという観察から始めます。一般的なバッチ実行計画を次のように簡素化します。
0 | REPEAT(until=exhausted) { | 1 | TX { 2 | REPEAT(size=5) { | 3 | RETRY(stateless) { 4 | TX { 4.1 | input; 4.2 | database access; | } 5 | } RECOVER { 5.1 | skip; | } | | } | } | | }
前の例は、最後の試行が失敗した後に起動する RECOVER
(5) パスを持つステートレス RETRY
(3) を示しています。stateless
ラベルは、ある制限まで例外を再スローすることなく、ブロックが繰り返されることを意味します。これは、トランザクション TX
(4) で伝播がネストされている場合にのみ機能します。
内側の TX
(4) にデフォルトの伝播プロパティがあり、ロールバックする場合、外側の TX
(1) を汚染します。内部トランザクションは、トランザクションマネージャーによってトランザクションリソースが破損したと見なされるため、再度使用することはできません。
ネストされた伝播のサポートは非常にまれであるため、現在のバージョンの Spring Batch ではステートレスな再試行による回復をサポートしないことを選択しています。前に示した典型的なパターンを使用すると、常に同じ効果を得ることができます (より多くの処理を繰り返すという犠牲を払って)。