RabbitMQ AMQP 1.0 サポート

バージョン 4.0 では、RabbitMQ での AMQP 1.0 (英語) プロトコルサポート用の spring-rabbitmq-client モジュールが導入されています。

このアーティファクトは com.rabbitmq.client:amqp-client [GitHub] (英語) ライブラリに基づいているため、RabbitMQ とその AMQP 1.0 プロトコルサポートでのみ動作します。任意の AMQP 1.0 ブローカーでは使用できません。そのため、現時点では JMS bridge [Apache] (英語) および対応する Spring JMS との統合が推奨されています。

RabbitMQ AMQP 1.0 サポートと対話できるようにするには、この依存関係をプロジェクトに追加する必要があります。

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbitmq-client</artifactId>
  <version>3.2.8</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbitmq-client:3.2.8'

spring-rabbit (AMQP 0.9.1 プロトコル用)は、この新しいクライアントで共通 API(例外、@RabbitListener サポートなど)を再利用するための推移的な依存関係として提供されます。対象プロジェクトで両方の機能を使用する必要はありませんが、RabbitMQ では AMQP 0.9.1 および 1.0 と AMQP 0.9.1 の両方が共存できます。

For more information about RabbitMQ AMQP 1.0 Java Client see its documentation (英語) .

RabbitMQ AMQP 1.0 環境

com.rabbitmq.client.amqp.Environment は、接続管理やその他の共通設定のためにプロジェクトに最初に追加する必要があるものです。これは、ノードまたはノードクラスターへのエントリポイントです。この環境では接続を作成できます。接続間で共有されるインフラストラクチャ関連の設定(例: スレッドプール、メトリクス、監視)を含めることができます。

@Bean
Environment environment() {
    return new AmqpEnvironmentBuilder()
            .connectionSettings()
            .port(5672)
            .environmentBuilder()
            .build();
}

同じ Environment インスタンスを異なる RabbitMQ ブローカーへの接続に使用できますが、その場合は接続ごとに接続設定を指定する必要があります。詳細は以下を参照してください。

AMQP 接続ファクトリ

org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory 抽象化は com.rabbitmq.client.amqp.Connection を管理するために導入されました。AMQP 0.9.1 プロトコル専用の org.springframework.amqp.rabbit.connection.ConnectionFactory と混同しないでください。SingleAmqpConnectionFactory 実装は、1 つの接続とその設定を管理するために存在します。同じ Connection を、複数のプロデューサー、コンシューマー、管理で共有できます。多重化は、AMQP クライアントライブラリ内部で、AMQP 1.0 プロトコル実装用のリンク抽象化によって処理されます。Connection はリカバリ機能を備え、トポロジも処理します。

ほとんどの場合、この Bean をプロジェクトに追加するだけで十分です。

@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
    return new SingleAmqpConnectionFactory(environment);
}

すべての接続固有の設定については、SingleAmqpConnectionFactory setter を参照してください。

RabbitMQ トポロジ管理

アプリケーションの観点からのトポロジ管理 (交換、キュー、バインディング) については、既存の AmqpAdmin インターフェースの実装である RabbitAmqpAdmin が存在します。

@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpAdmin(connectionFactory);
}

ExchangeQueueBindingDeclarables インスタンスのトポロジ管理には、ブローカーの構成で説明されているのと同じ Bean 定義を使用する必要があります。spring-rabbit の RabbitAdmin でもトポロジ管理は可能ですが、AMQP 0.9.1 接続に対して発生します。RabbitAmqpAdmin は AMQP 1.0 接続をベースとしているため、パブリッシャーとコンシューマーのリカバリと共に、トポロジのリカバリはそこからスムーズに処理されます。

RabbitAmqpAdmin は、start() ライフサイクルコールバックで各 Bean のスキャンを実行します。initialize()、およびその他のすべての RabbitMQ エンティティ管理メソッドは、実行時に手動で呼び出すことができます。RabbitAmqpAdmin は内部的に com.rabbitmq.client.amqp.Connection.management() API を使用して、それぞれのトポロジ操作を実行します。

RabbitAmqpTemplate

RabbitAmqpTemplate は AsyncAmqpTemplate の実装であり、AMQP 1.0 プロトコルを使用して様々な送受信操作を実行します。AmqpConnectionFactory が必要で、いくつかのデフォルト設定で構成できます。com.rabbitmq.client:amqp-client ライブラリには com.rabbitmq.client.amqp.Message が付属していますが、RabbitAmqpTemplate は、MessageProperties や MessageConverter 抽象化などのサポートクラスを含む、よく知られている org.springframework.amqp.core.Message に基づく API を公開しています。com.rabbitmq.client.amqp.Message との変換は、RabbitAmqpTemplate 内部で行われます。すべてのメソッドは、最終的に操作結果を取得するために CompletableFuture を返します。プレーンオブジェクトを使用した操作ではメッセージ本文の変換が必要であり、デフォルトでは SimpleMessageConverter が使用されます。変換の詳細については、メッセージコンバーターを参照してください。

通常、次のような Bean を 1 つ使用するだけで、すべての可能なテンプレートパターン操作を実行できます。

@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpTemplate(connectionFactory);
}

デフォルトの交換キーとルーティングキー、またはキューのみを設定することができます。RabbitAmqpTemplate には、受信操作用のデフォルトキューと、リクエスト / 応答操作用のデフォルトキューがあります。これらのキューが存在しない場合は、クライアントからのリクエスト用に一時的なキューが作成されます。

RabbitAmqpTemplate 操作のサンプルをいくつか示します。

@Bean
DirectExchange e1() {
    return new DirectExchange("e1");
}

@Bean
Queue q1() {
    return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}

@Bean
Binding b1() {
    return BindingBuilder.bind(q1()).to(e1()).with("k1");
}

...

@Test
void defaultExchangeAndRoutingKey() {
    this.rabbitAmqpTemplate.setExchange("e1");
    this.rabbitAmqpTemplate.setRoutingKey("k1");
	this.rabbitAmqpTemplate.setReceiveQueue("q1");

    assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
            .succeedsWithin(Duration.ofSeconds(10));

    assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
            .succeedsWithin(Duration.ofSeconds(10))
            .isEqualTo("test1");
}

Here we declared an e1 exchange, q1 queue and bind it into that exchange with a k1 routing key. Then we use a default setting for RabbitAmqpTemplate to publish messages to the mentioned exchange with the respective routing key and use q1 as default queue for receiving operations. There are overloaded variants for those methods to send to specific exchange or queue (for send and receive). The receiveAndConvert() operations with a ParameterizedTypeReference<T> requires a SmartMessageConverter to be injected into the RabbitAmqpTemplate.

The next example demonstrate and RPC implementation with RabbitAmqpTemplate (assuming same RabbitMQ objects as in the previous example):

@Test
void verifyRpc() {
    String testRequest = "rpc-request";
    String testReply = "rpc-reply";

    CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);

    AtomicReference<String> receivedRequest = new AtomicReference<>();
    CompletableFuture<Boolean> rpcServerResult =
            this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
                     payload -> {
                         receivedRequest.set(payload);
                         return testReply;
                     });

    assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
    assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
    assertThat(receivedRequest.get()).isEqualTo(testRequest);
}

The correlation and replyTo queue are managed internally. The server side can be implemented with a @RabbitListener POJO method described below.

The RabbitMQ AMQP 1.0 Consumer

As with many other messaging implementations for consumer side, the spring-rabbitmq-client modules comes with the RabbitAmqpListenerContainer which is, essentially, an implementation of well-know MessageListenerContainer. It does exactly the same as DirectMessageListenerContainer, but for RabbitMQ AMQP 1.0 support. Requires an AmqpConnectionFactory and at least one queue to consume from. Also, the MessageListener (or AMQP 1.0 specific RabbitAmqpMessageListener) must be provided. Can be configured with an autoSettle = false, with the meaning of AcknowledgeMode.MANUAL. In that case, the Message provided to the MessageListener has in its MessageProperties an AmqpAcknowledgment callback for target logic consideration.

The RabbitAmqpMessageListener has a contract for com.rabbitmq.client:amqp-client abstractions:

/**
 * Process an AMQP message.
 * @param message the message to process.
 * @param context the consumer context to settle message.
 *                Null if container is configured for {@code autoSettle}.
 */
void onAmqpMessage(Message message, Consumer.Context context);

Where the first argument is a native received com.rabbitmq.client.amqp.Message and context is a native callback for message settlement, similar to the mentioned above AmqpAcknowledgment abstraction.

The RabbitAmqpMessageListener can handle and settle messages in batches when batchSize option is provided. For this purpose the MessageListener.onMessageBatch() contract must be implemented. The batchReceiveDuration option is used to schedule a force release for not full batches to avoid memory and consumer credits (英語) exhausting.

Usually, the RabbitAmqpMessageListener class is not used directly in the target project, and POJO method annotation configuration via @RabbitListener is chosen for declarative consumer configuration. The RabbitAmqpListenerContainerFactory must be registered under the RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME, and @RabbitListener annotation process will register RabbitAmqpMessageListener instance into the RabbitListenerEndpointRegistry. The target POJO method invocation is handled by specific RabbitAmqpMessageListenerAdapter implementation, which extends a MessagingMessageListenerAdapter and reuses a lot of its functionality, including request-reply scenarios (async or not). So, all the concepts described in the アノテーション駆動型のリスナーエンドポイント are applied with this RabbitAmqpMessageListener as well.

In addition to traditional messaging payload and headers, the @RabbitListener POJO method contract can be with these parameters:

  • com.rabbitmq.client.amqp.Message - the native AMQP 1.0 message without any conversions;

  • org.springframework.amqp.core.Message - Spring AMQP message abstraction as conversion result from the native AMQP 1.0 message;

  • org.springframework.messaging.Message - Spring Messaging abstraction as conversion result from the Spring AMQP message;

  • Consumer.Context - RabbitMQ AMQP client consumer settlement API;

  • org.springframework.amqp.core.AmqpAcknowledgment - Spring AMQP 確認応答の抽象化: delegates to the Consumer.Context.

The following example demonstrates a simple @RabbitListener for RabbitMQ AMQP 1.0 interaction with the manual settlement:

@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpListenerContainerFactory(connectionFactory);
}

final List<String> received = Collections.synchronizedList(new ArrayList<>());

CountDownLatch consumeIsDone = new CountDownLatch(11);

@RabbitListener(queues = {"q1", "q2"},
        ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
        concurrency = "2",
        id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
    try {
        if ("discard".equals(data)) {
            if (!this.received.contains(data)) {
                context.discard();
            }
            else {
                throw new MessageConversionException("Test message is rejected");
            }
        }
        else if ("requeue".equals(data) && !this.received.contains(data)) {
            acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
        }
        else {
            acknowledgment.acknowledge();
        }
        this.received.add(data);
    }
    finally {
        this.consumeIsDone.countDown();
    }
}