Step 実行のインターセプト

Job の場合と同様に、Step の実行中には、ユーザーが何らかの機能を実行する必要がある可能性のある多くのイベントがあります。例: フッターを必要とするフラットファイルに書き出すには、フッターを書き込めるように、Step が完了したときに ItemWriter に通知する必要があります。これは、多くの Step スコープのリスナーの 1 つを使用して実現できます。

StepListener の拡張機能の 1 つを実装する任意のクラスを適用できます (ただし、インターフェース自体は空なので、そのインターフェース自体は実装できません) listeners 要素を介してステップに適用できます。listeners 要素は、ステップ、タスクレット、チャンク宣言内で有効です。機能が適用されるレベルでリスナーを宣言するか、多機能 ( StepExecutionListener や ItemReadListener など) の場合は、適用される最も細かいレベルで宣言することをお勧めします。

  • Java

  • XML

次の例は、Java のチャンクレベルで適用されるリスナーを示しています。

Java 構成
@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}

次の例は、XML のチャンクレベルで適用されるリスナーを示しています。

XML 構成
<step id="step1">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"/>
        <listeners>
            <listener ref="chunkListener"/>
        </listeners>
    </tasklet>
</step>

StepListener インターフェースの 1 つを実装する ItemReaderItemWriter、または ItemProcessor は、名前空間 <step> 要素または *StepFactoryBean ファクトリの 1 つを使用する場合、自動的に Step に登録されます。これは、Step に直接注入されたコンポーネントにのみ適用されます。リスナーが別のコンポーネント内にネストされている場合は、明示的に登録する必要があります (前述の ItemStream を Step に登録するで説明したように)。

StepListener インターフェースに加えて、同じ問題に対処するためのアノテーションが提供されています。プレーンな古い Java オブジェクトは、対応する StepListener 型に変換されるこれらのアノテーションを持つメソッドを持つことができます。ItemReader または ItemWriter または Tasklet などのチャンクコンポーネントのカスタム実装にアノテーションを付けることも一般的です。アノテーションは、<listener/> 要素の XML パーサーによって分析されるだけでなく、ビルダーの listener メソッドに登録されるため、XML 名前空間またはビルダーを使用してリスナーをステップに登録するだけです。

StepExecutionListener

StepExecutionListener は、Step 実行の最も一般的なリスナーを表します。次の例に示すように、Step の開始前と終了後に、正常に終了したか失敗したかを通知できます。

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatus には afterStep の戻り型があり、リスナーは Step の補完時に返される終了コードを変更できます。

このインターフェースに対応するアノテーションは次のとおりです。

  • @BeforeStep

  • @AfterStep

ChunkListener

「チャンク」は、トランザクションの範囲内で処理されるアイテムとして定義されます。各コミット間隔でトランザクションをコミットすると、チャンクがコミットされます。次のインターフェース定義が示すように、チャンクが処理を開始する前、またはチャンクが正常に完了した後に、ChunkListener を使用してロジックを実行できます。

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

beforeChunk メソッドは、トランザクションが開始された後、ItemReader で読み取りが開始される前に呼び出されます。逆に、afterChunk は、チャンクがコミットされた後に呼び出されます (または、ロールバックがある場合はまったく呼び出されません)。

このインターフェースに対応するアノテーションは次のとおりです。

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

チャンク宣言がない場合、ChunkListener を適用できます。TaskletStep は ChunkListener の呼び出しを担当するため、非アイテム指向のタスクレットにも適用されます (タスクレットの前後に呼び出されます)。

ItemReadListener

前にスキップロジックについて説明したとき、後で処理できるように、スキップされたレコードをログに記録することが有益である可能性があると述べました。読み取りエラーの場合、次のインターフェース定義が示すように、これは ItemReaderListener で実行できます。

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeRead メソッドは、各呼び出しの前に ItemReader を読み取るために呼び出されます。afterRead メソッドは、読み取りの呼び出しが成功するたびに呼び出され、読み取られたアイテムを渡します。読み取り中にエラーが発生した場合、onReadError メソッドが呼び出されます。発生した例外はログに記録できるように提供されます。

このインターフェースに対応するアノテーションは次のとおりです。

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

ItemReadListener と同様に、次のインターフェース定義が示すように、アイテムの処理を「リッスン」できます。

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess メソッドは、ItemProcessor の process の前に呼び出され、処理されるアイテムに渡されます。afterProcess メソッドは、アイテムが正常に処理された後に呼び出されます。処理中にエラーが発生した場合、onProcessError メソッドが呼び出されます。発生した例外と処理が試行されたアイテムが提供されるため、ログに記録できます。

このインターフェースに対応するアノテーションは次のとおりです。

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

次のインターフェース定義が示すように、ItemWriteListener を使用してアイテムの書き込みを「聞く」ことができます。

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite メソッドは、ItemWriter の write の前に呼び出され、書き込まれた項目のリストが渡されます。afterWrite メソッドは、項目が正常に書き込まれた後、チャンクの処理に関連付けられたトランザクションをコミットする前に呼び出されます。書き込み中にエラーが発生した場合は、onWriteError メソッドが呼び出されます。発生した例外と書き込みが試行された項目が提供され、ログに記録されます。

このインターフェースに対応するアノテーションは次のとおりです。

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener はすべて、エラーを通知するメカニズムを提供しますが、レコードが実際にスキップされたことを通知するものはありません。たとえば、onWriteError は、アイテムが再試行されて成功した場合でも呼び出されます。このため、次のインターフェース定義に示すように、スキップされたアイテムを追跡するための別のインターフェースがあります。

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

onSkipInRead は、読み取り中にアイテムがスキップされるたびに呼び出されます。ロールバックにより、同じアイテムが複数回スキップされて登録される可能性があることに注意してください。onSkipInWrite は、書き込み中にアイテムがスキップされると呼び出されます。項目は正常に読み取られた(スキップされなかった)ため、引数として項目自体も提供されます。

このインターフェースに対応するアノテーションは次のとおりです。

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

SkipListeners とトランザクション

SkipListener の最も一般的な使用例の 1 つは、スキップされたアイテムをログアウトすることです。これにより、別のバッチプロセスまたは人間のプロセスを使用して、スキップにつながる課題を評価および修正できます。元のトランザクションがロールバックされるケースが多いため、Spring Batch は次の 2 つの保証を行います。

  • 適切な skip メソッド(エラーがいつ発生したかによる)は、アイテムごとに 1 回だけ呼び出されます。

  • SkipListener は、トランザクションがコミットされる直前に常に呼び出されます。これは、リスナーによって呼び出されるトランザクションリソースが ItemWriter 内の障害によってロールバックされないようにするためです。