最新の安定バージョンについては、spring-cloud-stream 4.2.1 を使用してください。

RabbitMQ ストリームプラグインの初期コンシューマーサポート

RabbitMQ ストリームプラグイン (英語) の基本的なサポートが提供されるようになりました。この機能を有効にするには、spring-rabbit-stream jar をクラスパスに追加する必要があります。これは spring-amqp および spring-rabbit と同じバージョンである必要があります。

containerType プロパティを stream に設定した場合、上記のコンシューマープロパティはサポートされません。concurrency は、スーパーストリームでのみサポートされます。各バインドで使用できるストリームキューは 1 つだけです。

containerType=stream を使用するようにバインダーを構成するために、Spring Boot はアプリケーションのプロパティから Environment @Bean を自動的に構成します。オプションで、カスタマイザーを追加して、リスナーコンテナーをカスタマイズできます。

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
    return (cont, dest, group) -> {
        StreamListenerContainer container = (StreamListenerContainer) cont;
        container.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first());
        });
        // ...
    };
}

カスタマイザーに渡される name 引数は destination + '.' + group + '.container' です。

ストリーム name() (オフセット追跡の目的で)は、バインディング destination + '.' + group に設定されます。上記の ConsumerCustomizer を使用して変更できます。手動オフセット追跡を使用する場合は、Context をメッセージヘッダーとして使用できます。

int count;

@Bean
public Consumer<Message<?>> input() {
    return msg -> {
        System.out.println(msg);
        if (++count % 1000 == 0) {
            Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
            context.consumer().store(context.offset());
        }
    };
}

環境およびコンシューマービルダーの構成については、RabbitMQStreamJava クライアントのドキュメント (英語) を参照してください。

RabbitMQ Super Streams のコンシューマーサポート

スーパーストリームについては、スーパーストリーム (英語) を参照してください。

スーパーストリームを使用すると、スーパーストリームの各パーティションで 1 つのアクティブなコンシューマーを使用して、自動的にスケールアップ / スケールダウンできます。

構成例:

@Bean
public Consumer<Thing> input() {
    ...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true

フレームワークは、9 つのパーティションを持つ super という名前のスーパーストリームを作成します。このアプリケーションの最大 3 つのインスタンスをデプロイできます。