4.0.5

リファレンスガイド

このガイドでは、Spring Cloud Stream バインダーの RabbitMQ 実装について説明します。設計、使用箇所、構成オプションに関する情報、StreamCloudStream の概念が RabbitMQ 固有の構成にどのようにマッピングされるかに関する情報が含まれています。

1. 使用方法

RabbitMQ バインダーを使用するには、次の Maven 座標を使用して、Spring Cloud Stream アプリケーションに追加できます。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

または、次のように Spring Cloud Stream RabbitMQ スターターを使用することもできます。

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. RabbitMQ バインダーの概要

次の簡略図は、RabbitMQ バインダーがどのように動作するかを示しています。

rabbit binder
図 1: RabbitMQ バインダー

デフォルトでは、RabbitMQ バインダーの実装は各宛先を TopicExchange にマップします。コンシューマーグループごとに、Queue はその TopicExchange にバインドされます。各コンシューマーインスタンスには、そのグループの Queue に対応する RabbitMQ Consumer インスタンスがあります。パーティション化されたプロデューサーとコンシューマーの場合、キューにはパーティションインデックスが接尾辞として付けられ、ルーティングキーとしてパーティションインデックスが使用されます。匿名のコンシューマー(group プロパティを持たないコンシューマー)の場合、自動削除キュー(ランダム化された一意の名前)が使用されます。

オプションの autoBindDlq オプションを使用することにより、デッドレターキュー(DLQ)(およびデッドレター交換 DLX、およびルーティングインフラストラクチャ)を作成および構成するようにバインダーを構成できます。デフォルトでは、デッドレターキューには宛先の名前があり、.dlq が追加されています。再試行が有効になっている場合(maxAttempts > 1)、再試行が終了した後、失敗したメッセージが DLQ に配信されます。再試行が無効になっている場合(maxAttempts = 1)、requeueRejected を false (デフォルト)に設定して、失敗したメッセージが再キューイングされるのではなく、DLQ にルーティングされるようにする必要があります。さらに、republishToDlq により、バインダーは失敗したメッセージを(拒否するのではなく)DLQ に発行します。この機能により、追加情報(x-exception-stacktrace ヘッダーのスタックトレースなど)をヘッダーのメッセージに追加できます。切り捨てられたスタックトレースについては、frameMaxHeadroom プロパティを参照してください。このオプションでは、再試行を有効にする必要はありません。失敗したメッセージは、1 回試行するだけで再公開できます。バージョン 1.2 以降、再発行されたメッセージの配信モードを構成できます。republishDeliveryMode プロパティを参照してください。

ストリームリスナーが ImmediateAcknowledgeAmqpException をスローした場合、DLQ はバイパスされ、メッセージは単に破棄されます。バージョン 2.1 以降、これは republishToDlq の設定に関係なく当てはまります。以前は、republishToDlq が false の場合のみでした。

requeueRejected を true に設定すると(republishToDlq=false を使用)、メッセージは継続的に再キューイングおよび再配信されます。これは、失敗の理由が一時的なものでない限り、意図したものではない可能性があります。一般に、maxAttempts を 1 より大きい値に設定するか、republishToDlq を true に設定することにより、バインダー内で再試行を有効にする必要があります。

バージョン 3.1.2 以降、コンシューマーが transacted としてマークされている場合、DLQ への公開はトランザクションに参加します。これにより、何らかの理由で公開が失敗した場合(たとえば、ユーザーがデッドレターエクスチェンジへの公開を認可されていない場合)にトランザクションをロールバックできます。さらに、接続ファクトリが発行者の確認または返送用に構成されている場合、DLQ への発行は確認を待機し、返されたメッセージを確認します。否定応答または返されたメッセージが受信された場合、バインダーは AmqpRejectAndDontRequeueException をスローし、ブローカーが republishToDlq プロパティが false であるかのように DLQ への公開を処理できるようにします。

これらのプロパティの詳細については、RabbitMQ バインダーのプロパティを参照してください。

フレームワークは、デッドレターメッセージを消費する(またはそれらをプライマリキューに再ルーティングする)ための標準的なメカニズムを提供していません。いくつかのオプションはデッドレターキュー処理で説明されています。

Spring Cloud Stream アプリケーションで複数の RabbitMQ バインダーが使用されている場合、2 つのバインダーに RabbitAutoConfiguration の同じ構成が適用されないように、"RabbitAutoConfiguration" を無効にすることが重要です。@SpringBootApplication アノテーションを使用して、クラスを除外できます。

バージョン 2.0 以降、RabbitMessageChannelBinder は RabbitTemplate.userPublisherConnection プロパティを true に設定して、非トランザクションのプロデューサーがコンシューマーのデッドロックを回避するようにします。これは、ブローカーのメモリアラーム (英語) のためにキャッシュされた接続がブロックされた場合に発生する可能性があります。

現在、multiplex コンシューマー(複数のキューをリッスンしている単一のコンシューマー)は、メッセージ駆動型のコンシューマーに対してのみサポートされています。ポーリングされたコンシューマーは、単一のキューからのみメッセージを取得できます。

3. 構成オプション

このセクションには、RabbitMQ バインダーとバインドされたチャネルに固有の設定が含まれています。

一般的なバインディング構成オプションとプロパティについては、Spring Cloud Stream コアドキュメント (英語) を参照してください。

3.1. RabbitMQ バインダーのプロパティ

デフォルトでは、RabbitMQ バインダーは Spring Boot の ConnectionFactory を使用します。その結果、RabbitMQ のすべての Spring Boot 構成オプションをサポートします。(参考までに、Spring Boot ドキュメントを参照してください)。RabbitMQ 構成オプションは、spring.rabbitmq プレフィックスを使用します。

Spring Boot オプションに加えて、RabbitMQ バインダーは次のプロパティをサポートします。

spring.cloud.stream.rabbit.binder.adminAddresses

RabbitMQ 管理プラグインの URL のコンマ区切りのリスト。nodes に複数のエントリが含まれている場合にのみ使用されます。このリストの各エントリには、spring.rabbitmq.addresses に対応するエントリが必要です。RabbitMQ クラスターを使用していて、キューをホストするノードから消費したい場合にのみ必要です。詳細については、キューアフィニティと LocalizedQueueConnectionFactory を参照してください。

デフォルト: 空。

spring.cloud.stream.rabbit.binder.nodes

RabbitMQ ノード名のコンマ区切りのリスト。複数のエントリがある場合、キューが配置されているサーバーアドレスを見つけるために使用されます。このリストの各エントリには、spring.rabbitmq.addresses に対応するエントリが必要です。RabbitMQ クラスターを使用していて、キューをホストするノードから消費したい場合にのみ必要です。詳細については、キューアフィニティと LocalizedQueueConnectionFactory を参照してください。

デフォルト: 空。

spring.cloud.stream.rabbit.binder.compressionLevel

圧縮されたバインディングの圧縮レベル。java.util.zip.Deflater を参照してください。

デフォルト: 1 (BEST_LEVEL)。

spring.cloud.stream.binder.connection-name-prefix

このバインダーによって作成された接続に名前を付けるために使用される接続名プレフィックス。名前はこのプレフィックスの後に #n が続きます。ここで、n は、新しい接続が開かれるたびに増分します。

デフォルト: none(Spring AMQP デフォルト)。

3.2. RabbitMQ コンシューマープロパティ

次のプロパティは Rabbit コンシューマーでのみ使用可能であり、接頭辞 spring.cloud.stream.rabbit.bindings.<channelName>.consumer. を付ける必要があります。

ただし、同じプロパティのセットをほとんどのバインディングに適用する必要がある場合は、繰り返しを避けるために、Spring Cloud Stream はすべてのチャネルの値を spring.cloud.stream.rabbit.default.<property>=<value> の形式で設定することをサポートしています。

また、特定のプロパティをバインドすると、デフォルトで同等のプロパティが上書きされることに注意してください。

acknowledgeMode

確認応答モード。

デフォルト: AUTO.

anonymousGroupPrefix

バインディングに group プロパティがない場合、匿名の自動削除キューが宛先交換にバインドされます。このようなキューのデフォルトの命名戦略により、anonymous.<base64 representation of a UUID> という名前のキューが作成されます。このプロパティを設定して、プレフィックスをデフォルト以外のものに変更します。

デフォルト: anonymous..

autoBindDlq

DLQ を自動的に宣言し、それをバインダー DLX にバインドするかどうか。

デフォルト: false.

bindingRoutingKey

キューを交換にバインドするためのルーティングキー(bindQueue が true の場合)。複数のキーにすることができます。bindingRoutingKeyDelimiter を参照してください。パーティション化された宛先の場合、-<instanceIndex> が各キーに追加されます。

デフォルト: #.

bindingRoutingKeyDelimiter

これが null でない場合、"bindingRoutingKey" はこの値で区切られたキーのリストであると見なされ、多くの場合、コンマが使用されます。

デフォルト: null.

bindQueue

キューを宣言して宛先交換にバインドするかどうか。独自のインフラストラクチャをセットアップし、以前にキューを作成してバインドしたことがある場合は、false に設定します。

デフォルト: true.

consumerTagPrefix

コンシューマータグを作成するために使用されます。#n が追加されます。ここで、n は、作成されたコンシューマーごとに増分します。例: ${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}

デフォルト: なし - ブローカーはランダムなコンシューマータグを生成します。

containerType

使用するリスナーコンテナーの種類を選択します。詳細については、Spring AMQP ドキュメントのコンテナーの選択を参照してください。[rabbitmq-stream] も参照してください。

デフォルト: simple

deadLetterQueueName

DLQ の名前

デフォルト: prefix+destination.dlq

deadLetterExchange

キューに割り当てる DLX。autoBindDlq が true の場合にのみ関連します。

デフォルト: 「プレフィックス +DLX」

deadLetterExchangeType

キューに割り当てる DLX の型。autoBindDlq が true の場合にのみ関連します。

デフォルト: ' 直接 '

deadLetterRoutingKey

キューに割り当てるデッドレタールーティングキー。autoBindDlq が true の場合にのみ関連します。

デフォルト: destination

declareDlx

宛先のデッドレター交換を宣言するかどうか。autoBindDlq が true の場合にのみ関連します。DLX が事前構成されている場合は、false に設定します。

デフォルト: true.

declareExchange

宛先の交換を宣言するかどうか。

デフォルト: true.

delayedExchange

交換を Delayed Message Exchange として宣言するかどうか。ブローカーに遅延メッセージ交換プラグインが必要です。x-delayed-type 引数は exchangeType に設定されます。

デフォルト: false.

dlqBindingArguments

dlq をデッドレター交換にバインドするときに適用される引数。headers deadLetterExchangeType とともに使用して、一致するヘッダーを指定します。たとえば、…​dlqBindingArguments.x-match=any…​dlqBindingArguments.someHeader=someValue

デフォルト: 空

dlqDeadLetterExchange

DLQ が宣言されている場合、そのキューに割り当てる DLX。

デフォルト: none

dlqDeadLetterRoutingKey

DLQ が宣言されている場合、そのキューに割り当てるデッドレタールーティングキー。

デフォルト: none

dlqExpires

未使用のデッドレターキューが削除されるまでの時間(ミリ秒単位)。

デフォルト: no expiration

dlqLazy

x-queue-mode=lazy 引数を使用してデッドレターキューを宣言します。“怠惰なキュー” (英語) を参照してください。ポリシーを使用すると、キューを削除せずに設定を変更できるため、この設定の代わりにポリシーを使用することを検討してください。

デフォルト: false.

dlqMaxLength

デッドレターキュー内のメッセージの最大数。

デフォルト: no limit

dlqMaxLengthBytes

すべてのメッセージからのデッドレターキューの合計バイト数の最大数。

デフォルト: no limit

dlqMaxPriority

デッドレターキュー(0-255)内のメッセージの最大優先度。

デフォルト: none

dlqOverflowBehavior

dlqMaxLength または dlqMaxLengthBytes を超えたときに実行するアクション。現在 drop-head または reject-publish ですが、RabbitMQ のドキュメントを参照してください。

デフォルト: none

dlqQuorum.deliveryLimit

quorum.enabled=true の場合、配信制限を設定してから、メッセージがドロップまたはデッドレターになります。

デフォルト: なし - ブローカーのデフォルトが適用されます。

dlqQuorum.enabled

true の場合、従来のキューの代わりにクォーラムデッドレターキューを作成します。

デフォルト: false

dlqQuorum.initialGroupSize

quorum.enabled=true の場合、初期クォーラムサイズを設定します。

デフォルト: なし - ブローカーのデフォルトが適用されます。

dlqSingleActiveConsumer

true に設定すると、x-single-active-consumer キュープロパティが true に設定されます。

デフォルト: false

dlqTtl

宣言されたときにデッドレターキューに適用されるデフォルトの存続時間(ミリ秒単位)。

デフォルト: no limit

durableSubscription

サブスクリプションが永続的である必要があるかどうか。group も設定されている場合にのみ有効です。

デフォルト: true.

exchangeAutoDelete

declareExchange が true の場合、交換を自動削除する(つまり、最後のキューが削除された後に削除する)かどうか。

デフォルト: true.

exchangeDurable

declareExchange が真の場合、交換が永続的であるかどうか(つまり、ブローカーの再起動後も存続するかどうか)。

デフォルト: true.

exchangeType

交換型: パーティション化されていない宛先の場合は directfanoutheaders または topic、パーティション化された宛先の場合は direct、ヘッダー、topic

デフォルト: topic.

exclusive

排他的なコンシューマーを作成するかどうか。これが true の場合、同時実行性は 1 である必要があります。厳密な順序付けが必要な場合によく使用されますが、障害後にホットスタンバイインスタンスが引き継ぐことができます。スタンバイインスタンスが消費を試みる頻度を制御する recoveryInterval を参照してください。RabbitMQ 3.8 以降を使用する場合は、代わりに singleActiveConsumer の使用を検討してください。

デフォルト: false.

expires

未使用のキューが削除されるまでの時間(ミリ秒単位)。

デフォルト: no expiration

failedDeclarationRetryInterval

キューが欠落している場合に、キューからの消費を試行する間隔(ミリ秒単位)。

デフォルト: 5000

frameMaxHeadroom

スタックトレースを DLQ メッセージヘッダーに追加するときに他のヘッダー用に予約するバイト数。すべてのヘッダーは、ブローカーで構成された frame_max サイズ内に収まる必要があります。スタックトレースは大きくなる可能性があります。サイズにこのプロパティを加えたものが frame_max を超える場合、スタックトレースは切り捨てられます。警告ログが書き込まれます。例外をキャッチし、スタックトレースが小さいものをスローすることにより、frame_max を増やすか、スタックトレースを減らすことを検討してください。

デフォルト: 20000

headerPatterns

受信メッセージからマッピングされるヘッダーのパターン。

デフォルト: ['*'] (すべてのヘッダー)。

lazy

x-queue-mode=lazy 引数を使用してキューを宣言します。“怠惰なキュー” (英語) を参照してください。ポリシーを使用すると、キューを削除せずに設定を変更できるため、この設定の代わりにポリシーを使用することを検討してください。

デフォルト: false.

maxConcurrency

コンシューマーの最大数。containerType が direct の場合はサポートされません。

デフォルト: 1.

maxLength

キュー内のメッセージの最大数。

デフォルト: no limit

maxLengthBytes

すべてのメッセージからのキュー内の合計バイト数の最大数。

デフォルト: no limit

maxPriority

キュー内のメッセージの最大優先度(0-255)。

デフォルト: none

missingQueuesFatal

キューが見つからない場合、その状態を致命的なものとして扱い、リスナーコンテナーを停止するかどうか。デフォルトは false であるため、コンテナーはキューからの消費を継続します。たとえば、クラスターを使用していて、非 HA キューをホストしているノードがダウンしている場合です。

デフォルト: false

overflowBehavior

maxLength または maxLengthBytes を超えたときに実行するアクション。現在 drop-head または reject-publish ですが、RabbitMQ のドキュメントを参照してください。

デフォルト: none

prefetch

プリフェッチカウント。

デフォルト: 1.

prefix

destination およびキューの名前に追加されるプレフィックス。

デフォルト: "".

queueBindingArguments

キューを取引所にバインドするときに適用される引数。headers exchangeType とともに使用して、一致するヘッダーを指定します。たとえば、…​queueBindingArguments.x-match=any…​queueBindingArguments.someHeader=someValue

デフォルト: 空

queueDeclarationRetries

キューが欠落している場合に、キューからの消費を再試行する回数。missingQueuesFatal が true の場合にのみ関連します。それ以外の場合、コンテナーは無期限に再試行し続けます。containerType が direct の場合はサポートされません。

デフォルト: 3

queueNameGroupOnly

true の場合、group と等しい名前のキューから消費します。それ以外の場合、キュー名は destination.group です。これは、たとえば、Spring Cloud Stream を使用して既存の RabbitMQ キューから消費する場合に役立ちます。

デフォルト: false。

quorum.deliveryLimit

quorum.enabled=true の場合、配信制限を設定してから、メッセージがドロップまたはデッドレターになります。

デフォルト: なし - ブローカーのデフォルトが適用されます。

quorum.enabled

true の場合、従来のキューの代わりにクォーラムキューを作成します。

デフォルト: false

quorum.initialGroupSize

quorum.enabled=true の場合、初期クォーラムサイズを設定します。

デフォルト: なし - ブローカーのデフォルトが適用されます。

recoveryInterval

接続回復の試行間隔(ミリ秒単位)。

デフォルト: 5000.

requeueRejected

再試行が無効になっている場合、または republishToDlq が false の場合に、配信の失敗を再キューイングする必要があるかどうか。

デフォルト: false.

republishDeliveryMode

republishToDlq が true の場合、再発行されたメッセージの配信モードを指定します。

デフォルト: DeliveryMode.PERSISTENT

republishToDlq

デフォルトでは、再試行回数の上限に達した後に失敗したメッセージは拒否されます。デッドレターキュー (DLQ) が設定されている場合、RabbitMQ は失敗したメッセージ (変更なし) を DLQ にルーティングします。true に設定されている場合、バインダーは、最終的な失敗の原因からの例外メッセージとスタックトレースを含む追加ヘッダーとともに、失敗したメッセージを DLQ に再発行します。frameMaxHeadroom プロパティも参照してください。

デフォルト: true

singleActiveConsumer

true に設定すると、x-single-active-consumer キュープロパティが true に設定されます。

デフォルト: false

transacted

トランザクションチャネルを使用するかどうか。

デフォルト: false.

ttl

宣言されたときにキューに適用するためのデフォルトの存続時間(ミリ秒単位)。

デフォルト: no limit

txSize

ラック間の配信数。containerType が direct の場合はサポートされません。

デフォルト: 1.

3.3. RabbitMQ ストリームプラグインの初期コンシューマーサポート

RabbitMQ ストリームプラグイン (英語) の基本的なサポートが提供されるようになりました。この機能を有効にするには、spring-rabbit-stream jar をクラスパスに追加する必要があります。これは spring-amqp および spring-rabbit と同じバージョンである必要があります。

containerType プロパティを stream に設定した場合、上記のコンシューマープロパティはサポートされません。concurrency は、スーパーストリームでのみサポートされます。各バインドで使用できるストリームキューは 1 つだけです。

containerType=stream を使用するようにバインダーを構成するために、Spring Boot はアプリケーションのプロパティから Environment @Bean を自動的に構成します。オプションで、カスタマイザーを追加して、リスナーコンテナーをカスタマイズできます。

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
    return (cont, dest, group) -> {
        StreamListenerContainer container = (StreamListenerContainer) cont;
        container.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first());
        });
        // ...
    };
}

カスタマイザーに渡される name 引数は destination + '.' + group + '.container' です。

ストリーム name() (オフセット追跡の目的で)は、バインディング destination + '.' + group に設定されます。上記の ConsumerCustomizer を使用して変更できます。手動オフセット追跡を使用する場合は、Context をメッセージヘッダーとして使用できます。

int count;

@Bean
public Consumer<Message<?>> input() {
    return msg -> {
        System.out.println(msg);
        if (++count % 1000 == 0) {
            Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
            context.consumer().store(context.offset());
        }
    };
}

環境およびコンシューマービルダーの構成については、RabbitMQStreamJava クライアントのドキュメント (英語) を参照してください。

3.3.1. RabbitMQ Super Streams のコンシューマーサポート

スーパーストリームについては、スーパーストリーム (英語) を参照してください。

スーパーストリームを使用すると、スーパーストリームの各パーティションで 1 つのアクティブなコンシューマーを使用して、自動的にスケールアップ / スケールダウンできます。

構成例:

@Bean
public Consumer<Thing> input() {
    ...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true

フレームワークは、9 つのパーティションを持つ super という名前のスーパーストリームを作成します。このアプリケーションの最大 3 つのインスタンスをデプロイできます。

3.4. 高度なリスナーコンテナー構成

バインダーまたはバインディングプロパティとして公開されていないリスナーコンテナープロパティを設定するには、型 ListenerContainerCustomizer の単一の Bean をアプリケーションコンテキストに追加します。バインダーとバインディングのプロパティが設定され、カスタマイザーが呼び出されます。カスタマイザー(configure() メソッド)には、キュー名とコンシューマーグループが引数として提供されます。

3.5. 高度なキュー / 交換 / バインディング構成

時々、RabbitMQ チームは、キューなどを宣言するときに引数を設定することで有効になる新機能を追加します。通常、このような機能は適切なプロパティを追加することでバインダーで有効になりますが、現在のバージョンではすぐに利用できない場合があります。バージョン 3.0.1 以降、宣言が実行される直前に DeclarableCustomizer Bean をアプリケーションコンテキストに追加して、Declarable (QueueExchangeBinding)を変更できるようになりました。これにより、現在バインダーで直接サポートされていない引数を追加できます。

3.6. バッチメッセージの受信

RabbitMQ バインダーでは、コンシューマーバインディングによって処理されるバッチには次の 2 つの型があります。

3.6.1. プロデューサーによって作成されたバッチ

通常、プロデューサーバーインディングに batch-enabled=true (Rabbit プロデューサーのプロパティを参照)がある場合、またはメッセージが BatchingRabbitTemplate によって作成される場合、バッチの要素はリスナーメソッドへの個別の呼び出しとして返されます。バージョン 3.0 以降、spring.cloud.stream.bindings.<name>.consumer.batch-mode が true に設定されている場合、そのようなバッチは List<?> としてリスナーメソッドに提示できます。

3.6.2. コンシューマー側のバッチ処理

バージョン 3.1 以降、コンシューマーは、変換されたペイロードの List<?> としてアプリケーションに提示されるバッチに複数の受信メッセージをアセンブルするように構成できます。次の簡単なアプリケーションは、この手法の使用方法を示しています。

spring.cloud.stream.bindings.input-in-0.group=someGroup

spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true

spring.cloud.stream.rabbit.bindings.input-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.receive-timeout=200
@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	Consumer<List<Thing>> input() {
		return list -> {
			System.out.println("Received " + list.size());
			list.forEach(thing -> {
				System.out.println(thing);

				// ...

			});
		};
	}

	@Bean
	public ApplicationRunner runner(RabbitTemplate template) {
		return args -> {
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}");
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}");
		};
	}

	public static class Thing {

		private String field;

		public Thing() {
		}

		public Thing(String field) {
			this.field = field;
		}

		public String getField() {
			return this.field;
		}

		public void setField(String field) {
			this.field = field;
		}

		@Override
		public String toString() {
			return "Thing [field=" + this.field + "]";
		}

	}

}
Received 2
Thing [field=value1]
Thing [field=value2]

バッチ内のメッセージの数は、batch-size および receive-timeout プロパティによって指定されます。receive-timeout が新しいメッセージなしで経過した場合、「短い」バッチが配信されます。

コンシューマー側のバッチ処理は、container-type=simple (デフォルト)でのみサポートされます。

コンシューマー側のバッチメッセージのヘッダーを調べたい場合は、Message<List<?>> を使用する必要があります。ヘッダーは、ヘッダー AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS 内の List<Map<String, Object>> であり、対応するインデックス内の各ペイロード要素のヘッダーがあります。繰り返しますが、ここに簡単な例があります:

@SpringBootApplication
public class Application {

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	Consumer<Message<List<Thing>>> input() {
		return msg -> {
			List<Thing> things = msg.getPayload();
			System.out.println("Received " + things.size());
			List<Map<String, Object>> headers =
					(List<Map<String, Object>>) msg.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS);
			for (int i = 0; i < things.size(); i++) {
				System.out.println(things.get(i) + " myHeader=" + headers.get(i).get("myHeader"));

				// ...

			}
		};
	}

	@Bean
	public ApplicationRunner runner(RabbitTemplate template) {
		return args -> {
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}", msg -> {
				msg.getMessageProperties().setHeader("myHeader", "headerValue1");
				return msg;
			});
			template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}", msg -> {
				msg.getMessageProperties().setHeader("myHeader", "headerValue2");
				return msg;
			});
		};
	}

	public static class Thing {

		private String field;

		public Thing() {
		}

		public Thing(String field) {
			this.field = field;
		}

		public String getfield() {
			return this.field;
		}

		public void setfield(String field) {
			this.field = field;
		}

		@Override
		public String toString() {
			return "Thing [field=" + this.field + "]";
		}

	}

}
Received 2
Thing [field=value1] myHeader=headerValue1
Thing [field=value2] myHeader=headerValue2

3.7. Rabbit プロデューサーのプロパティ

次のプロパティは Rabbit プロデューサーのみが使用でき、接頭辞 spring.cloud.stream.rabbit.bindings.<channelName>.producer. を付ける必要があります。

ただし、同じプロパティのセットをほとんどのバインディングに適用する必要がある場合は、繰り返しを避けるために、Spring Cloud Stream はすべてのチャネルの値を spring.cloud.stream.rabbit.default.<property>=<value> の形式で設定することをサポートしています。

また、特定のプロパティをバインドすると、デフォルトで同等のプロパティが上書きされることに注意してください。

altermateExchange.binding.queue

交換がまだ存在せず、name が提供されている場合は、このキューを代替交換にバインドします。引数のない単純な永続キューがプロビジョニングされます。より高度な構成が必要な場合は、自分でキューを構成してバインドする必要があります。

デフォルト: null alternateExchange.binding.routingKey 交換がまだ存在せず、name と queue が提供されている場合は、このルーティングキーを使用してキューを代替交換にバインドします。

デフォルト: # (デフォルトの topic 代替交換用)

alternateExchange.exists

代替交換が存在するかどうか、プロビジョニングが必要かどうか。

デフォルト: false

alternateExchange.type

代替取引所がまだ存在しない場合は、プロビジョニングする取引所の型。

デフォルト: topic

alternateExchange.name

宛先交換機で代替交換機を構成します。

デフォルト: null

autoBindDlq

DLQ を自動的に宣言し、それをバインダー DLX にバインドするかどうか。

デフォルト: false.

batchingEnabled

プロデューサーによるメッセージのバッチ処理を有効にするかどうか。メッセージは、次のプロパティ (このリストの次の 3 つのエントリで説明) に従って 1 つのメッセージにバッチ処理されます: 'batchSize'、batchBufferLimitbatchTimeout。詳細については、バッチ処理を参照してください。また、バッチメッセージの受信も参照してください。

デフォルト: false.

batchSize

バッチ処理が有効になっているときにバッファリングするメッセージの数。

デフォルト: 100.

batchBufferLimit

バッチ処理が有効になっている場合の最大バッファサイズ。

デフォルト: 10000.

batchTimeout

バッチ処理が有効になっている場合のバッチタイムアウト。

デフォルト: 5000.

bindingRoutingKey

キューを交換にバインドするためのルーティングキー(bindQueue が true の場合)。複数のキーにすることができます。bindingRoutingKeyDelimiter を参照してください。パーティション化された宛先の場合、-n が各キーに追加されます。requiredGroups が提供されている場合にのみ適用され、それらのグループにのみ適用されます。

デフォルト: #.

bindingRoutingKeyDelimiter

これが null でない場合、"bindingRoutingKey" はこの値で区切られたキーのリストであると見なされます。多くの場合、コンマが使用されます。requiredGroups が提供されている場合にのみ適用され、その場合はそれらのグループにのみ適用されます。

デフォルト: null.

bindQueue

キューを宣言して宛先交換にバインドするかどうか。独自のインフラストラクチャをセットアップし、以前にキューを作成してバインドしたことがある場合は、false に設定します。requiredGroups が提供されている場合にのみ適用され、それらのグループにのみ適用されます。

デフォルト: true.

compress

送信時にデータを圧縮する必要があるかどうか。

デフォルト: false.

confirmAckChannel

errorChannelEnabled が true の場合、肯定的な配信確認を送信するチャネル(別名パブリッシャーが確認)。チャネルが存在しない場合は、DirectChannel がこの名前で登録されます。接続ファクトリは、発行者が確認できるように構成する必要があります。useConfirmHeader と相互に排他的です。

デフォルト: nullChannel (ack は破棄されます)。

deadLetterQueueName

DLQ の名前は、requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: prefix+destination.dlq

deadLetterExchange

キューに割り当てる DLX。autoBindDlq が true の場合にのみ関連します。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: 「プレフィックス +DLX」

deadLetterExchangeType

キューに割り当てる DLX の型。autoBindDlq が true の場合にのみ関連します。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: ' 直接 '

deadLetterRoutingKey

キューに割り当てるデッドレタールーティングキー。autoBindDlq が true の場合にのみ関連します。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: destination

declareDlx

宛先のデッドレター交換を宣言するかどうか。autoBindDlq が true の場合にのみ関連します。DLX が事前構成されている場合は、false に設定します。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: true.

declareExchange

宛先の交換を宣言するかどうか。

デフォルト: true.

delayExpression

メッセージに適用する遅延を評価するための SpEL 式(x-delay ヘッダー)。交換が遅延メッセージ交換でない場合は効果がありません。

デフォルト: x-delay ヘッダーは設定されていません。

delayedExchange

交換を Delayed Message Exchange として宣言するかどうか。ブローカーに遅延メッセージ交換プラグインが必要です。x-delayed-type 引数は exchangeType に設定されます。

デフォルト: false.

deliveryMode

配信モード。

デフォルト: PERSISTENT.

dlqBindingArguments

dlq をデッドレター交換にバインドするときに適用される引数。headers deadLetterExchangeType とともに使用して、一致するヘッダーを指定します。たとえば、…​dlqBindingArguments.x-match=any…​dlqBindingArguments.someHeader=someValuerequiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: 空

dlqDeadLetterExchange

DLQ が宣言されると、そのキューに割り当てる DLX。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: none

dlqDeadLetterRoutingKey

DLQ が宣言されると、そのキューに割り当てるデッドレタールーティングキー。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: none

dlqExpires

未使用のデッドレターキューが削除されるまでの時間(ミリ秒単位)。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: no expiration

dlqLazy

x-queue-mode=lazy 引数を使用してデッドレターキューを宣言します。“怠惰なキュー” (英語) を参照してください。ポリシーを使用すると、キューを削除せずに設定を変更できるため、この設定の代わりにポリシーを使用することを検討してください。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

dlqMaxLength

デッドレターキュー内のメッセージの最大数。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: no limit

dlqMaxLengthBytes

すべてのメッセージからのデッドレターキューの合計バイト数の最大数。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: no limit

dlqMaxPriority

デッドレターキュー内のメッセージの最大優先度(0-255) requiredGroups が提供されている場合にのみ適用され、次にそれらのグループにのみ適用されます。

デフォルト: none

dlqQuorum.deliveryLimit

quorum.enabled=true の場合、配信制限を設定してから、メッセージがドロップまたはデッドレターになります。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: なし - ブローカーのデフォルトが適用されます。

dlqQuorum.enabled

true の場合、従来のキューの代わりにクォーラムデッドレターキューを作成します。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: false

dlqQuorum.initialGroupSize

quorum.enabled=true の場合、初期クォーラムサイズを設定します。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: なし - ブローカーのデフォルトが適用されます。

dlqSingleActiveConsumer

true に設定すると、x-single-active-consumer キュープロパティが true に設定されます。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: false

dlqTtl

宣言されたときにデッドレターキューに適用されるデフォルトの存続時間(ミリ秒単位)。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: no limit

exchangeAutoDelete

declareExchange が true の場合、交換を自動削除するかどうか(最後のキューが削除された後に削除されます)。

デフォルト: true.

exchangeDurable

declareExchange が true の場合、交換が永続的である必要があるかどうか(ブローカーの再起動後も存続します)。

デフォルト: true.

exchangeType

交換型: パーティション化されていない宛先の場合は directfanoutheaders または topic、パーティション化された宛先の場合は directheaders または topic

デフォルト: topic.

expires

未使用のキューが削除されるまでの時間(ミリ秒単位)。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: no expiration

headerPatterns

送信メッセージにマップされるヘッダーのパターン。

デフォルト: ['*'] (すべてのヘッダー)。

lazy

x-queue-mode=lazy 引数を使用してキューを宣言します。“怠惰なキュー” (英語) を参照してください。ポリシーを使用すると、キューを削除せずに設定を変更できるため、この設定の代わりにポリシーを使用することを検討してください。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: false.

maxLength

キュー内のメッセージの最大数。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: no limit

maxLengthBytes

すべてのメッセージからのキュー内の合計バイト数の最大数。requiredGroups が提供されている場合にのみ適用され、それらのグループにのみ適用されます。

デフォルト: no limit

maxPriority

キュー内のメッセージの最大優先度(0-255)。requiredGroups が提供されている場合にのみ適用され、それらのグループにのみ適用されます。

デフォルト: none

prefix

destination 交換の名前に追加されるプレフィックス。

デフォルト: "".

producerType

プロデューサーの型。

  • クラシックキューおよびクォーラムキュー用の AMQP AMQP クライアント

  • STREAM_SYNC RabbitMQ Streams プラグインクライアント、確認を受信するまでブロック

  • STREAM_ASYNC RabbitMQ ストリームプラグインクライアント、ブロックしない

    デフォルト: "".

queueBindingArguments

キューを取引所にバインドするときに適用される引数。headers exchangeType とともに使用して、一致するヘッダーを指定します。たとえば、…​queueBindingArguments.x-match=any…​queueBindingArguments.someHeader=someValuerequiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: 空

queueNameGroupOnly

true の場合、group と等しい名前のキューから消費します。それ以外の場合、キュー名は destination.group です。これは、たとえば、Spring Cloud Stream を使用して既存の RabbitMQ キューから消費する場合に役立ちます。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: false。

quorum.deliveryLimit

quorum.enabled=true の場合、配信制限を設定してから、メッセージがドロップまたはデッドレターになります。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: なし - ブローカーのデフォルトが適用されます。

quorum.enabled

true の場合、従来のキューの代わりにクォーラムキューを作成します。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: false

quorum.initialGroupSize

quorum.enabled=true の場合、初期クォーラムサイズを設定します。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: なし - ブローカーのデフォルトが適用されます。

routingKeyExpression

メッセージのパブリッシュ時に使用するルーティングキーを決定する SpEL 式。固定ルーティングキーの場合は、routingKey を使用します。

デフォルト: パーティション化された宛先の場合は destination または destination-<partition>

routingKey

メッセージの発行時に使用する固定ルーティングキーを定義する文字列。

デフォルト: routingKeyExpression を参照

singleActiveConsumer

true に設定すると、x-single-active-consumer キュープロパティが true に設定されます。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: false

transacted

トランザクションチャネルを使用するかどうか。

デフォルト: false.

ttl

宣言されたときにキューに適用するために存続するデフォルトの時間(ミリ秒単位)。requiredGroups が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。

デフォルト: no limit

useConfirmHeader

パブリッシャーが確認を参照してください。confirmAckChannel と相互に排他的です。

RabbitMQ の場合、コンテンツ型ヘッダーは外部アプリケーションによって設定できます。Spring Cloud Stream は、ヘッダーをネイティブにサポートしない Kafka(0.11 より前)などのトランスポートを含む、あらゆる型のトランスポートに使用される拡張内部プロトコルの一部としてサポートします。

3.8. パブリッシャーが確認

メッセージの公開結果を取得するには、2 つのメカニズムがあります。いずれの場合も、接続ファクトリには publisherConfirmType セット ConfirmType.CORRELATED が必要です。「レガシー」メカニズムは、confirmAckChannel をメッセージチャネルの Bean 名に設定し、そこから非同期で確認を取得できるようにすることです。負の ack がエラーチャネルに送信されます(有効になっている場合)- エラーチャネルを参照してください。

バージョン 3.1 で追加された推奨メカニズムは、相関データヘッダーを使用し、その Future<Confirm> プロパティを介して結果を待機することです。これは、結果を待つ前に複数のメッセージを送信できるため、バッチリスナーで特に役立ちます。この手法を使用するには、useConfirmHeader プロパティを true に設定します。次の簡単なアプリケーションは、この手法の使用例です。

spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true

spring.cloud.stream.source=output
spring.cloud.stream.bindings.output-out-0.producer.error-channel-enabled=true

spring.cloud.stream.rabbit.bindings.output-out-0.producer.useConfirmHeader=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Autowired
	private StreamBridge bridge;

	@Bean
	Consumer<List<String>> input() {
		return list -> {
			List<MyCorrelationData> results = new ArrayList<>();
			list.forEach(str -> {
				log.info("Received: " + str);
				MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str);
				results.add(corr);
				this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase())
						.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
						.build());
			});
			results.forEach(correlation -> {
				try {
					Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS);
					log.info(confirm + " for " + correlation.getPayload());
					if (correlation.getReturnedMessage() != null) {
						log.error("Message for " + correlation.getPayload() + " was returned ");

						// throw some exception to invoke binder retry/error handling

					}
				}
				catch (InterruptedException e) {
					Thread.currentThread().interrupt();
					throw new IllegalStateException(e);
				}
				catch (ExecutionException | TimeoutException e) {
					throw new IllegalStateException(e);
				}
			});
		};
	}


	@Bean
	public ApplicationRunner runner(BatchingRabbitTemplate template) {
		return args -> IntStream.range(0, 10).forEach(i ->
				template.convertAndSend("input-in-0", "input-in-0.rbgh303", "foo" + i));
	}

	@Bean
	public BatchingRabbitTemplate template(CachingConnectionFactory cf, TaskScheduler taskScheduler) {
		BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, 1000000, 1000);
		return new BatchingRabbitTemplate(cf, batchingStrategy, taskScheduler);
	}

}

class MyCorrelationData extends CorrelationData {

	private final String payload;

	MyCorrelationData(String id, String payload) {
		super(id);
		this.payload = payload;
	}

	public String getPayload() {
		return this.payload;
	}

}

ご覧のとおり、各メッセージを送信してから、公開結果を待ちます。メッセージをルーティングできない場合は、将来が完了する前に、返されたメッセージが相関データに入力されます。

フレームワークが相関を実行できるように、相関データには一意の id を提供する必要があります。

useConfirmHeader と confirmAckChannel の両方を設定することはできませんが、useConfirmHeader が true の場合でも、エラーチャネルで返されたメッセージを受信できますが、相関ヘッダーを使用する方が便利です。

3.9. RabbitMQ ストリームプラグインの初期プロデューサーサポート

RabbitMQ ストリームプラグイン (英語) の基本的なサポートが提供されるようになりました。この機能を有効にするには、spring-rabbit-stream jar をクラスパスに追加する必要があります。これは spring-amqp および spring-rabbit と同じバージョンである必要があります。

上記のプロデューサープロパティは、producerType プロパティを STREAM_SYNC または STREAM_ASYNC に設定した場合はサポートされません。

ストリーム ProducerType を使用するようにバインダーを構成するために、Spring Boot はアプリケーションのプロパティから Environment @Bean を構成します。オプションで、カスタマイザーを追加して、メッセージハンドラーをカスタマイズできます。

@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
    return (hand, dest) -> {
        RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
        handler.setConfirmTimeout(5000);
        ((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
                (name, builder) -> {
                    ...
                });
    };
}

環境およびプロデューサービルダーの構成については、RabbitMQStreamJava クライアントのドキュメント (英語) を参照してください。

3.9.1. RabbitMQ スーパーストリームのプロデューサーサポート

スーパーストリームについては、スーパーストリーム (英語) を参照してください。

スーパーストリームを使用すると、スーパーストリームの各パーティションで 1 つのアクティブなコンシューマーを使用して、自動的にスケールアップ / スケールダウンできます。Spring Cloud Stream を使用すると、AMQP またはストリームクライアントを使用してスーパーストリームに公開できます。

スーパーストリームはすでに存在している必要があります。スーパーストリームの作成は、プロデューサーバーインディングではサポートされていません。

AMQP を介したスーパーストリームへのパブリッシュ:

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

ストリームクライアントを使用してスーパーストリームに公開する:

spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false

ストリームクライアントを使用する場合、confirmAckChannel を設定すると、正常に送信されたメッセージのコピーがそのチャネルに送信されます。

4. 既存のキュー / 交換の使用

デフォルトでは、バインダーは、宛先バインディングプロパティ <prefix><destination> の値から派生した名前でトピック交換を自動的にプロビジョニングします。指定しない場合、宛先はデフォルトでバインディング名になります。コンシューマーをバインドする場合、キューは <prefix><destination>.<group> という名前で自動的にプロビジョニングされます(group バインディングプロパティが指定されている場合)。または、group がない場合は、匿名の自動削除キューがプロビジョニングされます。キューは、非パーティションバインディングの場合は "match-all" ワイルドカードルーティングキー(#)、パーティションバインディングの場合は <destination>-<instanceIndex> を使用して交換にバインドされます。プレフィックスは、デフォルトでは空の String です。出力バインディングが requiredGroups で指定されている場合、キュー / バインディングはグループごとにプロビジョニングされます。

このデフォルトの動作を変更できるウサギ固有のバインディングプロパティがいくつかあります。

使用したい既存の交換 / キューがある場合、交換の名前が myExchange で、キューの名前が myQueue であると仮定すると、次のように自動プロビジョニングを完全に無効にできます。

  • spring.cloud.stream.bindings.<binding name>.destination=myExchange

  • spring.cloud.stream.bindings.<binding name>.group=myQueue

  • spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false

  • spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false

  • spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true

バインダーにキュー / 交換をプロビジョニングさせたいが、ここで説明したデフォルト以外のものを使用してプロビジョニングしたい場合は、次のプロパティを使用します。詳細については、上記のプロパティドキュメントを参照してください。

  • spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey

  • spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>

  • spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'

autoBindDlq が true の場合、デッドレター交換 / キューを宣言するときに使用される同様のプロパティがあります。

5. RabbitMQ バインダーで再試行する

バインダー内で再試行が有効になっている場合、リスナーコンテナースレッドは、構成されているバックオフ期間の間中断されます。これは、単一のコンシューマーで厳密なオーダーが必要な場合に重要になる可能性があります。ただし、他のユースケースでは、他のメッセージがそのスレッドで処理されるのを防ぎます。バインダーの再試行を使用する代わりに、デッドレターキュー(DLQ)に存続する時間のあるデッドレタリングと、DLQ 自体のデッドレター構成を設定することもできます。ここで説明するプロパティの詳細については、"RabbitMQ バインダーのプロパティ" を参照してください。次の設定例を使用して、この機能を有効にできます。

  • autoBindDlq を true に設定します。バインダーは DLQ を作成します。オプションで、deadLetterQueueName で名前を指定できます。

  • dlqTtl を、再配信の間に待機するバックオフ時間に設定します。

  • dlqDeadLetterExchange をデフォルトの交換に設定します。デフォルトの deadLetterRoutingKey はキュー名(destination.group)であるため、DLQ からの期限切れのメッセージは元のキューにルーティングされます。次の例に示すように、デフォルトの交換に設定するには、値を指定せずにプロパティを設定します。

メッセージを強制的にデッドレターにするには、AmqpRejectAndDontRequeueException をスローするか、requeueRejected を false (デフォルト)に設定して例外をスローします。

ループは終わりなく続きます。これは一時的な問題には問題ありませんが、何度か試行した後はあきらめたい場合があります。幸い、RabbitMQ は x-death ヘッダーを提供します。これにより、発生したサイクル数を判別できます。

諦めた後にメッセージを確認するには、ImmediateAcknowledgeAmqpException をスローします。

5.1. すべてまとめる

次の構成では、ワイルドカードルーティングキー # を使用してトピック交換にバインドされたキュー myDestination.consumerGroup を使用して交換 myDestination を作成します。

---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---

この構成では、myDestination.consumerGroup のルーティングキーを使用して直接交換(DLX)にバインドされた DLQ が作成されます。メッセージが拒否されると、メッセージは DLQ にルーティングされます。次の例に示すように、5 秒後、メッセージは期限切れになり、キュー名をルーティングキーとして使用して元のキューにルーティングされます。

Spring Boot アプリケーション
@SpringBootApplication
public class XDeathApplication {

    public static void main(String[] args) {
        SpringApplication.run(XDeathApplication.class, args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            Map<?,?> death = message.getHeaders().get("x-death");
            if (death != null && death.get("count").equals(3L)) {
                // giving up - don't send to DLX
                throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
            }
            throw new AmqpRejectAndDontRequeueException("failed");
        };
    }

}

x-death ヘッダーの count プロパティが Long であることに注意してください。

6. エラーチャネル

バージョン 1.3 以降、バインダーは各コンシューマー宛先のエラーチャネルに無条件に例外を送信し、非同期プロデューサー送信エラーをエラーチャネルに送信するように構成することもできます。詳細については、"[Spring- クラウド - ストリーム - 概要 - エラー処理 ]" を参照してください。

RabbitMQ には、2 種類の送信失敗があります。

後者はまれです。RabbitMQ のドキュメントによると、「[A nack] は、キューを担当する Erlang プロセスで内部エラーが発生した場合にのみ配信されます。」 reject-publish キューオーバーフロー動作を使用して制限付きキューに公開する場合も、否定応答を受け取る可能性があります。

( "[Spring- クラウド - ストリーム - 概要 - エラー処理 ]" に従って)プロデューサーエラーチャネルを有効にするだけでなく、RabbitMQ バインダーは、接続ファクトリが次のように適切に構成されている場合にのみ、チャネルにメッセージを送信します。

  • ccf.setPublisherConfirms(true);

  • ccf.setPublisherReturns(true);

接続ファクトリに Spring Boot 構成を使用する場合は、次のプロパティを設定します。

  • spring.rabbitmq.publisher-confirms

  • spring.rabbitmq.publisher-returns

返されたメッセージの ErrorMessage のペイロードは、次のプロパティを持つ ReturnedAmqpMessageException です。

  • failedMessage: 送信に失敗した spring-messaging Message<?>

  • amqpMessage: 生の spring-amqp Message

  • replyCode: 失敗の理由を示す整数値(たとえば、312- ルートなし)。

  • replyText: 失敗の理由を示すテキスト値(たとえば、NO_ROUTE)。

  • exchange: メッセージが公開された交換。

  • routingKey: メッセージが公開されたときに使用されたルーティングキー。

返されたメッセージを受信するための代替メカニズムについては、パブリッシャーが確認も参照してください。

否定的に確認された確認の場合、ペイロードは次のプロパティを持つ NackedAmqpMessageException です。

  • failedMessage: 送信に失敗した spring-messaging Message<?>

  • nackReason: 理由(利用可能な場合 - 詳細については、ブローカーのログを調べる必要がある場合があります)。

これらの例外(デッドレターキューへの送信など)の自動処理はありません。これらの例外は、独自の Spring Integration フローで使用できます。

7. Rabbit バインダーヘルスインジケーター

Rabbit バインダーのヘルスインジケーターは、Spring Boot から提供されたものに委譲されます。詳細については、こちらを参照してください。

プロパティ - management.health.binders.enabled を使用してバインダーレベルでこのヘルスインジケーターを無効にし、これを false に設定できます。マルチバインダー環境の場合、これはバインダーの環境プロパティで設定する必要があります。

ヘルスインジケーターが無効になっている場合、ヘルスアクチュエーターのエンドポイントに次のようなものが表示されます。

"rabbit": {
  "status": "UNKNOWN"
}

Spring Boot レベルで、Rabbit ヘルスインジケーターを無効にする場合は、プロパティ management.health.rabbit.enabled を使用し、false に設定する必要があります。

8. デッドレターキュー処理

ユーザーがデッドレターメッセージをどのように処理するかを予測できないため、フレームワークはそれらを処理するための標準的なメカニズムを提供しません。デッドレタリングの理由が一時的なものである場合は、メッセージを元のキューに戻すことをお勧めします。ただし、課題が永続的な課題である場合は、無限ループが発生する可能性があります。次の Spring Boot アプリケーションは、これらのメッセージを元のキューに戻す方法の例を示していますが、3 回試行した後、3 番目の「駐車場」キューに移動します。2 番目の例では、RabbitMQ 遅延メッセージ交換 (英語) を使用して、再キューイングされたメッセージに遅延を導入します。この例では、試行ごとに遅延が増加します。これらの例では、@RabbitListener を使用して DLQ からメッセージを受信します。バッチプロセスで RabbitTemplate.receive() を使用することもできます。

例では、元の宛先が so8400in であり、コンシューマーグループが so8400 であると想定しています。

8.1. パーティション化されていない宛先

最初の 2 つの例は、宛先がパーティション化されていない場合のものです。

@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Press enter to exit");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
            this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}
@SpringBootApplication
public class ReRouteDlqApplication {

    private static final String ORIGINAL_QUEUE = "so8400in.so8400";

    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

    private static final String X_RETRIES_HEADER = "x-retries";

    private static final String DELAY_EXCHANGE = "dlqReRouter";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
        System.out.println("Press enter to exit");
        System.in.read();
        context.close();
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(queues = DLQ)
    public void rePublish(Message failedMessage) {
        Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
        Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
        if (retriesHeader == null) {
            retriesHeader = Integer.valueOf(0);
        }
        if (retriesHeader < 3) {
            headers.put(X_RETRIES_HEADER, retriesHeader + 1);
            headers.put("x-delay", 5000 * retriesHeader);
            this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
        }
        else {
            this.rabbitTemplate.send(PARKING_LOT, failedMessage);
        }
    }

    @Bean
    public DirectExchange delayExchange() {
        DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
        exchange.setDelayed(true);
        return exchange;
    }

    @Bean
    public Binding bindOriginalToDelay() {
        return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
    }

    @Bean
    public Queue parkingLot() {
        return new Queue(PARKING_LOT);
    }

}

8.2. パーティション化された宛先

パーティション化された宛先では、すべてのパーティションに 1 つの DLQ があります。ヘッダーから元のキューを決定します。

8.2.1. republishToDlq=false

republishToDlq が false の場合、次の例に示すように、RabbitMQ は元の宛先に関する情報を含む x-death ヘッダーを使用してメッセージを DLX/DLQ に公開します。

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_DEATH_HEADER = "x-death";

	private static final String X_RETRIES_HEADER = "x-retries";

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Press enter to exit");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
			String exchange = (String) xDeath.get(0).get("exchange");
			List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
			this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

8.2.2. republishToDlq=true

republishToDlq が true の場合、次の例に示すように、再発行リカバリは元の交換キーとルーティングキーをヘッダーに追加します。

@SpringBootApplication
public class ReRouteDlqApplication {

	private static final String ORIGINAL_QUEUE = "so8400in.so8400";

	private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

	private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";

	private static final String X_RETRIES_HEADER = "x-retries";

	private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;

	private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;

	public static void main(String[] args) throws Exception {
		ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
		System.out.println("Press enter to exit");
		System.in.read();
		context.close();
	}

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@RabbitListener(queues = DLQ)
	public void rePublish(Message failedMessage) {
		Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
		Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
		if (retriesHeader == null) {
			retriesHeader = Integer.valueOf(0);
		}
		if (retriesHeader < 3) {
			headers.put(X_RETRIES_HEADER, retriesHeader + 1);
			String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
			String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
			this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
		}
		else {
			this.rabbitTemplate.send(PARKING_LOT, failedMessage);
		}
	}

	@Bean
	public Queue parkingLot() {
		return new Queue(PARKING_LOT);
	}

}

9. RabbitMQ バインダーを使用したパーティション分割

RabbitMQ はネイティブでのパーティショニングをサポートしていません。

特定のパーティションにデータを送信すると有利な場合があります。たとえば、メッセージ処理を厳密に並べ替える場合は、特定の顧客宛てのすべてのメッセージを同じパーティションに送信する必要があります。

RabbitMessageChannelBinder は、各パーティションのキューを宛先交換にバインドすることにより、パーティション化を提供します。

次の Java と YAML の例は、プロデューサーを構成する方法を示しています。

プロデューサー
@SpringBootApplication
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            generate-out-0:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

前の例の構成では、デフォルトのパーティショニング(key.hashCode() % partitionCount)を使用しています。これは、キー値に応じて、適切にバランスの取れたアルゴリズムを提供する場合と提供しない場合があります。partitionSelectorExpression または partitionSelectorClass プロパティを使用して、このデフォルトをオーバーライドできます。

required-groups プロパティは、プロデューサーのデプロイ時にコンシューマーキューをプロビジョニングする必要がある場合にのみ必要です。そうしないと、対応するコンシューマーがデプロイされるまで、パーティションに送信されたメッセージはすべて失われます。

次の構成は、トピック交換をプロビジョニングします。

part exchange

次のキューがその交換にバインドされています。

part queues

次のバインディングは、キューを交換に関連付けます。

part bindings

次の Java と YAML の例は、前の例を継続し、コンシューマーを構成する方法を示しています。

コンシューマー
@SpringBootApplication
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
            System.out.println(in + " received from queue " + queue);
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            listen-in-0:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0
RabbitMessageChannelBinder は動的スケーリングをサポートしていません。パーティションごとに少なくとも 1 つのコンシューマーが必要です。コンシューマーの instanceIndex は、どのパーティションが消費されているかを示すために使用されます。Cloud Foundry などのプラットフォームは、instanceIndex を持つインスタンスを 1 つだけ持つことができます。