非同期送信ゲートウェイ

前のセクションで説明したゲートウェイは同期的であり、応答を受信する(またはタイムアウトが発生する)まで送信スレッドが中断されます。Spring Integration バージョン 4.3 は、Spring AMQP からの AsyncRabbitTemplate を使用する非同期ゲートウェイを追加しました。メッセージが送信されると、スレッドは送信操作が完了した直後に戻り、メッセージが受信されると、テンプレートのリスナーコンテナースレッドで応答が送信されます。これは、ゲートウェイがポーラースレッドで呼び出されるときに便利です。スレッドがリリースされ、フレームワーク内の他のタスクで使用できます。

次のリストは、AMQP 非同期送信ゲートウェイの可能な構成オプションを示しています。

  • Java DSL

  • Java

  • XML

@Configuration
public class AmqpAsyncApplication {

    @Bean
    public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
        return f -> f
                .handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
                        .routingKey("queue1")); // default exchange - route to queue 'queue1'
    }

    @MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
    public interface MyGateway {

        String sendToRabbit(String data);

    }

}
@Configuration
public class AmqpAsyncConfig {

    @Bean
    @ServiceActivator(inputChannel = "amqpOutboundChannel")
    public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
        AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
        outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
        return outbound;
    }

    @Bean
    public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
                     SimpleMessageListenerContainer replyContainer) {

        return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
    }

    @Bean
    public SimpleMessageListenerContainer replyContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
        container.setQueueNames("asyncRQ1");
        return container;
    }

    @Bean
    public MessageChannel amqpOutboundChannel() {
        return new DirectChannel();
    }

}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway"    (1)
                           request-channel="myRequestChannel" (2)
                           async-template=""                  (3)
                           exchange-name=""                   (4)
                           exchange-name-expression=""        (5)
                           order="1"                          (6)
                           reply-channel=""                   (7)
                           reply-timeout=""                   (8)
                           requires-reply=""                  (9)
                           routing-key=""                     (10)
                           routing-key-expression=""          (11)
                           default-delivery-mode""            (12)
                           confirm-correlation-expression=""  (13)
                           confirm-ack-channel=""             (14)
                           confirm-nack-channel=""            (15)
                           confirm-timeout=""                 (16)
                           return-channel=""                  (17)
                           lazy-connect="true" />             (18)
1 このアダプターの一意の ID。オプション。
2 メッセージを AMQP 交換に変換して公開するために送信するメッセージチャネル。必須。
3 構成された AsyncRabbitTemplate への Bean 参照。オプション(デフォルトは asyncRabbitTemplate)。
4 メッセージの送信先となる AMQP 交換の名前。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name-expression" と相互に排他的です。オプション。
5 メッセージがルートオブジェクトとして送信される AMQP 交換の名前を決定するために評価される SpEL 式。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name" と相互に排他的。オプション。
6 複数のコンシューマーが登録されている場合のこのコンシューマーの順序。これにより、負荷分散とフェイルオーバーが可能になります。オプション(デフォルトは Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE])。
7AMQP キューから受信して変換した後、応答が送信されるメッセージチャネル。オプション。
8reply-channel に応答メッセージを送信するときにゲートウェイが待機する時間。これは、reply-channel がブロックできる場合にのみ適用されます。たとえば、容量制限が現在いっぱいの QueueChannel などです。デフォルトは無限です。
9AsyncRabbitTemplate’s `receiveTimeout プロパティ内で応答メッセージが受信されず、この設定が true の場合、ゲートウェイは受信メッセージの errorChannel ヘッダーにエラーメッセージを送信します。AsyncRabbitTemplate’s `receiveTimeout プロパティ内で応答メッセージが受信されず、この設定が false である場合、ゲートウェイはデフォルトの errorChannel (利用可能な場合)にエラーメッセージを送信します。デフォルトは true です。
10 メッセージを送信するときに使用するルーティングキー。デフォルトでは、これは空の String です。"routing-key-expression" と相互に排他的です。オプション。
11 メッセージをルートオブジェクトとして使用してメッセージを送信するときに使用するルーティングキーを決定するために評価される SpEL 式(たとえば、"payload.key" )。デフォルトでは、これは空の String です。「ルーティングキー」と相互に排他的。オプション。
12 メッセージのデフォルト配信モード: PERSISTENT または NON_PERSISTENT header-mapper が配信モードを設定するとオーバーライドされます。Spring Integration メッセージヘッダー(amqp_deliveryMode)が存在する場合、DefaultHeaderMapper は値を設定します。この属性が指定されておらず、ヘッダーマッパーが設定しない場合、デフォルトは RabbitTemplate によって使用される基になる Spring AMQP MessagePropertiesConverter に依存します。カスタマイズされていない場合、デフォルトは PERSISTENT です。オプション。
13 相関データを定義する式。提供される場合、これは、パブリッシャーの確認を受信するように基盤となる AMQP テンプレートを構成します。専用の RabbitTemplate と、publisherConfirms プロパティが true に設定された CachingConnectionFactory が必要です。パブリッシャーの確認を受け取り、相関データが提供されると、確認の型に応じて、確認が confirm-ack-channel または confirm-nack-channel のいずれかに書き込まれます。確認のペイロードは、この式で定義された相関データであり、メッセージの "amqp_publishConfirm" ヘッダーは true (ack)または false (nack)に設定されています。nack インスタンスの場合、追加のヘッダー(amqp_publishConfirmNackCause)が提供されます。例: headers['myCorrelationData']payload。式が Message<?> インスタンス( "#this" など)に解決される場合、ack/nack チャネルで発行されるメッセージは、追加のヘッダーが追加されたそのメッセージに基づいています。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。
14 肯定的な(ack)パブリッシャー確認が送信されるチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです。基になる AsyncRabbitTemplate の enableConfirms プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel)。
15 バージョン 4.2 以降。否定的な(nack)パブリッシャー確認の送信先のチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです。基になる AsyncRabbitTemplate の enableConfirms プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel)。
16 設定すると、この時間内にパブリッシャー確認が受信されない場合、ゲートウェイは否定応答(nack)を合成します(ミリ秒単位)。保留中の確認はこの値の 50% ごとにチェックされるため、nack が送信される実際の時間は 1x と 1.5x の間になります。パブリッシャーの確認と return の代替メカニズムも参照してください。デフォルトはなし(nack は生成されません)。
17 返されたメッセージが送信されるチャネル。提供されると、基盤となる AMQP テンプレートは、配信不能メッセージをゲートウェイに返すように構成されます。メッセージは、AMQP から受信したデータと、次の追加ヘッダーで構成されます: amqp_returnReplyCodeamqp_returnReplyTextamqp_returnExchangeamqp_returnRoutingKey。基になる AsyncRabbitTemplate で、mandatory プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。
18false に設定すると、エンドポイントはアプリケーションコンテキストの初期化中にブローカーへの接続を試行します。これにより、ブローカがダウンしている場合にエラーメッセージを記録することにより、不正な構成の「フェイルファースト」検出が可能になります。true (デフォルト)の場合、最初のメッセージが送信されたときに接続が確立されます(他のコンポーネントが接続を確立したためにすでに存在しない場合)。

詳細については、非同期サービスアクティベーターも参照してください。

RabbitTemplate

確認・返却をご利用の際は、AsyncRabbitTemplate に接続する RabbitTemplate を専用にすることをお勧めします。そうしないと、予期しない副作用が発生する可能性があります。