AMQP-backed メッセージチャネル
2 つのメッセージチャネル実装が利用可能です。1 つはポイントツーポイントで、もう 1 つはパブリッシュ / サブスクライブです。これらのチャネルは両方とも、基礎となる AmqpTemplate
および SimpleMessageListenerContainer
の幅広い構成属性を提供します(この章でチャネルアダプターとゲートウェイについて前述したとおり)。ただし、ここで示す例の構成は最小限です。XML スキーマを調べて、使用可能な属性を表示します。
ポイントツーポイントチャネルは、次の例のようになります。
<int-amqp:channel id="p2pChannel"/>
上記の例では、カバーで si.p2pChannel
という名前の Queue
が宣言され、このチャネルは(技術的には、この Queue
の名前と一致するルーティングキーを使用して、無名直接交換に送信することにより) Queue
に送信します。このチャネルは、Queue
のコンシューマーも登録します。チャネルをメッセージ駆動型ではなく「ポーリング可能」にする場合は、次の例に示すように、message-driven
フラグに false
の値を指定します。
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
パブリッシュ / サブスクライブチャネルは次のようになります。
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
上記の例では、カバーで、si.fanout.pubSubChannel
という名前のファンアウト交換が宣言され、このチャネルはそのファンアウト交換に送信します。このチャネルは、サーバー名の排他的、自動削除、非永続 Queue
も宣言し、メッセージを受信するためにその Queue
にコンシューマーを登録している間、それをファンアウト交換にバインドします。publish-subscribe-channel には「ポーリング可能な」オプションはありません。メッセージ駆動型でなければなりません。
バージョン 4.1 以降、AMQP-backed メッセージチャネルは(channel-transacted
と組み合わせて) template-channel-transacted
をサポートし、AbstractMessageListenerContainer
と RabbitTemplate
の transactional
構成を分離します。以前は、channel-transacted
はデフォルトで true
でした。現在、デフォルトでは、AbstractMessageListenerContainer
の場合は false
です。
バージョン 4.3 より前は、AMQP-backed チャネルは Serializable
ペイロードとヘッダーを持つメッセージのみをサポートしていました。メッセージ全体が変換(直列化)され、RabbitMQ に送信されました。これで、extract-payload
属性(または Java 構成を使用する場合は setExtractPayload()
)を true
に設定できます。このフラグが true
の場合、チャネルアダプターを使用する場合と同様の方法で、メッセージペイロードが変換され、ヘッダーがマップされます。この配置により、AMQP でバックアップされたチャネルを、直列化できないペイロード(おそらく、Jackson2JsonMessageConverter
などの別のメッセージコンバーター)で使用できます。デフォルトのマッピングされたヘッダーの詳細については、AMQP メッセージヘッダーを参照してください。outbound-header-mapper
および inbound-header-mapper
属性を使用するカスタムマッパーを提供することにより、マッピングを変更できます。default-delivery-mode
も指定できるようになりました。これは、amqp_deliveryMode
ヘッダーがない場合に配信モードを設定するために使用されます。デフォルトでは、Spring AMQP MessageProperties
は PERSISTENT
配信モードを使用します。
他の永続化に裏付けられたチャネルと同様に、AMQP に裏付けられたチャネルは、メッセージの永続化を提供してメッセージの損失を回避することを目的としています。他のピアアプリケーションに作業を分配することを目的としていません。そのためには、代わりにチャネルアダプターを使用してください。 |
バージョン 5.0 以降、ポーリング可能チャネルは、指定された receiveTimeout のポーラースレッドをブロックするようになりました(デフォルトは 1 秒です)。以前は、他の PollableChannel 実装とは異なり、受信タイムアウトに関係なく、メッセージが利用できない場合、スレッドはすぐにスケジューラーに戻りました。各メッセージを受信するにはコンシューマーを作成する必要があるため、ブロッキングは basicGet() を使用してメッセージを取得する(タイムアウトなし)よりも少し高負荷です。以前の動作を復元するには、ポーラーの receiveTimeout を 0 に設定します。 |
Java 構成を使用した構成
次の例は、Java 構成でチャネルを構成する方法を示しています。
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
Java DSL を使用した構成
次の例は、Java DSL でチャネルを構成する方法を示しています。
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}