RabbitMQ ストリームプラグインの使用
バージョン 2.4 では、RabbitMQ ストリームプラグイン (英語) に対する RabbitMQ ストリームプラグイン Java クライアント [GitHub] (英語) の初期サポートが導入されています。
RabbitStreamTemplate
StreamListenerContainer
プロジェクトに spring-rabbit-stream
依存関係を追加します。
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
<version>3.1.6</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbit-stream:3.1.6'
RabbitAdmin
Bean を使用し、QueueBuilder.stream()
メソッドを使用してキュー型を指定することで、通常どおりキューをプロビジョニングできます。例:
@Bean
Queue stream() {
return QueueBuilder.durable("stream.queue1")
.stream()
.build();
}
ただし、AMQP 接続が開かれるときに管理者がトリガーされて定義された Bean を宣言するため、これは非ストリームコンポーネント ( SimpleMessageListenerContainer
や DirectMessageListenerContainer
など) も使用している場合にのみ機能します。アプリケーションがストリームコンポーネントのみを使用する場合、または高度なストリーム構成機能を使用したい場合は、代わりに StreamAdmin
を構成する必要があります。
@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, sc -> {
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
sc.stream("stream.queue2").create();
});
}
StreamCreator
の詳細については、RabbitMQ のドキュメントを参照してください。
メッセージ送信
RabbitStreamTemplate
は、RabbitTemplate
(AMQP) 機能のサブセットを提供します。
public interface RabbitStreamOperations extends AutoCloseable {
CompletableFuture<Boolean> send(Message message);
CompletableFuture<Boolean> convertAndSend(Object message);
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
RabbitStreamTemplate
実装には、次のコンストラクターとプロパティがあります。
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
MessageConverter
は convertAndSend
メソッドで使用され、オブジェクトを Spring AMQP Message
に変換します。
StreamMessageConverter
は、Spring AMQP Message
からネイティブストリーム Message
に変換するために使用されます。
ネイティブストリーム Message
を直接送信することもできます。Producer
のメッセージビルダへのアクセスを提供する messageBuilder()
メソッドを使用します。
ProducerCustomizer
は、ビルド前にプロデューサーをカスタマイズするメカニズムを提供します。
Environment
および Producer
のカスタマイズについては、Java クライアントのドキュメント (英語) を参照してください。
バージョン 3.0 以降、メソッドの戻り値の型は ListenableFuture ではなく CompletableFuture です。 |
メッセージの受信
非同期メッセージ受信は、StreamListenerContainer
(および @RabbitListener
を使用する場合は StreamRabbitListenerContainerFactory
) によって提供されます。
リスナーコンテナーには、Environment
と単一のストリーム名が必要です。
従来の MessageListener
を使用して Spring AMQP Message
を受け取るか、新しいインターフェースを使用してネイティブストリーム Message
を受け取ることができます。
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
サポートされているプロパティについては、メッセージリスナーコンテナーの設定を参照してください。
テンプレートと同様に、コンテナーには ConsumerCustomizer
プロパティがあります。
Environment
および Consumer
のカスタマイズについては、Java クライアントのドキュメント (英語) を参照してください。
@RabbitListener
を使用する場合は、StreamRabbitListenerContainerFactory
を構成します。現時点では、ほとんどの @RabbitListener
プロパティ (concurrency
など) は無視されます。id
、queues
、autoStartup
、containerFactory
のみがサポートされています。さらに、queues
にはストリーム名を 1 つしか含めることができません。
サンプル
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue1")
.stream()
.build();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue2")
.stream()
.build();
}
バージョン 2.4.5 では、adviceChain
プロパティが StreamListenerContainer
(およびそのファクトリ) に追加されました。生のストリームメッセージを消費するときに使用するオプションの StreamMessageRecoverer
を使用して、ステートレスな再試行インターセプターを作成するために、新しいファクトリ Bean も提供されます。
@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
このコンテナーでは、ステートフルな再試行はサポートされていません。 |
スーパーストリーム
スーパーストリームは、分割されたストリームの抽象的な概念であり、引数 x-super-stream: true
を持つ交換に多数のストリームキューをバインドすることによって実装されます。
プロビジョニング
便宜上、型 SuperStream
の単一の Bean を定義することによって、スーパーストリームをプロビジョニングできます。
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
RabbitAdmin
はこの Bean を検出し、交換 (my.super.stream
) と 3 つのキュー (パーティション) - my.super-stream-n
を宣言します。ここで、n
は 0
、1
、2
であり、n
に等しいルーティングキーでバインドされます。
また、AMQP を介して取引所に公開したい場合は、カスタムルーティングキーを提供できます。
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
キーの数は、パーティションの数と同じでなければなりません。
SuperStream へのプロデュース
RabbitStreamTemplate
に superStreamRoutingFunction
を追加する必要があります。
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
RabbitTemplate
を使用して、AMQP 経由で公開することもできます。
単一のアクティブコンシューマーによるスーパーストリームの消費
リスナーコンテナーで superStream
メソッドを呼び出して、スーパーストリームで単一のアクティブなコンシューマーを有効にします。
@Bean
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
この時点で、同時実行数が 1 より大きい場合、実際の同時実行数は Environment によってさらに制御されます。完全な並行性を実現するには、環境の maxConsumersByConnection を 1 に設定します。環境の構成 (英語) を参照してください。 |
Micrometer Observation
バージョン 3.0.5 以降、RabbitStreamTemplate
およびストリームリスナーコンテナーに対して、監視に Micrometer を使用することがサポートされるようになりました。コンテナーは、Micrometer タイマーもサポートするようになりました (監視が有効になっていない場合)。
各コンポーネントに observationEnabled
を設定して観察を有効にします。これにより、各観測ごとにタイマーが管理されるため、Micrometer タイマーが無効になります。アノテーション付きリスナーを使用する場合は、コンテナーファクトリで observationEnabled
を設定します。
詳細については、Micrometer トレース (英語) を参照してください。
タイマー / トレースにタグを追加するには、カスタム RabbitStreamTemplateObservationConvention
または RabbitStreamListenerObservationConvention
をテンプレートまたはリスナーコンテナーにそれぞれ構成します。
デフォルトの実装では、テンプレートの監視用に name
タグが追加され、コンテナー用に listener.id
タグが追加されます。
DefaultRabbitStreamTemplateObservationConvention
または DefaultStreamRabbitListenerObservationConvention
をサブクラス化するか、まったく新しい実装を提供できます。
詳細については、Micrometer 観測資料を参照してください。