メッセージを介したバッチジョブの起動

コア Spring Batch API を使用してバッチジョブを開始する場合、基本的に次の 2 つのオプションがあります。

  • コマンドラインから、CommandLineJobRunner

  • プログラムにより、JobOperator.start() または JobLauncher.run() のいずれかを使用

例: シェルスクリプトを使用してバッチジョブを呼び出すときに、CommandLineJobRunner を使用することができます。または、JobOperator を直接使用することもできます (たとえば、Spring Batch を Web アプリケーションの一部として使用する場合)。しかし、より複雑なユースケースはどうでしょうか ? リモート (S)FTP サーバーをポーリングしてバッチジョブのデータを取得する必要がある場合や、アプリケーションが複数の異なるデータソースを同時にサポートする必要がある場合があります。例: Web だけでなく、FTP やその他のソースからもデータファイルを受信する場合があります。Spring Batch を呼び出す前に、入力ファイルの追加の変換が必要になる場合があります。

Spring Integration とその多数のアダプターを使用してバッチジョブを実行すると、はるかに強力になります。例: ファイル受信チャネルアダプターを使用して、ファイルシステム内のディレクトリを監視し、入力ファイルが到着したらすぐにバッチジョブを開始できます。さらに、複数の異なるアダプターを使用する Spring Integration フローを作成して、構成のみを使用して複数のソースからバッチジョブのデータを同時に簡単に取り込むことができます。これらすべてのシナリオを Spring Integration で実装するのは簡単です。これにより、分離されたイベント駆動型の JobLauncher 実行が可能になります。

Spring Batch Integration は、バッチジョブを起動するために使用できる JobLaunchingMessageHandler クラスを提供します。JobLaunchingMessageHandler の入力は、JobLaunchRequest 型のペイロードを持つ Spring Integration メッセージによって提供されます。このクラスは、起動される Job と、バッチジョブを起動するために必要な JobParameters のラッパーです。

次の図は、バッチジョブを開始するために必要な典型的な Spring Integration メッセージフローを示しています。EIP(Enterprise Integration Patterns)Web サイト (英語) は、メッセージアイコンとその説明の完全な概要を提供します。

Launch Batch Job
図 1: バッチジョブの起動

ファイルを JobLaunchRequest に変換する

次の例では、ファイルを JobLaunchRequest に変換します。

import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;

import java.io.File;

public class FileMessageToJobRequest {
    private Job job;
    private String fileParameterName;

    public void setFileParameterName(String fileParameterName) {
        this.fileParameterName = fileParameterName;
    }

    public void setJob(Job job) {
        this.job = job;
    }

    @Transformer
    public JobLaunchRequest toRequest(Message<File> message) {
        JobParametersBuilder jobParametersBuilder =
            new JobParametersBuilder();

        jobParametersBuilder.addString(fileParameterName,
            message.getPayload().getAbsolutePath());

        return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
    }
}

JobExecution レスポンス

バッチジョブの実行中は、JobExecution インスタンスが返されます。このインスタンスを使用して、実行のステータスを判断できます。JobExecution を正常に作成できた場合は、実際の実行が成功したかどうかに関係なく、常に JobExecution が返されます。

JobExecution インスタンスが返される正確な動作は、提供された TaskExecutor によって異なります。synchronous (シングルスレッド) TaskExecutor 実装が使用されている場合、JobExecution レスポンスは、ジョブが完了する after のみが返されます。asynchronous TaskExecutor を使用すると、JobExecution インスタンスがすぐに返されます。次に、JobExecution インスタンスの id ( JobExecution.getJobId() を使用) を取得し、JobExplorer を使用してジョブの更新されたステータスを JobRepository にクエリできます。詳細については、リポジトリのクエリを参照してください。

Spring Batch Integration の設定

提供されたディレクトリで CSV ファイルをリッスンし、トランスフォーマー (FileMessageToJobRequest) に渡し、ジョブ起動ゲートウェイを介してジョブを起動し、JobExecution の出力を logging-channel-adapter でログに記録するために、誰かがファイル inbound-channel-adapter を作成する必要がある場合を考えてみましょう。

  • Java

  • XML

次の例は、その一般的なケースを Java で構成する方法を示しています。

Java 構成
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
    FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
    fileMessageToJobRequest.setFileParameterName("input.file.name");
    fileMessageToJobRequest.setJob(personJob());
    return fileMessageToJobRequest;
}

@Bean
public JobLaunchingGateway jobLaunchingGateway() {
    TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
    jobLauncher.setJobRepository(jobRepository);
    jobLauncher.setTaskExecutor(new SyncTaskExecutor());
    JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);

    return jobLaunchingGateway;
}

@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
                    filter(new SimplePatternFileListFilter("*.csv")),
            c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
            transform(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

次の例は、その一般的なケースを XML で構成する方法を示しています。

XML 構成
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>

<int-file:inbound-channel-adapter id="filePoller"
    channel="inboundFileChannel"
    directory="file:/tmp/myfiles/"
    filename-pattern="*.csv">
  <int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>

<int:transformer input-channel="inboundFileChannel"
    output-channel="outboundJobRequestChannel">
  <bean class="io.spring.sbi.FileMessageToJobRequest">
    <property name="job" ref="personJob"/>
    <property name="fileParameterName" value="input.file.name"/>
  </bean>
</int:transformer>

<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
    reply-channel="jobLaunchReplyChannel"/>

<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>

ItemReader 構成の例

ファイルをポーリングしてジョブを起動したため、次の Bean 構成が示すように、"input.file.name" というジョブパラメーターによって定義された場所で見つかったファイルを使用するように、Spring Batch ItemReader (たとえば) を構成する必要があります。

  • Java

  • XML

次の Java の例は、必要な Bean 構成を示しています。

Java 構成
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
    FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
    flatFileItemReader.setResource(new FileSystemResource(resource));
...
    return flatFileItemReader;
}

次の XML の例は、必要な Bean 構成を示しています。

XML 構成
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
    scope="step">
  <property name="resource" value="file://#{jobParameters['input.file.name']}"/>
    ...
</bean>

前の例の主な関心点は、リソースプロパティ値として #{jobParameters['input.file.name']} の値を挿入し、ItemReader Bean をステップスコープを持つように設定することです。ステップスコープを持つように Bean を設定すると、jobParameters 変数へのアクセスを許可するレイトバインディングサポートを利用できます。