AMQP 1.0 サポート

バージョン 7.0 以降、Spring Integration は RabbitMQ AMQP 1.0 サポート用のチャネルアダプターを提供します。これらのチャネルアダプターは org.springframework.amqp:spring-rabbitmq-client ライブラリに基づいています。

Spring AMQP ドキュメントには、RabbitMQ AMQP 1.0 サポートに関する詳細情報が記載されています。

AMQP 1.0 送信チャネルアダプター

AmqpClientMessageHandler は AbstractReplyProducingMessageHandler 実装であり、setRequiresReply() の設定に応じて、一方向チャネルアダプターまたは送信ゲートウェイとして機能します。このチャネルアダプターのインスタンスには、AMQP 1.0 プロトコル用の AsyncAmqpTemplate 実装(前述の spring-rabbitmq-client ライブラリの RabbitAmqpTemplate など)が必要です。このメッセージハンドラーはデフォルトで非同期であるため、パブリケーションエラーはリクエストメッセージの errorChannel ヘッダー、またはアプリケーションコンテキストのグローバルデフォルト errorChannel を介して処理する必要があります。

メッセージをパブリッシュするための exchange (オプションの routingKey と併用)は、パブリッシュするための queue と相互に排他的です。どちらも指定されていない場合、AsyncAmqpTemplate の実装では、これらの宛先パートに何らかのデフォルト設定を保証する必要があります。そうでない場合、メッセージは配信されなかったものとして拒否されます。

デフォルトでは、MessageConverter は文字列、Serializable インスタンス、バイト配列を処理する org.springframework.amqp.support.converter.SimpleMessageConverter です。また、デフォルトの AmqpHeaderMapper は DefaultAmqpHeaderMapper.outboundMapper() です。このヘッダーマッパーは、AMQP メッセージのプロパティを応答のヘッダーにマッピングするためにも使用されます。

ゲートウェイモードでは、応答メッセージ本体を変換するために replyPayloadType を指定できます。ただし、MessageConverter は JacksonJsonMessageConverter と同様に SmartMessageConverter の実装である必要があります。また、replyPayloadType とは相互排他的な returnMessage フラグを true に設定すると、org.springframework.amqp.core.Message のインスタンス全体を応答メッセージペイロードとして返すことができます。

次の例は、AmqpClientMessageHandler を単純な @ServiceActivator として構成する方法を示しています。

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow sendFlow(RabbitAmqpTemplate rabbitTemplate) {
    return f -> f
            .handle(AmqpClient.outboundAdapter(rabbitTemplate)
                    .exchange("e1")
                    .routingKeyExpression("'k1'"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
    integrationFlow {
                handle(AmqpClient.outboundAdapter(rabbitTemplate)
    		            .apply {
    		                exchange("e1")
                            routingKeyExpression("'k1'")
    		            }
    		    )
    }
@Bean
sendFlow() {
    integrationFlow {
        handle(AmqpClient.outboundAdapter(rabbitTemplate)
                .with {
                     exchange 'e1'
                     routingKeyExpression '''k1'''
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendChannel")
AmqpClientMessageHandler amqpClientMessageHandler(RabbitAmqpTemplate rabbitTemplate) {
    AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
    messageHandler.setExchangeExpressionString("headers[exchange]");
    messageHandler.setRoutingKeyExpressionString("headers[routingKey]");
    return messageHandler;
}

AmqpClientMessageHandler のゲートウェイバリアントは次のようになります。

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow requestReplyOutboundFlow(RabbitAmqpTemplate rabbitTemplate) {
    return f -> f
            .handle(AmqpClient.outboundGateway(rabbitTemplate)
                    .queueFunction(m -> "requestReply"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
    integrationFlow {
                handle(AmqpClient.outboundGateway(rabbitTemplate)
    		            .queueFunction { "requestReply" }
                )
    }
@Bean
sendFlow() {
    integrationFlow {
        handle(AmqpClient.outboundGateway(rabbitTemplate)
                .with {
                     queueFunction { 'requestReply' }
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendAndReceiveChannel")
AmqpClientMessageHandler amqpClientGateway(RabbitAmqpTemplate rabbitTemplate) {
    AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
    messageHandler.setRequiresReply(true);
    messageHandler.setReplyPayloadType(String.class);
    messageHandler.setMessageConverter(new JacksonJsonMessageConverter());
    messageHandler.setQueue("q1");
    return messageHandler;
}

AMQP 1.0 メッセージドライバーチャネルアダプター

AmqpClientMessageProducer は、RabbitMQ AMQP 1.0 プロトコルを介してキューからメッセージをコンシュームするためのメッセージドライバチャネルアダプターとしての MessageProducerSupport 実装です。AmqpConnectionFactory と少なくとも 1 つのキューが必要です。内部ロジックは RabbitAmqpListenerContainer と IntegrationRabbitAmqpMessageListener に基づいており、コンシュームされた AMQP メッセージ(変換後)を outputChannel に中継します。RabbitAmqpListenerContainer の設定オプションの一部は、AmqpClientMessageProducer から setter として公開されます。

デフォルトでは、MessageConverter は文字列、Serializable インスタンス、バイト配列を処理する org.springframework.amqp.support.converter.SimpleMessageConverter です。また、デフォルトの AmqpHeaderMapper は DefaultAmqpHeaderMapper.inboundMapper() です。messageConverter オプションを null に設定すると、変換(ヘッダーマッピングを含む)を完全にスキップし、受信した AMQP メッセージを生成対象の Spring メッセージのペイロードとして返すことができます。

また、AmqpClientMessageProducer は Pausable 契約を実装し、それぞれの RabbitAmqpListenerContainer API に委譲します。

AmqpClientMessageProducer.setBatchSize() > 1 の場合、このチャネルアダプターはバッチモードで動作します。この場合、受信メッセージはバッチサイズが満たされるか、batchReceiveTimeout 期間が満了するまで収集されます。バッチ処理されたすべての AMQP メッセージは Spring メッセージに変換され、結果リストが折り返しメッセージのペイロードとして生成され、outputChannel に送信されます。バッチモードでは、バッチ処理されたすべてのメッセージが一度に処理されるため、パフォーマンスが向上します。

autoSettle フラグが false に設定されている場合、受信したメッセージまたはバッチ全体の決済決定を行うために、AcknowledgmentCallback インスタンスが IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK メッセージヘッダーとして提供されます。

次の例は、AmqpClientMessageProducer を単純な受信エンドポイントとして構成する方法を示しています。

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow receiveFlow(AmqpConnectionFactory connectionFactory) {
    return IntegrationFlow.from(AmqpClient.inboundChannelAdapter(connectionFactory, "q1"))
            .channel(c -> c.queue("receiveChannel"))
            .get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
        integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, "q1")) {
            channel("inputChannel")
        }
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
    integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, 'q1')) {
        channel 'inputChannel'
    }
}
@Bean
AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory,
        QueueChannel inputChannel) {

    AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q3");
    amqpClientMessageProducer.setOutputChannel(inputChannel);
    amqpClientMessageProducer.setBatchSize(2);
    return amqpClientMessageProducer;
}

AMQP 1.0 受信ゲートウェイ

AmqpClientInboundGateway は、RabbitMQ AMQP 1.0 プロトコルを介してリクエストを受信し、応答を生成するための MessagingGatewaySupport 実装です。前述の AmqpClientMessageProducer に類似しており、多くの RabbitAmqpListenerContainer 設定オプションを共有しています。さらに、AmqpClientInboundGateway は AMQP 1.0 応答を生成するために、内部的に RabbitAmqpTemplate を使用します。

リクエストと自動返信の相関関係を設定するには、リクエストメッセージの replyTo プロパティを指定する必要があります。例: RabbitAmqpTemplate.sendAndReceive() は、排他的かつ自動削除されるキューを生成する RabbitMQ AMQP 1.0 ライブラリの RpcClient に依存しています。あるいは、返信アドレスを AmqpClientInboundGateway の replyExchange(およびオプションで replyRoutingKey)または replyQueue (両方ではない)に設定することもできます。AmqpClientInboundGateway は、RabbitAmqpTemplate のデフォルトオプションに委譲されます。リクエストメッセージの messageId または correlationId プロパティは、返信との関連付けに使用できます。RabbitAmqpTemplate.sendAndReceive() の RpcClient は、応答が欠落している場合に返信を生成します。AmqpClientInboundGateway は、このような相関キーを返信メッセージにマッピングすることができます。

次の例は、AmqpClientInboundGateway を単純な受信ゲートウェイとして構成する方法を示しています。

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow amqpClientInboundGatewayFlow(AmqpConnectionFactory connectionFactory) {
    return IntegrationFlow.from(AmqpClient.inboundGateway(connectionFactory, "requestReply"))
            .channel(c -> c.queue("inputChannel"))
            .get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
        integrationFlow(AmqpClient.inboundGateway(connectionFactory, "requestReply")) {
            channel { queue("inputChannel") }
        }
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
    integrationFlow(AmqpClient.inboundGateway(connectionFactory, 'requestReply')) {
        channel { queue 'inputChannel' }
    }
}
@Bean
AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) {
    AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "requestReply");
    amqpClientInboundGateway.setRequestChannelName("inputChannel");
    return amqpClientInboundGateway;
}