AMQP (RabbitMQ) のサポート

Spring Integration は、Advanced Message Queuing Protocol(AMQP)を使用してメッセージを送受信するためのチャネルアダプターを提供します。

この依存関係をプロジェクトに含める必要があります。

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-amqp</artifactId>
    <version>6.0.5</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-amqp:6.0.5"

次のアダプターが利用可能です。

Spring Integration は、AMQP Exchange および Queue によってサポートされるポイントツーポイントメッセージチャネルおよびパブリッシュ / サブスクライブメッセージチャネルも提供します。

AMQP サポートを提供するため、Spring Integration は(Spring AMQP)に依存しています。これは、コア Spring コンセプトを AMQP ベースのメッセージングソリューションの開発に適用します。Spring AMQP は(Spring JMS)と同様のセマンティクスを提供します。

提供されている AMQP チャネルアダプターは単方向メッセージング(送信または受信)のみを対象としていますが、Spring Integration はリクエスト応答操作用の受信および送信 AMQP ゲートウェイも提供します。

TIP: Spring AMQP プロジェクトのリファレンスドキュメントをよく理解しましょう。Spring の AMQP 全般および特に RabbitMQ との統合に関する詳細な情報を提供します。

受信チャネルアダプター

以下のリストは、AMQP 受信チャネルアダプターの可能な構成オプションを示しています。

Java DSL
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
Java
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
XML
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                (1)
                                  channel="inboundChannel"        (2)
                                  queue-names="si.test.queue"     (3)
                                  acknowledge-mode="AUTO"         (4)
                                  advice-chain=""                 (5)
                                  channel-transacted=""           (6)
                                  concurrent-consumers=""         (7)
                                  connection-factory=""           (8)
                                  error-channel=""                (9)
                                  expose-listener-channel=""      (10)
                                  header-mapper=""                (11)
                                  mapped-request-headers=""       (12)
                                  listener-container=""           (13)
                                  message-converter=""            (14)
                                  message-properties-converter="" (15)
                                  phase=""                        (16)
                                  prefetch-count=""               (17)
                                  receive-timeout=""              (18)
                                  recovery-interval=""            (19)
                                  missing-queues-fatal=""         (20)
                                  shutdown-timeout=""             (21)
                                  task-executor=""                (22)
                                  transaction-attribute=""        (23)
                                  transaction-manager=""          (24)
                                  tx-size=""                      (25)
                                  consumers-per-queue             (26)
                                  batch-mode="MESSAGES"/>         (27)

<1> The unique ID for this adapter.
Optional.
<2> Message channel to which converted messages should be sent.
Required.
<3> Names of the AMQP queues (comma-separated list) from which messages should be consumed.
Required.
<4> Acknowledge mode for the `MessageListenerContainer`.
When set to `MANUAL`, the delivery tag and channel are provided in message headers `amqp_deliveryTag` and `amqp_channel`, respectively.
The user application is responsible for acknowledgement.
`NONE` means no acknowledgements (`autoAck`).
`AUTO` means the adapter's container acknowledges when the downstream flow completes.
Optional (defaults to AUTO).
See <<amqp-inbound-ack>>.
<5> Extra AOP Advices to handle cross-cutting behavior associated with this inbound channel adapter.
Optional.
<6> Flag to indicate that channels created by this component are transactional.
If true, it tells the framework to use a transactional channel and to end all operations (send or receive) with a commit or rollback, depending on the outcome, with an exception that signals a rollback.
Optional (Defaults to false).
<7> Specify the number of concurrent consumers to create.
The default is `1`.
We recommend raising the number of concurrent consumers to scale the consumption of messages coming in from a queue.
However, note that any ordering guarantees are lost once multiple consumers are registered.
In general, use one consumer for low-volume queues.
Not allowed when 'consumers-per-queue' is set.
Optional.
<8> Bean reference to the RabbitMQ `ConnectionFactory`.
Optional (defaults to `connectionFactory`).
<9> Message channel to which error messages should be sent.
Optional.
<10> Whether the listener channel (com.rabbitmq.client.Channel) is exposed to a registered `ChannelAwareMessageListener`.
Optional (defaults to true).
<11> A reference to an `AmqpHeaderMapper` to use when receiving AMQP Messages.
Optional.
By default, only standard AMQP properties (such as `contentType`) are copied to Spring Integration `MessageHeaders`.
Any user-defined headers within the AMQP `MessageProperties` are NOT copied to the message by the default `DefaultAmqpHeaderMapper`.
Not allowed if 'request-header-names' is provided.
<12> Comma-separated list of the names of AMQP Headers to be mapped from the AMQP request into the `MessageHeaders`.
This can only be provided if the 'header-mapper' reference is not provided.
The values in this list can also be simple patterns to be matched against the header names (such as "\*" or "thing1*, thing2" or "*something").
<13> Reference to the `AbstractMessageListenerContainer` to use for receiving AMQP Messages.
If this attribute is provided, no other attribute related to the listener container configuration should be provided.
In other words, by setting this reference, you must take full responsibility for the listener container configuration.
The only exception is the `MessageListener` itself.
Since that is actually the core responsibility of this channel adapter implementation, the referenced listener container must not already have its own `MessageListener`.
Optional.
<14> The `MessageConverter` to use when receiving AMQP messages.
Optional.
<15> The `MessagePropertiesConverter` to use when receiving AMQP messages.
Optional.
<16> Specifies the phase in which the underlying `AbstractMessageListenerContainer` should be started and stopped.
The startup order proceeds from lowest to highest, and the shutdown order is the reverse of that.
By default, this value is `Integer.MAX_VALUE`, meaning that this container starts as late as possible and stops as soon as possible.
Optional.
<17> Tells the AMQP broker how many messages to send to each consumer in a single request.
Often, you can set this value high to improve throughput.
It should be greater than or equal to the transaction size (see the `tx-size` attribute, later in this list).
Optional (defaults to `1`).
<18> Receive timeout in milliseconds.
Optional (defaults to `1000`).
<19> Specifies the interval between recovery attempts of the underlying `AbstractMessageListenerContainer` (in milliseconds).
Optional (defaults to `5000`).
<20> If 'true' and none of the queues are available on the broker, the container throws a fatal exception during startup and stops if the queues are deleted when the container is running (after making three attempts to passively declare the queues).
If `false`, the container does not throw an exception and goes into recovery mode, attempting to restart according to the `recovery-interval`.
Optional (defaults to `true`).
<21> The time to wait for workers (in milliseconds) after the underlying `AbstractMessageListenerContainer` is stopped and before the AMQP connection is forced closed.
If any workers are active when the shutdown signal comes, they are allowed to finish processing as long as they can finish within this timeout.
Otherwise, the connection is closed and messages remain unacknowledged (if the channel is transactional).
Optional (defaults to `5000`).
<22> By default, the underlying `AbstractMessageListenerContainer` uses a `SimpleAsyncTaskExecutor` implementation, that fires up a new thread for each task, running it asynchronously.
By default, the number of concurrent threads is unlimited.
Note that this implementation does not reuse threads.
Consider using a thread-pooling `TaskExecutor` implementation as an alternative.
Optional (defaults to `SimpleAsyncTaskExecutor`).
<23> By default, the underlying `AbstractMessageListenerContainer` creates a new instance of the `DefaultTransactionAttribute` (it takes the EJB approach to rolling back on runtime but not checked exceptions).
Optional (defaults to `DefaultTransactionAttribute`).
<24> Sets a bean reference to an external `PlatformTransactionManager` on the underlying `AbstractMessageListenerContainer`.
The transaction manager works in conjunction with the `channel-transacted` attribute.
If there is already a transaction in progress when the framework is sending or receiving a message and the `channelTransacted` flag is `true`, the commit or rollback of the messaging transaction is deferred until the end of the current transaction.
If the `channelTransacted` flag is `false`, no transaction semantics apply to the messaging operation (it is auto-acked).
For further information, see
https://docs.spring.io/spring-amqp/reference/html/%255Freference.html#%5Ftransactions[Transactions with Spring AMQP].
Optional.
<25> Tells the `SimpleMessageListenerContainer` how many messages to process in a single transaction (if the channel is transactional).
For best results, it should be less than or equal to the value set in `prefetch-count`.
Not allowed when 'consumers-per-queue' is set.
Optional (defaults to `1`).
<26> Indicates that the underlying listener container should be a `DirectMessageListenerContainer` instead of the default `SimpleMessageListenerContainer`.
See the https://docs.spring.io/spring-amqp/reference/html/[Spring AMQP Reference Manual] for more information.
<27> When the container's `consumerBatchEnabled` is `true`, determines how the adapter presents the batch of messages in the message payload.
When set to `MESSAGES` (default), the payload is a `List<Message<?>>` where each message has headers mapped from the incoming AMQP `Message` and the payload is the converted `body`.
When set to `EXTRACT_PAYLOADS`, the payload is a `List<?>` where the elements are converted from the AMQP `Message` body.
`EXTRACT_PAYLOADS_WITH_HEADERS` is similar to `EXTRACT_PAYLOADS` but, in addition, the headers from each message are mapped from the `MessageProperties` into a `List<Map<String, Object>` at the corresponding index; the header name is `AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS`.
コンテナー

XML を使用して外部コンテナーを構成する場合、Spring AMQP 名前空間を使用してコンテナーを定義することはできないことに注意してください。これは、名前空間に少なくとも 1 つの <listener/> 要素が必要なためです。この環境では、リスナーはアダプターの内部にあります。このため、次の例に示すように、通常の Spring <bean/> 定義を使用してコンテナーを定義する必要があります。

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
Spring Integration JMS と AMQP のサポートは似ていますが、重要な違いがあります。JMS 受信チャネルアダプターは、カバーで JmsDestinationPollingSource を使用しており、設定済みのポーラーを想定しています。AMQP 受信チャネルアダプターは AbstractMessageListenerContainer を使用し、メッセージ駆動型です。その点では、JMS メッセージ駆動型チャネルアダプターにより似ています。

バージョン 5.5 以降、AmqpInboundChannelAdapter は、再試行操作が内部で呼び出されたときに RecoveryCallback で使用される org.springframework.amqp.rabbit.retry.MessageRecoverer 戦略を使用して構成できます。詳細については、setMessageRecoverer() JavaDocs を参照してください。

バッチメッセージ

バッチメッセージの詳細については、Spring AMQP ドキュメントを参照してください。

Spring Integration でバッチメッセージを作成するには、発信エンドポイントを BatchingRabbitTemplate で構成するだけです。

バッチメッセージを受信すると、デフォルトで、リスナーコンテナーは各フラグメントメッセージを抽出し、アダプターは各フラグメントに対して Message<?> を生成します。バージョン 5.2 以降、コンテナーの deBatchingEnabled プロパティが false に設定されている場合、代わりにアダプターによってデバッチ処理が実行され、ペイロードがフラグメントペイロードのリストである単一の Message<List<?>> が生成されます(適切な場合は変換後)。

デフォルトの BatchingStrategy は SimpleBatchingStrategy ですが、これはアダプターでオーバーライドできます。

再試行操作でリカバリが必要な場合は、org.springframework.amqp.rabbit.retry.MessageBatchRecoverer をバッチで使用する必要があります。

ポーリングされた受信チャネルアダプター

概要

バージョン 5.0.1 では、ポーリングチャネルアダプターが導入され、MessageSourcePollingTemplate またはポーラーなどを使用して、個々のメッセージをオンデマンドでフェッチできます。詳細については、遅延確認応答可能なメッセージソースを参照してください。

現在、XML 構成はサポートされていません。

次の例は、AmqpMessageSource を構成する方法を示しています。

Java DSL
@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(Amqp.inboundPolledAdapter(connectionFactory(), DSL_QUEUE),
                    e -> e.poller(Pollers.fixedDelay(1_000)).autoStartup(false))
            .handle(p -> {
                ...
            })
            .get();
}
Java
@Bean
public AmqpMessageSource source(ConnectionFactory connectionFactory) {
    return new AmqpMessageSource(connectionFactory, "someQueue");
}

構成プロパティについては、Javadoc を参照してください。

XML
This adapter currently does not have XML configuration support.

バッチメッセージ

バッチメッセージを参照してください。

ポーリングされたアダプターの場合、リスナーコンテナーはなく、バッチメッセージは常にデバッチされます(BatchingStrategy がサポートしている場合)。

受信ゲートウェイ

受信ゲートウェイは、受信チャネルアダプターのすべての属性( "channel" が "request-channel" に置き換えられることを除く)、およびいくつかの追加属性をサポートします。次のリストは、使用可能な属性を示しています。

Java DSL
@Bean // return the upper cased payload
public IntegrationFlow amqpInboundGateway(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundGateway(connectionFactory, "foo"))
            .transform(String.class, String::toUpperCase)
            .get();
}
Java
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundGateway inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundGateway gateway = new AmqpInboundGateway(listenerContainer);
    gateway.setRequestChannel(channel);
    gateway.setDefaultReplyTo("bar");
    return gateway;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                    new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("foo");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new AbstractReplyProducingMessageHandler() {

        @Override
        protected Object handleRequestMessage(Message<?> requestMessage) {
            return "reply to " + requestMessage.getPayload();
        }

    };
}
XML
<int-amqp:inbound-gateway
                          id="inboundGateway"                (1)
                          request-channel="myRequestChannel" (2)
                          header-mapper=""                   (3)
                          mapped-request-headers=""          (4)
                          mapped-reply-headers=""            (5)
                          reply-channel="myReplyChannel"     (6)
                          reply-timeout="1000"               (7)
                          amqp-template=""                   (8)
                          default-reply-to="" />             (9)
1 このアダプターの固有 ID。オプション。
2 変換されたメッセージの送信先のメッセージチャネル。必須。
3AMQP メッセージを受信するときに使用する AmqpHeaderMapper への参照。オプション。デフォルトでは、標準の AMQP プロパティ(contentType など)のみが Spring Integration MessageHeaders との間でコピーされます。AMQP MessageProperties 内のユーザー定義ヘッダーは、デフォルト DefaultAmqpHeaderMapper によって AMQP メッセージとの間でコピーされません。"request-header-names" または "reply-header-names" が指定されている場合は許可されません。
4AMQP リクエストから MessageHeaders にマップされる AMQP ヘッダーの名前のコンマ区切りリスト。この属性は、"header-mapper" 参照が提供されていない場合にのみ提供できます。このリストの値は、ヘッダー名と照合する単純なパターンにすることもできます(例: "*" または "thing1*, thing2" または "*thing1")。
5AMQP 応答メッセージの AMQP メッセージプロパティにマッピングされる MessageHeaders の名前のカンマ区切りリスト。すべての標準ヘッダー(contentType など)は AMQP メッセージプロパティにマップされますが、ユーザー定義ヘッダーは 'headers' プロパティにマップされます。この属性は、"header-mapper" 参照が提供されていない場合にのみ提供できます。このリストの値は、ヘッダー名(たとえば、"*" または "foo*, bar" または "*foo" など)と照合する単純なパターンにすることもできます。
6 返信メッセージが期待されるメッセージチャネル。オプション。
7 応答チャネルからメッセージを受信するために、基礎となる o.s.i.core.MessagingTemplate に receiveTimeout を設定します。指定しない場合、このプロパティのデフォルトは 1000 (1 秒)です。応答が送信される前にコンテナースレッドが別のスレッドに渡される場合にのみ適用されます。
8 カスタマイズされた AmqpTemplate Bean 参照(送信する応答メッセージをさらに制御するため)。RabbitTemplate の代替実装を提供できます。
9replyTo  requestMessage に replyTo プロパティがない場合に使用される o.s.amqp.core.Address。このオプションが指定されていない場合、amqp-template は提供されず、replyTo プロパティはリクエストメッセージに存在せず、応答をルーティングできないために IllegalStateException がスローされます。このオプションが指定されておらず、外部 amqp-template が提供されている場合、例外はスローされません。リクエストメッセージに replyTo プロパティが存在しない場合が予想される場合は、このオプションを指定するか、そのテンプレートにデフォルトの exchange および routingKey を構成する必要があります。

listener-container 属性の構成については、受信チャネルアダプターの注を参照してください。

バージョン 5.5 以降、AmqpInboundChannelAdapter は、再試行操作が内部で呼び出されたときに RecoveryCallback で使用される org.springframework.amqp.rabbit.retry.MessageRecoverer 戦略を使用して構成できます。詳細については、setMessageRecoverer() JavaDocs を参照してください。

バッチメッセージ

バッチメッセージを参照してください。

受信エンドポイント確認モード

デフォルトでは、受信エンドポイントは AUTO 確認モードを使用します。これは、ダウンストリーム統合フローが完了すると(または、QueueChannel または ExecutorChannel を使用してメッセージが別のスレッドに渡されると)コンテナーがメッセージを自動的に確認することを意味します。モードを NONE に設定すると、確認応答がまったく使用されないようにコンシューマーが構成されます(ブローカーは、送信されるとすぐにメッセージを自動的に確認応答します)。モードを MANUAL に設定すると、ユーザーコードは処理中の他のポイントでメッセージを確認できます。これをサポートするために、このモードでは、エンドポイントはそれぞれ amqp_channel および amqp_deliveryTag ヘッダーで Channel および deliveryTag を提供します。

Channel で任意の有効な Rabbit コマンドを実行できますが、通常は basicAck と basicNack (または basicReject)のみが使用されます。コンテナーの操作を妨げないために、チャネルへの参照を保持せず、現在のメッセージのコンテキストでのみ使用する必要があります。

Channel は「ライブ」オブジェクトへの参照であるため、直列化できず、メッセージが永続化されると失われます。

次の例は、MANUAL 確認応答の使用方法を示しています。

@ServiceActivator(inputChannel = "foo", outputChannel = "bar")
public Object handle(@Payload String payload, @Header(AmqpHeaders.CHANNEL) Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws Exception {

    // Do some processing

    if (allOK) {
        channel.basicAck(deliveryTag, false);

        // perhaps do some more processing

    }
    else {
        channel.basicNack(deliveryTag, false, true);
    }
    return someResultForDownStreamProcessing;
}

送信エンドポイント

次の発信エンドポイントには、多くの同様の構成オプションがあります。バージョン 5.2 から、confirm-timeout が追加されました。通常、パブリッシャーが有効になっていることを確認すると、ブローカーはすぐに適切なチャネルに送信される ack(または nack)を返します。確認が受信される前にチャネルが閉じられると、Spring AMQP フレームワークはナックを合成します。「欠落」ACK は決して発生しませんが、このプロパティを設定すると、エンドポイントは定期的にチェックし、受信が確認されずに時間が経過すると、ACK を合成します。

送信チャネルアダプター

次の例は、AMQP 送信チャネルアダプターの使用可能なプロパティを示しています。

Java DSL
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate,
        MessageChannel amqpOutboundChannel) {
    return IntegrationFlow.from(amqpOutboundChannel)
            .handle(Amqp.outboundAdapter(amqpTemplate)
                        .routingKey("queue1")) // default exchange - route to queue 'queue1'
            .get();
}
Java
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setRoutingKey("queue1"); // default exchange - route to queue 'queue1'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}
XML
<int-amqp:outbound-channel-adapter id="outboundAmqp"             (1)
                               channel="outboundChannel"         (2)
                               amqp-template="myAmqpTemplate"    (3)
                               exchange-name=""                  (4)
                               exchange-name-expression=""       (5)
                               order="1"                         (6)
                               routing-key=""                    (7)
                               routing-key-expression=""         (8)
                               default-delivery-mode""           (9)
                               confirm-correlation-expression="" (10)
                               confirm-ack-channel=""            (11)
                               confirm-nack-channel=""           (12)
                               confirm-timeout=""                (13)
                               wait-for-confirm=""               (14)
                               return-channel=""                 (15)
                               error-message-strategy=""         (16)
                               header-mapper=""                  (17)
                               mapped-request-headers=""         (18)
                               lazy-connect="true"               (19)
                               multi-send="false"/>              (20)
1 このアダプターの一意の ID。オプション。
2 メッセージを AMQP 交換に変換して公開するために送信するメッセージチャネル。必須。
3 構成された AMQP テンプレートへの Bean 参照。オプション(デフォルトは amqpTemplate)。
4 メッセージの送信先の AMQP 交換の名前。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name-expression" と相互に排他的です。オプション。
5 メッセージがルートオブジェクトとして送信される AMQP 交換の名前を決定するために評価される SpEL 式。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name" と相互に排他的。オプション。
6 複数のコンシューマーが登録されている場合のこのコンシューマーの順序。これにより、負荷分散とフェイルオーバーが可能になります。オプション(デフォルトは Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7 メッセージを送信するときに使用する固定ルーティングキー。デフォルトでは、これは空の String です。"routing-key-expression" と相互に排他的です。オプション。
8 メッセージをルートオブジェクトとして使用してメッセージを送信するときに使用するルーティングキーを決定するために評価される SpEL 式( "payload.key" など)。デフォルトでは、これは空の String です。「ルーティングキー」と相互に排他的。オプション。
9 メッセージのデフォルト配信モード: PERSISTENT または NON_PERSISTENTheader-mapper が配信モードを設定するとオーバーライドされます。Spring Integration メッセージヘッダー amqp_deliveryMode が存在する場合、DefaultHeaderMapper は値を設定します。この属性が指定されておらず、ヘッダーマッパーが設定しない場合、デフォルトは RabbitTemplate によって使用される基になる Spring AMQP MessagePropertiesConverter に依存します。それがまったくカスタマイズされていない場合、デフォルトは PERSISTENT です。オプション。
10 相関データを定義する式。提供される場合、これは、パブリッシャーの確認を受信するように基盤となる AMQP テンプレートを構成します。専用の RabbitTemplate と、publisherConfirms プロパティが true に設定された CachingConnectionFactory が必要です。パブリッシャーの確認を受信して相関データを提供すると、確認の種類に応じて、confirm-ack-channel または confirm-nack-channel のいずれかに書き込まれます。確認のペイロードは、この式で定義されている相関データです。メッセージには、'amqp_publishConfirm' ヘッダーが true (ack)または false (nack)に設定されています。例: headers['myCorrelationData'] および payload。バージョン 4.1 では、amqp_publishConfirmNackCause メッセージヘッダーが導入されました。これには、発行者の確認用の「ナック」の cause が含まれています。バージョン 4.2 以降、式が Message<?> インスタンス(#this など)に解決される場合、ack/nack チャネルで発行されるメッセージはそのメッセージに基づいており、追加のヘッダーが追加されています。以前は、型に関係なく、ペイロードとして相関データを使用して新しいメッセージが作成されていました。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。
11 ポジティブ(ack)パブリッシャーが確認するチャネルが送信されます。ペイロードは、confirm-correlation-expression によって定義された相関データです。式が #root または #this の場合、メッセージは元のメッセージから構築され、amqp_publishConfirm ヘッダーは true に設定されます。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel)。
12 否定的な(nack)パブリッシャー確認の送信先のチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです(ErrorMessageStrategy が構成されていない場合)。式が #root または #this の場合、メッセージは元のメッセージから構築され、amqp_publishConfirm ヘッダーは false に設定されます。ErrorMessageStrategy がある場合、メッセージは NackedAmqpMessageException ペイロードを持つ ErrorMessage です。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel)。
13 設定すると、この時間内にパブリッシャー確認が受信されない場合、アダプターは否定応答(nack)を合成します(ミリ秒単位)。保留中の確認はこの値の 50% ごとにチェックされるため、nack が送信される実際の時間は 1x と 1.5x の間になります。パブリッシャーの確認と return の代替メカニズムも参照してください。デフォルトはなし(nack は生成されません)。
14true に設定すると、呼び出しスレッドはブロックされ、パブリッシャーの確認を待ちます。これには、確認用に構成された RabbitTemplate と confirm-correlation-expression が必要です。スレッドは最大 confirm-timeout (デフォルトでは 5 秒) までブロックされます。タイムアウトが発生すると、MessageTimeoutException がスローされます。return が有効で、メッセージが返された場合、確認を待っている間にその他の例外が発生した場合は、適切なメッセージとともに MessageHandlingException がスローされます。
15 返されたメッセージが送信されるチャネル。指定すると、基になる AMQP テンプレートは、配信不能メッセージをアダプターに返すように構成されます。ErrorMessageStrategy が構成されていない場合、メッセージは AMQP から受信したデータと次の追加ヘッダー amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey から構成されます。ErrorMessageStrategy がある場合、メッセージは ReturnedAmqpMessageException ペイロードを持つ ErrorMessage です。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。
16 返されたメッセージまたは否定応答メッセージを送信するときに ErrorMessage インスタンスを構築するために使用される ErrorMessageStrategy 実装への参照。
17AMQP メッセージを送信するときに使用する AmqpHeaderMapper への参照。デフォルトでは、標準の AMQP プロパティ(contentType など)のみが Spring Integration MessageHeaders にコピーされます。デフォルトの `DefaultAmqpHeaderMapper` では、ユーザー定義のヘッダーはメッセージにコピーされません。'request-header-names' が指定されている場合は許可されません。オプション。
18MessageHeaders から AMQP メッセージにマッピングされる AMQP ヘッダーの名前のコンマ区切りリスト。"header-mapper" 参照が提供されている場合は許可されません。このリストの値は、ヘッダー名と照合する単純なパターンにすることもできます(例: "*" または "thing1*, thing2" または "*thing1")。
19false に設定すると、エンドポイントはアプリケーションコンテキストの初期化中にブローカーへの接続を試行します。これにより、不良構成の「フェイルファースト」検出が可能になりますが、ブローカーがダウンしていると初期化が失敗します。true (デフォルト)の場合、最初のメッセージが送信されたときに接続が確立されます(他のコンポーネントが接続を確立したためにすでに存在しない場合)。
20true に設定すると、型 Iterable<Message<?>> のペイロードは、単一の RabbitTemplate 呼び出しのスコープ内の同じチャネルで個別のメッセージとして送信されます。RabbitTemplate が必要です。wait-for-confirms が true の場合、メッセージが送信された後に RabbitTemplate.waitForConfirmsOrDie() が呼び出されます。トランザクションテンプレートでは、送信は新しいトランザクションまたはすでに開始されているトランザクション(存在する場合)で実行されます。
return-channel

return-channel を使用するには、mandatory プロパティが true に設定された RabbitTemplate と、publisherReturns プロパティが true に設定された CachingConnectionFactory が必要です。リターンで複数の送信エンドポイントを使用する場合、エンドポイントごとに個別の RabbitTemplate が必要です。

送信ゲートウェイ

次のリストは、AMQP 送信ゲートウェイの可能なプロパティを示しています。

Java DSL
@Bean
public IntegrationFlow amqpOutbound(AmqpTemplate amqpTemplate) {
    return f -> f.handle(Amqp.outboundGateway(amqpTemplate)
                    .routingKey("foo")) // default exchange - route to queue 'foo'
            .get();
}

@MessagingGateway(defaultRequestChannel = "amqpOutbound.input")
public interface MyGateway {

    String sendToRabbit(String data);

}
Java
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AmqpOutboundEndpoint amqpOutbound(AmqpTemplate amqpTemplate) {
    AmqpOutboundEndpoint outbound = new AmqpOutboundEndpoint(amqpTemplate);
    outbound.setExpectReply(true);
    outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
    return outbound;
}

@Bean
public MessageChannel amqpOutboundChannel() {
    return new DirectChannel();
}

@MessagingGateway(defaultRequestChannel = "amqpOutboundChannel")
public interface MyGateway {

    String sendToRabbit(String data);

}
XML
<int-amqp:outbound-gateway id="outboundGateway"               (1)
                           request-channel="myRequestChannel" (2)
                           amqp-template=""                   (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           error-message-strategy=""          (18)
                           lazy-connect="true" />             (19)
1 このアダプターの一意の ID。オプション。
2 メッセージを AMQP 交換に変換して公開するために送信されるメッセージチャネル。必須。
3 構成された AMQP テンプレートへの Bean 参照。オプション(デフォルトは amqpTemplate)。
4 メッセージの送信先となる AMQP 交換の名前。指定しない場合、メッセージはデフォルトの名前なしの cxchange に送信されます。"exchange-name-expression" と相互に排他的です。オプション。
5 メッセージをルートオブジェクトとして、メッセージの送信先 AMQP 交換の名前を決定するために評価される SpEL 式。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name" と相互に排他的。オプション。
6 複数のコンシューマーが登録されている場合のこのコンシューマーの順序。これにより、負荷分散とフェイルオーバーが可能になります。オプション(デフォルトは Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7AMQP キューから受信して変換した後、応答が送信されるメッセージチャネル。オプション。
8reply-channel に応答メッセージを送信するときにゲートウェイが待機する時間。これは、reply-channel がブロックできる場合にのみ適用されます。たとえば、容量制限が現在いっぱいの QueueChannel などです。デフォルトは無限大です。
9true の場合、AmqpTemplate’s `replyTimeout プロパティ内で応答メッセージが受信されない場合、ゲートウェイは例外をスローします。デフォルトは true です。
10 メッセージを送信するときに使用する routing-key。デフォルトでは、これは空の String です。"routing-key-expression" と相互に排他的です。オプション。
11 メッセージをルートオブジェクトとして使用してメッセージを送信するときに使用する routing-key を決定するために評価される SpEL 式(たとえば、"payload.key" )。デフォルトでは、これは空の String です。「ルーティングキー」と相互に排他的。オプション。
12 メッセージのデフォルト配信モード: PERSISTENT または NON_PERSISTENTheader-mapper が配信モードを設定するとオーバーライドされます。Spring Integration メッセージヘッダー amqp_deliveryMode が存在する場合、DefaultHeaderMapper は値を設定します。この属性が指定されておらず、ヘッダーマッパーが設定しない場合、デフォルトは RabbitTemplate によって使用される基になる Spring AMQP MessagePropertiesConverter に依存します。それがまったくカスタマイズされていない場合、デフォルトは PERSISTENT です。オプション。
13 バージョン 4.2 以降。相関データを定義する式。提供される場合、これは、発行者の確認を受信するように基盤となる AMQP テンプレートを構成します。専用の RabbitTemplate と、publisherConfirms プロパティが true に設定された CachingConnectionFactory が必要です。パブリッシャーの確認が受信され、相関データが提供されると、確認の型に応じて、confirm-ack-channel または confirm-nack-channel のいずれかに書き込まれます。確認のペイロードは、この式で定義されている相関データです。メッセージのヘッダー 'amqp_publishConfirm' は true (ack)または false (nack)に設定されています。nack 確認の場合、Spring Integration は追加のヘッダー amqp_publishConfirmNackCause を提供します。例: headers['myCorrelationData'] および payload。式が Message<?> インスタンス(#this など)に解決される場合、ack/nack チャネルで発行されるメッセージは、追加のヘッダーが追加されたそのメッセージに基づいています。以前は、型に関係なく、ペイロードとして相関データを使用して新しいメッセージが作成されていました。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。
14 肯定的な(ack)パブリッシャー確認が送信されるチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです。式が #root または #this の場合、メッセージは元のメッセージから構築され、amqp_publishConfirm ヘッダーは true に設定されます。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel)。
15 否定的な(nack)パブリッシャー確認の送信先のチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです(ErrorMessageStrategy が構成されていない場合)。式が #root または #this の場合、メッセージは元のメッセージから構築され、amqp_publishConfirm ヘッダーは false に設定されます。ErrorMessageStrategy がある場合、メッセージは NackedAmqpMessageException ペイロードを持つ ErrorMessage です。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel)。
16 設定すると、パブリッシャーがミリ秒単位のこの時間内に確認を受信しない場合、ゲートウェイは否定応答(nack)を合成します。保留確認は、この値の 50% ごとにチェックされるため、ナックが送信される実際の時間は、この値 1x と 1.5x の間になります。デフォルトなし(ナックは生成されません)。
17 返されたメッセージが送信されるチャネル。指定すると、基になる AMQP テンプレートは、配信不能メッセージをアダプターに返すように構成されます。ErrorMessageStrategy が構成されていない場合、メッセージは AMQP から受信したデータと次の追加ヘッダー amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey から構成されます。ErrorMessageStrategy がある場合、メッセージは ReturnedAmqpMessageException ペイロードを持つ ErrorMessage です。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。
18 返されたメッセージまたは否定応答メッセージを送信するときに ErrorMessage インスタンスを構築するために使用される ErrorMessageStrategy 実装への参照。
19false に設定すると、エンドポイントはアプリケーションコンテキストの初期化中にブローカーへの接続を試行します。これにより、ブローカがダウンしている場合にエラーメッセージを記録することにより、不正な構成の「フェイルファースト」検出が可能になります。true (デフォルト)の場合、最初のメッセージが送信されたときに接続が確立されます(他のコンポーネントが接続を確立したためにすでに存在しない場合)。
return-channel

return-channel を使用するには、mandatory プロパティが true に設定された RabbitTemplate と、publisherReturns プロパティが true に設定された CachingConnectionFactory が必要です。リターンで複数の送信エンドポイントを使用する場合、エンドポイントごとに個別の RabbitTemplate が必要です。

基礎となる AmqpTemplate のデフォルトの replyTimeout は 5 秒です。より長いタイムアウトが必要な場合は、template で設定する必要があります。

送信アダプターと送信ゲートウェイの構成の唯一の違いは、expectReply プロパティの設定であることに注意してください。

非同期送信ゲートウェイ

前のセクションで説明したゲートウェイは同期的であり、応答を受信する(またはタイムアウトが発生する)まで送信スレッドが中断されます。Spring Integration バージョン 4.3 は、Spring AMQP からの AsyncRabbitTemplate を使用する非同期ゲートウェイを追加しました。メッセージが送信されると、スレッドは送信操作が完了した直後に戻り、メッセージが受信されると、テンプレートのリスナーコンテナースレッドで応答が送信されます。これは、ゲートウェイがポーラースレッドで呼び出されるときに便利です。スレッドがリリースされ、フレームワーク内の他のタスクで使用できます。

次のリストは、AMQP 非同期送信ゲートウェイの可能な構成オプションを示しています。

Java DSL
@Configuration
public class AmqpAsyncApplication {

    @Bean
    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
        return f -> f
                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
                        .routingKey("queue1")); // default exchange - route to queue 'queue1'
    }

    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}
Java
@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {

        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}
XML
<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    (1)
                           request-channel="myRequestChannel" (2)
                           async-template=""                  (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           lazy-connect="true" />             (18)
1 このアダプターの一意の ID。オプション。
2 メッセージを AMQP 交換に変換して公開するために送信するメッセージチャネル。必須。
3 構成された AsyncRabbitTemplate への Bean 参照。オプション(デフォルトは asyncRabbitTemplate)。
4 メッセージの送信先となる AMQP 交換の名前。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name-expression" と相互に排他的です。オプション。
5 メッセージがルートオブジェクトとして送信される AMQP 交換の名前を決定するために評価される SpEL 式。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name" と相互に排他的。オプション。
6 複数のコンシューマーが登録されている場合のこのコンシューマーの順序。これにより、負荷分散とフェイルオーバーが可能になります。オプション(デフォルトは Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7AMQP キューから受信して変換した後、応答が送信されるメッセージチャネル。オプション。
8reply-channel に応答メッセージを送信するときにゲートウェイが待機する時間。これは、reply-channel がブロックできる場合にのみ適用されます。たとえば、容量制限が現在いっぱいの QueueChannel などです。デフォルトは無限です。
9AsyncRabbitTemplate’s `receiveTimeout プロパティ内で応答メッセージが受信されず、この設定が true の場合、ゲートウェイは受信メッセージの errorChannel ヘッダーにエラーメッセージを送信します。AsyncRabbitTemplate’s `receiveTimeout プロパティ内で応答メッセージが受信されず、この設定が false である場合、ゲートウェイはデフォルトの errorChannel (利用可能な場合)にエラーメッセージを送信します。デフォルトは true です。
10 メッセージを送信するときに使用するルーティングキー。デフォルトでは、これは空の String です。"routing-key-expression" と相互に排他的です。オプション。
11 メッセージをルートオブジェクトとして使用してメッセージを送信するときに使用するルーティングキーを決定するために評価される SpEL 式(たとえば、"payload.key" )。デフォルトでは、これは空の String です。「ルーティングキー」と相互に排他的。オプション。
12 メッセージのデフォルト配信モード: PERSISTENT または NON_PERSISTENTheader-mapper が配信モードを設定するとオーバーライドされます。Spring Integration メッセージヘッダー(amqp_deliveryMode)が存在する場合、DefaultHeaderMapper は値を設定します。この属性が指定されておらず、ヘッダーマッパーが設定しない場合、デフォルトは RabbitTemplate によって使用される基になる Spring AMQP MessagePropertiesConverter に依存します。カスタマイズされていない場合、デフォルトは PERSISTENT です。オプション。
13 相関データを定義する式。提供される場合、これは、パブリッシャーの確認を受信するように基盤となる AMQP テンプレートを構成します。専用の RabbitTemplate と、publisherConfirms プロパティが true に設定された CachingConnectionFactory が必要です。パブリッシャーの確認を受け取り、相関データが提供されると、確認の型に応じて、確認が confirm-ack-channel または confirm-nack-channel のいずれかに書き込まれます。確認のペイロードは、この式で定義された相関データであり、メッセージの "amqp_publishConfirm" ヘッダーは true (ack)または false (nack)に設定されています。nack インスタンスの場合、追加のヘッダー(amqp_publishConfirmNackCause)が提供されます。例: headers['myCorrelationData']payload。式が Message<?> インスタンス( "#this" など)に解決される場合、ack/nack チャネルで発行されるメッセージは、追加のヘッダーが追加されたそのメッセージに基づいています。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。
14 肯定的な(ack)パブリッシャー確認が送信されるチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです。基になる AsyncRabbitTemplate の enableConfirms プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel)。
15 バージョン 4.2 以降。否定的な(nack)パブリッシャー確認の送信先のチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです。基になる AsyncRabbitTemplate の enableConfirms プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel)。
16 設定すると、この時間内にパブリッシャー確認が受信されない場合、ゲートウェイは否定応答(nack)を合成します(ミリ秒単位)。保留中の確認はこの値の 50% ごとにチェックされるため、nack が送信される実際の時間は 1x と 1.5x の間になります。パブリッシャーの確認と return の代替メカニズムも参照してください。デフォルトはなし(nack は生成されません)。
17 返されたメッセージが送信されるチャネル。提供されると、基盤となる AMQP テンプレートは、配信不能メッセージをゲートウェイに返すように構成されます。メッセージは、AMQP から受信したデータと、次の追加ヘッダーで構成されます: amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey。基になる AsyncRabbitTemplate で、mandatory プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。
18false に設定すると、エンドポイントはアプリケーションコンテキストの初期化中にブローカーへの接続を試行します。これにより、ブローカがダウンしている場合にエラーメッセージを記録することにより、不正な構成の「フェイルファースト」検出が可能になります。true (デフォルト)の場合、最初のメッセージが送信されたときに接続が確立されます(他のコンポーネントが接続を確立したためにすでに存在しない場合)。

詳細については、非同期サービスアクティベーターも参照してください。

RabbitTemplate

確認・返却をご利用の際は、AsyncRabbitTemplate に接続する RabbitTemplate を専用にすることをお勧めします。そうしないと、予期しない副作用が発生する可能性があります。

パブリッシャーの確認と return の代替メカニズム

接続ファクトリがパブリッシャーの確認と戻りに対して構成されている場合、上記のセクションでは、確認を受け取り非同期で返すメッセージチャネルの構成について説明します。バージョン 5.4 以降、一般的に使いやすいメカニズムが追加されています。

この場合、confirm-correlation-expression または確認チャネルと戻りチャネルを構成しないでください。代わりに、AmqpHeaders.PUBLISH_CONFIRM_CORRELATION ヘッダーに CorrelationData インスタンスを追加します。その後、メッセージを送信した CorrelationData インスタンスの将来の状態を確認することにより、後で結果を待つことができます。returnedMessage フィールドは、未来が完了する前に(メッセージが返された場合)常に入力されます。

CorrelationData corr = new CorrelationData("someId"); // <--- Unique "id" is required for returns
someFlow.getInputChannel().send(MessageBuilder.withPayload("test")
        .setHeader("rk", "someKeyThatWontRoute")
        .setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
        .build());
...
try {
    Confirm Confirm = corr.getFuture().get(10, TimeUnit.SECONDS);
    Message returned = corr.getReturnedMessage();
    if (returned !- null) {
        // message could not be routed
    }
}
catch { ... }

パフォーマンスを向上させるために、一度に 1 つずつではなく、複数のメッセージを送信して後で確認を待つことをお勧めします。返されるメッセージは、変換後の生のメッセージです。必要な追加データを使用して CorrelationData をサブクラス化できます。

受信メッセージの変換

チャネルアダプターまたはゲートウェイに到着した受信メッセージは、メッセージコンバーターを使用して spring-messagingMessage<?> ペイロードに変換されます。デフォルトでは、java シリアライゼーションとテキストを処理する SimpleMessageConverter が使用されます。ヘッダーは、デフォルトで DefaultHeaderMapper.inboundMapper() を使用してマップされます。変換エラーが発生し、エラーチャネルが定義されていない場合、例外がコンテナーにスローされ、リスナーコンテナーのエラーハンドラーによって処理されます。デフォルトのエラーハンドラーは変換エラーを致命的なものとして扱い、メッセージは拒否されます (キューがそのように構成されている場合は、配信不能交換にルーティングされます)。エラーチャネルが定義されている場合、ErrorMessage ペイロードは、プロパティ failedMessage (変換できなかった Spring AMQP メッセージ) および cause を持つ ListenerExecutionFailedException です。コンテナー AcknowledgeMode が AUTO (デフォルト) であり、エラーフローが例外をスローせずにエラーを消費する場合、元のメッセージが確認されます。エラーフローが例外をスローする場合、コンテナーのエラーハンドラーと組み合わせた例外の種類によって、メッセージが再度キューに入れられるかどうかが決まります。コンテナーが AcknowledgeMode.MANUAL で構成されている場合、ペイロードは追加のプロパティ channel および deliveryTag を持つ ManualAckListenerExecutionFailedException です。これにより、エラーフローがメッセージの basicAck または basicNack (または basicReject) を呼び出して、メッセージの処理を制御できるようになります。

送信メッセージの変換

Spring AMQP 1.4 は ContentTypeDelegatingMessageConverter を導入しましたが、実際のコンバーターは受信コンテンツ型メッセージプロパティに基づいて選択されます。これは、受信エンドポイントで使用できます。

Spring Integration バージョン 4.3 では、使用するコンバーターを指定する contentType ヘッダーを使用して、送信エンドポイントでも ContentTypeDelegatingMessageConverter を使用できます。

次の例では、ContentTypeDelegatingMessageConverter を設定します。デフォルトのコンバーターは SimpleMessageConverter (Java 直列化とプレーンテキストを処理します)で、JSON コンバーターを使用します。

<amqp:outbound-channel-adapter id="withContentTypeConverter" channel="ctRequestChannel"
                               exchange-name="someExchange"
                               routing-key="someKey"
                               amqp-template="amqpTemplateContentTypeConverter" />

<int:channel id="ctRequestChannel"/>

<rabbit:template id="amqpTemplateContentTypeConverter"
        connection-factory="connectionFactory" message-converter="ctConverter" />

<bean id="ctConverter"
        class="o.s.amqp.support.converter.ContentTypeDelegatingMessageConverter">
    <property name="delegates">
        <map>
            <entry key="application/json">
                <bean class="o.s.amqp.support.converter.Jackson2JsonMessageConverter" />
            </entry>
        </map>
    </property>
</bean>

contentType ヘッダーを application/json に設定してメッセージを ctRequestChannel に送信すると、JSON コンバーターが選択されます。

これは、発信チャネルアダプターとゲートウェイの両方に適用されます。

バージョン 5.0 以降、発信メッセージの MessageProperties に追加されるヘッダーは、マップされたヘッダーによって上書きされることはありません(デフォルト)。以前は、これはメッセージコンバーターが ContentTypeDelegatingMessageConverter の場合のみでした(その場合、適切なコンバーターを選択できるようにヘッダーが最初にマップされました)。SimpleMessageConverter などの他のコンバーターの場合、マップされたヘッダーは、コンバーターによって追加されたヘッダーを上書きしました。これにより、送信メッセージに contentType ヘッダーが残っていて(おそらく受信チャネルアダプターから)、正しい送信 contentType が誤って上書きされたときに問題が発生しました。回避策は、メッセージを送信エンドポイントに送信する前にヘッダーフィルターを使用してヘッダーを削除することでした。

ただし、以前の動作が必要な場合があります。たとえば、JSON を含む String ペイロードの場合、SimpleMessageConverter はコンテンツを認識せず、contentType メッセージプロパティを text/plain に設定しますが、アプリケーションはそれを application/json にオーバーライドしたいと考えています。送信エンドポイントに送信されるメッセージの contentType ヘッダーを設定します。ObjectToJsonTransformer はまさにそれを行います(デフォルト)。

送信チャネルアダプターとゲートウェイ(および AMQP でバックアップされたチャネル)に headersMappedLast というプロパティが追加されました。これを true に設定すると、コンバーターによって追加されたプロパティを上書きする動作が復元されます。

バージョン 5.1.9 以降、同様の replyHeadersMappedLast が AmqpInboundGateway に対して提供されます。これは、応答を生成し、コンバーターによって入力されたヘッダーをオーバーライドする場合に使用されます。詳細については、JavaDocs を参照してください。

発信ユーザー ID

Spring AMQP バージョン 1.6 は、送信メッセージのデフォルトユーザー ID を指定できるメカニズムを導入しました。AmqpHeaders.USER_ID ヘッダーを設定することは常に可能であり、現在ではデフォルトよりも優先されます。これは、メッセージの受信者に役立つ場合があります。受信メッセージの場合、メッセージ発行者がプロパティを設定すると、AmqpHeaders.RECEIVED_USER_ID ヘッダーで使用可能になります。RabbitMQ は、ユーザー ID が接続の実際のユーザー ID であるか、接続で偽装が許可されていることを検証すること (英語) に注意してください。

送信メッセージのデフォルトユーザー ID を構成するには、RabbitTemplate でそれを構成し、そのテンプレートを使用するように送信アダプターまたはゲートウェイを構成します。同様に、返信にユーザー ID プロパティを設定するには、適切に構成されたテンプレートを受信ゲートウェイに挿入します。詳細については、Spring AMQP ドキュメントを参照してください。

遅延メッセージ交換

Spring AMQP は RabbitMQ 遅延メッセージ交換プラグインをサポートします。受信メッセージの場合、x-delay ヘッダーは AmqpHeaders.RECEIVED_DELAY ヘッダーにマップされます。AMQPHeaders.DELAY ヘッダーを設定すると、対応する x-delay ヘッダーが送信メッセージに設定されます。発信エンドポイントで delay および delayExpression プロパティを指定することもできます(XML 構成を使用する場合は delay-expression)。これらのプロパティは、AmqpHeaders.DELAY ヘッダーよりも優先されます。

AMQP-backed メッセージチャネル

2 つのメッセージチャネル実装が利用可能です。1 つはポイントツーポイントで、もう 1 つはパブリッシュ / サブスクライブです。これらのチャネルは両方とも、基礎となる AmqpTemplate および SimpleMessageListenerContainer の幅広い構成属性を提供します(この章でチャネルアダプターとゲートウェイについて前述したとおり)。ただし、ここで示す例の構成は最小限です。XML スキーマを調べて、使用可能な属性を表示します。

ポイントツーポイントチャネルは、次の例のようになります。

<int-amqp:channel id="p2pChannel"/>

上記の例では、カバーで si.p2pChannel という名前の Queue が宣言され、このチャネルは(技術的には、この Queue の名前と一致するルーティングキーを使用して、無名直接交換に送信することにより) Queue に送信します。このチャネルは、Queue のコンシューマーも登録します。チャネルをメッセージ駆動型ではなく「ポーリング可能」にする場合は、次の例に示すように、message-driven フラグに false の値を指定します。

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

パブリッシュ / サブスクライブチャネルは次のようになります。

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

上記の例では、カバーで、si.fanout.pubSubChannel という名前のファンアウト交換が宣言され、このチャネルはそのファンアウト交換に送信します。このチャネルは、サーバー名の排他的、自動削除、非永続 Queue も宣言し、メッセージを受信するためにその Queue にコンシューマーを登録している間、それをファンアウト交換にバインドします。publish-subscribe-channel には「ポーリング可能な」オプションはありません。メッセージ駆動型でなければなりません。

バージョン 4.1 以降、AMQP-backed メッセージチャネルは(channel-transacted と組み合わせて) template-channel-transacted をサポートし、AbstractMessageListenerContainer と RabbitTemplate の transactional 構成を分離します。以前は、channel-transacted はデフォルトで true でした。現在、デフォルトでは、AbstractMessageListenerContainer の場合は false です。

バージョン 4.3 より前は、AMQP-backed チャネルは Serializable ペイロードとヘッダーを持つメッセージのみをサポートしていました。メッセージ全体が変換(直列化)され、RabbitMQ に送信されました。これで、extract-payload 属性(または Java 構成を使用する場合は setExtractPayload())を true に設定できます。このフラグが true の場合、チャネルアダプターを使用する場合と同様の方法で、メッセージペイロードが変換され、ヘッダーがマップされます。この配置により、AMQP でバックアップされたチャネルを、直列化できないペイロード(おそらく、Jackson2JsonMessageConverter などの別のメッセージコンバーター)で使用できます。デフォルトのマッピングされたヘッダーの詳細については、AMQP メッセージヘッダーを参照してください。outbound-header-mapper および inbound-header-mapper 属性を使用するカスタムマッパーを提供することにより、マッピングを変更できます。default-delivery-mode も指定できるようになりました。これは、amqp_deliveryMode ヘッダーがない場合に配信モードを設定するために使用されます。デフォルトでは、Spring AMQP MessageProperties は PERSISTENT 配信モードを使用します。

他の永続化に裏付けられたチャネルと同様に、AMQP に裏付けられたチャネルは、メッセージの永続化を提供してメッセージの損失を回避することを目的としています。他のピアアプリケーションに作業を分配することを目的としていません。そのためには、代わりにチャネルアダプターを使用してください。
バージョン 5.0 以降、ポーリング可能チャネルは、指定された receiveTimeout のポーラースレッドをブロックするようになりました(デフォルトは 1 秒です)。以前は、他の PollableChannel 実装とは異なり、受信タイムアウトに関係なく、メッセージが利用できない場合、スレッドはすぐにスケジューラーに戻りました。各メッセージを受信するにはコンシューマーを作成する必要があるため、ブロッキングは basicGet() を使用してメッセージを取得する(タイムアウトなし)よりも少し高負荷です。以前の動作を復元するには、ポーラーの receiveTimeout を 0 に設定します。

Java 構成を使用した構成

次の例は、Java 構成でチャネルを構成する方法を示しています。

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

Java DSL を使用した構成

次の例は、Java DSL でチャネルを構成する方法を示しています。

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}

AMQP メッセージヘッダー

概要

Spring Integration AMQP アダプターは、すべての AMQP プロパティとヘッダーを自動的にマッピングします。(これは 4.3 からの変更です - 以前は、標準ヘッダーのみがマップされていました)。デフォルトでは、これらのプロパティは、DefaultAmqpHeaderMapper (Javadoc) を使用して Spring Integration MessageHeaders との間でコピーされます。

アダプターには、それをサポートするプロパティがあるため、AMQP 固有のヘッダーマッパーの独自の実装を渡すことができます。

DefaultAmqpHeaderMapper の requestHeaderNames または replyHeaderNames プロパティによって明示的に否定されない限り、AMQP MessageProperties (Javadoc) 内のユーザー定義ヘッダーは AMQP メッセージとの間でコピーされます。デフォルトでは、送信マッパーの場合、x-* ヘッダーはマップされません。理由については、このセクションで後述する注意を参照してください

デフォルトを上書きして 4.3 より前の動作に戻すには、プロパティで STANDARD_REQUEST_HEADERS および STANDARD_REPLY_HEADERS を使用します。

ユーザー定義のヘッダーをマッピングする場合、値には、照合する単純なワイルドカードパターン(thing* や *thing など)も含めることができます。* はすべてのヘッダーに一致します。

バージョン 4.1 から、AbstractHeaderMapper (DefaultAmqpHeaderMapper スーパークラス)により、NON_STANDARD_HEADERS トークンを(既存の STANDARD_REQUEST_HEADERS および STANDARD_REPLY_HEADERS に加えて) requestHeaderNames および replyHeaderNames プロパティ用に構成して、すべてのユーザー定義ヘッダーをマップできます。

org.springframework.amqp.support.AmqpHeaders クラスは、DefaultAmqpHeaderMapper によって使用されるデフォルトヘッダーを識別します。

  • amqp_appId

  • amqp_clusterId

  • amqp_contentEncoding

  • amqp_contentLength

  • content-type (contentType ヘッダーを参照してください)

  • amqp_correlationId

  • amqp_delay

  • amqp_deliveryMode

  • amqp_deliveryTag

  • amqp_expiration

  • amqp_messageCount

  • amqp_messageId

  • amqp_receivedDelay

  • amqp_receivedDeliveryMode

  • amqp_receivedExchange

  • amqp_receivedRoutingKey

  • amqp_redelivered

  • amqp_replyTo

  • amqp_timestamp

  • amqp_type

  • amqp_userId

  • amqp_publishConfirm

  • amqp_publishConfirmNackCause

  • amqp_returnReplyCode

  • amqp_returnReplyText

  • amqp_returnExchange

  • amqp_returnRoutingKey

  • amqp_channel

  • amqp_consumerTag

  • amqp_consumerQueue

このセクションで前述したように、すべてのヘッダーをコピーする一般的な方法は、* のヘッダーマッピングパターンを使用することです。ただし、特定の RabbitMQ 独自のプロパティ / ヘッダーもコピーされるため、予期しない副作用が生じる可能性があります。例: federation (英語) を使用する場合、受信したメッセージには、メッセージを送信したノードを含む x-received-from という名前のプロパティがある場合があります。ワイルドカード文字 * を受信ゲートウェイのリクエストおよびリプライヘッダーマッピングに使用すると、このヘッダーがコピーされ、フェデレーションで問題が発生する可能性があります。この応答メッセージは送信ブローカーにフェデレートされ、メッセージがループしていると見なされ、結果としてメッセージを表示せずにドロップする場合があります。ワイルドカードヘッダーマッピングの便利な機能を使用する場合は、ダウンストリームフローの一部のヘッダーを除外する必要がある場合があります。例: x-received-from ヘッダーを返信にコピーしないようにするために、AMQP 受信ゲートウェイに返信を送信する前に <int:header-filter …​ header-names="x-received-from"> を使用できます。または、ワイルドカードを使用する代わりに、実際にマップしたいプロパティを明示的にリストすることもできます。これらの理由により、受信メッセージの場合、マッパーは(デフォルトで) x-* ヘッダーをマップしません。また、受信メッセージから送信メッセージへのヘッダーの伝搬を回避するために、deliveryMode を amqp_deliveryMode ヘッダーにマップしません。代わりに、このヘッダーは amqp_receivedDeliveryMode にマップされますが、出力にはマップされません。

バージョン 4.3 以降、パターンの前に ! を付けることで、ヘッダーマッピングのパターンを無効にすることができます。否定パターンが優先されるため、STANDARD_REQUEST_HEADERS,thing1,ba*,!thing2,!thing3,qux,!thing1 などのリストは thing1 をマップしません(thing2 も thing3 もマップしません)。標準ヘッダーと bad および qux がマッピングされます。たとえば、受信側のダウンストリームで JSON 逆直列化ロジックが異なる方法で JSON 逆直列化ロジックが実行されたときに、受信メッセージの JSON 型ヘッダーをマップしないようにするために、否定手法は役立ちます。この目的のために、受信チャネルアダプター / ゲートウェイのヘッダーマッパーに !json_* パターンを構成する必要があります。

マッピングしたい ! で始まるユーザー定義ヘッダーがある場合、次のように \ でエスケープする必要があります: STANDARD_REQUEST_HEADERS,\!myBangHeader!myBangHeader という名前のヘッダーがマップされました。
バージョン 5.1 以降、対応する amqp_messageId または amqp_timestamp ヘッダーが送信メッセージに存在しない場合、DefaultAmqpHeaderMapper は、それぞれ MessageHeaders.ID および MessageHeaders.TIMESTAMP から MessageProperties.messageId および MessageProperties.timestamp へのマッピングにフォールバックします。受信プロパティは、以前と同様に amqp_* ヘッダーにマップされます。メッセージコンシューマーがステートフルリトライを使用している場合は、messageId プロパティを設定すると便利です。

contentType ヘッダー

他のヘッダーとは異なり、AmqpHeaders.CONTENT_TYPE の先頭には amqp_ が付きません。これにより、異なるテクノロジー間で contentType ヘッダーを透過的に渡すことができます。たとえば、RabbitMQ キューに送信される受信 HTTP メッセージ。

contentType ヘッダーは Spring AMQP の MessageProperties.contentType プロパティにマップされ、その後 RabbitMQ の content_type プロパティにマップされます。

バージョン 5.1 より前のバージョンでは、このヘッダーも MessageProperties.headers マップのエントリとしてマップされていました。これは誤りであり、さらに、基になる Spring AMQP メッセージコンバーターがコンテンツ型を変更した可能性があるため、値が誤っている可能性があります。このような変更は、ファーストクラスの content_type プロパティに反映されますが、RabbitMQ ヘッダーマップには反映されません。受信マッピングでヘッダーマップ値が無視されました。contentType は、ヘッダーマップのエントリにマップされなくなりました。

厳密なメッセージ順序

このセクションでは、受信および送信メッセージのメッセージ順序について説明します。

受信

受信メッセージの厳密な順序付けが必要な場合は、受信リスナーコンテナーの prefetchCount プロパティを 1 に設定する必要があります。これは、メッセージが失敗して再配信された場合、既存のプリフェッチされたメッセージの後に到着するためです。Spring AMQP バージョン 2.0 以降、prefetchCount はパフォーマンスを向上させるために 250 にデフォルト設定されています。厳密なオーダー要件には、パフォーマンスの低下という代償が伴います。

送信

次の統合フローを検討してください。

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .split(s -> s.delimiters(","))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

メッセージ ABC をゲートウェイに送信するとします。メッセージ ABC が順番に送信される可能性は高いですが、保証はありません。これは、テンプレートが送信操作ごとにキャッシュからチャネルを「借用」し、各メッセージに同じチャネルが使用されるという保証がないためです。解決策の 1 つは、スプリッターの前にトランザクションを開始することですが、RabbitMQ ではトランザクションのコストが高く、パフォーマンスが数百分の 1 に低下する可能性があります。

この問題をより効率的な方法で解決するために、バージョン 5.1 以降、Spring Integration は HandleMessageAdvice である BoundRabbitChannelAdvice を提供しています。メッセージアドバイスの処理を参照してください。スプリッターの前に適用すると、すべてのダウンストリーム操作が同じチャネルで実行され、オプションで、すべての送信メッセージのパブリッシャー確認が受信されるまで待機できます(接続ファクトリが確認用に構成されている場合)。次の例は、BoundRabbitChannelAdvice の使用方法を示しています。

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
    return IntegrationFlow.from(Gateway.class)
            .split(s -> s.delimiters(",")
                    .advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
            .<String, String>transform(String::toUpperCase)
            .handle(Amqp.outboundAdapter(template).routingKey("rk"))
            .get();
}

同じ RabbitTemplate (RabbitOperations を実装)がアドバイスと送信アダプターで使用されていることに注意してください。アドバイスは、すべての操作が同じチャネルで実行されるように、テンプレートの invoke メソッド内でダウンストリームフローを実行します。オプションのタイムアウトが提供されている場合、フローが完了すると、アドバイスは waitForConfirmsOrDie メソッドを呼び出します。waitForConfirmsOrDie メソッドは、指定された時間内に確認が受信されない場合に例外をスローします。

ダウンストリームフロー(QueueChannelExecutorChannel など)にスレッドハンドオフがあってはなりません。

AMQP サンプル

AMQP アダプターを試すには、https://github.com/SpringSource/spring-integration-samples (英語) の Spring Integration サンプル git リポジトリで入手可能なサンプルを確認してください。

現在、1 つのサンプルは、送信チャネルアダプターと受信チャネルアダプターを使用して、Spring Integration AMQP アダプターの基本機能を示しています。サンプルの AMQP ブローカー実装では RabbitMQ (英語) を使用します。

この例を実行するには、RabbitMQ の実行中のインスタンスが必要です。基本的なデフォルトだけのローカルインストールで十分です。RabbitMQ の詳細なインストール手順については、https://www.rabbitmq.com/install.html (英語) を参照してください

サンプルアプリケーションが起動したら、コマンドプロンプトにテキストを入力すると、入力したテキストを含むメッセージが AMQP キューにディスパッチされます。その代わりに、そのメッセージは Spring Integration によって取得され、コンソールに出力されます。

次の図は、このサンプルで使用される Spring Integration コンポーネントの基本セットを示しています。

RabbitMQ ストリームキューのサポート

AMQP サンプル image::images/spring-integration-amqp-sample-graph.png[] の Spring Integration グラフ

バージョン 6.0 では、RabbitMQ ストリームキューのサポートが導入されました。

これらのエンドポイントの DSL ファクトリクラスは Rabbit です。

RabbitMQ ストリーム 受信 チャネルアダプター

@Bean
IntegrationFlow flow(Environment env) {
    @Bean
	IntegrationFlow simpleStream(Environment env) {
		return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
						.configureContainer(container -> container.queueName("my.stream")))
				// ...
				.get();
	}

	@Bean
	IntegrationFlow superStream(Environment env) {
		return IntegrationFlow.from(RabbitStream.inboundAdapter(env)
						.configureContainer(container -> container.superStream("my.stream", "my.consumer")))
				// ...
				.get();
	}
}

RabbitMQ ストリーム 送信 チャネルアダプター

@Bean
IntegrationFlow outbound(RabbitStreamTemplate template) {
    return f -> f
            // ...
            .handle(RabbitStream.outboundStreamAdapter(template));

}