RabbitMQ ストリームプラグインの初期プロデューサーサポート

RabbitMQ ストリームプラグイン (英語) の基本的なサポートが提供されるようになりました。この機能を有効にするには、spring-rabbit-stream jar をクラスパスに追加する必要があります。これは spring-amqp および spring-rabbit と同じバージョンである必要があります。

上記のプロデューサープロパティは、producerType プロパティを STREAM_SYNC または STREAM_ASYNC に設定した場合はサポートされません。

ストリーム ProducerType を使用するようにバインダーを構成するために、Spring Boot はアプリケーションのプロパティから Environment@Bean を構成します。オプションで、カスタマイザーを追加して、メッセージハンドラーをカスタマイズできます。

@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
    return (hand, dest) -> {
        RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
        handler.setConfirmTimeout(5000);
        ((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
                (name, builder) -> {
                    ...
                });
    };
}

環境およびプロデューサービルダーの構成については、RabbitMQStreamJava クライアントのドキュメント (英語) を参照してください。

RabbitMQ スーパーストリームのプロデューサーサポート

スーパーストリームについては、スーパーストリーム (英語) を参照してください。

スーパーストリームを使用すると、スーパーストリームの各パーティションで 1 つのアクティブなコンシューマーを使用して、自動的にスケールアップ / スケールダウンできます。Spring Cloud Stream を使用すると、AMQP またはストリームクライアントを使用してスーパーストリームに公開できます。

スーパーストリームはすでに存在している必要があります。スーパーストリームの作成は、プロデューサーバーインディングではサポートされていません。

AMQP を介したスーパーストリームへのパブリッシュ:

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

ストリームクライアントを使用してスーパーストリームに公開する:

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

ストリームクライアントを使用する場合、confirmAckChannel を設定すると、正常に送信されたメッセージのコピーがそのチャネルに送信されます。