メッセージ送信

メッセージを送信するときは、次のいずれかの方法を使用できます。

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

前のリストの最後の方法から議論を始めることができます。これは、実際には最も明示的であるためです。AMQP 交換名を (ルーティングキーと共に) 実行時に提供できます。最後のパラメーターは、メッセージインスタンスの実際の作成を担当するコールバックです。このメソッドを使用してメッセージを送信する例は、次のようになります。次の例は、send メソッドを使用してメッセージを送信する方法を示しています。

amqpTemplate.send("marketData.topic", "quotes.nasdaq.THING1",
    new Message("12.34".getBytes(), someProperties));

そのテンプレートインスタンスを使用してほとんどまたは常に同じ取引所に送信する予定の場合は、テンプレート自体に exchange プロパティを設定できます。このような場合は、前のリストの 2 番目の方法を使用できます。次の例は、前の例と関数に同等です。

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.send("quotes.nasdaq.FOO", new Message("12.34".getBytes(), someProperties));

テンプレートに exchange プロパティと routingKey プロパティの両方が設定されている場合は、Message のみを受け入れるメソッドを使用できます。次の例は、その方法を示しています。

amqpTemplate.setExchange("marketData.topic");
amqpTemplate.setRoutingKey("quotes.nasdaq.FOO");
amqpTemplate.send(new Message("12.34".getBytes(), someProperties));

交換キーとルーティングキーのプロパティについて考えるより良い方法は、明示的なメソッドパラメーターが常にテンプレートの既定値をオーバーライドすることです。実際、テンプレートでこれらのプロパティを明示的に設定しなくても、常に既定値が設定されています。どちらの場合も、デフォルトは空の String ですが、これは実際には実用的なデフォルトです。ルーティングキーに関する限り、最初から必ずしも必要というわけではありません (たとえば、Fanout 交換の場合)。さらに、空の String を使用してキューを交換にバインドできます。これらはどちらも、テンプレートのルーティングキープロパティのデフォルトの空の String 値に依存する正当なシナリオです。交換名に関する限り、空の String が一般的に使用されます。これは、AMQP 仕様が「デフォルト交換」を名前なしと定義しているためです。すべてのキューは、その名前をバインディング値として使用して、デフォルトの交換 (直接交換) に自動的にバインドされるため、前述のリストの 2 番目の方法を使用して、デフォルトを介して任意のキューへの単純なポイントツーポイントメッセージングを行うことができます。両替。実行時にメソッドパラメーターを指定することにより、キュー名を routingKey として指定できます。次の例は、その方法を示しています。

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.send("queue.helloWorld", new Message("Hello World".getBytes(), someProperties));

または、主にまたは排他的に単一のキューに発行するために使用できるテンプレートを作成できます。次の例は、その方法を示しています。

RabbitTemplate template = new RabbitTemplate(); // using default no-name Exchange
template.setRoutingKey("queue.helloWorld"); // but we'll always send to this Queue
template.send(new Message("Hello World".getBytes(), someProperties));

メッセージビルダー API

バージョン 1.3 以降、MessageBuilder および MessagePropertiesBuilder によってメッセージビルダ API が提供されます。これらのメソッドは、メッセージまたはメッセージプロパティを作成するための便利な " 流れるような " 手段を提供します。次の例は、流れるような API の動作を示しています。

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
MessageProperties props = MessagePropertiesBuilder.newInstance()
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .andProperties(props)
    .build();

MessageProperties (Javadoc) で定義された各プロパティを設定できます。他のメソッドには setHeader(String key, String value)removeHeader(String key)removeHeaders()copyProperties(MessageProperties properties) が含まれます。各プロパティ設定メソッドには、set*IfAbsent() バリアントがあります。デフォルトの初期値が存在する場合、メソッドは set*IfAbsentOrDefault() という名前になります。

初期メッセージビルダーを作成するために、5 つの静的メソッドが用意されています。

public static MessageBuilder withBody(byte[] body) (1)

public static MessageBuilder withClonedBody(byte[] body) (2)

public static MessageBuilder withBody(byte[] body, int from, int to) (3)

public static MessageBuilder fromMessage(Message message) (4)

public static MessageBuilder fromClonedMessage(Message message) (5)
1Builder によって作成されたメッセージには、引数への直接参照である本文があります。
2 ビルダーによって作成されたメッセージには、引数にバイトのコピーを含む新しい配列である本文があります。
3 ビルダーによって作成されたメッセージには、引数からのバイト範囲を含む新しい配列である本文があります。詳細については、Arrays.copyOfRange() (標準 Javadoc) (英語) を参照してください。
4Builder によって作成されたメッセージには、引数の本文への直接参照である本文があります。引数のプロパティは、新しい MessageProperties オブジェクトにコピーされます。
5 ビルダーによって作成されたメッセージには、引数の本文のコピーを含む新しい配列である本文があります。引数のプロパティは、新しい MessageProperties オブジェクトにコピーされます。

MessagePropertiesBuilder インスタンスを作成するために、3 つの静的メソッドが提供されています。

public static MessagePropertiesBuilder newInstance() (1)

public static MessagePropertiesBuilder fromProperties(MessageProperties properties) (2)

public static MessagePropertiesBuilder fromClonedProperties(MessageProperties properties) (3)
1 新しいメッセージプロパティオブジェクトがデフォルト値で初期化されます。
2 ビルダーは初期化され、build() は提供されたプロパティオブジェクトを返します。
3 引数のプロパティは、新しい MessageProperties オブジェクトにコピーされます。

AmqpTemplate の RabbitTemplate 実装では、各 send() メソッドには、追加の CorrelationData オブジェクトを取るオーバーロードされたバージョンがあります。パブリッシャーの確認が有効になっている場合、このオブジェクトは AmqpTemplate で説明されているコールバックで返されます。これにより、送信者は確認 (ack または nack) を送信メッセージと関連付けることができます。

バージョン 1.6.7 から、CorrelationAwareMessagePostProcessor インターフェースが導入され、メッセージの変換後に相関データを変更できるようになりました。次の例は、その使用方法を示しています。

Message postProcessMessage(Message message, Correlation correlation);

バージョン 2.0 では、このインターフェースは推奨されていません。このメソッドは、postProcessMessage(Message message) に委譲するデフォルトの実装で MessagePostProcessor に移動されました。

また、バージョン 1.6.7 から、CorrelationDataPostProcessor と呼ばれる新しいコールバックインターフェースが提供されます。これは、すべての MessagePostProcessor インスタンス ( send() メソッドで提供されるものと setBeforePublishPostProcessors() で提供されるもの) の後に呼び出されます。実装は、send() メソッドで提供される相関データ (存在する場合) を更新または置換できます。Message と元の CorrelationData (存在する場合) が引数として提供されます。次の例は、postProcess メソッドの使用方法を示しています。

CorrelationData postProcess(Message message, CorrelationData correlationData);

パブリッシャーの return

テンプレートの mandatory プロパティが true の場合、返されるメッセージは AmqpTemplate で説明されているコールバックによって提供されます。

バージョン 1.4 から、RabbitTemplate は SpEL mandatoryExpression プロパティをサポートします。これは、ルート評価オブジェクトとして各リクエストメッセージに対して評価され、boolean 値に解決されます。@myBean.isMandatory(#root) などの Bean 参照を式で使用できます。

パブリッシャーのリターンは、送受信操作で RabbitTemplate によって内部的に使用することもできます。詳細については、返信タイムアウトを参照してください。

バッチ処理

バージョン 1.4.2 は BatchingRabbitTemplate を導入しました。これは、BatchingStrategy に従ってメッセージをバッチ処理するオーバーライドされた send メソッドを持つ RabbitTemplate のサブクラスです。バッチが完了したときにのみ、RabbitMQ にメッセージが送信されます。次のリストは、BatchingStrategy インターフェース定義を示しています。

public interface BatchingStrategy {

    MessageBatch addToBatch(String exchange, String routingKey, Message message);

    Date nextRelease();

    Collection<MessageBatch> releaseBatches();

}
バッチ処理されたデータはメモリに保持されます。システム障害が発生した場合、未送信のメッセージが失われる可能性があります。

SimpleBatchingStrategy が提供されます。単一の交換キーまたはルーティングキーへのメッセージの送信をサポートします。次のプロパティがあります。

  • batchSize: 送信前のバッチ内のメッセージ数。

  • bufferLimit: バッチメッセージの最大サイズ。これにより、超過した場合は batchSize がプリエンプトされ、部分的なバッチが送信されます。

  • timeout: バッチにメッセージを追加する新しいアクティビティがない場合に、部分的なバッチが送信されるまでの時間。

SimpleBatchingStrategy は、埋め込まれた各メッセージの前に 4 バイトのバイナリ長を付けて、バッチをフォーマットします。これは、springBatchFormat メッセージプロパティを lengthHeader4 に設定することにより、受信側システムに伝達されます。

バッチ処理されたメッセージは、デフォルトで (springBatchFormat メッセージヘッダーを使用して) リスナーコンテナーによって自動的にバッチ処理解除されます。バッチからのメッセージを拒否すると、バッチ全体が拒否されます。

ただし、詳細については、バッチ処理を伴う @RabbitListener を参照してください。