リクエスト / 応答メッセージ
AmqpTemplate
は、一方向送信操作 (exchange
、routingKey
、Message
) について前述したのと同じ引数オプションを受け入れるさまざまな sendAndReceive
メソッドも提供します。これらのメソッドは、送信前に必要な reply-to
プロパティの構成を処理し、その目的のために内部的に作成された専用キューで応答メッセージをリッスンできるため、リクエストと応答のシナリオで非常に役立ちます。
MessageConverter
がリクエストと応答の両方に適用される、同様のリクエスト応答メソッドも使用できます。これらのメソッドの名前は convertSendAndReceive
です。詳細については、AmqpTemplate
の Javadoc を参照してください。
バージョン 1.5.0 以降、各 sendAndReceive
メソッドバリアントには、CorrelationData
を取るオーバーロードされたバージョンがあります。適切に構成された接続ファクトリと共に、これにより、操作の送信側のパブリッシャー確認の受信が可能になります。詳細については、相関するパブリッシャーの確認と return および RabbitOperations
用の Javadoc を参照してください。
バージョン 2.0 以降、これらのメソッドのバリアント (convertSendAndReceiveAsType
) があり、追加の ParameterizedTypeReference
引数を使用して複雑な戻り型を変換します。テンプレートは SmartMessageConverter
で構成する必要があります。詳細については、RabbitTemplate
を使用した Message
からの変換を参照してください。
バージョン 2.1 以降では、応答コンシューマーの noLocal
フラグを制御する noLocalReplyConsumer
オプションを使用して RabbitTemplate
を構成できます。これはデフォルトで false
です。
返信タイムアウト
デフォルトでは、send メソッドと receive メソッドは 5 秒後にタイムアウトし、null を返します。この動作は、replyTimeout
プロパティを設定することで変更できます。バージョン 1.5 以降、mandatory
プロパティを true
に設定した場合 (または特定のメッセージに対して mandatory-expression
が true
に評価された場合)、メッセージをキューに配信できない場合、AmqpMessageReturnedException
がスローされます。この例外には、returnedMessage
、replyCode
、replyText
プロパティと、送信に使用される exchange
および routingKey
があります。
この機能は、発行元の return を使用します。CachingConnectionFactory で publisherReturns を true に設定することで有効にできます ( パブリッシャーの確認と return を参照)。また、独自の ReturnCallback を RabbitTemplate に登録していてはなりません。 |
バージョン 2.1.2 から、replyTimedOut
メソッドが追加され、保持された状態をクリーンアップできるように、サブクラスにタイムアウトを通知できるようになりました。
バージョン 2.0.11 および 2.1.3 以降、デフォルトの DirectReplyToMessageListenerContainer
を使用する場合、テンプレートの replyErrorHandler
プロパティを設定することでエラーハンドラーを追加できます。このエラーハンドラーは、遅延応答や相関ヘッダーなしで受信したメッセージなど、配信に失敗した場合に呼び出されます。渡される例外は、failedMessage
プロパティを持つ ListenerExecutionFailedException
です。
RabbitMQ 直接返信
バージョン 3.4.0 以降、RabbitMQ サーバーは直接の返信先 (英語) をサポートします。これにより、応答キューを固定する主な理由がなくなります (リクエストごとに一時キューを作成する必要がなくなります)。Spring AMQP バージョン以降、一時的な返信キューを作成する代わりに、1.4.1 直接返信先がデフォルトで使用されます (サーバーでサポートされている場合)。replyQueue が指定されていない場合 (または amq.rabbitmq.reply-to の名前が設定されている場合)、RabbitTemplate は、直接の返信先がサポートされているかどうかを自動的に検出し、それを使用するか、一時的な返信キューを使用するようにフォールバックします。直接返信先を使用する場合、reply-listener は必要ないため、構成しないでください。 |
応答リスナーは、名前付きキュー ( amq.rabbitmq.reply-to
以外) で引き続きサポートされており、応答の同時実行などを制御できます。
バージョン 1.6 以降、応答ごとに一時的で排他的な自動削除キューを使用する場合は、useTemporaryReplyQueues
プロパティを true
に設定します。replyAddress
を設定すると、このプロパティは無視されます。
RabbitTemplate
をサブクラス化し、useDirectReplyTo()
をオーバーライドしてさまざまな条件をチェックすることにより、直接返信先を使用するかどうかを決定する条件を変更できます。このメソッドは、最初のリクエストが送信されたときに 1 回だけ呼び出されます。
バージョン 2.0 より前では、RabbitTemplate
はリクエストごとに新しいコンシューマーを作成し、応答が受信された (またはタイムアウトした) ときにコンシューマーをキャンセルしていました。テンプレートは代わりに DirectReplyToMessageListenerContainer
を使用し、コンシューマーを再利用できるようになりました。テンプレートは引き続き返信の関連付けを処理するため、返信が遅れて別の送信者に送られる危険はありません。以前の動作に戻す場合は、useDirectReplyToContainer
(XML 構成を使用する場合は direct-reply-to-container
) プロパティを false に設定します。
AsyncRabbitTemplate
にはそのようなオプションはありません。直接返信先が使用されている場合、返信には常に DirectReplyToContainer
が使用されていました。
バージョン 2.3.7 以降、テンプレートには新しいプロパティ useChannelForCorrelation
があります。これが true
の場合、サーバーは相関 ID をリクエストメッセージヘッダーから応答メッセージにコピーする必要はありません。代わりに、リクエストの送信に使用されたチャネルを使用して、リクエストへの応答が関連付けられます。
応答キューとのメッセージ相関
固定応答キュー ( amq.rabbitmq.reply-to
以外) を使用する場合は、相関データを提供して、応答をリクエストに関連付けることができるようにする必要があります。RabbitMQ リモートプロシージャコール (RPC) (英語) を参照してください。デフォルトでは、相関データを保持するために標準の correlationId
プロパティが使用されます。ただし、カスタムプロパティを使用して相関データを保持する場合は、<rabbit-template/> で correlation-key
属性を設定できます。属性を明示的に correlationId
に設定することは、属性を省略することと同じです。クライアントとサーバーは、相関データに同じヘッダーを使用する必要があります。
Spring AMQP バージョン 1.1 は、このデータに spring_reply_correlation というカスタムプロパティを使用しました。現在のバージョンでこの動作に戻したい場合 (おそらく 1.1 を使用する別のアプリケーションとの互換性を維持するため)、属性を spring_reply_correlation に設定する必要があります。 |
デフォルトでは、テンプレートは独自の相関 ID を生成します (ユーザーが指定した値は無視されます)。独自の相関 ID を使用する場合は、RabbitTemplate
インスタンスの userCorrelationId
プロパティを true
に設定します。
相関 ID は、リクエストに対して間違った応答が返される可能性を回避するために、一意である必要があります。 |
応答リスナーコンテナー
3.4.0 より前のバージョンの RabbitMQ を使用する場合、応答ごとに新しい一時キューが使用されます。ただし、テンプレートで 1 つの応答キューを構成できます。これはより効率的であり、そのキューに引数を設定することもできます。ただし、この場合、<reply-listener/> サブ要素も提供する必要があります。この要素は、応答キューのリスナーコンテナーを提供します。テンプレートはリスナーです。テンプレートの構成から継承される connection-factory
および message-converter
を除き、<listener-container/> で許可されるすべてのメッセージリスナーコンテナーの設定属性が要素で許可されます。
アプリケーションの複数のインスタンスを実行するか、複数の RabbitTemplate インスタンスを使用する場合、MUST はそれぞれに固有の応答キューを使用します。RabbitMQ にはキューからメッセージを選択する機能がないため、すべてが同じキューを使用する場合、各インスタンスは応答を求めて競合し、必ずしも独自の応答を受信するとは限りません。 |
次の例では、接続ファクトリを使用して rabbit テンプレートを定義しています。
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
コンテナーとテンプレートは接続ファクトリを共有しますが、チャネルは共有しません。リクエストと応答は同じトランザクション内では実行されません (トランザクションの場合)。
バージョン 1.5.0 より前では、reply-address 属性は使用できませんでした。返信は常に、デフォルトの交換と reply-queue 名をルーティングキーとして使用してルーティングされていました。これはデフォルトのままですが、新しい reply-address 属性を指定できるようになりました。reply-address には <exchange>/<routingKey> 形式のアドレスを含めることができ、応答は指定された交換にルーティングされ、ルーティングキーにバインドされたキューにルーティングされます。reply-address は reply-queue よりも優先されます。reply-address のみを使用する場合、<reply-listener> は別の <listener-container> コンポーネントとして構成する必要があります。reply-address および reply-queue (または <listener-container> の queues 属性) は、論理的に同じキューを参照する必要があります。 |
この構成では、応答を受信するために SimpleListenerContainer
が使用され、RabbitTemplate
が MessageListener
になります。前の例に示すように、<rabbit:template/>
名前空間要素を使用してテンプレートを定義すると、パーサーはテンプレート内のコンテナーとワイヤーをリスナーとして定義します。
テンプレートが固定 replyQueue を使用しない場合 (または直接返信を使用している場合 — RabbitMQ 直接返信を参照)、リスナーコンテナーは必要ありません。RabbitMQ 3.4.0 以降を使用する場合は、ダイレクト reply-to が推奨されるメカニズムです。 |
RabbitTemplate
を <bean/>
として定義するか、@Configuration
クラスを使用して @Bean
として定義する場合、またはプログラムでテンプレートを作成する場合は、応答リスナーコンテナーを自分で定義して接続する必要があります。これを行わないと、テンプレートは応答を受信せず、最終的にタイムアウトになり、sendAndReceive
メソッドの呼び出しに対する応答として null が返されます。
バージョン 1.5 以降、RabbitTemplate
は、応答を受信する MessageListener
として構成されているかどうかを検出します。そうでない場合、応答アドレスを使用してメッセージを送受信しようとすると、IllegalStateException
で失敗します (応答がまったく受信されないため)。
さらに、単純な replyAddress
(キュー名) が使用されている場合、応答リスナーコンテナーは、同じ名前のキューをリッスンしていることを確認します。返信アドレスが交換およびルーティングキーであり、デバッグログメッセージが書き込まれている場合、このチェックは実行できません。
応答リスナーとテンプレートを自分で接続するときは、テンプレートの replyAddress とコンテナーの queues (または queueNames ) プロパティが同じキューを参照していることを確認することが重要です。テンプレートは、送信メッセージの replyTo プロパティに返信アドレスを挿入します。 |
次のリストは、Bean を手動で接続する方法の例を示しています。
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
このテストケース [GitHub] (英語) では、リクエストを処理して返信を返す「リモート」リスナーコンテナーとともに、固定の返信キューに接続された RabbitTemplate
の完全な例が示されています。
応答がタイムアウトになると (replyTimeout )、sendAndReceive() メソッドは null を返します。 |
バージョン 1.3.6 より前のバージョンでは、タイムアウトメッセージに対する遅れた返信はログに記録されるだけでした。これで、返信が遅れた場合は拒否されます (テンプレートは AmqpRejectAndDontRequeueException
をスローします)。拒否されたメッセージをデッドレター交換に送信するように返信キューが構成されている場合、後で分析するために返信を取得できます。これを行うには、応答キューの名前と同じルーティングキーを使用して、構成されたデッドレター交換にキューをバインドします。
デッドレタリングの設定の詳細については、RabbitMQ デッドレターのドキュメント (英語) を参照してください。例として、FixedReplyQueueDeadLetterTests
テストケースを参照することもできます。
非同期 Rabbit テンプレート
バージョン 1.6 は AsyncRabbitTemplate
を導入しました。これには、AmqpTemplate
のものと同様の sendAndReceive
(および convertSendAndReceive
) メソッドがあります。ただし、ブロックする代わりに、CompletableFuture
を返します。
sendAndReceive
メソッドは RabbitMessageFuture
を返します。convertSendAndReceive
メソッドは RabbitConverterFuture
を返します。
後で get()
を呼び出して結果を同期的に取得するか、結果と非同期的に呼び出されるコールバックを登録することができます。次のリストは、両方のアプローチを示しています。
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
mandatory
が設定されていて、メッセージを配信できない場合、Future は AmqpMessageReturnedException
の原因で ExecutionException
をスローします。これは、返されたメッセージと返された情報をカプセル化します。
enableConfirms
が設定されている場合、フューチャには confirm
というプロパティがあり、これはそれ自体が CompletableFuture<Boolean>
であり、true
は発行の成功を示します。確認の未来が false
である場合、RabbitFuture
には nackCause
と呼ばれる追加のプロパティがあり、利用可能な場合は失敗の理由が含まれています。
返信はパブリッシュの成功を意味するため、返信後にパブリッシャー確認が受信された場合、パブリッシャー確認は破棄されます。 |
テンプレートの receiveTimeout
プロパティを設定して、応答をタイムアウトにすることができます (デフォルトは 30000
- 30 秒)。タイムアウトが発生した場合、未来は AmqpReplyTimeoutException
で完了します。
テンプレートは SmartLifecycle
を実装しています。保留中の応答があるときにテンプレートを停止すると、保留中の Future
インスタンスがキャンセルされます。
バージョン 2.0 以降、非同期テンプレートは、構成された返信キューの代わりに直接返信先 (英語) をサポートするようになりました。この機能を有効にするには、次のコンストラクターのいずれかを使用します。
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
同期 RabbitTemplate
で直接返信を使用するには、RabbitMQ 直接返信を参照してください。
バージョン 2.0 では、これらのメソッドのバリアント (convertSendAndReceiveAsType
) が導入されました。これらのメソッドは、追加の ParameterizedTypeReference
引数を使用して、返される複雑な型を変換します。基盤となる RabbitTemplate
を SmartMessageConverter
で構成する必要があります。詳細については、RabbitTemplate
を使用した Message
からの変換を参照してください。