AMQP-backed メッセージチャネル

2 つのメッセージチャネル実装が利用可能です。1 つはポイントツーポイントで、もう 1 つはパブリッシュ / サブスクライブです。これらのチャネルは両方とも、基礎となる AmqpTemplate および SimpleMessageListenerContainer の幅広い構成属性を提供します(この章でチャネルアダプターとゲートウェイについて前述したとおり)。ただし、ここで示す例の構成は最小限です。XML スキーマを調べて、使用可能な属性を表示します。

ポイントツーポイントチャネルは、次の例のようになります。

<int-amqp:channel id="p2pChannel"/>

上記の例では、カバーで si.p2pChannel という名前の Queue が宣言され、このチャネルは(技術的には、この Queue の名前と一致するルーティングキーを使用して、無名直接交換に送信することにより) Queue に送信します。このチャネルは、Queue のコンシューマーも登録します。チャネルをメッセージ駆動型ではなく「ポーリング可能」にする場合は、次の例に示すように、message-driven フラグに false の値を指定します。

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

パブリッシュ / サブスクライブチャネルは次のようになります。

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

上記の例では、カバーで、si.fanout.pubSubChannel という名前のファンアウト交換が宣言され、このチャネルはそのファンアウト交換に送信します。このチャネルは、サーバー名の排他的、自動削除、非永続 Queue も宣言し、メッセージを受信するためにその Queue にコンシューマーを登録している間、それをファンアウト交換にバインドします。publish-subscribe-channel には「ポーリング可能な」オプションはありません。メッセージ駆動型でなければなりません。

バージョン 4.1 以降、AMQP-backed メッセージチャネルは(channel-transacted と組み合わせて) template-channel-transacted をサポートし、AbstractMessageListenerContainer と RabbitTemplate の transactional 構成を分離します。以前は、channel-transacted はデフォルトで true でした。現在、デフォルトでは、AbstractMessageListenerContainer の場合は false です。

バージョン 4.3 より前は、AMQP-backed チャネルは Serializable ペイロードとヘッダーを持つメッセージのみをサポートしていました。メッセージ全体が変換(直列化)され、RabbitMQ に送信されました。これで、extract-payload 属性(または Java 構成を使用する場合は setExtractPayload())を true に設定できます。このフラグが true の場合、チャネルアダプターを使用する場合と同様の方法で、メッセージペイロードが変換され、ヘッダーがマップされます。この配置により、AMQP でバックアップされたチャネルを、直列化できないペイロード(おそらく、Jackson2JsonMessageConverter などの別のメッセージコンバーター)で使用できます。デフォルトのマッピングされたヘッダーの詳細については、AMQP メッセージヘッダーを参照してください。outbound-header-mapper および inbound-header-mapper 属性を使用するカスタムマッパーを提供することにより、マッピングを変更できます。default-delivery-mode も指定できるようになりました。これは、amqp_deliveryMode ヘッダーがない場合に配信モードを設定するために使用されます。デフォルトでは、Spring AMQP MessageProperties は PERSISTENT 配信モードを使用します。

他の永続化に裏付けられたチャネルと同様に、AMQP に裏付けられたチャネルは、メッセージの永続化を提供してメッセージの損失を回避することを目的としています。他のピアアプリケーションに作業を分配することを目的としていません。そのためには、代わりにチャネルアダプターを使用してください。
バージョン 5.0 以降、ポーリング可能チャネルは、指定された receiveTimeout のポーラースレッドをブロックするようになりました(デフォルトは 1 秒です)。以前は、他の PollableChannel 実装とは異なり、受信タイムアウトに関係なく、メッセージが利用できない場合、スレッドはすぐにスケジューラーに戻りました。各メッセージを受信するにはコンシューマーを作成する必要があるため、ブロッキングは basicGet() を使用してメッセージを取得する(タイムアウトなし)よりも少し高負荷です。以前の動作を復元するには、ポーラーの receiveTimeout を 0 に設定します。

Java 構成を使用した構成

次の例は、Java 構成でチャネルを構成する方法を示しています。

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

Java DSL を使用した構成

次の例は、Java DSL でチャネルを構成する方法を示しています。

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}