バッチ
このセクションでは、Spring Cloud Task と Spring Batch の統合について詳しく説明します。このセクションでは、ジョブの実行とそれが実行されたタスク間の関連付けの追跡、および Spring Cloud Deployer による リモートパーティショニングについて説明します。
ジョブ実行をそれが実行されたタスクに関連付ける
Spring Boot は、Spring Boot Uber-jar 内でバッチジョブを実行するための機能を提供します。Spring Boot によるこの機能のサポートにより、開発者はその実行内で複数のバッチジョブを実行できます。Spring Cloud Task は、ジョブの実行 (ジョブ実行) をタスクの実行に関連付けて、一方を他方に追跡できるようにする機能を提供します。
Spring Cloud Task は、TaskBatchExecutionListener
を使用してこの機能を実現します。デフォルトでは、このリスナーは、Spring Batch ジョブ (コンテキスト内で定義された型 Job
の Bean を持つことによって) とクラスパス上の spring-cloud-task-batch
jar の両方を持つコンテキストで自動的に設定されます。リスナーは、これらの条件を満たすすべてのジョブに挿入されます。
TaskBatchExecutionListener のオーバーライド
現在のコンテキスト内のバッチジョブにリスナーが挿入されないようにするには、標準の Spring Boot メカニズムを使用して自動構成を無効にします。
コンテキスト内の特定のジョブにのみリスナーを挿入するには、次の例に示すように、batchTaskExecutionListenerBeanPostProcessor
をオーバーライドし、ジョブ Bean ID のリストを指定します。
public static TaskBatchExecutionListenerBeanPostProcessor batchTaskExecutionListenerBeanPostProcessor() {
TaskBatchExecutionListenerBeanPostProcessor postProcessor =
new TaskBatchExecutionListenerBeanPostProcessor();
postProcessor.setJobNames(Arrays.asList(new String[] {"job1", "job2"}));
return postProcessor;
}
サンプルバッチアプリケーションは、Spring Cloud Task プロジェクトのサンプルモジュール ( こちら) [GitHub] (英語) にあります。 |
リモートパーティショニング
Spring Cloud Deployer は、ほとんどのクラウドインフラストラクチャ上で Spring Boot ベースのアプリケーションを起動するための機能を提供します。DeployerPartitionHandler
および DeployerStepExecutionHandler
は、ワーカーステップ実行の起動を Spring Cloud Deployer に委譲します。
DeployerStepExecutionHandler
を構成するには、実行される Spring Boot Uber-jar を表す Resource
、TaskLauncherHandler
、JobExplorer
を指定する必要があります。環境プロパティのほか、一度に実行する ワーカー の最大数、結果をポーリングする間隔 (デフォルトは 10 秒)、およびタイムアウト (デフォルトは -1 またはタイムアウトなし) を構成できます。次の例は、この PartitionHandler
の構成がどのように見えるかを示しています。
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher,
JobExplorer jobExplorer) throws Exception {
MavenProperties mavenProperties = new MavenProperties();
mavenProperties.setRemoteRepositories(new HashMap<>(Collections.singletonMap("springRepo",
new MavenProperties.RemoteRepository(repository))));
Resource resource =
MavenResource.parse(String.format("%s:%s:%s",
"io.spring.cloud",
"partitioned-batch-job",
"1.1.0.RELEASE"), mavenProperties);
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource, "workerStep");
List<String> commandLineArgs = new ArrayList<>(3);
commandLineArgs.add("--spring.profiles.active=worker");
commandLineArgs.add("--spring.cloud.task.initialize.enable=false");
commandLineArgs.add("--spring.batch.initializer.enabled=false");
partitionHandler.setCommandLineArgsProvider(
new PassThroughCommandLineArgsProvider(commandLineArgs));
partitionHandler.setEnvironmentVariablesProvider(new NoOpEnvironmentVariablesProvider());
partitionHandler.setMaxWorkers(2);
partitionHandler.setApplicationName("PartitionedBatchJobTask");
return partitionHandler;
}
環境変数をパーティションに渡す場合、各パーティションは異なる環境設定を持つ異なるマシン上に存在する可能性があります。必要な環境変数のみを渡す必要があります。 |
上の例では、ワーカー の最大数を 2 に設定していることに注意してください。ワーカー の最大数を設定すると、一度に実行する必要があるパーティションの最大数が確立されます。
実行される Resource
は、現在のコンテキストで CommandLineRunner
として構成された DeployerStepExecutionHandler
を備えた Spring Boot Uber-jar であることが予想されます。前の例で列挙されたリポジトリは、Spring Boot Uber-jar が配置されている リモートリポジトリである必要があります。マネージャーと ワーカー は両方とも、ジョブリポジトリおよびタスクリポジトリとして使用されている同じデータストアを認識できることが期待されます。基盤となるインフラストラクチャが Spring Boot jar をブートストラップし、Spring Boot が DeployerStepExecutionHandler
を起動すると、ステップハンドラーはリクエストされた Step
を実行します。次の例は、DeployerStepExecutionHandler
を構成する方法を示しています。
@Bean
public DeployerStepExecutionHandler stepExecutionHandler(JobExplorer jobExplorer) {
DeployerStepExecutionHandler handler =
new DeployerStepExecutionHandler(this.context, jobExplorer, this.jobRepository);
return handler;
}
リモートパーティションアプリケーションのサンプルは、Spring Cloud Task プロジェクトのサンプルモジュール ( こちら) [GitHub] (英語) にあります。 |
リモートバッチパーティションを非同期的に起動する
デフォルトでは、バッチパーティションは順番に起動されます。ただし、リソース (Kubernetes での pod のプロビジョニングなど) がプロビジョニングされるまで各起動がブロックされるため、場合によってはこれがパフォーマンスに影響を与える可能性があります。このような場合は、ThreadPoolTaskExecutor
を DeployerPartitionHandler
に提供できます。これにより、ThreadPoolTaskExecutor
の構成に基づいて リモートバッチパーティションが起動されます。例:
@Bean
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setThreadNamePrefix("default_task_executor_thread");
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.initialize();
return executor;
}
@Bean
public PartitionHandler partitionHandler(TaskLauncher taskLauncher, JobExplorer jobExplorer,
TaskRepository taskRepository, ThreadPoolTaskExecutor executor) throws Exception {
Resource resource = this.resourceLoader
.getResource("maven://io.spring.cloud:partitioned-batch-job:2.2.0.BUILD-SNAPSHOT");
DeployerPartitionHandler partitionHandler =
new DeployerPartitionHandler(taskLauncher, jobExplorer, resource,
"workerStep", taskRepository, executor);
...
}
ThreadPoolTaskExecutor を使用するとスレッドがアクティブなままになり、アプリが終了しないため、コンテキストを閉じる必要があります。アプリケーションを適切に閉じるには、spring.cloud.task.closecontextEnabled プロパティを true に設定する必要があります。 |
Kubernetes Platform 用のバッチパーティションアプリケーションの開発に関する注意事項
Kubernetes プラットフォームにパーティション分割されたアプリをデプロイする場合は、Spring Cloud Kubernetes Deployer に対して次の依存関係を使用する必要があります。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-deployer-kubernetes</artifactId> </dependency>
タスクアプリケーションとそのパーティションのアプリケーション名は、正規表現パターン
[a-z0-9]([-a-z0-9]*[a-z0-9])
に従う必要があります。それ以外の場合は、例外がスローされます。
バッチ情報メッセージ
Spring Cloud Task は、バッチジョブが情報メッセージを発行する機能を提供します。"Spring Batch イベント" セクションでは、この機能について詳しく説明します。
バッチジョブの終了コード
前に説明したように、Spring Cloud Task アプリケーションは、タスク実行の終了コードを記録する機能をサポートしています。ただし、タスク内で Spring Batch ジョブを実行する場合、バッチジョブの実行がどのように完了したかに関係なく、デフォルトのバッチ /Boot 動作を使用すると、タスクの結果は常に 0 になります。タスクは Boot アプリケーションであり、タスクから返される終了コードは Boot アプリケーションと同じであることに注意してください。この動作をオーバーライドして、バッチジョブが FAILED
の BatchStatus を返したときにタスクがゼロ以外の終了コードを返せるようにするには、spring.cloud.task.batch.fail-on-job-failure
を true
に設定します。この場合、終了コードは 1 (デフォルト)、または指定された ExitCodeGenerator
に基づくことができます)
この機能は、Spring Boot によって提供されるものを置き換える新しい ApplicationRunner
を使用します。デフォルトでは、同じ順序で構成されます。ただし、ApplicationRunner
の実行順序をカスタマイズする場合は、spring.cloud.task.batch.applicationRunnerOrder
プロパティを設定することで順序を設定できます。タスクがバッチジョブの実行結果に基づいて終了コードを返すようにするには、独自の CommandLineRunner
を作成する必要があります。