AMQP-backed メッセージチャネル
2 つのメッセージチャネル実装が利用可能です。1 つはポイントツーポイントで、もう 1 つはパブリッシュ / サブスクライブです。これらのチャネルは両方とも、基礎となる AmqpTemplate および SimpleMessageListenerContainer の幅広い構成属性を提供します(この章でチャネルアダプターとゲートウェイについて前述したとおり)。ただし、ここで示す例の構成は最小限です。XML スキーマを調べて、使用可能な属性を表示します。
ポイントツーポイントチャネルは、次の例のようになります。
<int-amqp:channel id="p2pChannel"/> 上記の例では、カバーで si.p2pChannel という名前の Queue が宣言され、このチャネルは(技術的には、この Queue の名前と一致するルーティングキーを使用して、無名直接交換に送信することにより) Queue に送信します。このチャネルは、Queue のコンシューマーも登録します。チャネルをメッセージ駆動型ではなく「ポーリング可能」にする場合は、次の例に示すように、message-driven フラグに false の値を指定します。
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>パブリッシュ / サブスクライブチャネルは次のようになります。
<int-amqp:publish-subscribe-channel id="pubSubChannel"/> 上記の例では、カバーで、si.fanout.pubSubChannel という名前のファンアウト交換が宣言され、このチャネルはそのファンアウト交換に送信します。このチャネルは、サーバー名の排他的、自動削除、非永続 Queue も宣言し、メッセージを受信するためにその Queue にコンシューマーを登録している間、それをファンアウト交換にバインドします。publish-subscribe-channel には「ポーリング可能な」オプションはありません。メッセージ駆動型でなければなりません。
バージョン 4.1 以降、AMQP-backed メッセージチャネルは(channel-transacted と組み合わせて) template-channel-transacted をサポートし、AbstractMessageListenerContainer と RabbitTemplate の transactional 構成を分離します。以前は、channel-transacted はデフォルトで true でした。現在、デフォルトでは、AbstractMessageListenerContainer の場合は false です。
バージョン 4.3 より前は、AMQP-backed チャネルは Serializable ペイロードとヘッダーを持つメッセージのみをサポートしていました。メッセージ全体が変換(直列化)され、RabbitMQ に送信されました。これで、extract-payload 属性(または Java 構成を使用する場合は setExtractPayload())を true に設定できます。このフラグが true の場合、チャネルアダプターを使用する場合と同様の方法で、メッセージペイロードが変換され、ヘッダーがマップされます。この配置により、AMQP でバックアップされたチャネルを、直列化できないペイロード(おそらく、Jackson2JsonMessageConverter などの別のメッセージコンバーター)で使用できます。デフォルトのマッピングされたヘッダーの詳細については、AMQP メッセージヘッダーを参照してください。outbound-header-mapper および inbound-header-mapper 属性を使用するカスタムマッパーを提供することにより、マッピングを変更できます。default-delivery-mode も指定できるようになりました。これは、amqp_deliveryMode ヘッダーがない場合に配信モードを設定するために使用されます。デフォルトでは、Spring AMQP MessageProperties は PERSISTENT 配信モードを使用します。
| 他の永続化に裏付けられたチャネルと同様に、AMQP に裏付けられたチャネルは、メッセージの永続化を提供してメッセージの損失を回避することを目的としています。他のピアアプリケーションに作業を分配することを目的としていません。そのためには、代わりにチャネルアダプターを使用してください。 |
バージョン 5.0 以降、ポーリング可能チャネルは、指定された receiveTimeout のポーラースレッドをブロックするようになりました(デフォルトは 1 秒です)。以前は、他の PollableChannel 実装とは異なり、受信タイムアウトに関係なく、メッセージが利用できない場合、スレッドはすぐにスケジューラーに戻りました。各メッセージを受信するにはコンシューマーを作成する必要があるため、ブロッキングは basicGet() を使用してメッセージを取得する(タイムアウトなし)よりも少し高負荷です。以前の動作を復元するには、ポーラーの receiveTimeout を 0 に設定します。 |
Java 構成を使用した構成
次の例は、Java 構成でチャネルを構成する方法を示しています。
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}Java DSL を使用した構成
次の例は、Java DSL でチャネルを構成する方法を示しています。
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}