スキャッターギャザー

バージョン 4.1 から、Spring Integration はスキャッターギャザー (英語) エンタープライズ統合パターンの実装を提供します。これは、メッセージを受信者に送信し、結果を集約することを目的とする複合エンドポイントです。エンタープライズ統合パターン (英語) に記載されているように、「最良の見積もり」などのシナリオのコンポーネントであり、複数のサプライヤーに情報をリクエストし、リクエストされたアイテムに最適な用語を提供するサプライヤーを決定する必要があります。

以前は、個別のコンポーネントを使用してパターンを構成できました。この機能強化により、より便利な構成が実現します。

ScatterGatherHandler は、PublishSubscribeChannel (または RecipientListRouter)と AggregatingMessageHandler を組み合わせたリクエスト / 応答エンドポイントです。リクエストメッセージは scatter チャネルに送信され、ScatterGatherHandler はアグリゲーターが outputChannel に送信する応答を待ちます。

機能性

Scatter-Gather パターンは、「オークション」と「配信」の 2 つのシナリオを提案しています。どちらの場合も、aggregation 機能は同じであり、AggregatingMessageHandler で利用可能なすべてのオプションを提供します。(実際には、ScatterGatherHandler はコンストラクター引数として AggregatingMessageHandler のみを必要とします)詳細については、アグリゲーターを参照してください。

競売

オークション Scatter-Gather バリアントは、リクエストメッセージに「パブリッシュ / サブスクライブ」ロジックを使用します。ここで、「スキャッター」チャネルは apply-sequence="true" の PublishSubscribeChannel です。ただし、このチャネルは任意の MessageChannel 実装にすることができます(ContentEnricher の request-channel の場合のように - コンテンツエンリッチャーを参照)。ただし、この場合、aggregation 関数用に独自のカスタム correlationStrategy を作成する必要があります。

分布

配布 Scatter-Gather バリアントは、RecipientListRouter (RecipientListRouter を参照)に基づいており、RecipientListRouter で使用可能なすべてのオプションがあります。これは、2 番目の ScatterGatherHandler コンストラクター引数です。recipient-list-router および aggregator のデフォルトの correlationStrategy のみに依存する場合は、apply-sequence="true" を指定する必要があります。それ以外の場合は、aggregator 用のカスタム correlationStrategy を提供する必要があります。PublishSubscribeChannel バリアント(オークションバリアント)とは異なり、recipient-list-router selector オプションを使用すると、メッセージに基づいてターゲットサプライヤーをフィルタリングできます。apply-sequence="true" では、デフォルトの sequenceSize が提供され、aggregator はグループを正しく解放できます。配布オプションは、オークションオプションと相互に排他的です。

フレームワークは外部から提供されたコンポーネントを変更できないため、applySequence=true は ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) コンストラクター構成に基づくプレーンな Java 構成にのみ必要です。便宜上、Scatter-Gather 用の XML および Java DSL は、バージョン 6.0 から applySequence を true に設定します。

オークションとディストリビューションの両方のバリエーションで、リクエスト(スキャター)メッセージは gatherResultChannel ヘッダーで強化され、aggregator からの応答メッセージを待機します。

デフォルトでは、すべてのサプライヤーは結果を replyChannel ヘッダーに送信する必要があります(通常、最終エンドポイントから output-channel を省略します)。ただし、gatherChannel オプションも提供されているため、サプライヤーは集約のためにそのチャネルに応答を送信できます。

スキャッターギャザーエンドポイントの構成

次の例は、Scatter-Gather の Bean 定義の Java 構成を示しています。

@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}

@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}

@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}

前の例では、applySequence="true" と受信者チャネルのリストを使用して RecipientListRouter distributor Bean を構成します。次の Bean は AggregatingMessageHandler 用です。最後に、これらの Bean を両方とも ScatterGatherHandler Bean 定義に挿入し、それを @ServiceActivator としてマークして、スキャッターギャザーコンポーネントを統合フローにワイヤリングします。

次の例は、XML 名前空間を使用して <scatter-gather> エンドポイントを構成する方法を示しています。

<scatter-gather
		id=""  (1)
		auto-startup=""  (2)
		input-channel=""  (3)
		output-channel=""  (4)
		scatter-channel=""  (5)
		gather-channel=""  (6)
		order=""  (7)
		phase=""  (8)
		send-timeout=""  (9)
		gather-timeout=""  (10)
		requires-reply="" > (11)
			<scatterer/>  (12)
			<gatherer/>  (13)
</scatter-gather>
1 エンドポイントの ID。ScatterGatherHandler Bean は、id + '.handler' のエイリアスで登録されます。RecipientListRouter Bean は、id + '.scatterer' のエイリアスで登録されます。AggregatingMessageHandler Bean は、id + '.gatherer' のエイリアスで登録されます。オプション。(BeanFactory はデフォルトの id 値を生成します。)
2 アプリケーションコンテキストの初期化中にエンドポイントを起動する必要があるかどうかを示すライフサイクル属性。さらに、ScatterGatherHandler は Lifecycle も実装し、gather-channel が提供された場合に内部で作成される gatherEndpoint を開始および停止します。オプション。(デフォルトは true です。)
3ScatterGatherHandler で処理するリクエストメッセージを受信するチャネル。必須。
4ScatterGatherHandler が集約結果を送信するチャネル。オプション。(受信メッセージは、replyChannel メッセージヘッダーで応答チャネル自体を指定できます)。
5 オークションシナリオの散布メッセージの送信先のチャネル。オプション。<scatterer> サブ要素と相互に排他的。
6 集計に対する各サプライヤーからの返信を受信するチャネル。スキャッタメッセージの replyChannel ヘッダーとして使用されます。オプション。デフォルトでは、FixedSubscriberChannel が作成されます。
7 複数のハンドラーが同じ DirectChannel にサブスクライブされる場合のこのコンポーネントの順序(負荷分散の目的で使用)。オプション。
8 エンドポイントを開始および停止するフェーズを指定します。起動順序は最低から最高に進み、シャットダウン順序は最高から最低になります。デフォルトでは、この値は Integer.MAX_VALUE です。これは、このコンテナーが可能な限り遅く起動し、できるだけ早く停止することを意味します。オプション。
9 応答 Message を output-channel に送信するときに待機するタイムアウト間隔。デフォルトでは、send() は 1 秒間ブロックします。これは、出力チャネルに何らかの「送信」制限がある場合にのみ適用されます。たとえば、固定「容量」がいっぱいの QueueChannel などです。この場合、MessageDeliveryException がスローされます。send-timeout は、AbstractSubscribableChannel 実装では無視されます。group-timeout(-expression) の場合、スケジュールされた期限切れタスクの MessageDeliveryException により、このタスクが再スケジュールされます。オプション。
10 スキャッターギャザーが応答メッセージを返すまで待機する時間を指定できます。デフォルトでは、30 秒間待機します。応答がタイムアウトした場合は "null" が返されます。オプション。
11scatter-gather が null 以外の値を返す必要があるかどうかを指定します。この値は、デフォルトでは true です。そのため、基になるアグリゲーターが gather-timeout の後に NULL 値を返すと、ReplyRequiredException がスローされます。null が可能性がある場合、無期限の待機を避けるために gather-timeout を指定する必要があります。
12<recipient-list-router> オプション。オプション。scatter-channel 属性と相互に排他的。
13<aggregator> オプション。必須。

エラー処理

Scatter-Gather は複数のリクエスト / 応答コンポーネントであるため、エラー処理にはさらに複雑さが伴います。場合によっては、ReleaseStrategy がプロセスがリクエストよりも少ない応答で終了することを許可する場合、ダウンストリームの例外をキャッチして無視する方が良い場合があります。その他の場合、エラーが発生したときにサブフローから戻るために「カバレッジメッセージ」のようなものを考慮する必要があります。

すべての非同期サブフローは、MessagePublishingErrorHandler から送信される適切なエラーメッセージの errorChannel ヘッダーで構成する必要があります。そうでない場合、共通のエラー処理ロジックを使用して、エラーがグローバル errorChannel に送信されます。非同期エラー処理の詳細については、エラー処理を参照してください。

同期フローは、ExpressionEvaluatingRequestHandlerAdvice を使用して例外を無視したり、カバレッジメッセージを返したりする場合があります。サブフローの 1 つから ScatterGatherHandler に例外がスローされると、上流に再スローされます。このようにして、他のすべてのサブフローは何もせずに機能し、その応答は ScatterGatherHandler で無視されます。これは予期される動作である場合がありますが、ほとんどの場合、他のすべておよび Gatherer の期待に影響を与えることなく、特定のサブフローのエラーを処理する方が適切です。

バージョン 5.1.3 以降、ScatterGatherHandler には errorChannelName オプションが付属しています。これはスキャッタメッセージの errorChannel ヘッダーに設定され、非同期エラーが発生した場合に使用されるか、エラーメッセージを直接送信するために通常の同期サブフローで使用できます。

以下のサンプル構成は、補正メッセージを返すことによる非同期エラー処理を示しています。

@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}

@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}

適切な応答を生成するには、MessagePublishingErrorHandler によって scatterGatherErrorChannel に送信された MessagingException の failedMessage からヘッダー(replyChannel および errorChannel を含む)をコピーする必要があります。このようにして、応答メッセージグループの完了のために、ターゲット例外が ScatterGatherHandler の Gatherer に返されます。このような例外 payload は、Gatherer の MessageGroupProcessor でフィルターで除外するか、スキャッター / ギャザーエンドポイントの後にダウンストリームで処理できます。

分散結果を収集元に送信する前に、ScatterGatherHandler はリクエストメッセージヘッダーを元に戻します。これには、応答チャネルとエラーチャネル (存在する場合) が含まれます。このようにして、スキャッター受信者サブフローで非同期ハンドオフが適用された場合でも、AggregatingMessageHandler からのエラーが呼び出し元に伝搬されます。操作を成功させるには、スキャッター受信者サブフローからの応答に gatherResultChanneloriginalReplyChanneloriginalErrorChannel ヘッダーを転送する必要があります。この場合、合理的で有限の gatherTimeout を ScatterGatherHandler 用に構成する必要があります。そうしないと、デフォルトで、Gatherer からの応答を永久に待機してブロックされます。