ポーリングコンシューマー

AmqpTemplate 自体は、ポーリングされた Message 受信に使用できます。デフォルトでは、利用可能なメッセージがない場合、null がすぐに返されます。ブロッキングはありません。バージョン 1.5 から、ミリ秒単位で receiveTimeout を設定できます。受信メソッドは、最大でメッセージを待機します。ゼロ未満の値は、無期限に (または少なくともブローカーへの接続が失われるまで) ブロックすることを意味します。バージョン 1.6 では、呼び出しごとにタイムアウトを渡すことができる receive メソッドのバリアントが導入されました。

受信操作ではメッセージごとに新しい QueueingConsumer が作成されるため、この手法は大量の環境にはあまり適していません。これらのユースケースでは、非同期コンシューマーまたはゼロの receiveTimeout を使用することを検討してください。

バージョン 2.4.8 以降、ゼロ以外のタイムアウトを使用する場合、コンシューマーをチャネルに関連付けるために使用される basicConsume メソッドに渡される引数を指定できます。例: template.addConsumerArg("x-priority", 10)

利用可能な 4 つの単純な receive メソッドがあります。送信側の Exchange と同様に、デフォルトのキュープロパティがテンプレート自体に直接設定されている必要があるメソッドと、実行時にキューパラメーターを受け入れるメソッドがあります。バージョン 1.6 では、リクエストごとに receiveTimeout をオーバーライドするために timeoutMillis を受け入れるバリアントが導入されました。次のリストは、4 つのメソッドの定義を示しています。

Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

Message receive(String queueName, long timeoutMillis) throws AmqpException;

メッセージを送信する場合と同様に、AmqpTemplate には、Message インスタンスの代わりに POJO を受信するためのいくつかの便利なメソッドがあり、実装は、返される Object を作成するために使用される MessageConverter をカスタマイズするメソッドを提供します: 次のリストは、それらのメソッドを示しています。

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

Object receiveAndConvert(long timeoutMillis) throws AmqpException;

Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;

バージョン 2.0 以降、複雑な型を変換するために追加の ParameterizedTypeReference 引数を取るこれらのメソッドのバリアントがあります。テンプレートは SmartMessageConverter で構成する必要があります。詳細については、RabbitTemplate を使用した Message からの変換を参照してください。

sendAndReceive メソッドと同様に、バージョン 1.3 以降、AmqpTemplate には、メッセージを同期的に受信、処理、返信するための便利な receiveAndReply メソッドがいくつかあります。次のリストは、これらのメソッド定義を示しています。

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
       throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
     throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
    String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
     ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
            ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

AmqpTemplate 実装は、receive および reply フェーズを処理します。ほとんどの場合、受信したメッセージに対してビジネスロジックを実行し、必要に応じて応答オブジェクトまたはメッセージを作成するために、ReceiveAndReplyCallback の実装のみを提供する必要があります。ReceiveAndReplyCallback は null を返す場合があることに注意してください。この場合、応答は送信されず、receiveAndReply は receive メソッドのように機能します。これにより、メッセージの混合に同じキューを使用できますが、その一部は返信を必要としない場合があります。

自動メッセージ (リクエストと応答) 変換は、提供されたコールバックが生のメッセージ交換契約を提供する ReceiveAndReplyMessageCallback のインスタンスでない場合にのみ適用されます。

ReplyToAddressCallback は、受信したメッセージと ReceiveAndReplyCallback からの応答に対して実行時に replyTo アドレスを決定するカスタムロジックが必要な場合に役立ちます。デフォルトでは、リクエストメッセージ内の replyTo 情報を使用して応答がルーティングされます。

次のリストは、POJO ベースの受信と応答の例を示しています。

boolean received =
        this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {

                public Invoice handle(Order order) {
                        return processOrder(order);
                }
        });
if (received) {
        log.info("We received an order!");
}