最新の安定バージョンについては、spring-cloud-stream 4.2.1 を使用してください。 |
RabbitMQ ストリームプラグインの初期コンシューマーサポート
RabbitMQ ストリームプラグイン (英語) の基本的なサポートが提供されるようになりました。この機能を有効にするには、spring-rabbit-stream
jar をクラスパスに追加する必要があります。これは spring-amqp
および spring-rabbit
と同じバージョンである必要があります。
containerType プロパティを stream に設定した場合、上記のコンシューマープロパティはサポートされません。concurrency は、スーパーストリームでのみサポートされます。各バインドで使用できるストリームキューは 1 つだけです。 |
containerType=stream
を使用するようにバインダーを構成するために、Spring Boot はアプリケーションのプロパティから Environment
@Bean
を自動的に構成します。オプションで、カスタマイザーを追加して、リスナーコンテナーをカスタマイズできます。
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
カスタマイザーに渡される name
引数は destination + '.' + group + '.container'
です。
ストリーム name()
(オフセット追跡の目的で)は、バインディング destination + '.' + group
に設定されます。上記の ConsumerCustomizer
を使用して変更できます。手動オフセット追跡を使用する場合は、Context
をメッセージヘッダーとして使用できます。
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
環境およびコンシューマービルダーの構成については、RabbitMQStreamJava クライアントのドキュメント (英語) を参照してください。
RabbitMQ Super Streams のコンシューマーサポート
スーパーストリームについては、スーパーストリーム (英語) を参照してください。
スーパーストリームを使用すると、スーパーストリームの各パーティションで 1 つのアクティブなコンシューマーを使用して、自動的にスケールアップ / スケールダウンできます。
構成例:
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
フレームワークは、9 つのパーティションを持つ super
という名前のスーパーストリームを作成します。このアプリケーションの最大 3 つのインスタンスをデプロイできます。