厳密なメッセージ順序
このセクションでは、受信および送信メッセージのメッセージ順序について説明します。
受信
受信メッセージの厳密な順序付けが必要な場合は、受信リスナーコンテナーの prefetchCount プロパティを 1 に設定する必要があります。これは、メッセージが失敗して再配信された場合、既存のプリフェッチされたメッセージの後に到着するためです。Spring AMQP バージョン 2.0 以降、prefetchCount はパフォーマンスを向上させるために 250 にデフォルト設定されています。厳密なオーダー要件には、パフォーマンスの低下という代償が伴います。
送信
次の統合フローを検討してください。
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
} メッセージ A、B、C をゲートウェイに送信するとします。メッセージ A、B、C が順番に送信される可能性は高いですが、保証はありません。これは、テンプレートが送信操作ごとにキャッシュからチャネルを「借用」し、各メッセージに同じチャネルが使用されるという保証がないためです。解決策の 1 つは、スプリッターの前にトランザクションを開始することですが、RabbitMQ ではトランザクションのコストが高く、パフォーマンスが数百分の 1 に低下する可能性があります。
この問題をより効率的な方法で解決するために、バージョン 5.1 以降、Spring Integration は HandleMessageAdvice である BoundRabbitChannelAdvice を提供しています。メッセージアドバイスの処理を参照してください。スプリッターの前に適用すると、すべてのダウンストリーム操作が同じチャネルで実行され、オプションで、すべての送信メッセージのパブリッシャー確認が受信されるまで待機できます(接続ファクトリが確認用に構成されている場合)。次の例は、BoundRabbitChannelAdvice の使用方法を示しています。
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
} 同じ RabbitTemplate (RabbitOperations を実装)がアドバイスと送信アダプターで使用されていることに注意してください。アドバイスは、すべての操作が同じチャネルで実行されるように、テンプレートの invoke メソッド内でダウンストリームフローを実行します。オプションのタイムアウトが提供されている場合、フローが完了すると、アドバイスは waitForConfirmsOrDie メソッドを呼び出します。waitForConfirmsOrDie メソッドは、指定された時間内に確認が受信されない場合に例外をスローします。
ダウンストリームフロー (QueueChannel、ExecutorChannel など) ではスレッドのハンドオフがあってはなりません。 |