RabbitMQ AMQP 1.0 サポート

バージョン 4.0 では、RabbitMQ での AMQP 1.0 (英語) プロトコルサポート用の spring-rabbitmq-client モジュールが導入されています。

このアーティファクトは com.rabbitmq.client:amqp-client [GitHub] (英語) ライブラリに基づいているため、RabbitMQ とその AMQP 1.0 プロトコルサポートでのみ動作します。任意の AMQP 1.0 ブローカーでは使用できません。そのため、現時点では JMS ブリッジ [Apache] (英語) および対応する Spring JMS との統合が推奨されています。

RabbitMQ AMQP 1.0 サポートと対話できるようにするには、この依存関係をプロジェクトに追加する必要があります。

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbitmq-client</artifactId>
  <version>4.0.1</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbitmq-client:4.0.1'

spring-rabbit (AMQP 0.9.1 プロトコル用)は、この新しいクライアントで共通 API(例外、@RabbitListener サポートなど)を再利用するための推移的な依存関係として提供されます。対象プロジェクトで両方の機能を使用する必要はありませんが、RabbitMQ では AMQP 0.9.1 および 1.0 と AMQP 0.9.1 の両方が共存できます。

RabbitMQ AMQP 1.0 Java クライアントの詳細については、ドキュメント (英語) を参照してください。

RabbitMQ AMQP 1.0 環境

com.rabbitmq.client.amqp.Environment は、接続管理やその他の共通設定のためにプロジェクトに最初に追加する必要があるものです。これは、ノードまたはノードクラスターへのエントリポイントです。この環境では接続を作成できます。接続間で共有されるインフラストラクチャ関連の設定(例: スレッドプール、メトリクス、監視)を含めることができます。

@Bean
Environment environment() {
    return new AmqpEnvironmentBuilder()
            .connectionSettings()
            .port(5672)
            .environmentBuilder()
            .build();
}

同じ Environment インスタンスを異なる RabbitMQ ブローカーへの接続に使用できますが、その場合は接続ごとに接続設定を指定する必要があります。詳細は以下を参照してください。

AMQP 接続ファクトリ

org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory 抽象化は com.rabbitmq.client.amqp.Connection を管理するために導入されました。AMQP 0.9.1 プロトコル専用の org.springframework.amqp.rabbit.connection.ConnectionFactory と混同しないでください。SingleAmqpConnectionFactory 実装は、1 つの接続とその設定を管理するために存在します。同じ Connection を、複数のプロデューサー、コンシューマー、管理で共有できます。多重化は、AMQP クライアントライブラリ内部で、AMQP 1.0 プロトコル実装用のリンク抽象化によって処理されます。Connection はリカバリ機能を備え、トポロジも処理します。

ほとんどの場合、この Bean をプロジェクトに追加するだけで十分です。

@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
    return new SingleAmqpConnectionFactory(environment);
}

すべての接続固有の設定については、SingleAmqpConnectionFactory setter を参照してください。

RabbitMQ トポロジ管理

アプリケーションの観点からのトポロジ管理 (交換、キュー、バインディング) については、既存の AmqpAdmin インターフェースの実装である RabbitAmqpAdmin が存在します。

@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpAdmin(connectionFactory);
}

ExchangeQueueBindingDeclarables インスタンスのトポロジ管理には、ブローカーの構成で説明されているのと同じ Bean 定義を使用する必要があります。spring-rabbit の RabbitAdmin でもトポロジ管理は可能ですが、AMQP 0.9.1 接続に対して発生します。RabbitAmqpAdmin は AMQP 1.0 接続をベースとしているため、パブリッシャーとコンシューマーのリカバリと共に、トポロジのリカバリはそこからスムーズに処理されます。

RabbitAmqpAdmin は、start() ライフサイクルコールバックで各 Bean のスキャンを実行します。initialize()、およびその他のすべての RabbitMQ エンティティ管理メソッドは、実行時に手動で呼び出すことができます。RabbitAmqpAdmin は内部的に com.rabbitmq.client.amqp.Connection.management() API を使用して、それぞれのトポロジ操作を実行します。

RabbitAmqpTemplate

RabbitAmqpTemplate は AsyncAmqpTemplate の実装であり、AMQP 1.0 プロトコルを使用して様々な送受信操作を実行します。AmqpConnectionFactory が必要で、いくつかのデフォルト設定で構成できます。com.rabbitmq.client:amqp-client ライブラリには com.rabbitmq.client.amqp.Message が付属していますが、RabbitAmqpTemplate は、MessageProperties や MessageConverter 抽象化などのサポートクラスを含む、よく知られている org.springframework.amqp.core.Message に基づく API を公開しています。com.rabbitmq.client.amqp.Message との変換は、RabbitAmqpTemplate 内部で行われます。すべてのメソッドは、最終的に操作結果を取得するために CompletableFuture を返します。プレーンオブジェクトを使用した操作ではメッセージ本文の変換が必要であり、デフォルトでは SimpleMessageConverter が使用されます。変換の詳細については、メッセージコンバーターを参照してください。

通常、次のような Bean を 1 つ使用するだけで、すべての可能なテンプレートパターン操作を実行できます。

@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpTemplate(connectionFactory);
}

デフォルトの交換キーとルーティングキー、またはキューのみを設定することができます。RabbitAmqpTemplate には、受信操作用のデフォルトキューと、リクエスト / 応答操作用のデフォルトキューがあります。これらのキューが存在しない場合は、クライアントからのリクエスト用に一時的なキューが作成されます。

RabbitAmqpTemplate 操作のサンプルをいくつか示します。

@Bean
DirectExchange e1() {
    return new DirectExchange("e1");
}

@Bean
Queue q1() {
    return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}

@Bean
Binding b1() {
    return BindingBuilder.bind(q1()).to(e1()).with("k1");
}

...

@Test
void defaultExchangeAndRoutingKey() {
    this.rabbitAmqpTemplate.setExchange("e1");
    this.rabbitAmqpTemplate.setRoutingKey("k1");
	this.rabbitAmqpTemplate.setReceiveQueue("q1");

    assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
            .succeedsWithin(Duration.ofSeconds(10));

    assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
            .succeedsWithin(Duration.ofSeconds(10))
            .isEqualTo("test1");
}

ここでは、e1 エクスチェンジと q1 キューを宣言し、k1 ルーティングキーを使用してこれらのエクスチェンジにバインドします。次に、RabbitAmqpTemplate のデフォルト設定を使用して、対応するルーティングキーを使用して上記のエクスチェンジにメッセージをパブリッシュし、受信操作のデフォルトキューとして q1 を使用します。これらのメソッドには、特定のエクスチェンジまたはキュー(送信および受信用)に送信するためのオーバーロードされたバリアントがあります。ParameterizedTypeReference<T> を使用した receiveAndConvert() 操作では、RabbitAmqpTemplate に SmartMessageConverter を挿入する必要があります。

次の例は、RabbitAmqpTemplate を使用した RPC 実装を示しています (前の例と同じ RabbitMQ オブジェクトを前提としています)。

@Test
void verifyRpc() {
    String testRequest = "rpc-request";
    String testReply = "rpc-reply";

    CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);

    AtomicReference<String> receivedRequest = new AtomicReference<>();
    CompletableFuture<Boolean> rpcServerResult =
            this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
                     payload -> {
                         receivedRequest.set(payload);
                         return testReply;
                     });

    assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
    assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
    assertThat(receivedRequest.get()).isEqualTo(testRequest);
}

相関関係と replyTo キューは内部で管理されます。サーバー側は、後述する @RabbitListener POJO メソッドを使用して実装できます。

RabbitMQ AMQP 1.0 コンシューマー

他の多くのコンシューマー側メッセージング実装と同様に、spring-rabbitmq-client モジュールには、基本的によく知られている MessageListenerContainer の実装である RabbitAmqpListenerContainer が付属しています。これは DirectMessageListenerContainer と全く同じですが、RabbitMQ AMQP 1.0 をサポートします。AmqpConnectionFactory と、少なくとも 1 つのキューが必要です。また、MessageListener (または AMQP 1.0 固有の RabbitAmqpMessageListener)も提供する必要があります。AcknowledgeMode.MANUAL と同じ意味を持つ autoSettle = false で構成できます。その場合、MessageListener に提供される Message の MessageProperties には、ターゲットロジックを考慮するための AmqpAcknowledgment コールバックが含まれます。

RabbitAmqpMessageListener には com.rabbitmq.client:amqp-client 抽象化の契約があります。

/**
 * Process an AMQP message.
 * @param message the message to process.
 * @param context the consumer context to settle message.
 *                Null if container is configured for {@code autoSettle}.
 */
void onAmqpMessage(Message message, Consumer.Context context);

ここで、最初の引数はネイティブ受信 com.rabbitmq.client.amqp.Message であり、context は、前述の AmqpAcknowledgment 抽象化と同様に、メッセージ解決用のネイティブコールバックです。

batchSize オプションが指定されている場合、RabbitAmqpMessageListener はメッセージをバッチ処理して決済できます。この目的のためには、MessageListener.onMessageBatch() 契約を実装する必要があります。batchReceiveDuration オプションは、メモリとコンシューマークレジット (英語) の枯渇を回避するために、バッチが満杯でない場合に強制的に解放をスケジュールするために使用されます。

通常、RabbitAmqpMessageListener クラスは対象プロジェクトで直接使用されることはなく、宣言的なコンシューマー設定には @RabbitListener を介した POJO メソッドアノテーション設定が選択されます。RabbitAmqpListenerContainerFactory は RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME に登録する必要があり、@RabbitListener アノテーションプロセスは RabbitAmqpMessageListener インスタンスを RabbitListenerEndpointRegistry に登録します。対象 POJO メソッドの呼び出しは、MessagingMessageListenerAdapter を継承し、リクエストリプライシナリオ(非同期または非同期)を含む多くの機能を再利用する特定の RabbitAmqpMessageListenerAdapter 実装によって処理されます。アノテーション駆動型のリスナーエンドポイントで説明されているすべての概念は、この RabbitAmqpMessageListener にも適用されます。

従来のメッセージング payload および headers に加えて、@RabbitListener POJO メソッド契約には次のパラメーターを使用できます。

  • com.rabbitmq.client.amqp.Message - 変換なしのネイティブ AMQP 1.0 メッセージ。

  • org.springframework.amqp.core.Message - ネイティブ AMQP 1.0 メッセージからの変換結果としての Spring AMQP メッセージ抽象化。

  • org.springframework.messaging.Message - Spring AMQP メッセージからの変換結果としての Spring メッセージング抽象化。

  • Consumer.Context - RabbitMQ AMQP クライアントコンシューマー決済 API;

  • org.springframework.amqp.core.AmqpAcknowledgment - Spring AMQP 確認応答の抽象化: Consumer.Context への代表者。

次の例は、手動決済による RabbitMQ AMQP 1.0 の単純な @RabbitListener の相互作用を示しています。

@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
    return new RabbitAmqpListenerContainerFactory(connectionFactory);
}

final List<String> received = Collections.synchronizedList(new ArrayList<>());

CountDownLatch consumeIsDone = new CountDownLatch(11);

@RabbitListener(queues = {"q1", "q2"},
        ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
        concurrency = "2",
        id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
    try {
        if ("discard".equals(data)) {
            if (!this.received.contains(data)) {
                context.discard();
            }
            else {
                throw new MessageConversionException("Test message is rejected");
            }
        }
        else if ("requeue".equals(data) && !this.received.contains(data)) {
            acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
        }
        else {
            acknowledgment.acknowledge();
        }
        this.received.add(data);
    }
    finally {
        this.consumeIsDone.countDown();
    }
}