サブフローのサポート

if…​else および publish-subscribe コンポーネントの一部には、サブフローを使用してロジックまたはマッピングを指定する機能があります。次の例が示すように、最も単純なサンプルは .publishSubscribeChannel() です。

@Bean
public IntegrationFlow subscribersFlow() {
    return flow -> flow
            .publishSubscribeChannel(Executors.newCachedThreadPool(), s -> s
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p / 2)
                            .channel(c -> c.queue("subscriber1Results")))
                    .subscribe(f -> f
                            .<Integer>handle((p, h) -> p * 2)
                            .channel(c -> c.queue("subscriber2Results"))))
            .<Integer>handle((p, h) -> p * 3)
            .channel(c -> c.queue("subscriber3Results"));
}

個別の IntegrationFlow @Bean 定義を使用しても同じ結果を得ることができますが、論理構成のサブフロースタイルが役立つことを願っています。その結果、コードが短く(そして読みやすく)なることがわかりました。

バージョン 5.3 以降、BroadcastCapableChannel ベースの publishSubscribeChannel() 実装が提供され、ブローカーが支援するメッセージチャネルでサブフローサブスクライバーを構成します。例: Jms.publishSubscribeChannel() で複数のサブスクライバーをサブフローとして構成できるようになりました。

@Bean
public JmsPublishSubscribeMessageChannelSpec jmsPublishSubscribeChannel() {
    return Jms.publishSubscribeChannel(jmsConnectionFactory())
                .destination("pubsub");
}

@Bean
public IntegrationFlow pubSubFlow(BroadcastCapableChannel jmsPublishSubscribeChannel) {
    return f -> f
            .publishSubscribeChannel(jmsPublishSubscribeChannel,
                    pubsub -> pubsub
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel1")))
                            .subscribe(subFlow -> subFlow
                                .channel(c -> c.queue("jmsPubSubBridgeChannel2"))));
}

同様の publish-subscribe サブフロー構成は、.routeToRecipients() メソッドを提供します。

別の例は、.filter() メソッドで .discardChannel() の代わりに .discardFlow() を使用しています。

.route() には特別な注意が必要です。次の例を考えてみましょう。

@Bean
public IntegrationFlow routeFlow() {
    return f -> f
            .<Integer, Boolean>route(p -> p % 2 == 0,
                    m -> m.channelMapping("true", "evenChannel")
                            .subFlowMapping("false", sf ->
                                    sf.<Integer>handle((p, h) -> p * 3)))
            .transform(Object::toString)
            .channel(c -> c.queue("oddChannel"));
}

.channelMapping() は、通常の Router マッピングと同じように機能し続けますが、.subFlowMapping() はそのサブフローをメインフローに結び付けました。つまり、ルーターのサブフローは .route() の後にメインフローに戻ります。

場合によっては、.subFlowMapping() から既存の IntegrationFlow @Bean を参照する必要があります。次の例は、その方法を示しています。

@Bean
public IntegrationFlow splitRouteAggregate() {
    return f -> f
            .split()
            .<Integer, Boolean>route(o -> o % 2 == 0,
                    m -> m
                            .subFlowMapping(true, oddFlow())
                            .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
            .aggregate();
}

@Bean
public IntegrationFlow oddFlow() {
    return f -> f.handle(m -> System.out.println("odd"));
}

@Bean
public IntegrationFlow evenFlow() {
    return f -> f.handle((p, h) -> "even");
}


この場合、そのようなサブフローから応答を受信してメインフローを続行する必要がある場合、この IntegrationFlow Bean 参照(またはその入力チャネル)は、前の例に示すように .gateway() でラップする必要があります。前の例の oddFlow() 参照は、.gateway() にラップされていません。このルーティングブランチからの応答は期待していません。そうしないと、次のような例外が発生します。

Caused by: org.springframework.beans.factory.BeanCreationException:
    The 'currentComponent' (org.springframework.integration.router.MethodInvokingRouter@7965a51c)
    is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'.
    This is the end of the integration flow.

サブフローをラムダとして構成すると、フレームワークがサブフローとのリクエストと応答の対話を処理し、ゲートウェイは必要ありません。

サブフローは任意の深さにネストできますが、ネストすることはお勧めしません。実際、ルーターの場合でも、フロー内に複雑なサブフローを追加すると、すぐにスパゲッティのプレートのように見え始め、人間が解析するのが難しくなります。

DSL がサブフロー構成をサポートする場合、構成するコンポーネントに通常チャネルが必要であり、そのサブフローが channel() 要素で始まる場合、フレームワークはコンポーネントの出力チャネルとフローの入力チャネルの間に bridge() を暗黙的に配置します。例: この filter 定義では:

.filter(p -> p instanceof String, e -> e
	.discardFlow(df -> df
                         .channel(MessageChannels.queue())
                         ...)

フレームワークは、MessageFilter.discardChannel に注入するために DirectChannel Bean を内部的に作成します。次に、サブフローをサブスクリプションのこの暗黙的なチャネルで始まる IntegrationFlow にラップし、フローで指定された channel() の前に bridge を配置します。既存の IntegrationFlow Bean を(ラムダなどのインラインサブフローの代わりに)サブフロー参照として使用する場合、フレームワークがフロー Bean から最初のチャネルを解決できるため、そのようなブリッジは必要ありません。インラインサブフローでは、入力チャネルはまだ使用できません。