厳密なメッセージ順序
このセクションでは、受信および送信メッセージのメッセージ順序について説明します。
受信
受信メッセージの厳密な順序付けが必要な場合は、受信リスナーコンテナーの prefetchCount
プロパティを 1
に設定する必要があります。これは、メッセージが失敗して再配信された場合、既存のプリフェッチされたメッセージの後に到着するためです。Spring AMQP バージョン 2.0 以降、prefetchCount
はパフォーマンスを向上させるために 250
にデフォルト設定されています。厳密なオーダー要件には、パフォーマンスの低下という代償が伴います。
送信
次の統合フローを検討してください。
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
メッセージ A
、B
、C
をゲートウェイに送信するとします。メッセージ A
、B
、C
が順番に送信される可能性は高いですが、保証はありません。これは、テンプレートが送信操作ごとにキャッシュからチャネルを「借用」し、各メッセージに同じチャネルが使用されるという保証がないためです。解決策の 1 つは、スプリッターの前にトランザクションを開始することですが、RabbitMQ ではトランザクションのコストが高く、パフォーマンスが数百分の 1 に低下する可能性があります。
この問題をより効率的な方法で解決するために、バージョン 5.1 以降、Spring Integration は HandleMessageAdvice
である BoundRabbitChannelAdvice
を提供しています。メッセージアドバイスの処理を参照してください。スプリッターの前に適用すると、すべてのダウンストリーム操作が同じチャネルで実行され、オプションで、すべての送信メッセージのパブリッシャー確認が受信されるまで待機できます(接続ファクトリが確認用に構成されている場合)。次の例は、BoundRabbitChannelAdvice
の使用方法を示しています。
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
同じ RabbitTemplate
(RabbitOperations
を実装)がアドバイスと送信アダプターで使用されていることに注意してください。アドバイスは、すべての操作が同じチャネルで実行されるように、テンプレートの invoke
メソッド内でダウンストリームフローを実行します。オプションのタイムアウトが提供されている場合、フローが完了すると、アドバイスは waitForConfirmsOrDie
メソッドを呼び出します。waitForConfirmsOrDie
メソッドは、指定された時間内に確認が受信されない場合に例外をスローします。
ダウンストリームフロー (QueueChannel 、ExecutorChannel など) ではスレッドのハンドオフがあってはなりません。 |