厳密なメッセージ順序

このセクションでは、受信および送信メッセージのメッセージ順序について説明します。

受信

受信メッセージの厳密な順序付けが必要な場合は、受信リスナーコンテナーの prefetchCount プロパティを 1 に設定する必要があります。これは、メッセージが失敗して再配信された場合、既存のプリフェッチされたメッセージの後に到着するためです。Spring AMQP バージョン 2.0 以降、prefetchCount はパフォーマンスを向上させるために 250 にデフォルト設定されています。厳密なオーダー要件には、パフォーマンスの低下という代償が伴います。

送信

次の統合フローを検討してください。

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

メッセージ ABC をゲートウェイに送信するとします。メッセージ ABC が順番に送信される可能性は高いですが、保証はありません。これは、テンプレートが送信操作ごとにキャッシュからチャネルを「借用」し、各メッセージに同じチャネルが使用されるという保証がないためです。解決策の 1 つは、スプリッターの前にトランザクションを開始することですが、RabbitMQ ではトランザクションのコストが高く、パフォーマンスが数百分の 1 に低下する可能性があります。

この問題をより効率的な方法で解決するために、バージョン 5.1 以降、Spring Integration は HandleMessageAdvice である BoundRabbitChannelAdvice を提供しています。メッセージアドバイスの処理を参照してください。スプリッターの前に適用すると、すべてのダウンストリーム操作が同じチャネルで実行され、オプションで、すべての送信メッセージのパブリッシャー確認が受信されるまで待機できます(接続ファクトリが確認用に構成されている場合)。次の例は、BoundRabbitChannelAdvice の使用方法を示しています。

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .splitWith(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

同じ RabbitTemplate (RabbitOperations を実装)がアドバイスと送信アダプターで使用されていることに注意してください。アドバイスは、すべての操作が同じチャネルで実行されるように、テンプレートの invoke メソッド内でダウンストリームフローを実行します。オプションのタイムアウトが提供されている場合、フローが完了すると、アドバイスは waitForConfirmsOrDie メソッドを呼び出します。waitForConfirmsOrDie メソッドは、指定された時間内に確認が受信されない場合に例外をスローします。

ダウンストリームフロー (QueueChannelExecutorChannel など) ではスレッドのハンドオフがあってはなりません。