一般的なバッチパターン

一部のバッチジョブは、Spring Batch の既製のコンポーネントから純粋に組み立てることができます。たとえば、ItemReader および ItemWriter 実装は、さまざまなシナリオをカバーするように構成できます。ただし、ほとんどの場合、カスタムコードを記述する必要があります。アプリケーション開発者向けの主要な API エントリポイントは、TaskletItemReaderItemWriter、さまざまなリスナーインターフェースです。ほとんどの単純なバッチジョブでは、Spring Batch ItemReader からの既製の入力を使用できますが、開発者が ItemWriter または ItemProcessor を実装する必要がある処理および記述にカスタムの懸念がある場合がよくあります。

この章では、カスタムビジネスロジックの一般的なパターンの例をいくつか示します。これらの例は、主にリスナーインターフェースを特徴としています。ItemReader または ItemWriter は、必要に応じてリスナーインターフェースも実装できることに注意してください。

アイテムの処理と失敗のログ

一般的な使用例は、ステップごとのエラーをアイテムごとに特別に処理する必要があることです。おそらく、特別なチャネルにログを記録するか、データベースにレコードを挿入します。チャンク指向の Step (ステップファクトリ Bean から作成)を使用すると、ユーザーは read のエラー用の単純な ItemReadListener と write のエラー用の ItemWriteListener を使用してこのユースケースを実装できます。次のコードスニペットは、読み取りと書き込みの両方の失敗を記録するリスナーを示しています。

public class ItemFailureLoggerListener extends ItemListenerSupport {

    private static Log logger = LogFactory.getLog("item.error");

    public void onReadError(Exception ex) {
        logger.error("Encountered error on read", e);
    }

    public void onWriteError(Exception ex, List<? extends Object> items) {
        logger.error("Encountered error on write", ex);
    }
}

このリスナーを実装したら、ステップに登録する必要があります。

  • Java

  • XML

次の例は、リスナーをステップ Java に登録する方法を示しています。

Java 構成
@Bean
public Step simpleStep(JobRepository jobRepository) {
	return new StepBuilder("simpleStep", jobRepository)
				...
				.listener(new ItemFailureLoggerListener())
				.build();
}

次の例は、XML のステップにリスナーを登録する方法を示しています。

XML 構成
<step id="simpleStep">
...
<listeners>
    <listener>
        <bean class="org.example...ItemFailureLoggerListener"/>
    </listener>
</listeners>
</step>
リスナーが onError() メソッドで何かを行う場合、ロールバックされるトランザクション内になければなりません。onError() メソッド内でデータベースなどのトランザクションリソースを使用する必要がある場合は、そのメソッドに宣言トランザクションを追加し(詳細については Spring コアリファレンスガイドを参照)、その伝播属性に REQUIRES_NEW の値を与えることを検討してください。

ビジネス上の理由でジョブを手動で停止する

Spring Batch は JobOperator インターフェースを介して stop() メソッドを提供しますが、これは実際にはアプリケーションプログラマーではなくオペレーターが使用するためのものです。場合によっては、ビジネスロジック内からジョブの実行を停止する方が便利な場合や意味がある場合があります。

最も簡単なことは、RuntimeException (無期限に再試行もスキップもされないもの)をスローすることです。例: 次の例に示すように、カスタム例外型を使用できます。

public class PoisonPillItemProcessor<T> implements ItemProcessor<T, T> {

    @Override
    public T process(T item) throws Exception {
        if (isPoisonPill(item)) {
            throw new PoisonPillException("Poison pill detected: " + item);
        }
        return item;
    }
}

次の例に示すように、ステップの実行を停止するもう 1 つの簡単な方法は、ItemReader から null を返すことです。

public class EarlyCompletionItemReader implements ItemReader<T> {

    private ItemReader<T> delegate;

    public void setDelegate(ItemReader<T> delegate) { ... }

    public T read() throws Exception {
        T item = delegate.read();
        if (isEndItem(item)) {
            return null; // end the step here
        }
        return item;
    }

}

前の例は、処理されるアイテムが null である場合にバッチ全体を通知する CompletionPolicy 戦略のデフォルト実装があるという事実に実際に依存しています。より洗練された完了ポリシーを実装し、SimpleStepFactoryBean を介して Step に挿入できます。

  • Java

  • XML

次の例は、Java のステップに完了ポリシーを挿入する方法を示しています。

Java 構成
@Bean
public Step simpleStep(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("simpleStep", jobRepository)
				.<String, String>chunk(new SpecialCompletionPolicy(), transactionManager)
				.reader(reader())
				.writer(writer())
				.build();
}

次の例は、XML のステップに完了ポリシーを挿入する方法を示しています。

XML 構成
<step id="simpleStep">
    <tasklet>
        <chunk reader="reader" writer="writer" commit-interval="10"
               chunk-completion-policy="completionPolicy"/>
    </tasklet>
</step>

<bean id="completionPolicy" class="org.example...SpecialCompletionPolicy"/>

代替方法は、StepExecution にフラグを設定することです。これは、アイテム処理の間にフレームワークの Step 実装によってチェックされます。この代替を実装するには、現在の StepExecution にアクセスする必要があり、これは StepListener を実装して Step に登録することで実現できます。次の例は、フラグを設定するリスナーを示しています。

public class CustomItemWriter extends ItemListenerSupport implements StepListener {

    private StepExecution stepExecution;

    public void beforeStep(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    public void afterRead(Object item) {
        if (isPoisonPill(item)) {
            stepExecution.setTerminateOnly();
       }
    }

}

フラグが設定されている場合、デフォルトの動作では、ステップが JobInterruptedException をスローします。この動作は、StepInterruptionPolicy を介して制御できます。ただし、唯一の選択肢は例外をスローするかしないかであるため、これは常にジョブの異常終了です。

フッターレコードを追加する

多くの場合、フラットファイルに書き込む場合、すべての処理が完了した後、「フッター」レコードをファイルの最後に追加する必要があります。これは、Spring Batch が提供する FlatFileFooterCallback インターフェースを使用して実現できます。FlatFileFooterCallback (およびそ版である FlatFileHeaderCallback)は FlatFileItemWriter のオプションのプロパティであり、アイテムライターに追加できます。

  • Java

  • XML

次の例は、Java で FlatFileHeaderCallback および FlatFileFooterCallback を使用する方法を示しています。

Java 構成
@Bean
public FlatFileItemWriter<String> itemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.headerCallback(headerCallback())
			.footerCallback(footerCallback())
			.build();
}

次の例は、XML で FlatFileHeaderCallback および FlatFileFooterCallback を使用する方法を示しています。

XML 構成
<bean id="itemWriter" class="org.spr...FlatFileItemWriter">
    <property name="resource" ref="outputResource" />
    <property name="lineAggregator" ref="lineAggregator"/>
    <property name="headerCallback" ref="headerCallback" />
    <property name="footerCallback" ref="footerCallback" />
</bean>

フッターコールバックインターフェースには、次のインターフェース定義に示すように、フッターを記述する必要があるときに呼び出されるメソッドが 1 つだけあります。

public interface FlatFileFooterCallback {

    void writeFooter(Writer writer) throws IOException;

}

サマリーフッターの作成

フッターレコードに関連する一般的な要件は、出力プロセス中に情報を集約し、この情報をファイルの最後に追加することです。このフッターは、多くの場合、ファイルの要約として機能するか、チェックサムを提供します。

例: バッチジョブが Trade レコードをフラットファイルに書き込み、すべての Trades からの合計金額をフッターに配置する必要がある場合、次の ItemWriter 実装を使用できます。

public class TradeItemWriter implements ItemWriter<Trade>,
                                        FlatFileFooterCallback {

    private ItemWriter<Trade> delegate;

    private BigDecimal totalAmount = BigDecimal.ZERO;

    public void write(Chunk<? extends Trade> items) throws Exception {
        BigDecimal chunkTotal = BigDecimal.ZERO;
        for (Trade trade : items) {
            chunkTotal = chunkTotal.add(trade.getAmount());
        }

        delegate.write(items);

        // After successfully writing all items
        totalAmount = totalAmount.add(chunkTotal);
    }

    public void writeFooter(Writer writer) throws IOException {
        writer.write("Total Amount Processed: " + totalAmount);
    }

    public void setDelegate(ItemWriter delegate) {...}
}

この TradeItemWriter は、書き込まれた各 Trade 項目から amount とともに増加する totalAmount 値を保管します。最後の Trade が処理された後、フレームワークは writeFooter を呼び出し、totalAmount をファイルに入れます。write メソッドは、チャンクに Trade の合計量を格納する一時変数 chunkTotal を使用することに注意してください。これは、write メソッドでスキップが発生した場合に、totalAmount が変更されないようにするために行われます。write メソッドの最後にのみ、例外がスローされないことが保証されたら、totalAmount を更新します。

writeFooter メソッドを呼び出すには、TradeItemWriter (FlatFileFooterCallback を実装)を footerCallback として FlatFileItemWriter に接続する必要があります。

  • Java

  • XML

次の例は、Java で TradeItemWriter をワイヤリングする方法を示しています。

Java 構成
@Bean
public TradeItemWriter tradeItemWriter() {
	TradeItemWriter itemWriter = new TradeItemWriter();

	itemWriter.setDelegate(flatFileItemWriter(null));

	return itemWriter;
}

@Bean
public FlatFileItemWriter<String> flatFileItemWriter(Resource outputResource) {
	return new FlatFileItemWriterBuilder<String>()
			.name("itemWriter")
			.resource(outputResource)
			.lineAggregator(lineAggregator())
			.footerCallback(tradeItemWriter())
			.build();
}

次の例は、TradeItemWriter を XML でワイヤリングする方法を示しています。

XML 構成
<bean id="tradeItemWriter" class="..TradeItemWriter">
    <property name="delegate" ref="flatFileItemWriter" />
</bean>

<bean id="flatFileItemWriter" class="org.spr...FlatFileItemWriter">
   <property name="resource" ref="outputResource" />
   <property name="lineAggregator" ref="lineAggregator"/>
   <property name="footerCallback" ref="tradeItemWriter" />
</bean>

TradeItemWriter のこれまでの記述方法は、Step が再始動可能でない場合にのみ正しく機能します。これは、クラスがステートフル(totalAmount を格納するため)であるが、totalAmount はデータベースに永続化されないためです。再起動のイベントでは取得できません。このクラスを再起動可能にするには、次の例に示すように、ItemStream インターフェースをメソッド open および update とともに実装する必要があります。

public void open(ExecutionContext executionContext) {
    if (executionContext.containsKey("total.amount") {
        totalAmount = (BigDecimal) executionContext.get("total.amount");
    }
}

public void update(ExecutionContext executionContext) {
    executionContext.put("total.amount", totalAmount);
}

update メソッドは、オブジェクトがデータベースに永続化される直前に、totalAmount の最新バージョンを ExecutionContext に保存します。open メソッドは、既存の totalAmount を ExecutionContext から取得し、それを処理の開始点として使用して、TradeItemWriter が再始動時に Step が実行された前回の中断から再開できるようにします。

クエリベースの ItemReader の駆動

リーダーとライターに関する章では、ページングを使用したデータベース入力について説明しました。DB2 などの多くのデータベースベンダーは、読み取り中のテーブルをオンラインアプリケーションの他の部分でも使用する必要がある場合に課題を引き起こす可能性がある非常に悲観的なロック戦略を持っています。さらに、非常に大きなデータセット上でカーソルを開くと、特定のベンダーのデータベースで問題が発生する可能性があります。多くのプロジェクトでは、データの読み取りに「駆動クエリ」アプローチを使用することを好みます。このアプローチは、次の図に示すように、返される必要のあるオブジェクト全体ではなく、キーを反復処理することで機能します。

Driving Query Job
図 1: クエリジョブの実行

ご覧のとおり、前のイメージに示されている例では、カーソルベースの例で使用されたものと同じ 'FOO' テーブルが使用されています。ただし、行全体を選択するのではなく、SQL ステートメントで ID のみが選択されました。FOO オブジェクトが read から返されるのではなく、Integer が返されます。この番号は、次のイメージに示すように、完全な Foo オブジェクトである「詳細」の照会に使用できます。

Driving Query Example
図 2: 駆動クエリの例

ItemProcessor を使用して、駆動クエリから取得したキーを完全な Foo オブジェクトに変換する必要があります。既存の DAO を使用して、キーに基づいてオブジェクト全体を照会できます。

複数行レコード

通常、フラットファイルの場合、各レコードは 1 行に限定されますが、ファイルには複数の形式の複数の行にわたるレコードが含まれることがよくあります。ファイルからの次の抜粋は、そのような配置の例を示しています。

HEA;0013100345;2007-02-15
NCU;Smith;Peter;;T;20014539;F
BAD;;Oak Street 31/A;;Small Town;00235;IL;US
FOT;2;2;267.34

"HEA" で始まる行と "FOT" で始まる行の間のすべてが 1 つのレコードと見なされます。この状況を正しく処理するには、いくつかの考慮事項があります。

  • ItemReader は、一度に 1 つのレコードを読み取る代わりに、複数行レコードのすべての行をグループとして読み取って、ItemWriter にそのまま渡すことができるようにする必要があります。

  • 各回線型は、異なるトークン化が必要になる場合があります。

1 つのレコードが複数の行にまたがっており、行数がわからない場合があるため、ItemReader は常にレコード全体を読み取るように注意する必要があります。これを行うには、カスタム ItemReader を FlatFileItemReader のラッパーとして実装する必要があります。

  • Java

  • XML

次の例は、Java でカスタム ItemReader を実装する方法を示しています。

Java 構成
@Bean
public MultiLineTradeItemReader itemReader() {
	MultiLineTradeItemReader itemReader = new MultiLineTradeItemReader();

	itemReader.setDelegate(flatFileItemReader());

	return itemReader;
}

@Bean
public FlatFileItemReader flatFileItemReader() {
	FlatFileItemReader<Trade> reader = new FlatFileItemReaderBuilder<>()
			.name("flatFileItemReader")
			.resource(new ClassPathResource("data/iosample/input/multiLine.txt"))
			.lineTokenizer(orderFileTokenizer())
			.fieldSetMapper(orderFieldSetMapper())
			.build();
	return reader;
}

次の例は、カスタム ItemReader を XML で実装する方法を示しています。

XML 構成
<bean id="itemReader" class="org.spr...MultiLineTradeItemReader">
    <property name="delegate">
        <bean class="org.springframework.batch.item.file.FlatFileItemReader">
            <property name="resource" value="data/iosample/input/multiLine.txt" />
            <property name="lineMapper">
                <bean class="org.spr...DefaultLineMapper">
                    <property name="lineTokenizer" ref="orderFileTokenizer"/>
                    <property name="fieldSetMapper" ref="orderFieldSetMapper"/>
                </bean>
            </property>
        </bean>
    </property>
</bean>

各行が適切にトークン化されることを保証するために、固定長入力では特に重要ですが、デリゲート FlatFileItemReader で PatternMatchingCompositeLineTokenizer を使用できます。詳細については、リーダーとライターの章の FlatFileItemReader  を参照してください。次に、デリゲートリーダーは PassThroughFieldSetMapper を使用して、各行の FieldSet を折り返し ItemReader に戻します。

  • Java

  • XML

次の例は、Java で各行が適切にトークン化されるようにする方法を示しています。

Java コンテンツ
@Bean
public PatternMatchingCompositeLineTokenizer orderFileTokenizer() {
	PatternMatchingCompositeLineTokenizer tokenizer =
			new PatternMatchingCompositeLineTokenizer();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(4);

	tokenizers.put("HEA*", headerRecordTokenizer());
	tokenizers.put("FOT*", footerRecordTokenizer());
	tokenizers.put("NCU*", customerLineTokenizer());
	tokenizers.put("BAD*", billingAddressLineTokenizer());

	tokenizer.setTokenizers(tokenizers);

	return tokenizer;
}

次の例は、各行が XML で適切にトークン化されていることを確認する方法を示しています。

XML コンテンツ
<bean id="orderFileTokenizer" class="org.spr...PatternMatchingCompositeLineTokenizer">
    <property name="tokenizers">
        <map>
            <entry key="HEA*" value-ref="headerRecordTokenizer" />
            <entry key="FOT*" value-ref="footerRecordTokenizer" />
            <entry key="NCU*" value-ref="customerLineTokenizer" />
            <entry key="BAD*" value-ref="billingAddressLineTokenizer" />
        </map>
    </property>
</bean>

このラッパーは、レコードの終わりを認識できる必要があります。これにより、最後に到達するまで、デリゲートで read() を継続的に呼び出すことができます。読み取られる行ごとに、ラッパーは返されるアイテムを構築する必要があります。フッターに到達すると、次の例に示すように、アイテムを ItemProcessor および ItemWriter に配信するために返すことができます。

private FlatFileItemReader<FieldSet> delegate;

public Trade read() throws Exception {
    Trade t = null;

    for (FieldSet line = null; (line = this.delegate.read()) != null;) {
        String prefix = line.readString(0);
        if (prefix.equals("HEA")) {
            t = new Trade(); // Record must start with header
        }
        else if (prefix.equals("NCU")) {
            Assert.notNull(t, "No header was found.");
            t.setLast(line.readString(1));
            t.setFirst(line.readString(2));
            ...
        }
        else if (prefix.equals("BAD")) {
            Assert.notNull(t, "No header was found.");
            t.setCity(line.readString(4));
            t.setState(line.readString(6));
          ...
        }
        else if (prefix.equals("FOT")) {
            return t; // Record must end with footer
        }
    }
    Assert.isNull(t, "No 'END' was found.");
    return null;
}

システムコマンドの実行

多くのバッチジョブでは、バッチジョブ内から外部コマンドを呼び出す必要があります。このようなプロセスは、スケジューラによって個別に開始できますが、実行に関する一般的なメタデータの利点は失われます。さらに、マルチステップジョブも複数のジョブに分割する必要があります。

必要性が非常に一般的であるため、Spring Batch はシステムコマンドを呼び出すための Tasklet 実装を提供します。

  • Java

  • XML

次の例は、Java で外部コマンドを呼び出す方法を示しています。

Java 構成
@Bean
public SystemCommandTasklet tasklet() {
	SystemCommandTasklet tasklet = new SystemCommandTasklet();

	tasklet.setCommand("echo hello");
	tasklet.setTimeout(5000);

	return tasklet;
}

次の例は、XML で外部コマンドを呼び出す方法を示しています。

XML 構成
<bean class="org.springframework.batch.core.step.tasklet.SystemCommandTasklet">
    <property name="command" value="echo hello" />
    <!-- 5 second timeout for the command to complete -->
    <property name="timeout" value="5000" />
</bean>

入力が見つからない場合のステップ完了の処理

多くのバッチシナリオでは、データベースまたはファイル内で処理する行が見つからないことは例外ではありません。Step は、単に作業が見つからなかったと見なされ、0 項目の読み取りで完了します。Spring Batch でデフォルトで提供される ItemReader 実装はすべて、このアプローチがデフォルトです。これは、入力が存在する場合でも何も書き出されない場合、混乱を招く可能性があります(通常、ファイルの名前が間違っているか、同様の問題が発生した場合に起こります)。このため、フレームワークが処理されていることがわかった作業量を判断するために、メタデータ自体をインスペクションする必要があります。ただし、入力がないことが例外と見なされた場合はどうなるでしょうか? この場合、メタデータをプログラムで処理してアイテムが処理されておらず、失敗の原因になっていないかをプログラムで確認することが最善のソリューションです。これは一般的な使用例であるため、Spring Batch は、NoWorkFoundStepExecutionListener のクラス定義に示されているように、リスナーにまさにこの機能を提供します。

public class NoWorkFoundStepExecutionListener extends StepExecutionListenerSupport {

    public ExitStatus afterStep(StepExecution stepExecution) {
        if (stepExecution.getReadCount() == 0) {
            return ExitStatus.FAILED;
        }
        return null;
    }

}

前の StepExecutionListener は、"afterStep" フェーズで StepExecution の readCount プロパティをインスペクションして、読み取られたアイテムがないかどうかを判別します。その場合、終了コード FAILED が返され、Step が失敗することを示します。そうでない場合は、null が返されますが、これは Step のステータスには影響しません。

データを将来のステップに渡す

多くの場合、あるステップから別のステップに情報を渡すと便利です。これは、ExecutionContext を介して実行できます。キャッチは、2 つの ExecutionContexts があることです。1 つは Step レベルで、もう 1 つは Job レベルです。StepExecutionContext はステップの間だけ残りますが、JobExecutionContext は Job 全体を通して残ります。一方、StepExecutionContext は、Step がチャンクをコミットするたびに更新されますが、JobExecutionContext は、各 Step の最後でのみ更新されます。

この分離の結果、Step の実行中に、すべてのデータを StepExecutionContext に配置する必要があります。そうすることで、Step の実行中にデータが適切に保存されるようになります。データが JobExecutionContext に保存されている場合、そのデータは Step の実行中に保持されません。Step に障害が発生すると、そのデータは失われます。

public class SavingItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...

        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

データを将来の Steps で利用できるようにするには、ステップが終了した後、データを JobExecutionContext に「プロモート」する必要があります。Spring Batch は、この目的のために ExecutionContextPromotionListener を提供します。リスナーは、プロモートする必要がある ExecutionContext のデータに関連するキーを使用して構成する必要があります。オプションで、プロモーションを実行する必要のある終了コードパターンのリストを使用して構成することもできます(COMPLETED がデフォルトです)。すべてのリスナーと同様に、Step に登録する必要があります。

  • Java

  • XML

次の例は、Java で JobExecutionContext へのステップをプロモートする方法を示しています。

Java 構成
@Bean
public Job job1(JobRepository jobRepository, Step step1, Step step2) {
	return new JobBuilder("job1", jobRepository)
				.start(step1)
				.next(step2)
				.build();
}

@Bean
public Step step1(JobRepository jobRepository, PlatformTransactionManager transactionManager) {
	return new StepBuilder("step1", jobRepository)
				.<String, String>chunk(10, transactionManager)
				.reader(reader())
				.writer(savingWriter())
				.listener(promotionListener())
				.build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
	ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();

	listener.setKeys(new String[] {"someKey"});

	return listener;
}

次の例は、XML で JobExecutionContext へのステップをプロモートする方法を示しています。

XML 構成
<job id="job1">
    <step id="step1">
        <tasklet>
            <chunk reader="reader" writer="savingWriter" commit-interval="10"/>
        </tasklet>
        <listeners>
            <listener ref="promotionListener"/>
        </listeners>
    </step>

    <step id="step2">
       ...
    </step>
</job>

<beans:bean id="promotionListener" class="org.spr....ExecutionContextPromotionListener">
    <beans:property name="keys">
        <list>
            <value>someKey</value>
        </list>
    </beans:property>
</beans:bean>

最後に、次の例に示すように、保存された値を JobExecutionContext から取得する必要があります。

public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;

    public void write(Chunk<? extends Object> items) throws Exception {
        // ...
    }

    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}