受信チャネルアダプター

以下のリストは、AMQP 受信チャネルアダプターの可能な構成オプションを示しています。

  • Java DSL

  • Java

  • XML

@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                (1)
                                  channel="inboundChannel"        (2)
                                  queue-names="si.test.queue"     (3)
                                  acknowledge-mode="AUTO"         (4)
                                  advice-chain=""                 (5)
                                  channel-transacted=""           (6)
                                  concurrent-consumers=""         (7)
                                  connection-factory=""           (8)
                                  error-channel=""                (9)
                                  expose-listener-channel=""      (10)
                                  header-mapper=""                (11)
                                  mapped-request-headers=""       (12)
                                  listener-container=""           (13)
                                  message-converter=""            (14)
                                  message-properties-converter="" (15)
                                  phase=""                        (16)
                                  prefetch-count=""               (17)
                                  receive-timeout=""              (18)
                                  recovery-interval=""            (19)
                                  missing-queues-fatal=""         (20)
                                  shutdown-timeout=""             (21)
                                  task-executor=""                (22)
                                  transaction-attribute=""        (23)
                                  transaction-manager=""          (24)
                                  batch-size=""                   (25)
                                  consumers-per-queue             (26)
                                  batch-mode="MESSAGES"/>         (27)
1 このアダプターの一意の ID。オプション。
2 変換されたメッセージの送信先のメッセージチャネル。必須。
3 メッセージをコンシュームする AMQP キュー(コンマ区切りリスト)の名前。必須。
4MessageListenerContainer の確認モード。MANUAL に設定すると、配信タグとチャネルはそれぞれメッセージヘッダー amqp_deliveryTag と amqp_channel で提供されます。ユーザーアプリケーションは確認の責任を負います。NONE は、確認応答なし(autoAck)を意味します。AUTO は、ダウンストリームフローが完了すると、アダプターのコンテナーが確認することを意味します。オプション(デフォルトは AUTO)。受信エンドポイント確認モードを参照してください。
5 この受信チャネルアダプターに関連付けられた横断的な動作を処理するための追加の AOP アドバイス。オプション。
6 このコンポーネントによって作成されたチャネルがトランザクションであることを示すフラグ。true の場合、トランザクションチャネルを使用し、結果に応じて、ロールバックを通知する例外を除き、コミットまたはロールバックですべての操作(送信または受信)を終了するようにフレームワークに指示します。オプション(デフォルトは false)。
7 作成する同時コンシューマーの数を指定します。デフォルトは 1 です。同時コンシューマーの数を増やして、キューから受信するメッセージの消費を拡大することをお勧めします。ただし、複数のコンシューマーが登録されると、順序付けの保証は失われます。一般に、少量のキューには 1 つのコンシューマーを使用します。"consumers-per-queue" が設定されている場合は許可されません。オプション。
8RabbitMQ ConnectionFactory への Bean 参照。オプション(デフォルトは connectionFactory)。
9 エラーメッセージの送信先のメッセージチャネル。オプション。
10 リスナーチャネル (com.rabbitmq.client.Channel) が登録済みの ChannelAwareMessageListener に公開されるかどうか。オプション (デフォルトは true)。
11AMQP メッセージ受信時に使用する AmqpHeaderMapper への参照。オプション。デフォルトでは、標準の AMQP プロパティ(contentType など)のみが Spring Integration MessageHeaders にコピーされます。AMQP MessageProperties 内のユーザー定義ヘッダーは、デフォルトの DefaultAmqpHeaderMapper によってメッセージにコピーされません。request-header-names が指定されている場合はコピーされません。
12AMQP リクエストから MessageHeaders にマップされる AMQP ヘッダーの名前のコンマ区切りリスト。これは、"header-mapper" 参照が提供されていない場合にのみ提供できます。このリストの値は、ヘッダー名( "*" または「thing1 *、thing2」または "* something" など)と照合する単純なパターンにすることもできます。
13AMQP メッセージの受信に使用する AbstractMessageListenerContainer への参照です。この属性を指定した場合、リスナーコンテナーの設定に関連する他の属性は指定しないでください。つまり、この参照を設定することで、リスナーコンテナーの設定について全責任を負うことになります。唯一の例外は MessageListener 自体です。これはこのチャネルアダプター実装の中心的なロールであるため、参照先のリスナーコンテナーには独自の MessageListener がまだ存在してはなりません。省略可能。
14AMQP メッセージを受信するときに使用する MessageConverter。オプション。
15AMQP メッセージを受信するときに使用する MessagePropertiesConverter。オプション。
16 基礎となる AbstractMessageListenerContainer を開始および停止するフェーズを指定します。起動順序は最低から最高に進み、シャットダウンの順序はその逆です。デフォルトでは、この値は Integer.MAX_VALUE です。これは、このコンテナーが可能な限り遅く起動し、できるだけ早く停止することを意味します。オプション。
17AMQP ブローカーに、単一のリクエストで各コンシューマーに送信するメッセージの数を伝えます。多くの場合、スループットを改善するためにこの値を高く設定できます。トランザクションサイズ以上でなければなりません(このリストで後述する batch-size 属性を参照)。オプション(デフォルトは 1)。
18 ミリ秒単位の受信タイムアウト。オプション(デフォルトは 1000)。
19 基礎となる AbstractMessageListenerContainer の回復試行の間隔を指定します(ミリ秒単位)。オプション(デフォルトは 5000)。
20 "true" であり、ブローカーで使用できるキューがない場合、コンテナーは起動時に致命的な例外をスローし、コンテナーの実行中にキューが削除されると停止します(キューを受動的に宣言しようと 3 回試行した後)。false の場合、コンテナーは例外をスローせず、リカバリモードに入り、recovery-interval に従って再起動を試みます。オプション(デフォルトは true)。
21 基礎となる AbstractMessageListenerContainer が停止してから AMQP 接続が強制的に閉じられるまでの、ワーカーを待機する時間(ミリ秒)。シャットダウンシグナルが来たときにワーカーがアクティブである場合、このタイムアウト内で終了できる限り、処理を終了することができます。それ以外の場合、接続は閉じられ、メッセージは未確認のままになります(チャネルがトランザクションの場合)。オプション(デフォルトは 5000)。
22 デフォルトでは、基礎となる AbstractMessageListenerContainer は SimpleAsyncTaskExecutor 実装を使用します。これは、各タスクに対して新しいスレッドを起動し、非同期で実行します。デフォルトでは、同時スレッドの数は無制限です。この実装はスレッドを再利用しないことに注意してください。代替手段として、スレッドプーリング TaskExecutor 実装の使用を検討してください。オプション(デフォルトは SimpleAsyncTaskExecutor)。
23 デフォルトでは、基礎となる AbstractMessageListenerContainer は DefaultTransactionAttribute の新しいインスタンスを作成します(EJB アプローチを使用して実行時にロールバックしますが、チェック済み例外はありません)。オプション(デフォルトは DefaultTransactionAttribute)。
24 基になる AbstractMessageListenerContainer の外部 PlatformTransactionManager への Bean 参照を設定します。トランザクションマネージャーは、channel-transacted 属性と連携して機能します。フレームワークがメッセージを送信または受信していて、channelTransacted フラグが true であるときに進行中のトランザクションがすでに存在する場合、メッセージングトランザクションのコミットまたはロールバックは、現在のトランザクションが終了するまで延期されます。channelTransacted フラグが false の場合、トランザクションのセマンティクスはメッセージング操作に適用されません(自動確認されます)。詳細については、Spring AMQP を使用したトランザクションを参照してください。オプション。
25SimpleMessageListenerContainer に、1 回のリクエストで処理するメッセージ数を指定します。最適な結果を得るには、prefetch-count で設定された値以下である必要があります。"consumers-per-queue" が設定されている場合は許可されません。オプション(デフォルトは 1)。
26 基礎となるリスナーコンテナーがデフォルトの SimpleMessageListenerContainer ではなく DirectMessageListenerContainer であることを示します。詳細については、Spring AMQP リファレンスマニュアルを参照してください。
27 コンテナーの consumerBatchEnabled が true の場合、アダプターがメッセージペイロードでメッセージのバッチをどのように表示するかを決定します。MESSAGES (デフォルト)に設定すると、ペイロードは List<Message<?>> になり、各メッセージには受信 AMQP Message からマッピングされたヘッダーがあり、ペイロードは変換された body です。EXTRACT_PAYLOADS に設定した場合、ペイロードは List<?> であり、要素は AMQP Message 本体から変換されます。EXTRACT_PAYLOADS_WITH_HEADERS は EXTRACT_PAYLOADS に似ていますが、さらに、各メッセージのヘッダーは MessageProperties から対応するインデックスの List<Map<String, Object> にマップされます。ヘッダー名は AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS です。
コンテナー

XML を使用して外部コンテナーを構成する場合、Spring AMQP 名前空間を使用してコンテナーを定義することはできないことに注意してください。これは、名前空間に少なくとも 1 つの <listener/> 要素が必要なためです。この環境では、リスナーはアダプターの内部にあります。このため、次の例に示すように、通常の Spring <bean/> 定義を使用してコンテナーを定義する必要があります。

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
Spring Integration JMS と AMQP のサポートは似ていますが、重要な違いがあります。JMS 受信チャネルアダプターは、カバーで JmsDestinationPollingSource を使用しており、設定済みのポーラーを想定しています。AMQP 受信チャネルアダプターは AbstractMessageListenerContainer を使用し、メッセージ駆動型です。その点では、JMS メッセージ駆動型チャネルアダプターにより似ています。

バージョン 5.5 以降、AmqpInboundChannelAdapter は、再試行操作が内部で呼び出されたときに RecoveryCallback で使用される org.springframework.amqp.rabbit.retry.MessageRecoverer 戦略を使用して構成できます。詳細については、setMessageRecoverer() JavaDocs を参照してください。

@Publisher アノテーションは、@RabbitListener と組み合わせて使用することもできます。

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

    @Bean
    QueueChannel fromRabbitViaPublisher() {
        return new QueueChannel();
    }

    @RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
    @Publisher("fromRabbitViaPublisher")
    @Payload("#args.payload.toUpperCase()")
    public void consumeForPublisher(String payload) {

    }

}

デフォルトでは、@Publisher AOP インターセプターはメソッド呼び出しからの戻り値を処理します。ただし、@RabbitListener メソッドからの戻り値は AMQP 応答メッセージとして扱われます。このようなアプローチは @Publisher と一緒に使用することはできないため、メソッド引数に対するそれぞれの SpEL 式を含む @Payload アノテーションがこの組み合わせで推奨される方法です。@Publisher の詳細については、"アノテーション主導の構成" セクションを参照してください。

リスナーコンテナーで排他的コンシューマーまたはシングルアクティブコンシューマーを使用する場合は、コンテナープロパティ forceStop を true に設定することをお勧めします。これにより、コンテナーを停止した後、このインスタンスが完全に停止する前に別のコンシューマーがメッセージの消費を開始する可能性がある競合状態が防止されます。

バッチメッセージ

バッチメッセージの詳細については、Spring AMQP ドキュメントを参照してください。

Spring Integration でバッチメッセージを作成するには、発信エンドポイントを BatchingRabbitTemplate で構成するだけです。

バッチメッセージを受信すると、デフォルトで、リスナーコンテナーは各フラグメントメッセージを抽出し、アダプターは各フラグメントに対して Message<?> を生成します。バージョン 5.2 以降、コンテナーの deBatchingEnabled プロパティが false に設定されている場合、代わりにアダプターによってデバッチ処理が実行され、ペイロードがフラグメントペイロードのリストである単一の Message<List<?>> が生成されます(適切な場合は変換後)。

デフォルトの BatchingStrategy は SimpleBatchingStrategy ですが、これはアダプターでオーバーライドできます。

再試行操作でリカバリが必要な場合は、org.springframework.amqp.rabbit.retry.MessageBatchRecoverer をバッチで使用する必要があります。