RabbitMQ ストリームプラグインの使用

バージョン 2.4 では、RabbitMQ ストリームプラグイン (英語) に対する RabbitMQ ストリームプラグイン Java クライアント [GitHub] (英語) の初期サポートが導入されています。

  • RabbitStreamTemplate

  • StreamListenerContainer

プロジェクトに spring-rabbit-stream 依存関係を追加します。

maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>3.1.6</version>
</dependency>
gradle
compile 'org.springframework.amqp:spring-rabbit-stream:3.1.6'

RabbitAdmin Bean を使用し、QueueBuilder.stream() メソッドを使用してキュー型を指定することで、通常どおりキューをプロビジョニングできます。例:

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

ただし、AMQP 接続が開かれるときに管理者がトリガーされて定義された Bean を宣言するため、これは非ストリームコンポーネント ( SimpleMessageListenerContainer や DirectMessageListenerContainer など) も使用している場合にのみ機能します。アプリケーションがストリームコンポーネントのみを使用する場合、または高度なストリーム構成機能を使用したい場合は、代わりに StreamAdmin を構成する必要があります。

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

StreamCreator の詳細については、RabbitMQ のドキュメントを参照してください。

メッセージ送信

RabbitStreamTemplate は、RabbitTemplate (AMQP) 機能のサブセットを提供します。

RabbitStreamOperations
public interface RabbitStreamOperations extends AutoCloseable {

	CompletableFuture<Boolean> send(Message message);

	CompletableFuture<Boolean> convertAndSend(Object message);

	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

	MessageBuilder messageBuilder();

	MessageConverter messageConverter();

	StreamMessageConverter streamMessageConverter();

	@Override
	void close() throws AmqpException;

}

RabbitStreamTemplate 実装には、次のコンストラクターとプロパティがあります。

RabbitStreamTemplate
public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverter は convertAndSend メソッドで使用され、オブジェクトを Spring AMQP Message に変換します。

StreamMessageConverter は、Spring AMQP Message からネイティブストリーム Message に変換するために使用されます。

ネイティブストリーム Message を直接送信することもできます。Producer のメッセージビルダへのアクセスを提供する messageBuilder() メソッドを使用します。

ProducerCustomizer は、ビルド前にプロデューサーをカスタマイズするメカニズムを提供します。

Environment および Producer のカスタマイズについては、Java クライアントのドキュメント (英語) を参照してください。

バージョン 3.0 以降、メソッドの戻り値の型は ListenableFuture ではなく CompletableFuture です。

メッセージの受信

非同期メッセージ受信は、StreamListenerContainer (および @RabbitListener を使用する場合は StreamRabbitListenerContainerFactory ) によって提供されます。

リスナーコンテナーには、Environment と単一のストリーム名が必要です。

従来の MessageListener を使用して Spring AMQP Message を受け取るか、新しいインターフェースを使用してネイティブストリーム Message を受け取ることができます。

public interface StreamMessageListener extends MessageListener {

	void onStreamMessage(Message message, Context context);

}

サポートされているプロパティについては、メッセージリスナーコンテナーの設定を参照してください。

テンプレートと同様に、コンテナーには ConsumerCustomizer プロパティがあります。

Environment および Consumer のカスタマイズについては、Java クライアントのドキュメント (英語) を参照してください。

@RabbitListener を使用する場合は、StreamRabbitListenerContainerFactory を構成します。現時点では、ほとんどの @RabbitListener プロパティ (concurrency など) は無視されます。idqueuesautoStartupcontainerFactory のみがサポートされています。さらに、queues にはストリーム名を 1 つしか含めることができません。

サンプル

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

バージョン 2.4.5 では、adviceChain プロパティが StreamListenerContainer (およびそのファクトリ) に追加されました。生のストリームメッセージを消費するときに使用するオプションの StreamMessageRecoverer を使用して、ステートレスな再試行インターセプターを作成するために、新しいファクトリ Bean も提供されます。

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}
このコンテナーでは、ステートフルな再試行はサポートされていません。

スーパーストリーム

スーパーストリームは、分割されたストリームの抽象的な概念であり、引数 x-super-stream: true を持つ交換に多数のストリームキューをバインドすることによって実装されます。

プロビジョニング

便宜上、型 SuperStream の単一の Bean を定義することによって、スーパーストリームをプロビジョニングできます。

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

RabbitAdmin はこの Bean を検出し、交換 (my.super.stream) と 3 つのキュー (パーティション) - my.super-stream-n を宣言します。ここで、n は 012 であり、n に等しいルーティングキーでバインドされます。

また、AMQP を介して取引所に公開したい場合は、カスタムルーティングキーを提供できます。

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

キーの数は、パーティションの数と同じでなければなりません。

SuperStream へのプロデュース

RabbitStreamTemplate に superStreamRoutingFunction を追加する必要があります。

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

RabbitTemplate を使用して、AMQP 経由で公開することもできます。

単一のアクティブコンシューマーによるスーパーストリームの消費

リスナーコンテナーで superStream メソッドを呼び出して、スーパーストリームで単一のアクティブなコンシューマーを有効にします。

@Bean
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}
この時点で、同時実行数が 1 より大きい場合、実際の同時実行数は Environment によってさらに制御されます。完全な並行性を実現するには、環境の maxConsumersByConnection を 1 に設定します。環境の構成 (英語) を参照してください。

Micrometer Observation

バージョン 3.0.5 以降、RabbitStreamTemplate およびストリームリスナーコンテナーに対して、監視に Micrometer を使用することがサポートされるようになりました。コンテナーは、Micrometer タイマーもサポートするようになりました (監視が有効になっていない場合)。

各コンポーネントに observationEnabled を設定して観察を有効にします。これにより、各観測ごとにタイマーが管理されるため、Micrometer タイマーが無効になります。アノテーション付きリスナーを使用する場合は、コンテナーファクトリで observationEnabled を設定します。

詳細については、Micrometer トレース (英語) を参照してください。

タイマー / トレースにタグを追加するには、カスタム RabbitStreamTemplateObservationConvention または RabbitStreamListenerObservationConvention をテンプレートまたはリスナーコンテナーにそれぞれ構成します。

デフォルトの実装では、テンプレートの監視用に name タグが追加され、コンテナー用に listener.id タグが追加されます。

DefaultRabbitStreamTemplateObservationConvention または DefaultStreamRabbitListenerObservationConvention をサブクラス化するか、まったく新しい実装を提供できます。

詳細については、Micrometer 観測資料を参照してください。