非同期送信ゲートウェイ
前のセクションで説明したゲートウェイは同期的であり、応答を受信する(またはタイムアウトが発生する)まで送信スレッドが中断されます。Spring Integration バージョン 4.3 は、Spring AMQP からの AsyncRabbitTemplate
を使用する非同期ゲートウェイを追加しました。メッセージが送信されると、スレッドは送信操作が完了した直後に戻り、メッセージが受信されると、テンプレートのリスナーコンテナースレッドで応答が送信されます。これは、ゲートウェイがポーラースレッドで呼び出されるときに便利です。スレッドがリリースされ、フレームワーク内の他のタスクで使用できます。
次のリストは、AMQP 非同期送信ゲートウェイの可能な構成オプションを示しています。
Java DSL
Java
XML
@Configuration
public class AmqpAsyncApplication {
@Bean
public IntegrationFlow asyncAmqpOutbound(AsyncRabbitTemplate asyncRabbitTemplate) {
return f -> f
.handle(Amqp.asyncOutboundGateway(asyncRabbitTemplate)
.routingKey("queue1")); // default exchange - route to queue 'queue1'
}
@MessagingGateway(defaultRequestChannel = "asyncAmqpOutbound.input")
public interface MyGateway {
String sendToRabbit(String data);
}
}
@Configuration
public class AmqpAsyncConfig {
@Bean
@ServiceActivator(inputChannel = "amqpOutboundChannel")
public AsyncAmqpOutboundGateway amqpOutbound(AsyncRabbitTemplate asyncTemplate) {
AsyncAmqpOutboundGateway outbound = new AsyncAmqpOutboundGateway(asyncTemplate);
outbound.setRoutingKey("foo"); // default exchange - route to queue 'foo'
return outbound;
}
@Bean
public AsyncRabbitTemplate asyncTemplate(RabbitTemplate rabbitTemplate,
SimpleMessageListenerContainer replyContainer) {
return new AsyncRabbitTemplate(rabbitTemplate, replyContainer);
}
@Bean
public SimpleMessageListenerContainer replyContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(ccf);
container.setQueueNames("asyncRQ1");
return container;
}
@Bean
public MessageChannel amqpOutboundChannel() {
return new DirectChannel();
}
}
<int-amqp:outbound-async-gateway id="asyncOutboundGateway" (1)
request-channel="myRequestChannel" (2)
async-template="" (3)
exchange-name="" (4)
exchange-name-expression="" (5)
order="1" (6)
reply-channel="" (7)
reply-timeout="" (8)
requires-reply="" (9)
routing-key="" (10)
routing-key-expression="" (11)
default-delivery-mode"" (12)
confirm-correlation-expression="" (13)
confirm-ack-channel="" (14)
confirm-nack-channel="" (15)
confirm-timeout="" (16)
return-channel="" (17)
lazy-connect="true" /> (18)
1 | このアダプターの一意の ID。オプション。 |
2 | メッセージを AMQP 交換に変換して公開するために送信するメッセージチャネル。必須。 |
3 | 構成された AsyncRabbitTemplate への Bean 参照。オプション(デフォルトは asyncRabbitTemplate )。 |
4 | メッセージの送信先となる AMQP 交換の名前。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name-expression" と相互に排他的です。オプション。 |
5 | メッセージがルートオブジェクトとして送信される AMQP 交換の名前を決定するために評価される SpEL 式。指定しない場合、メッセージはデフォルトの名前のない交換に送信されます。"exchange-name" と相互に排他的。オプション。 |
6 | 複数のコンシューマーが登録されている場合のこのコンシューマーの順序。これにより、負荷分散とフェイルオーバーが可能になります。オプション(デフォルトは Ordered.LOWEST_PRECEDENCE [=Integer.MAX_VALUE] )。 |
7 | AMQP キューから受信して変換した後、応答が送信されるメッセージチャネル。オプション。 |
8 | reply-channel に応答メッセージを送信するときにゲートウェイが待機する時間。これは、reply-channel がブロックできる場合にのみ適用されます。たとえば、容量制限が現在いっぱいの QueueChannel などです。デフォルトは無限です。 |
9 | AsyncRabbitTemplate’s `receiveTimeout プロパティ内で応答メッセージが受信されず、この設定が true の場合、ゲートウェイは受信メッセージの errorChannel ヘッダーにエラーメッセージを送信します。AsyncRabbitTemplate’s `receiveTimeout プロパティ内で応答メッセージが受信されず、この設定が false である場合、ゲートウェイはデフォルトの errorChannel (利用可能な場合)にエラーメッセージを送信します。デフォルトは true です。 |
10 | メッセージを送信するときに使用するルーティングキー。デフォルトでは、これは空の String です。"routing-key-expression" と相互に排他的です。オプション。 |
11 | メッセージをルートオブジェクトとして使用してメッセージを送信するときに使用するルーティングキーを決定するために評価される SpEL 式(たとえば、"payload.key" )。デフォルトでは、これは空の String です。「ルーティングキー」と相互に排他的。オプション。 |
12 | メッセージのデフォルト配信モード: PERSISTENT または NON_PERSISTENT header-mapper が配信モードを設定するとオーバーライドされます。Spring Integration メッセージヘッダー(amqp_deliveryMode )が存在する場合、DefaultHeaderMapper は値を設定します。この属性が指定されておらず、ヘッダーマッパーが設定しない場合、デフォルトは RabbitTemplate によって使用される基になる Spring AMQP MessagePropertiesConverter に依存します。カスタマイズされていない場合、デフォルトは PERSISTENT です。オプション。 |
13 | 相関データを定義する式。提供される場合、これは、パブリッシャーの確認を受信するように基盤となる AMQP テンプレートを構成します。専用の RabbitTemplate と、publisherConfirms プロパティが true に設定された CachingConnectionFactory が必要です。パブリッシャーの確認を受け取り、相関データが提供されると、確認の型に応じて、確認が confirm-ack-channel または confirm-nack-channel のいずれかに書き込まれます。確認のペイロードは、この式で定義された相関データであり、メッセージの "amqp_publishConfirm" ヘッダーは true (ack )または false (nack )に設定されています。nack インスタンスの場合、追加のヘッダー(amqp_publishConfirmNackCause )が提供されます。例: headers['myCorrelationData'] 、payload 。式が Message<?> インスタンス( "#this" など)に解決される場合、ack /nack チャネルで発行されるメッセージは、追加のヘッダーが追加されたそのメッセージに基づいています。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。 |
14 | 肯定的な(ack )パブリッシャー確認が送信されるチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです。基になる AsyncRabbitTemplate の enableConfirms プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel )。 |
15 | バージョン 4.2 以降。否定的な(nack )パブリッシャー確認の送信先のチャネル。ペイロードは、confirm-correlation-expression によって定義された相関データです。基になる AsyncRabbitTemplate の enableConfirms プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション(デフォルトは nullChannel )。 |
16 | 設定すると、この時間内にパブリッシャー確認が受信されない場合、ゲートウェイは否定応答(nack)を合成します(ミリ秒単位)。保留中の確認はこの値の 50% ごとにチェックされるため、nack が送信される実際の時間は 1x と 1.5x の間になります。パブリッシャーの確認と return の代替メカニズムも参照してください。デフォルトはなし(nack は生成されません)。 |
17 | 返されたメッセージが送信されるチャネル。提供されると、基盤となる AMQP テンプレートは、配信不能メッセージをゲートウェイに返すように構成されます。メッセージは、AMQP から受信したデータと、次の追加ヘッダーで構成されます: amqp_returnReplyCode 、amqp_returnReplyText 、amqp_returnExchange 、amqp_returnRoutingKey 。基になる AsyncRabbitTemplate で、mandatory プロパティを true に設定する必要があります。パブリッシャーの確認と return の代替メカニズムも参照してください。オプション。 |
18 | false に設定すると、エンドポイントはアプリケーションコンテキストの初期化中にブローカーへの接続を試行します。これにより、ブローカがダウンしている場合にエラーメッセージを記録することにより、不正な構成の「フェイルファースト」検出が可能になります。true (デフォルト)の場合、最初のメッセージが送信されたときに接続が確立されます(他のコンポーネントが接続を確立したためにすでに存在しない場合)。 |
詳細については、非同期サービスアクティベーターも参照してください。
RabbitTemplate 確認・返却をご利用の際は、 |