メッセージ送信
メッセージを送信するときは、次のいずれかの方法を使用できます。
void send(Message message) throws AmqpException;
void send(String routingKey, Message message) throws AmqpException;
void send(String exchange, String routingKey, Message message) throws AmqpException;
上のリストの最後のメソッドから説明を始めます。これが実際最も明示的なメソッドです。このメソッドを使用すると、実行時に AMQP 交換名 (およびルーティングキー) を提供できます。最後のパラメーターは、メッセージインスタンスを実際に作成するコールバックです。このメソッドを使用してメッセージを送信する例は次のようになります。次の例は、send
メソッドを使用してメッセージを送信する方法を示しています。
amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
new Message("12.34".getBytes(), someProperties));
そのテンプレートインスタンスを使用してほとんどまたは常に同じ取引所に送信する予定の場合は、テンプレート自体に exchange
プロパティを設定できます。このような場合は、前のリストの 2 番目の方法を使用できます。次の例は、前の例と関数に同等です。
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));
テンプレートに exchange
プロパティと routingKey
プロパティの両方が設定されている場合は、Message
のみを受け入れるメソッドを使用できます。次の例は、その方法を示しています。
amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));
交換キーとルーティングキーのプロパティについて考えるより良い方法は、明示的なメソッドパラメーターが常にテンプレートの既定値をオーバーライドすることです。実際、テンプレートでこれらのプロパティを明示的に設定しなくても、常に既定値が設定されています。どちらの場合も、デフォルトは空の String
ですが、これは実際には実用的なデフォルトです。ルーティングキーに関する限り、最初から必ずしも必要というわけではありません (たとえば、Fanout
交換の場合)。さらに、空の String
を使用してキューを交換にバインドできます。これらはどちらも、テンプレートのルーティングキープロパティのデフォルトの空の String
値に依存する正当なシナリオです。交換名に関する限り、空の String
が一般的に使用されます。これは、AMQP 仕様が「デフォルト交換」を名前なしと定義しているためです。すべてのキューは、その名前をバインディング値として使用して、デフォルトの交換 (直接交換) に自動的にバインドされるため、前述のリストの 2 番目の方法を使用して、デフォルトを介して任意のキューへの単純なポイントツーポイントメッセージングを行うことができます。両替。実行時にメソッドパラメーターを指定することにより、キュー名を routingKey
として指定できます。次の例は、その方法を示しています。
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));
または、主にまたは排他的に単一のキューに発行するために使用できるテンプレートを作成できます。次の例は、その方法を示しています。
RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));
メッセージビルダー API
バージョン 1.3 以降、MessageBuilder
および MessagePropertiesBuilder
によってメッセージビルダ API が提供されます。これらのメソッドは、メッセージまたはメッセージプロパティを作成するための便利な " 流れるような " 手段を提供します。次の例は、流れるような API の動作を示しています。
Message message = MessageBuilder.withBody("foo".getBytes())
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
.setMessageId("123")
.setHeader("bar", "baz")
.build();
Message message = MessageBuilder.withBody("foo".getBytes())
.andProperties(props)
.build();
MessageProperties
(Javadoc) で定義された各プロパティを設定できます。他のメソッドには setHeader(String key, String value)
、removeHeader(String key)
、removeHeaders()
、copyProperties(MessageProperties properties)
が含まれます。各プロパティ設定メソッドには、set*IfAbsent()
バリアントがあります。デフォルトの初期値が存在する場合、メソッドは set*IfAbsentOrDefault()
という名前になります。
初期メッセージビルダーを作成するために、5 つの静的メソッドが用意されています。
public static MessageBuilder withBody(byte[] body) (1)
public static MessageBuilder withClonedBody(byte[] body) (2)
public static MessageBuilder withBody(byte[] body, int from, int to) (3)
public static MessageBuilder fromMessage(Message message) (4)
public static MessageBuilder fromClonedMessage(Message message) (5)
1 | Builder によって作成されたメッセージには、引数への直接参照である本文があります。 |
2 | ビルダーによって作成されたメッセージには、引数にバイトのコピーを含む新しい配列である本文があります。 |
3 | ビルダーによって作成されたメッセージには、引数からのバイト範囲を含む新しい配列である本文があります。詳細については、Arrays.copyOfRange() (標準 Javadoc) (英語) を参照してください。 |
4 | Builder によって作成されたメッセージには、引数の本文への直接参照である本文があります。引数のプロパティは、新しい MessageProperties オブジェクトにコピーされます。 |
5 | ビルダーによって作成されたメッセージには、引数の本文のコピーを含む新しい配列である本文があります。引数のプロパティは、新しい MessageProperties オブジェクトにコピーされます。 |
MessagePropertiesBuilder
インスタンスを作成するために、3 つの静的メソッドが提供されています。
public static MessagePropertiesBuilder newInstance() (1)
public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)
public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 | 新しいメッセージプロパティオブジェクトがデフォルト値で初期化されます。 |
2 | ビルダーは初期化され、build() は提供されたプロパティオブジェクトを返します。 |
3 | 引数のプロパティは、新しい MessageProperties オブジェクトにコピーされます。 |
AmqpTemplate
の RabbitTemplate
実装では、各 send()
メソッドには、追加の CorrelationData
オブジェクトを取るオーバーロードされたバージョンがあります。パブリッシャーの確認が有効になっている場合、このオブジェクトは AmqpTemplate
で説明されているコールバックで返されます。これにより、送信者は確認 (ack
または nack
) を送信メッセージと関連付けることができます。
バージョン 1.6.7 から、CorrelationAwareMessagePostProcessor
インターフェースが導入され、メッセージの変換後に相関データを変更できるようになりました。次の例は、その使用方法を示しています。
Message postProcessMessage(Message message, Correlation correlation);
バージョン 2.0 では、このインターフェースは推奨されていません。このメソッドは、postProcessMessage(Message message)
に委譲するデフォルトの実装で MessagePostProcessor
に移動されました。
また、バージョン 1.6.7 から、CorrelationDataPostProcessor
と呼ばれる新しいコールバックインターフェースが提供されます。これは、すべての MessagePostProcessor
インスタンス ( send()
メソッドで提供されるものと setBeforePublishPostProcessors()
で提供されるもの) の後に呼び出されます。実装は、send()
メソッドで提供される相関データ (存在する場合) を更新または置換できます。Message
と元の CorrelationData
(存在する場合) が引数として提供されます。次の例は、postProcess
メソッドの使用方法を示しています。
CorrelationData postProcess(Message message, CorrelationData correlationData);
パブリッシャーの return
テンプレートの mandatory
プロパティが true
の場合、返されるメッセージは AmqpTemplate
で説明されているコールバックによって提供されます。
バージョン 1.4 から、RabbitTemplate
は SpEL mandatoryExpression
プロパティをサポートします。これは、ルート評価オブジェクトとして各リクエストメッセージに対して評価され、boolean
値に解決されます。@myBean.isMandatory(#root)
などの Bean 参照を式で使用できます。
パブリッシャーのリターンは、送受信操作で RabbitTemplate
によって内部的に使用することもできます。詳細については、返信タイムアウトを参照してください。
バッチ処理
バージョン 1.4.2 は BatchingRabbitTemplate
を導入しました。これは、BatchingStrategy
に従ってメッセージをバッチ処理するオーバーライドされた send
メソッドを持つ RabbitTemplate
のサブクラスです。バッチが完了したときにのみ、RabbitMQ にメッセージが送信されます。次のリストは、BatchingStrategy
インターフェース定義を示しています。
public interface BatchingStrategy {
MessageBatch addToBatch(String exchange, String routingKey, Message message);
Date nextRelease();
Collection<MessageBatch> releaseBatches();
}
バッチ処理されたデータはメモリに保持されます。システム障害が発生した場合、未送信のメッセージが失われる可能性があります。 |
SimpleBatchingStrategy
が提供されます。単一の交換キーまたはルーティングキーへのメッセージの送信をサポートします。次のプロパティがあります。
batchSize
: 送信前のバッチ内のメッセージ数。bufferLimit
: バッチメッセージの最大サイズ。これにより、超過した場合はbatchSize
がプリエンプトされ、部分的なバッチが送信されます。timeout
: バッチにメッセージを追加する新しいアクティビティがない場合に、部分的なバッチが送信されるまでの時間。
SimpleBatchingStrategy
は、埋め込まれた各メッセージの前に 4 バイトのバイナリ長を付けて、バッチをフォーマットします。これは、springBatchFormat
メッセージプロパティを lengthHeader4
に設定することにより、受信側システムに伝達されます。
バッチ処理されたメッセージは、デフォルトで (springBatchFormat メッセージヘッダーを使用して) リスナーコンテナーによって自動的にバッチ処理解除されます。バッチからのメッセージを拒否すると、バッチ全体が拒否されます。 |
ただし、詳細については、バッチ処理を伴う @RabbitListener を参照してください。