Spring Cloud Stream 統合

タスクはそれ自体でも便利ですが、タスクをより大きなエコシステムに統合すると、より複雑な処理やオーケストレーションに役立つようになります。このセクションでは、Spring Cloud Task と Spring Cloud Stream の統合オプションについて説明します。

Spring Cloud Stream からのタスクの起動

ストリームからタスクを起動できます。これを行うには、ペイロードとして TaskLaunchRequest を含むメッセージをリッスンするシンクを作成します。TaskLaunchRequest には次のものが含まれます。

  • uri: 実行されるタスクアーティファクトへ。

  • applicationName: タスクに関連付けられた名前。applicationName が設定されていない場合、TaskLaunchRequest は次の内容で構成されるタスク名を生成します。: Task-<UUID>.

  • commandLineArguments: タスクのコマンドライン引数を含むリスト。

  • environmentProperties: タスクで使用される環境変数を含むマップ。

  • deploymentProperties: デプロイヤーがタスクをデプロイするために使用するプロパティを含むマップ。

ペイロードの型が異なる場合、シンクは例外をスローします。

例: HTTP ソースからデータを取り込み、TaskLaunchRequest を含む GenericMessage を作成し、その出力チャネルにメッセージを送信するプロセッサーを備えたストリームを作成できます。タスクシンクは入力チャネルからメッセージを受信し、タスクを起動します。

taskSink を作成するには、次の例に示すように、EnableTaskLauncher アノテーションを含む Spring Boot アプリケーションを作成するだけで済みます。

@SpringBootApplication
@EnableTaskLauncher
public class TaskSinkApplication {
	public static void main(String[] args) {
		SpringApplication.run(TaskSinkApplication.class, args);
	}
}

Spring Cloud Task プロジェクトのサンプルモジュール [GitHub] (英語) には、サンプルのシンクとプロセッサーが含まれています。これらのサンプルをローカル maven リポジトリにインストールするには、次の例に示すように、skipInstall プロパティを false に設定して spring-cloud-task-samples ディレクトリから maven ビルドを実行します。

mvn clean install

maven.remoteRepositories.springRepo.url プロパティは、Spring Boot Uber-jar が配置されている リモートリポジトリの場所に設定する必要があります。設定されていない場合、リモートリポジトリは存在しないため、ローカルリポジトリのみに依存します。

Spring Cloud Data Flow

Spring Cloud Data Flow でストリームを作成するには、まず作成したタスクシンクアプリケーションを登録する必要があります。次の例では、Spring Cloud Data Flow シェルを使用してプロセッサーとシンクのサンプルアプリケーションを登録しています。

app register --name taskSink --type sink --uri maven://io.spring.cloud:tasksink:<version>
app register --name taskProcessor --type processor --uri maven:io.spring.cloud:taskprocessor:<version>

次の例は、Spring Cloud Data Flow シェルからストリームを作成する方法を示しています。

stream create foo --definition "http --server.port=9000|taskProcessor|taskSink" --deploy

Spring Cloud Task イベント

Spring Cloud Task は、タスクが Spring Cloud Stream チャネルを通じて実行されるときに、Spring Cloud Stream チャネルを通じてイベントを発行する機能を提供します。タスクリスナーは、task-events という名前のメッセージチャネルで TaskExecution をパブリッシュするために使用されます。この機能は、spring-cloud-streamspring-cloud-stream-<binder>、クラスパス上に定義されたタスクを持つタスクに自動接続されます。

イベント発行リスナーを無効にするには、spring.cloud.task.events.enabled プロパティを false に設定します。

適切なクラスパスが定義されていると、次のタスクは task-events チャネル上のイベントとして TaskExecution を発行します (タスクの開始時と終了時の両方で)。

@SpringBootApplication
public class TaskEventsApplication {

	public static void main(String[] args) {
		SpringApplication.run(TaskEventsApplication.class, args);
	}

	@Configuration
	public static class TaskConfiguration {

		@Bean
		public ApplicationRunner applicationRunner() {
			return new ApplicationRunner() {
				@Override
				public void run(ApplicationArguments args) {
					System.out.println("The ApplicationRunner was executed");
				}
			};
		}
	}
}
バインダー実装もクラスパス上にある必要があります。
サンプルタスクイベントアプリケーションは、Spring Cloud Task プロジェクトのサンプルモジュール ( こちら) [GitHub] (英語) にあります。

特定のタスクイベントの無効化

タスクイベントを無効にするには、spring.cloud.task.events.enabled プロパティを false に設定します。

Spring Batch イベント

タスクを通じて Spring Batch ジョブを実行する場合、Spring Batch で使用可能な Spring Batch リスナーに基づいて情報メッセージを発行するように Spring Cloud Task を構成できます。具体的には、次の Spring Batch リスナーが各バッチジョブに自動構成され、Spring Cloud Task を通じて実行されるときに、関連する Spring Cloud Stream チャネルにメッセージを送信します。

  • JobExecutionListener は job-execution-events をリッスンします

  • StepExecutionListener は step-execution-events をリッスンします

  • ChunkListener は chunk-events をリッスンします

  • ItemReadListener は item-read-events をリッスンします

  • ItemProcessListener は item-process-events をリッスンします

  • ItemWriteListener は item-write-events をリッスンします

  • SkipListener は skip-events をリッスンします

適切な Bean ( Job および TaskLifecycleListener) がコンテキスト内に存在する場合、これらのリスナーは AbstractJob に自動構成されます。これらのイベントをリッスンするための構成は、他の Spring Cloud Stream チャネルへのバインドと同じ方法で処理されます。私たちのタスク (バッチジョブを実行するタスク) は Source として機能し、リスニングアプリケーションは Processor または Sink として機能します。

たとえば、アプリケーションがジョブの開始と停止のために job-execution-events チャネルをリッスンすることが考えられます。リスニングアプリケーションを構成するには、次のように入力が job-execution-events になるように構成します。

spring.cloud.stream.bindings.input.destination=job-execution-events

バインダー実装もクラスパス上にある必要があります。
バッチイベントアプリケーションのサンプルは、Spring Cloud Task プロジェクトのサンプルモジュール ( こちら) [GitHub] (英語) にあります。

バッチイベントを別のチャネルに送信する

Spring Cloud Task がバッチイベントに対して提供するオプションの 1 つは、特定のリスナーがメッセージを送信できるチャネルを変更する機能です。これを行うには、次の構成を使用します: spring.cloud.stream.bindings.<the channel>.destination=<new destination>。例: StepExecutionListener がデフォルトの step-execution-events ではなく my-step-execution-events という別のチャネルにメッセージを送信する必要がある場合、次の構成を追加できます。

spring.cloud.task.batch.events.step-execution-events-binding-name=my-step-execution-events

バッチイベントの無効化

すべてのバッチイベントのリスナー機能を無効にするには、次の構成を使用します。

spring.cloud.task.batch.events.enabled=false

特定のバッチイベントを無効にするには、次の構成を使用します。

spring.cloud.task.batch.events.<batch event listener>.enabled=false:

次のリストは、無効にできる個々のリスナーを示しています。

spring.cloud.task.batch.events.job-execution.enabled=false
spring.cloud.task.batch.events.step-execution.enabled=false
spring.cloud.task.batch.events.chunk.enabled=false
spring.cloud.task.batch.events.item-read.enabled=false
spring.cloud.task.batch.events.item-process.enabled=false
spring.cloud.task.batch.events.item-write.enabled=false
spring.cloud.task.batch.events.skip.enabled=false

バッチイベントの発行順序

デフォルトでは、バッチイベントには Ordered.LOWEST_PRECEDENCE があります。この値を変更するには (たとえば、5 に)、次の構成を使用します。

spring.cloud.task.batch.events.job-execution-order=5
spring.cloud.task.batch.events.step-execution-order=5
spring.cloud.task.batch.events.chunk-order=5
spring.cloud.task.batch.events.item-read-order=5
spring.cloud.task.batch.events.item-process-order=5
spring.cloud.task.batch.events.item-write-order=5
spring.cloud.task.batch.events.skip-order=5