Spring Cloud Stream 統合
タスクはそれ自体でも便利ですが、タスクをより大きなエコシステムに統合すると、より複雑な処理やオーケストレーションに役立つようになります。このセクションでは、Spring Cloud Task と Spring Cloud Stream の統合オプションについて説明します。
Spring Cloud Stream からのタスクの起動
ストリームからタスクを起動できます。これを行うには、ペイロードとして TaskLaunchRequest
を含むメッセージをリッスンするシンクを作成します。TaskLaunchRequest
には次のものが含まれます。
uri
: 実行されるタスクアーティファクトへ。applicationName
: タスクに関連付けられた名前。applicationName が設定されていない場合、TaskLaunchRequest
は次の内容で構成されるタスク名を生成します。:Task-<UUID>
.commandLineArguments
: タスクのコマンドライン引数を含むリスト。environmentProperties
: タスクで使用される環境変数を含むマップ。deploymentProperties
: デプロイヤーがタスクをデプロイするために使用するプロパティを含むマップ。
ペイロードの型が異なる場合、シンクは例外をスローします。 |
例: HTTP ソースからデータを取り込み、TaskLaunchRequest
を含む GenericMessage
を作成し、その出力チャネルにメッセージを送信するプロセッサーを備えたストリームを作成できます。タスクシンクは入力チャネルからメッセージを受信し、タスクを起動します。
taskSink を作成するには、次の例に示すように、EnableTaskLauncher
アノテーションを含む Spring Boot アプリケーションを作成するだけで済みます。
@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
public static void main(String[] args) {
SpringApplication.run(TaskSinkApplication.class, args);
}
}
Spring Cloud Task プロジェクトのサンプルモジュール [GitHub] (英語) には、サンプルのシンクとプロセッサーが含まれています。これらのサンプルをローカル maven リポジトリにインストールするには、次の例に示すように、skipInstall
プロパティを false
に設定して spring-cloud-task-samples
ディレクトリから maven ビルドを実行します。
mvn clean install
maven.remoteRepositories.springRepo.url プロパティは、Spring Boot Uber-jar が配置されている リモートリポジトリの場所に設定する必要があります。設定されていない場合、リモートリポジトリは存在しないため、ローカルリポジトリのみに依存します。 |
Spring Cloud Data Flow
Spring Cloud Data Flow でストリームを作成するには、まず作成したタスクシンクアプリケーションを登録する必要があります。次の例では、Spring Cloud Data Flow シェルを使用してプロセッサーとシンクのサンプルアプリケーションを登録しています。
app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>
次の例は、Spring Cloud Data Flow シェルからストリームを作成する方法を示しています。
stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy
Spring Cloud Task イベント
Spring Cloud Task は、タスクが Spring Cloud Stream チャネルを通じて実行されるときに、Spring Cloud Stream チャネルを通じてイベントを発行する機能を提供します。タスクリスナーは、task-events
という名前のメッセージチャネルで TaskExecution
をパブリッシュするために使用されます。この機能は、spring-cloud-stream
、spring-cloud-stream-<binder>
、クラスパス上に定義されたタスクを持つタスクに自動接続されます。
イベント発行リスナーを無効にするには、spring.cloud.task.events.enabled プロパティを false に設定します。 |
適切なクラスパスが定義されていると、次のタスクは task-events
チャネル上のイベントとして TaskExecution
を発行します (タスクの開始時と終了時の両方で)。
@SpringBootApplication
public class TaskEventsApplication {
public static void main(String[] args) {
SpringApplication.run(TaskEventsApplication.class, args);
}
@Configuration
public static class TaskConfiguration {
@Bean
public ApplicationRunner applicationRunner() {
return new ApplicationRunner() {
@Override
public void run(ApplicationArguments args) {
System.out.println("The ApplicationRunner was executed");
}
};
}
}
}
バインダー実装もクラスパス上にある必要があります。 |
サンプルタスクイベントアプリケーションは、Spring Cloud Task プロジェクトのサンプルモジュール ( こちら) [GitHub] (英語) にあります。 |
Spring Batch イベント
タスクを通じて Spring Batch ジョブを実行する場合、Spring Batch で使用可能な Spring Batch リスナーに基づいて情報メッセージを発行するように Spring Cloud Task を構成できます。具体的には、次の Spring Batch リスナーが各バッチジョブに自動構成され、Spring Cloud Task を通じて実行されるときに、関連する Spring Cloud Stream チャネルにメッセージを送信します。
JobExecutionListener
はjob-execution-events
をリッスンしますStepExecutionListener
はstep-execution-events
をリッスンしますChunkListener
はchunk-events
をリッスンしますItemReadListener
はitem-read-events
をリッスンしますItemProcessListener
はitem-process-events
をリッスンしますItemWriteListener
はitem-write-events
をリッスンしますSkipListener
はskip-events
をリッスンします
適切な Bean ( Job
および TaskLifecycleListener
) がコンテキスト内に存在する場合、これらのリスナーは AbstractJob
に自動構成されます。これらのイベントをリッスンするための構成は、他の Spring Cloud Stream チャネルへのバインドと同じ方法で処理されます。私たちのタスク (バッチジョブを実行するタスク) は Source
として機能し、リスニングアプリケーションは Processor
または Sink
として機能します。
たとえば、アプリケーションがジョブの開始と停止のために job-execution-events
チャネルをリッスンすることが考えられます。リスニングアプリケーションを構成するには、次のように入力が job-execution-events
になるように構成します。
spring.cloud.stream.bindings.input.destination=job-execution-events
バインダー実装もクラスパス上にある必要があります。 |
バッチイベントアプリケーションのサンプルは、Spring Cloud Task プロジェクトのサンプルモジュール ( こちら) [GitHub] (英語) にあります。 |
バッチイベントを別のチャネルに送信する
Spring Cloud Task がバッチイベントに対して提供するオプションの 1 つは、特定のリスナーがメッセージを送信できるチャネルを変更する機能です。これを行うには、次の構成を使用します: spring.cloud.stream.bindings.<the channel>.destination=<new destination>
。例: StepExecutionListener
がデフォルトの step-execution-events
ではなく my-step-execution-events
という別のチャネルにメッセージを送信する必要がある場合、次の構成を追加できます。
spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events
バッチイベントの無効化
すべてのバッチイベントのリスナー機能を無効にするには、次の構成を使用します。
spring.cloud.task.batch.events.enabled=false
特定のバッチイベントを無効にするには、次の構成を使用します。
spring.cloud.task.batch.events.<batch event listener>.enabled=false
:
次のリストは、無効にできる個々のリスナーを示しています。
spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false
バッチイベントの発行順序
デフォルトでは、バッチイベントには Ordered.LOWEST_PRECEDENCE
があります。この値を変更するには (たとえば、5 に)、次の構成を使用します。
spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5