スレッドバリア
他の非同期イベントが発生するまで、メッセージフロースレッドを中断する必要がある場合があります。例: メッセージを RabbitMQ に発行する HTTP リクエストを検討します。RabbitMQ ブローカーがメッセージを受信したという確認を発行するまで、ユーザーに返信したくない場合があります。
バージョン 4.2 では、Spring Integration はこの目的のために <barrier/>
コンポーネントを導入しました。基になる MessageHandler
は BarrierMessageHandler
です。このクラスは MessageTriggerAction
も実装します。trigger()
メソッドに渡されたメッセージは、handleRequestMessage()
メソッドの対応するスレッドを解放します(存在する場合)。
中断されたスレッドとトリガースレッドは、メッセージで CorrelationStrategy
を呼び出すことにより関連付けられます。メッセージが input-channel
に送信されると、スレッドは最大 requestTimeout
ミリ秒中断され、対応するトリガーメッセージを待機します。デフォルトの相関戦略では、IntegrationMessageHeaderAccessor.CORRELATION_ID
ヘッダーを使用します。トリガーメッセージが同じ相関で到着すると、スレッドは解放されます。リリース後に output-channel
に送信されるメッセージは、MessageGroupProcessor
を使用して構築されます。デフォルトでは、メッセージは 2 つのペイロードの Collection<?>
であり、ヘッダーは DefaultAggregatingMessageGroupProcessor
を使用してマージされます。
trigger() メソッドが最初に呼び出された場合(またはメインスレッドがタイムアウトした後)、中断メッセージが到着するのを待つ triggerTimeout まで待機します。トリガースレッドを一時停止したくない場合は、代わりに TaskExecutor にハンドオフして、そのスレッドを一時停止することを検討してください。 |
以前のバージョンの 5.4 では、リクエストメッセージとトリガーメッセージの両方に timeout オプションが 1 つしかありませんでしたが、一部のユースケースでは、これらのアクションに異なるタイムアウトを設定することをお勧めします。そのため、requestTimeout および triggerTimeout オプションが導入されました。 |
requires-reply
プロパティは、トリガーメッセージが到着する前に中断されたスレッドがタイムアウトした場合に実行するアクションを決定します。デフォルトでは、false
です。これは、エンドポイントが null
を返し、フローが終了し、スレッドが呼び出し元に戻ることを意味します。true
の場合、ReplyRequiredException
がスローされます。
trigger()
メソッドをプログラムで呼び出すことができます(名前 barrier.handler
を使用して Bean 参照を取得します。barrier
はバリアエンドポイントの Bean 名です)。または、<outbound-channel-adapter/>
を設定してリリースをトリガーできます。
同じ相関関係で中断できるスレッドは 1 つだけです。同じ相関を複数回使用できますが、同時に 1 回しか使用できません。2 番目のスレッドが同じ相関で到着すると、例外がスローされます。 |
次の例は、相関にカスタムヘッダーを使用する方法を示しています。
Java
XML
@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}
@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
<int:barrier id="barrier1" input-channel="in" output-channel="out"
correlation-strategy-expression="headers['myHeader']"
output-processor="myOutputProcessor"
discard-channel="lateTriggerChannel"
timeout="10000">
</int:barrier>
<int:outbound-channel-adapter channel="release" ref="barrier1.handler" method="trigger" />
どちらにメッセージが最初に到着したかによって、in
にメッセージを送信するスレッドまたは release
にメッセージを送信するスレッドのいずれかが、もう一方のメッセージが到着するまで最大 10 秒間待機します。メッセージが解放されると、out
チャネルに、カスタム MessageGroupProcessor
Bean を呼び出した結果を組み合わせた myOutputProcessor
というメッセージが送信されます。メインスレッドがタイムアウトし、トリガーが後で到着した場合、遅れたトリガーが送信される破棄チャネルを構成できます。リクエストメッセージが時間内に到着しない場合、トリガーメッセージも破棄されます。
このコンポーネントの例については、バリアサンプルアプリケーション [GitHub] (英語) を参照してください。