メッセージを介したバッチジョブの起動
コア Spring Batch API を使用してバッチジョブを開始する場合、基本的に次の 2 つのオプションがあります。
コマンドラインから、
CommandLineJobRunner
プログラムにより、
JobOperator.start()
またはJobLauncher.run()
のいずれかを使用
例: シェルスクリプトを使用してバッチジョブを呼び出すときに、CommandLineJobRunner
を使用することができます。または、JobOperator
を直接使用することもできます (たとえば、Spring Batch を Web アプリケーションの一部として使用する場合)。しかし、より複雑なユースケースはどうでしょうか ? リモート (S)FTP サーバーをポーリングしてバッチジョブのデータを取得する必要がある場合や、アプリケーションが複数の異なるデータソースを同時にサポートする必要がある場合があります。例: Web だけでなく、FTP やその他のソースからもデータファイルを受信する場合があります。Spring Batch を呼び出す前に、入力ファイルの追加の変換が必要になる場合があります。
Spring Integration とその多数のアダプターを使用してバッチジョブを実行すると、はるかに強力になります。例: ファイル受信チャネルアダプターを使用して、ファイルシステム内のディレクトリを監視し、入力ファイルが到着したらすぐにバッチジョブを開始できます。さらに、複数の異なるアダプターを使用する Spring Integration フローを作成して、構成のみを使用して複数のソースからバッチジョブのデータを同時に簡単に取り込むことができます。これらすべてのシナリオを Spring Integration で実装するのは簡単です。これにより、分離されたイベント駆動型の JobLauncher
実行が可能になります。
Spring Batch Integration は、バッチジョブを起動するために使用できる JobLaunchingMessageHandler
クラスを提供します。JobLaunchingMessageHandler
の入力は、JobLaunchRequest
型のペイロードを持つ Spring Integration メッセージによって提供されます。このクラスは、起動される Job
と、バッチジョブを起動するために必要な JobParameters
のラッパーです。
次の図は、バッチジョブを開始するために必要な典型的な Spring Integration メッセージフローを示しています。EIP(Enterprise Integration Patterns)Web サイト (英語) は、メッセージアイコンとその説明の完全な概要を提供します。
ファイルを JobLaunchRequest に変換する
次の例では、ファイルを JobLaunchRequest
に変換します。
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.integration.launch.JobLaunchRequest;
import org.springframework.integration.annotation.Transformer;
import org.springframework.messaging.Message;
import java.io.File;
public class FileMessageToJobRequest {
private Job job;
private String fileParameterName;
public void setFileParameterName(String fileParameterName) {
this.fileParameterName = fileParameterName;
}
public void setJob(Job job) {
this.job = job;
}
@Transformer
public JobLaunchRequest toRequest(Message<File> message) {
JobParametersBuilder jobParametersBuilder =
new JobParametersBuilder();
jobParametersBuilder.addString(fileParameterName,
message.getPayload().getAbsolutePath());
return new JobLaunchRequest(job, jobParametersBuilder.toJobParameters());
}
}
JobExecution レスポンス
バッチジョブの実行中は、JobExecution
インスタンスが返されます。このインスタンスを使用して、実行のステータスを判断できます。JobExecution
を正常に作成できた場合は、実際の実行が成功したかどうかに関係なく、常に JobExecution
が返されます。
JobExecution
インスタンスが返される正確な動作は、提供された TaskExecutor
によって異なります。synchronous
(シングルスレッド) TaskExecutor
実装が使用されている場合、JobExecution
レスポンスは、ジョブが完了する after
のみが返されます。asynchronous
TaskExecutor
を使用すると、JobExecution
インスタンスがすぐに返されます。次に、JobExecution
インスタンスの id
( JobExecution.getJobId()
を使用) を取得し、JobExplorer
を使用してジョブの更新されたステータスを JobRepository
にクエリできます。詳細については、リポジトリのクエリを参照してください。
Spring Batch Integration の設定
提供されたディレクトリで CSV ファイルをリッスンし、トランスフォーマー (FileMessageToJobRequest
) に渡し、ジョブ起動ゲートウェイを介してジョブを起動し、JobExecution
の出力を logging-channel-adapter
でログに記録するために、誰かがファイル inbound-channel-adapter
を作成する必要がある場合を考えてみましょう。
Java
XML
次の例は、その一般的なケースを Java で構成する方法を示しています。
@Bean
public FileMessageToJobRequest fileMessageToJobRequest() {
FileMessageToJobRequest fileMessageToJobRequest = new FileMessageToJobRequest();
fileMessageToJobRequest.setFileParameterName("input.file.name");
fileMessageToJobRequest.setJob(personJob());
return fileMessageToJobRequest;
}
@Bean
public JobLaunchingGateway jobLaunchingGateway() {
TaskExecutorJobLauncher jobLauncher = new TaskExecutorJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SyncTaskExecutor());
JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher);
return jobLaunchingGateway;
}
@Bean
public IntegrationFlow integrationFlow(JobLaunchingGateway jobLaunchingGateway) {
return IntegrationFlow.from(Files.inboundAdapter(new File("/tmp/myfiles")).
filter(new SimplePatternFileListFilter("*.csv")),
c -> c.poller(Pollers.fixedRate(1000).maxMessagesPerPoll(1))).
transform(fileMessageToJobRequest()).
handle(jobLaunchingGateway).
log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
get();
}
次の例は、その一般的なケースを XML で構成する方法を示しています。
<int:channel id="inboundFileChannel"/>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-file:inbound-channel-adapter id="filePoller"
channel="inboundFileChannel"
directory="file:/tmp/myfiles/"
filename-pattern="*.csv">
<int:poller fixed-rate="1000"/>
</int-file:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="io.spring.sbi.FileMessageToJobRequest">
<property name="job" ref="personJob"/>
<property name="fileParameterName" value="input.file.name"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel"/>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
ItemReader 構成の例
ファイルをポーリングしてジョブを起動したため、次の Bean 構成が示すように、"input.file.name" というジョブパラメーターによって定義された場所で見つかったファイルを使用するように、Spring Batch ItemReader
(たとえば) を構成する必要があります。
Java
XML
次の Java の例は、必要な Bean 構成を示しています。
@Bean
@StepScope
public ItemReader sampleReader(@Value("#{jobParameters[input.file.name]}") String resource) {
...
FlatFileItemReader flatFileItemReader = new FlatFileItemReader();
flatFileItemReader.setResource(new FileSystemResource(resource));
...
return flatFileItemReader;
}
次の XML の例は、必要な Bean 構成を示しています。
<bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader"
scope="step">
<property name="resource" value="file://#{jobParameters['input.file.name']}"/>
...
</bean>
前の例の主な関心点は、リソースプロパティ値として #{jobParameters['input.file.name']}
の値を挿入し、ItemReader
Bean をステップスコープを持つように設定することです。ステップスコープを持つように Bean を設定すると、jobParameters
変数へのアクセスを許可するレイトバインディングサポートを利用できます。