AMQP 1.0 サポート
バージョン 7.0 以降、Spring Integration は RabbitMQ AMQP 1.0 サポート用のチャネルアダプターを提供します。これらのチャネルアダプターは org.springframework.amqp:spring-rabbitmq-client ライブラリに基づいています。
Spring AMQP ドキュメントには、RabbitMQ AMQP 1.0 サポートに関する詳細情報が記載されています。
AMQP 1.0 送信チャネルアダプター
AmqpClientMessageHandler は AbstractReplyProducingMessageHandler 実装であり、setRequiresReply() の設定に応じて、一方向チャネルアダプターまたは送信ゲートウェイとして機能します。このチャネルアダプターのインスタンスには、AMQP 1.0 プロトコル用の AsyncAmqpTemplate 実装(前述の spring-rabbitmq-client ライブラリの RabbitAmqpTemplate など)が必要です。このメッセージハンドラーはデフォルトで非同期であるため、パブリケーションエラーはリクエストメッセージの errorChannel ヘッダー、またはアプリケーションコンテキストのグローバルデフォルト errorChannel を介して処理する必要があります。
メッセージをパブリッシュするための exchange (オプションの routingKey と併用)は、パブリッシュするための queue と相互に排他的です。どちらも指定されていない場合、AsyncAmqpTemplate の実装では、これらの宛先パートに何らかのデフォルト設定を保証する必要があります。そうでない場合、メッセージは配信されなかったものとして拒否されます。
デフォルトでは、MessageConverter は文字列、Serializable インスタンス、バイト配列を処理する org.springframework.amqp.support.converter.SimpleMessageConverter です。また、デフォルトの AmqpHeaderMapper は DefaultAmqpHeaderMapper.outboundMapper() です。このヘッダーマッパーは、AMQP メッセージのプロパティを応答のヘッダーにマッピングするためにも使用されます。
ゲートウェイモードでは、応答メッセージ本体を変換するために replyPayloadType を指定できます。ただし、MessageConverter は JacksonJsonMessageConverter と同様に SmartMessageConverter の実装である必要があります。また、replyPayloadType とは相互排他的な returnMessage フラグを true に設定すると、org.springframework.amqp.core.Message のインスタンス全体を応答メッセージペイロードとして返すことができます。
次の例は、AmqpClientMessageHandler を単純な @ServiceActivator として構成する方法を示しています。
Java DSL
Kotlin DSL
Groovy DSL
Java
@Bean
IntegrationFlow sendFlow(RabbitAmqpTemplate rabbitTemplate) {
return f -> f
.handle(AmqpClient.outboundAdapter(rabbitTemplate)
.exchange("e1")
.routingKeyExpression("'k1'"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
integrationFlow {
handle(AmqpClient.outboundAdapter(rabbitTemplate)
.apply {
exchange("e1")
routingKeyExpression("'k1'")
}
)
}
@Bean
sendFlow() {
integrationFlow {
handle(AmqpClient.outboundAdapter(rabbitTemplate)
.with {
exchange 'e1'
routingKeyExpression '''k1'''
}
)
}
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendChannel")
AmqpClientMessageHandler amqpClientMessageHandler(RabbitAmqpTemplate rabbitTemplate) {
AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
messageHandler.setExchangeExpressionString("headers[exchange]");
messageHandler.setRoutingKeyExpressionString("headers[routingKey]");
return messageHandler;
}
AmqpClientMessageHandler のゲートウェイバリアントは次のようになります。
Java DSL
Kotlin DSL
Groovy DSL
Java
@Bean
IntegrationFlow requestReplyOutboundFlow(RabbitAmqpTemplate rabbitTemplate) {
return f -> f
.handle(AmqpClient.outboundGateway(rabbitTemplate)
.queueFunction(m -> "requestReply"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
integrationFlow {
handle(AmqpClient.outboundGateway(rabbitTemplate)
.queueFunction { "requestReply" }
)
}
@Bean
sendFlow() {
integrationFlow {
handle(AmqpClient.outboundGateway(rabbitTemplate)
.with {
queueFunction { 'requestReply' }
}
)
}
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendAndReceiveChannel")
AmqpClientMessageHandler amqpClientGateway(RabbitAmqpTemplate rabbitTemplate) {
AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
messageHandler.setRequiresReply(true);
messageHandler.setReplyPayloadType(String.class);
messageHandler.setMessageConverter(new JacksonJsonMessageConverter());
messageHandler.setQueue("q1");
return messageHandler;
}
AMQP 1.0 メッセージドライバーチャネルアダプター
AmqpClientMessageProducer は、RabbitMQ AMQP 1.0 プロトコルを介してキューからメッセージをコンシュームするためのメッセージドライバチャネルアダプターとしての MessageProducerSupport 実装です。AmqpConnectionFactory と少なくとも 1 つのキューが必要です。内部ロジックは RabbitAmqpListenerContainer と IntegrationRabbitAmqpMessageListener に基づいており、コンシュームされた AMQP メッセージ(変換後)を outputChannel に中継します。RabbitAmqpListenerContainer の設定オプションの一部は、AmqpClientMessageProducer から setter として公開されます。
デフォルトでは、MessageConverter は文字列、Serializable インスタンス、バイト配列を処理する org.springframework.amqp.support.converter.SimpleMessageConverter です。また、デフォルトの AmqpHeaderMapper は DefaultAmqpHeaderMapper.inboundMapper() です。messageConverter オプションを null に設定すると、変換(ヘッダーマッピングを含む)を完全にスキップし、受信した AMQP メッセージを生成対象の Spring メッセージのペイロードとして返すことができます。
また、AmqpClientMessageProducer は Pausable 契約を実装し、それぞれの RabbitAmqpListenerContainer API に委譲します。
AmqpClientMessageProducer.setBatchSize() > 1 の場合、このチャネルアダプターはバッチモードで動作します。この場合、受信メッセージはバッチサイズが満たされるか、batchReceiveTimeout 期間が満了するまで収集されます。バッチ処理されたすべての AMQP メッセージは Spring メッセージに変換され、結果リストが折り返しメッセージのペイロードとして生成され、outputChannel に送信されます。バッチモードでは、バッチ処理されたすべてのメッセージが一度に処理されるため、パフォーマンスが向上します。
autoSettle フラグが false に設定されている場合、受信したメッセージまたはバッチ全体の決済決定を行うために、AcknowledgmentCallback インスタンスが IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK メッセージヘッダーとして提供されます。
次の例は、AmqpClientMessageProducer を単純な受信エンドポイントとして構成する方法を示しています。
Java DSL
Kotlin DSL
Groovy DSL
Java
@Bean
IntegrationFlow receiveFlow(AmqpConnectionFactory connectionFactory) {
return IntegrationFlow.from(AmqpClient.inboundChannelAdapter(connectionFactory, "q1"))
.channel(c -> c.queue("receiveChannel"))
.get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, "q1")) {
channel("inputChannel")
}
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, 'q1')) {
channel 'inputChannel'
}
}
@Bean
AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory,
QueueChannel inputChannel) {
AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q3");
amqpClientMessageProducer.setOutputChannel(inputChannel);
amqpClientMessageProducer.setBatchSize(2);
return amqpClientMessageProducer;
}
AMQP 1.0 受信ゲートウェイ
AmqpClientInboundGateway は、RabbitMQ AMQP 1.0 プロトコルを介してリクエストを受信し、応答を生成するための MessagingGatewaySupport 実装です。前述の AmqpClientMessageProducer に類似しており、多くの RabbitAmqpListenerContainer 設定オプションを共有しています。さらに、AmqpClientInboundGateway は AMQP 1.0 応答を生成するために、内部的に RabbitAmqpTemplate を使用します。
リクエストと自動返信の相関関係を設定するには、リクエストメッセージの replyTo プロパティを指定する必要があります。例: RabbitAmqpTemplate.sendAndReceive() は、排他的かつ自動削除されるキューを生成する RabbitMQ AMQP 1.0 ライブラリの RpcClient に依存しています。あるいは、返信アドレスを AmqpClientInboundGateway の replyExchange(およびオプションで replyRoutingKey)または replyQueue (両方ではない)に設定することもできます。AmqpClientInboundGateway は、RabbitAmqpTemplate のデフォルトオプションに委譲されます。リクエストメッセージの messageId または correlationId プロパティは、返信との関連付けに使用できます。RabbitAmqpTemplate.sendAndReceive() の RpcClient は、応答が欠落している場合に返信を生成します。AmqpClientInboundGateway は、このような相関キーを返信メッセージにマッピングすることができます。
次の例は、AmqpClientInboundGateway を単純な受信ゲートウェイとして構成する方法を示しています。
Java DSL
Kotlin DSL
Groovy DSL
Java
@Bean
IntegrationFlow amqpClientInboundGatewayFlow(AmqpConnectionFactory connectionFactory) {
return IntegrationFlow.from(AmqpClient.inboundGateway(connectionFactory, "requestReply"))
.channel(c -> c.queue("inputChannel"))
.get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
integrationFlow(AmqpClient.inboundGateway(connectionFactory, "requestReply")) {
channel { queue("inputChannel") }
}
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
integrationFlow(AmqpClient.inboundGateway(connectionFactory, 'requestReply')) {
channel { queue 'inputChannel' }
}
}
@Bean
AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) {
AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "requestReply");
amqpClientInboundGateway.setRequestChannelName("inputChannel");
return amqpClientInboundGateway;
}