オペレーター gateway()
IntegrationFlow 定義の gateway() オペレーターは、特別なサービスアクティベーターの実装であり、入力チャネルを介して他のエンドポイントまたは統合フローを呼び出し、応答を待ちます。技術的には、<chain> 定義(チェーン内からチェーンを呼び出すを参照)のネストされた <gateway> コンポーネントと同じロールを果たし、フローをよりクリーンでわかりやすくします。論理的に、ビジネスの観点から、ターゲット統合ソリューションの異なる部分の間で機能の分散と再利用を可能にするメッセージングゲートウェイです(メッセージングゲートウェイを参照)。この演算子には、さまざまなゴールに対していくつかのオーバーロードがあります。
gateway(String requestChannel)は、名前でエンドポイントの入力チャネルにメッセージを送信します。gateway(MessageChannel requestChannel)は、直接注入によってエンドポイントの入力チャネルにメッセージを送信します。gateway(IntegrationFlow flow)は、提供されたIntegrationFlowの入力チャネルにメッセージを送信します。
これらのすべてには、ターゲット GatewayMessageHandler およびそれぞれの AbstractEndpoint を構成するための 2 番目の Consumer<GatewayEndpointSpec> 引数を持つバリアントがあります。また、IntegrationFlow ベースのメソッドでは、既存の IntegrationFlow Bean を呼び出したり、IntegrationFlow 関数インターフェースのインプレースラムダを介してフローをサブフローとして宣言したり、private メソッドクリーナーコードスタイルで抽出したりすることができます。
@Bean
IntegrationFlow someFlow() {
return IntegrationFlow
.from(...)
.gateway(subFlow())
.handle(...)
.get();
}
private static IntegrationFlow subFlow() {
return f -> f
.scatterGather(s -> s.recipientFlow(...),
g -> g.outputProcessor(MessageGroup::getOne))
} ダウンストリームフローが常に応答を返すとは限らない場合は、requestTimeout を 0 に設定して、呼び出し元のスレッドが無期限にハングしないようにする必要があります。その場合、フローはその時点で終了し、スレッドはさらなる作業のために解放されます。 |
バージョン 6.5 以降、この gateway() 演算子は async(true) の動作を完全にサポートします。内部的には、GatewayProxyFactoryBean 用に AsyncRequestReplyExchanger サービスインターフェースが提供されています。また、AsyncRequestReplyExchanger 契約は CompletableFuture<Message<?>> であるため、リクエストと応答全体が非同期で実行されます。