スレッドバリア

他の非同期イベントが発生するまで、メッセージフロースレッドを中断する必要がある場合があります。例: メッセージを RabbitMQ に発行する HTTP リクエストを検討します。RabbitMQ ブローカーがメッセージを受信したという確認を発行するまで、ユーザーに返信したくない場合があります。

バージョン 4.2 では、Spring Integration はこの目的のために <barrier/> コンポーネントを導入しました。基になる MessageHandler は BarrierMessageHandler です。このクラスは MessageTriggerAction も実装します。trigger() メソッドに渡されたメッセージは、handleRequestMessage() メソッドの対応するスレッドを解放します(存在する場合)。

中断されたスレッドとトリガースレッドは、メッセージで CorrelationStrategy を呼び出すことにより関連付けられます。メッセージが input-channel に送信されると、スレッドは最大 requestTimeout ミリ秒中断され、対応するトリガーメッセージを待機します。デフォルトの相関戦略では、IntegrationMessageHeaderAccessor.CORRELATION_ID ヘッダーを使用します。トリガーメッセージが同じ相関で到着すると、スレッドは解放されます。リリース後に output-channel に送信されるメッセージは、MessageGroupProcessor を使用して構築されます。デフォルトでは、メッセージは 2 つのペイロードの Collection<?> であり、ヘッダーは DefaultAggregatingMessageGroupProcessor を使用してマージされます。

trigger() メソッドが最初に呼び出された場合(またはメインスレッドがタイムアウトした後)、中断メッセージが到着するのを待つ triggerTimeout まで待機します。トリガースレッドを一時停止したくない場合は、代わりに TaskExecutor にハンドオフして、そのスレッドを一時停止することを検討してください。
以前のバージョンの 5.4 では、リクエストメッセージとトリガーメッセージの両方に timeout オプションが 1 つしかありませんでしたが、一部のユースケースでは、これらのアクションに異なるタイムアウトを設定することをお勧めします。そのため、requestTimeout および triggerTimeout オプションが導入されました。

requires-reply プロパティは、トリガーメッセージが到着する前に中断されたスレッドがタイムアウトした場合に実行するアクションを決定します。デフォルトでは、false です。これは、エンドポイントが null を返し、フローが終了し、スレッドが呼び出し元に戻ることを意味します。true の場合、ReplyRequiredException がスローされます。

trigger() メソッドをプログラムで呼び出すことができます(名前 barrier.handler を使用して Bean 参照を取得します。barrier はバリアエンドポイントの Bean 名です)。または、<outbound-channel-adapter/> を設定してリリースをトリガーできます。

同じ相関関係で中断できるスレッドは 1 つだけです。同じ相関を複数回使用できますが、同時に 1 回しか使用できません。2 番目のスレッドが同じ相関で到着すると、例外がスローされます。

次の例は、相関にカスタムヘッダーを使用する方法を示しています。

  • Java

  • XML

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
    BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
    barrier.setOutputChannel(out());
    barrier.setDiscardChannel(lateTriggerChannel);
    return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
    return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
        correlation-strategy-expression="headers['myHeader']"
        output-processor="myOutputProcessor"
        discard-channel="lateTriggerChannel"
        timeout="10000">
</int:barrier>

<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />

どちらが最初にメッセージを到着させるかに応じて、in にメッセージを送信するスレッドまたは release にメッセージを送信するスレッドは、他のメッセージが到着するまで最大 10 秒間待機します。メッセージが解放されると、out チャネルには、myOutputProcessor という名前のカスタム MessageGroupProcessor Bean を呼び出した結果を組み合わせたメッセージが送信されます。メインスレッドがタイムアウトし、トリガーが後で到着した場合、遅延トリガーが送信される破棄チャネルを構成できます。

このコンポーネントの例については、バリアサンプルアプリケーション [GitHub] (英語) を参照してください。