ポーリングコンシューマー
AmqpTemplate
自体は、ポーリングされた Message
受信に使用できます。デフォルトでは、利用可能なメッセージがない場合、null
がすぐに返されます。ブロッキングはありません。バージョン 1.5 から、ミリ秒単位で receiveTimeout
を設定できます。受信メソッドは、最大でメッセージを待機します。ゼロ未満の値は、無期限に (または少なくともブローカーへの接続が失われるまで) ブロックすることを意味します。バージョン 1.6 では、呼び出しごとにタイムアウトを渡すことができる receive
メソッドのバリアントが導入されました。
受信操作ではメッセージごとに新しい QueueingConsumer が作成されるため、この手法は大量の環境にはあまり適していません。これらのユースケースでは、非同期コンシューマーまたはゼロの receiveTimeout を使用することを検討してください。 |
バージョン 2.4.8 以降、ゼロ以外のタイムアウトを使用する場合、コンシューマーをチャネルに関連付けるために使用される basicConsume
メソッドに渡される引数を指定できます。例: template.addConsumerArg("x-priority", 10)
。
利用可能な 4 つの単純な receive
メソッドがあります。送信側の Exchange
と同様に、デフォルトのキュープロパティがテンプレート自体に直接設定されている必要があるメソッドと、実行時にキューパラメーターを受け入れるメソッドがあります。バージョン 1.6 では、リクエストごとに receiveTimeout
をオーバーライドするために timeoutMillis
を受け入れるバリアントが導入されました。次のリストは、4 つのメソッドの定義を示しています。
Message receive() throws AmqpException;
Message receive(String queueName) throws AmqpException;
Message receive(long timeoutMillis) throws AmqpException;
Message receive(String queueName, long timeoutMillis) throws AmqpException;
メッセージを送信する場合と同様に、AmqpTemplate
には、Message
インスタンスの代わりに POJO を受信するためのいくつかの便利なメソッドがあり、実装は、返される Object
を作成するために使用される MessageConverter
をカスタマイズするメソッドを提供します: 次のリストは、それらのメソッドを示しています。
Object receiveAndConvert() throws AmqpException;
Object receiveAndConvert(String queueName) throws AmqpException;
Object receiveAndConvert(long timeoutMillis) throws AmqpException;
Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
バージョン 2.0 以降、複雑な型を変換するために追加の ParameterizedTypeReference
引数を取るこれらのメソッドのバリアントがあります。テンプレートは SmartMessageConverter
で構成する必要があります。詳細については、RabbitTemplate
を使用した Message
からの変換を参照してください。
sendAndReceive
メソッドと同様に、バージョン 1.3 以降、AmqpTemplate
には、メッセージを同期的に受信、処理、返信するための便利な receiveAndReply
メソッドがいくつかあります。次のリストは、これらのメソッド定義を示しています。
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;
<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
AmqpTemplate
実装は、receive
および reply
フェーズを処理します。ほとんどの場合、受信したメッセージに対してビジネスロジックを実行し、必要に応じて応答オブジェクトまたはメッセージを作成するために、ReceiveAndReplyCallback
の実装のみを提供する必要があります。ReceiveAndReplyCallback
は null
を返す場合があることに注意してください。この場合、応答は送信されず、receiveAndReply
は receive
メソッドのように機能します。これにより、メッセージの混合に同じキューを使用できますが、その一部は返信を必要としない場合があります。
自動メッセージ (リクエストと応答) 変換は、提供されたコールバックが生のメッセージ交換契約を提供する ReceiveAndReplyMessageCallback
のインスタンスでない場合にのみ適用されます。
ReplyToAddressCallback
は、受信したメッセージと ReceiveAndReplyCallback
からの応答に対して実行時に replyTo
アドレスを決定するカスタムロジックが必要な場合に役立ちます。デフォルトでは、リクエストメッセージ内の replyTo
情報を使用して応答がルーティングされます。
次のリストは、POJO ベースの受信と応答の例を示しています。
boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {
public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}