4.0.5
リファレンスガイド
このガイドでは、Spring Cloud Stream バインダーの RabbitMQ 実装について説明します。設計、使用箇所、構成オプションに関する情報、StreamCloudStream の概念が RabbitMQ 固有の構成にどのようにマッピングされるかに関する情報が含まれています。
1. 使用方法
RabbitMQ バインダーを使用するには、次の Maven 座標を使用して、Spring Cloud Stream アプリケーションに追加できます。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
または、次のように Spring Cloud Stream RabbitMQ スターターを使用することもできます。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
2. RabbitMQ バインダーの概要
次の簡略図は、RabbitMQ バインダーがどのように動作するかを示しています。
デフォルトでは、RabbitMQ バインダーの実装は各宛先を TopicExchange
にマップします。コンシューマーグループごとに、Queue
はその TopicExchange
にバインドされます。各コンシューマーインスタンスには、そのグループの Queue
に対応する RabbitMQ Consumer
インスタンスがあります。パーティション化されたプロデューサーとコンシューマーの場合、キューにはパーティションインデックスが接尾辞として付けられ、ルーティングキーとしてパーティションインデックスが使用されます。匿名のコンシューマー(group
プロパティを持たないコンシューマー)の場合、自動削除キュー(ランダム化された一意の名前)が使用されます。
オプションの autoBindDlq
オプションを使用することにより、デッドレターキュー(DLQ)(およびデッドレター交換 DLX
、およびルーティングインフラストラクチャ)を作成および構成するようにバインダーを構成できます。デフォルトでは、デッドレターキューには宛先の名前があり、.dlq
が追加されています。再試行が有効になっている場合(maxAttempts > 1
)、再試行が終了した後、失敗したメッセージが DLQ に配信されます。再試行が無効になっている場合(maxAttempts = 1
)、requeueRejected
を false
(デフォルト)に設定して、失敗したメッセージが再キューイングされるのではなく、DLQ にルーティングされるようにする必要があります。さらに、republishToDlq
により、バインダーは失敗したメッセージを(拒否するのではなく)DLQ に発行します。この機能により、追加情報(x-exception-stacktrace
ヘッダーのスタックトレースなど)をヘッダーのメッセージに追加できます。切り捨てられたスタックトレースについては、frameMaxHeadroom
プロパティを参照してください。このオプションでは、再試行を有効にする必要はありません。失敗したメッセージは、1 回試行するだけで再公開できます。バージョン 1.2 以降、再発行されたメッセージの配信モードを構成できます。republishDeliveryMode
プロパティを参照してください。
ストリームリスナーが ImmediateAcknowledgeAmqpException
をスローした場合、DLQ はバイパスされ、メッセージは単に破棄されます。バージョン 2.1 以降、これは republishToDlq
の設定に関係なく当てはまります。以前は、republishToDlq
が false
の場合のみでした。
requeueRejected を true に設定すると(republishToDlq=false を使用)、メッセージは継続的に再キューイングおよび再配信されます。これは、失敗の理由が一時的なものでない限り、意図したものではない可能性があります。一般に、maxAttempts を 1 より大きい値に設定するか、republishToDlq を true に設定することにより、バインダー内で再試行を有効にする必要があります。 |
バージョン 3.1.2 以降、コンシューマーが transacted
としてマークされている場合、DLQ への公開はトランザクションに参加します。これにより、何らかの理由で公開が失敗した場合(たとえば、ユーザーがデッドレターエクスチェンジへの公開を認可されていない場合)にトランザクションをロールバックできます。さらに、接続ファクトリが発行者の確認または返送用に構成されている場合、DLQ への発行は確認を待機し、返されたメッセージを確認します。否定応答または返されたメッセージが受信された場合、バインダーは AmqpRejectAndDontRequeueException
をスローし、ブローカーが republishToDlq
プロパティが false
であるかのように DLQ への公開を処理できるようにします。
これらのプロパティの詳細については、RabbitMQ バインダーのプロパティを参照してください。
フレームワークは、デッドレターメッセージを消費する(またはそれらをプライマリキューに再ルーティングする)ための標準的なメカニズムを提供していません。いくつかのオプションはデッドレターキュー処理で説明されています。
Spring Cloud Stream アプリケーションで複数の RabbitMQ バインダーが使用されている場合、2 つのバインダーに RabbitAutoConfiguration の同じ構成が適用されないように、"RabbitAutoConfiguration" を無効にすることが重要です。@SpringBootApplication アノテーションを使用して、クラスを除外できます。 |
バージョン 2.0 以降、RabbitMessageChannelBinder
は RabbitTemplate.userPublisherConnection
プロパティを true
に設定して、非トランザクションのプロデューサーがコンシューマーのデッドロックを回避するようにします。これは、ブローカーのメモリアラーム (英語) のためにキャッシュされた接続がブロックされた場合に発生する可能性があります。
現在、multiplex コンシューマー(複数のキューをリッスンしている単一のコンシューマー)は、メッセージ駆動型のコンシューマーに対してのみサポートされています。ポーリングされたコンシューマーは、単一のキューからのみメッセージを取得できます。 |
3. 構成オプション
このセクションには、RabbitMQ バインダーとバインドされたチャネルに固有の設定が含まれています。
一般的なバインディング構成オプションとプロパティについては、Spring Cloud Stream コアドキュメント (英語) を参照してください。
3.1. RabbitMQ バインダーのプロパティ
デフォルトでは、RabbitMQ バインダーは Spring Boot の ConnectionFactory
を使用します。その結果、RabbitMQ のすべての Spring Boot 構成オプションをサポートします。(参考までに、Spring Boot ドキュメントを参照してください)。RabbitMQ 構成オプションは、spring.rabbitmq
プレフィックスを使用します。
Spring Boot オプションに加えて、RabbitMQ バインダーは次のプロパティをサポートします。
- spring.cloud.stream.rabbit.binder.adminAddresses
RabbitMQ 管理プラグインの URL のコンマ区切りのリスト。
nodes
に複数のエントリが含まれている場合にのみ使用されます。このリストの各エントリには、spring.rabbitmq.addresses
に対応するエントリが必要です。RabbitMQ クラスターを使用していて、キューをホストするノードから消費したい場合にのみ必要です。詳細については、キューアフィニティと LocalizedQueueConnectionFactory を参照してください。デフォルト: 空。
- spring.cloud.stream.rabbit.binder.nodes
RabbitMQ ノード名のコンマ区切りのリスト。複数のエントリがある場合、キューが配置されているサーバーアドレスを見つけるために使用されます。このリストの各エントリには、
spring.rabbitmq.addresses
に対応するエントリが必要です。RabbitMQ クラスターを使用していて、キューをホストするノードから消費したい場合にのみ必要です。詳細については、キューアフィニティと LocalizedQueueConnectionFactory を参照してください。デフォルト: 空。
- spring.cloud.stream.rabbit.binder.compressionLevel
圧縮されたバインディングの圧縮レベル。
java.util.zip.Deflater
を参照してください。デフォルト:
1
(BEST_LEVEL)。- spring.cloud.stream.binder.connection-name-prefix
このバインダーによって作成された接続に名前を付けるために使用される接続名プレフィックス。名前はこのプレフィックスの後に
#n
が続きます。ここで、n
は、新しい接続が開かれるたびに増分します。デフォルト: none(Spring AMQP デフォルト)。
3.2. RabbitMQ コンシューマープロパティ
次のプロパティは Rabbit コンシューマーでのみ使用可能であり、接頭辞 spring.cloud.stream.rabbit.bindings.<channelName>.consumer.
を付ける必要があります。
ただし、同じプロパティのセットをほとんどのバインディングに適用する必要がある場合は、繰り返しを避けるために、Spring Cloud Stream はすべてのチャネルの値を spring.cloud.stream.rabbit.default.<property>=<value>
の形式で設定することをサポートしています。
また、特定のプロパティをバインドすると、デフォルトで同等のプロパティが上書きされることに注意してください。
- acknowledgeMode
確認応答モード。
デフォルト:
AUTO
.- anonymousGroupPrefix
バインディングに
group
プロパティがない場合、匿名の自動削除キューが宛先交換にバインドされます。このようなキューのデフォルトの命名戦略により、anonymous.<base64 representation of a UUID>
という名前のキューが作成されます。このプロパティを設定して、プレフィックスをデフォルト以外のものに変更します。デフォルト:
anonymous.
.- autoBindDlq
DLQ を自動的に宣言し、それをバインダー DLX にバインドするかどうか。
デフォルト:
false
.- bindingRoutingKey
キューを交換にバインドするためのルーティングキー(
bindQueue
がtrue
の場合)。複数のキーにすることができます。bindingRoutingKeyDelimiter
を参照してください。パーティション化された宛先の場合、-<instanceIndex>
が各キーに追加されます。デフォルト:
#
.- bindingRoutingKeyDelimiter
これが null でない場合、"bindingRoutingKey" はこの値で区切られたキーのリストであると見なされ、多くの場合、コンマが使用されます。
デフォルト:
null
.- bindQueue
キューを宣言して宛先交換にバインドするかどうか。独自のインフラストラクチャをセットアップし、以前にキューを作成してバインドしたことがある場合は、
false
に設定します。デフォルト:
true
.- consumerTagPrefix
コンシューマータグを作成するために使用されます。
#n
が追加されます。ここで、n
は、作成されたコンシューマーごとに増分します。例:${spring.application.name}-${spring.cloud.stream.bindings.input.group}-${spring.cloud.stream.instance-index}
。デフォルト: なし - ブローカーはランダムなコンシューマータグを生成します。
- containerType
使用するリスナーコンテナーの種類を選択します。詳細については、Spring AMQP ドキュメントのコンテナーの選択を参照してください。[rabbitmq-stream] も参照してください。
デフォルト:
simple
- deadLetterQueueName
DLQ の名前
デフォルト:
prefix+destination.dlq
- deadLetterExchange
キューに割り当てる DLX。
autoBindDlq
がtrue
の場合にのみ関連します。デフォルト: 「プレフィックス +DLX」
- deadLetterExchangeType
キューに割り当てる DLX の型。
autoBindDlq
がtrue
の場合にのみ関連します。デフォルト: ' 直接 '
- deadLetterRoutingKey
キューに割り当てるデッドレタールーティングキー。
autoBindDlq
がtrue
の場合にのみ関連します。デフォルト:
destination
- declareDlx
宛先のデッドレター交換を宣言するかどうか。
autoBindDlq
がtrue
の場合にのみ関連します。DLX が事前構成されている場合は、false
に設定します。デフォルト:
true
.- declareExchange
宛先の交換を宣言するかどうか。
デフォルト:
true
.- delayedExchange
交換を
Delayed Message Exchange
として宣言するかどうか。ブローカーに遅延メッセージ交換プラグインが必要です。x-delayed-type
引数はexchangeType
に設定されます。デフォルト:
false
.- dlqBindingArguments
dlq をデッドレター交換にバインドするときに適用される引数。
headers
deadLetterExchangeType
とともに使用して、一致するヘッダーを指定します。たとえば、…dlqBindingArguments.x-match=any
、…dlqBindingArguments.someHeader=someValue
。デフォルト: 空
- dlqDeadLetterExchange
DLQ が宣言されている場合、そのキューに割り当てる DLX。
デフォルト:
none
- dlqDeadLetterRoutingKey
DLQ が宣言されている場合、そのキューに割り当てるデッドレタールーティングキー。
デフォルト:
none
- dlqExpires
未使用のデッドレターキューが削除されるまでの時間(ミリ秒単位)。
デフォルト:
no expiration
- dlqLazy
x-queue-mode=lazy
引数を使用してデッドレターキューを宣言します。“怠惰なキュー” (英語) を参照してください。ポリシーを使用すると、キューを削除せずに設定を変更できるため、この設定の代わりにポリシーを使用することを検討してください。デフォルト:
false
.- dlqMaxLength
デッドレターキュー内のメッセージの最大数。
デフォルト:
no limit
- dlqMaxLengthBytes
すべてのメッセージからのデッドレターキューの合計バイト数の最大数。
デフォルト:
no limit
- dlqMaxPriority
デッドレターキュー(0-255)内のメッセージの最大優先度。
デフォルト:
none
- dlqOverflowBehavior
dlqMaxLength
またはdlqMaxLengthBytes
を超えたときに実行するアクション。現在drop-head
またはreject-publish
ですが、RabbitMQ のドキュメントを参照してください。デフォルト:
none
- dlqQuorum.deliveryLimit
quorum.enabled=true
の場合、配信制限を設定してから、メッセージがドロップまたはデッドレターになります。デフォルト: なし - ブローカーのデフォルトが適用されます。
- dlqQuorum.enabled
true の場合、従来のキューの代わりにクォーラムデッドレターキューを作成します。
デフォルト: false
- dlqQuorum.initialGroupSize
quorum.enabled=true
の場合、初期クォーラムサイズを設定します。デフォルト: なし - ブローカーのデフォルトが適用されます。
- dlqSingleActiveConsumer
true に設定すると、
x-single-active-consumer
キュープロパティが true に設定されます。デフォルト:
false
- dlqTtl
宣言されたときにデッドレターキューに適用されるデフォルトの存続時間(ミリ秒単位)。
デフォルト:
no limit
- durableSubscription
サブスクリプションが永続的である必要があるかどうか。
group
も設定されている場合にのみ有効です。デフォルト:
true
.- exchangeAutoDelete
declareExchange
が true の場合、交換を自動削除する(つまり、最後のキューが削除された後に削除する)かどうか。デフォルト:
true
.- exchangeDurable
declareExchange
が真の場合、交換が永続的であるかどうか(つまり、ブローカーの再起動後も存続するかどうか)。デフォルト:
true
.- exchangeType
交換型: パーティション化されていない宛先の場合は
direct
、fanout
、headers
またはtopic
、パーティション化された宛先の場合はdirect
、ヘッダー、topic
。デフォルト:
topic
.- exclusive
排他的なコンシューマーを作成するかどうか。これが
true
の場合、同時実行性は 1 である必要があります。厳密な順序付けが必要な場合によく使用されますが、障害後にホットスタンバイインスタンスが引き継ぐことができます。スタンバイインスタンスが消費を試みる頻度を制御するrecoveryInterval
を参照してください。RabbitMQ 3.8 以降を使用する場合は、代わりにsingleActiveConsumer
の使用を検討してください。デフォルト:
false
.- expires
未使用のキューが削除されるまでの時間(ミリ秒単位)。
デフォルト:
no expiration
- failedDeclarationRetryInterval
キューが欠落している場合に、キューからの消費を試行する間隔(ミリ秒単位)。
デフォルト: 5000
- frameMaxHeadroom
スタックトレースを DLQ メッセージヘッダーに追加するときに他のヘッダー用に予約するバイト数。すべてのヘッダーは、ブローカーで構成された
frame_max
サイズ内に収まる必要があります。スタックトレースは大きくなる可能性があります。サイズにこのプロパティを加えたものがframe_max
を超える場合、スタックトレースは切り捨てられます。警告ログが書き込まれます。例外をキャッチし、スタックトレースが小さいものをスローすることにより、frame_max
を増やすか、スタックトレースを減らすことを検討してください。デフォルト: 20000
- headerPatterns
受信メッセージからマッピングされるヘッダーのパターン。
デフォルト:
['*']
(すべてのヘッダー)。- lazy
x-queue-mode=lazy
引数を使用してキューを宣言します。“怠惰なキュー” (英語) を参照してください。ポリシーを使用すると、キューを削除せずに設定を変更できるため、この設定の代わりにポリシーを使用することを検討してください。デフォルト:
false
.- maxConcurrency
コンシューマーの最大数。
containerType
がdirect
の場合はサポートされません。デフォルト:
1
.- maxLength
キュー内のメッセージの最大数。
デフォルト:
no limit
- maxLengthBytes
すべてのメッセージからのキュー内の合計バイト数の最大数。
デフォルト:
no limit
- maxPriority
キュー内のメッセージの最大優先度(0-255)。
デフォルト:
none
- missingQueuesFatal
キューが見つからない場合、その状態を致命的なものとして扱い、リスナーコンテナーを停止するかどうか。デフォルトは
false
であるため、コンテナーはキューからの消費を継続します。たとえば、クラスターを使用していて、非 HA キューをホストしているノードがダウンしている場合です。デフォルト:
false
- overflowBehavior
maxLength
またはmaxLengthBytes
を超えたときに実行するアクション。現在drop-head
またはreject-publish
ですが、RabbitMQ のドキュメントを参照してください。デフォルト:
none
- prefetch
プリフェッチカウント。
デフォルト:
1
.- prefix
destination
およびキューの名前に追加されるプレフィックス。デフォルト: "".
- queueBindingArguments
キューを取引所にバインドするときに適用される引数。
headers
exchangeType
とともに使用して、一致するヘッダーを指定します。たとえば、…queueBindingArguments.x-match=any
、…queueBindingArguments.someHeader=someValue
。デフォルト: 空
- queueDeclarationRetries
キューが欠落している場合に、キューからの消費を再試行する回数。
missingQueuesFatal
がtrue
の場合にのみ関連します。それ以外の場合、コンテナーは無期限に再試行し続けます。containerType
がdirect
の場合はサポートされません。デフォルト:
3
- queueNameGroupOnly
true の場合、
group
と等しい名前のキューから消費します。それ以外の場合、キュー名はdestination.group
です。これは、たとえば、Spring Cloud Stream を使用して既存の RabbitMQ キューから消費する場合に役立ちます。デフォルト: false。
- quorum.deliveryLimit
quorum.enabled=true
の場合、配信制限を設定してから、メッセージがドロップまたはデッドレターになります。デフォルト: なし - ブローカーのデフォルトが適用されます。
- quorum.enabled
true の場合、従来のキューの代わりにクォーラムキューを作成します。
デフォルト: false
- quorum.initialGroupSize
quorum.enabled=true
の場合、初期クォーラムサイズを設定します。デフォルト: なし - ブローカーのデフォルトが適用されます。
- recoveryInterval
接続回復の試行間隔(ミリ秒単位)。
デフォルト:
5000
.- requeueRejected
再試行が無効になっている場合、または
republishToDlq
がfalse
の場合に、配信の失敗を再キューイングする必要があるかどうか。デフォルト:
false
.
- republishDeliveryMode
republishToDlq
がtrue
の場合、再発行されたメッセージの配信モードを指定します。デフォルト:
DeliveryMode.PERSISTENT
- republishToDlq
デフォルトでは、再試行回数の上限に達した後に失敗したメッセージは拒否されます。デッドレターキュー (DLQ) が設定されている場合、RabbitMQ は失敗したメッセージ (変更なし) を DLQ にルーティングします。
true
に設定されている場合、バインダーは、最終的な失敗の原因からの例外メッセージとスタックトレースを含む追加ヘッダーとともに、失敗したメッセージを DLQ に再発行します。frameMaxHeadroom プロパティも参照してください。デフォルト:
true
- singleActiveConsumer
true に設定すると、
x-single-active-consumer
キュープロパティが true に設定されます。デフォルト:
false
- transacted
トランザクションチャネルを使用するかどうか。
デフォルト:
false
.- ttl
宣言されたときにキューに適用するためのデフォルトの存続時間(ミリ秒単位)。
デフォルト:
no limit
- txSize
ラック間の配信数。
containerType
がdirect
の場合はサポートされません。デフォルト:
1
.
3.3. RabbitMQ ストリームプラグインの初期コンシューマーサポート
RabbitMQ ストリームプラグイン (英語) の基本的なサポートが提供されるようになりました。この機能を有効にするには、spring-rabbit-stream
jar をクラスパスに追加する必要があります。これは spring-amqp
および spring-rabbit
と同じバージョンである必要があります。
containerType プロパティを stream に設定した場合、上記のコンシューマープロパティはサポートされません。concurrency は、スーパーストリームでのみサポートされます。各バインドで使用できるストリームキューは 1 つだけです。 |
containerType=stream
を使用するようにバインダーを構成するために、Spring Boot はアプリケーションのプロパティから Environment
@Bean
を自動的に構成します。オプションで、カスタマイザーを追加して、リスナーコンテナーをカスタマイズできます。
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
カスタマイザーに渡される name
引数は destination + '.' + group + '.container'
です。
ストリーム name()
(オフセット追跡の目的で)は、バインディング destination + '.' + group
に設定されます。上記の ConsumerCustomizer
を使用して変更できます。手動オフセット追跡を使用する場合は、Context
をメッセージヘッダーとして使用できます。
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
環境およびコンシューマービルダーの構成については、RabbitMQStreamJava クライアントのドキュメント (英語) を参照してください。
3.3.1. RabbitMQ Super Streams のコンシューマーサポート
スーパーストリームについては、スーパーストリーム (英語) を参照してください。
スーパーストリームを使用すると、スーパーストリームの各パーティションで 1 つのアクティブなコンシューマーを使用して、自動的にスケールアップ / スケールダウンできます。
構成例:
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
フレームワークは、9 つのパーティションを持つ super
という名前のスーパーストリームを作成します。このアプリケーションの最大 3 つのインスタンスをデプロイできます。
3.4. 高度なリスナーコンテナー構成
バインダーまたはバインディングプロパティとして公開されていないリスナーコンテナープロパティを設定するには、型 ListenerContainerCustomizer
の単一の Bean をアプリケーションコンテキストに追加します。バインダーとバインディングのプロパティが設定され、カスタマイザーが呼び出されます。カスタマイザー(configure()
メソッド)には、キュー名とコンシューマーグループが引数として提供されます。
3.5. 高度なキュー / 交換 / バインディング構成
時々、RabbitMQ チームは、キューなどを宣言するときに引数を設定することで有効になる新機能を追加します。通常、このような機能は適切なプロパティを追加することでバインダーで有効になりますが、現在のバージョンではすぐに利用できない場合があります。バージョン 3.0.1 以降、宣言が実行される直前に DeclarableCustomizer
Bean をアプリケーションコンテキストに追加して、Declarable
(Queue
、Exchange
、Binding
)を変更できるようになりました。これにより、現在バインダーで直接サポートされていない引数を追加できます。
3.6. バッチメッセージの受信
RabbitMQ バインダーでは、コンシューマーバインディングによって処理されるバッチには次の 2 つの型があります。
3.6.1. プロデューサーによって作成されたバッチ
通常、プロデューサーバーインディングに batch-enabled=true
(Rabbit プロデューサーのプロパティを参照)がある場合、またはメッセージが BatchingRabbitTemplate
によって作成される場合、バッチの要素はリスナーメソッドへの個別の呼び出しとして返されます。バージョン 3.0 以降、spring.cloud.stream.bindings.<name>.consumer.batch-mode
が true
に設定されている場合、そのようなバッチは List<?>
としてリスナーメソッドに提示できます。
3.6.2. コンシューマー側のバッチ処理
バージョン 3.1 以降、コンシューマーは、変換されたペイロードの List<?>
としてアプリケーションに提示されるバッチに複数の受信メッセージをアセンブルするように構成できます。次の簡単なアプリケーションは、この手法の使用方法を示しています。
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.receive-timeout=200
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<List<Thing>> input() {
return list -> {
System.out.println("Received " + list.size());
list.forEach(thing -> {
System.out.println(thing);
// ...
});
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}");
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}");
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1]
Thing [field=value2]
バッチ内のメッセージの数は、batch-size
および receive-timeout
プロパティによって指定されます。receive-timeout
が新しいメッセージなしで経過した場合、「短い」バッチが配信されます。
コンシューマー側のバッチ処理は、container-type=simple (デフォルト)でのみサポートされます。 |
コンシューマー側のバッチメッセージのヘッダーを調べたい場合は、Message<List<?>>
を使用する必要があります。ヘッダーは、ヘッダー AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS
内の List<Map<String, Object>>
であり、対応するインデックス内の各ペイロード要素のヘッダーがあります。繰り返しますが、ここに簡単な例があります:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<Message<List<Thing>>> input() {
return msg -> {
List<Thing> things = msg.getPayload();
System.out.println("Received " + things.size());
List<Map<String, Object>> headers =
(List<Map<String, Object>>) msg.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS);
for (int i = 0; i < things.size(); i++) {
System.out.println(things.get(i) + " myHeader=" + headers.get(i).get("myHeader"));
// ...
}
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue1");
return msg;
});
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue2");
return msg;
});
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getfield() {
return this.field;
}
public void setfield(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1] myHeader=headerValue1
Thing [field=value2] myHeader=headerValue2
3.7. Rabbit プロデューサーのプロパティ
次のプロパティは Rabbit プロデューサーのみが使用でき、接頭辞 spring.cloud.stream.rabbit.bindings.<channelName>.producer.
を付ける必要があります。
ただし、同じプロパティのセットをほとんどのバインディングに適用する必要がある場合は、繰り返しを避けるために、Spring Cloud Stream はすべてのチャネルの値を spring.cloud.stream.rabbit.default.<property>=<value>
の形式で設定することをサポートしています。
また、特定のプロパティをバインドすると、デフォルトで同等のプロパティが上書きされることに注意してください。
- altermateExchange.binding.queue
交換がまだ存在せず、
name
が提供されている場合は、このキューを代替交換にバインドします。引数のない単純な永続キューがプロビジョニングされます。より高度な構成が必要な場合は、自分でキューを構成してバインドする必要があります。デフォルト:
null
alternateExchange.binding.routingKey 交換がまだ存在せず、name
とqueue
が提供されている場合は、このルーティングキーを使用してキューを代替交換にバインドします。デフォルト:
#
(デフォルトのtopic
代替交換用)- alternateExchange.exists
代替交換が存在するかどうか、プロビジョニングが必要かどうか。
デフォルト:
false
- alternateExchange.type
代替取引所がまだ存在しない場合は、プロビジョニングする取引所の型。
デフォルト:
topic
- alternateExchange.name
宛先交換機で代替交換機を構成します。
デフォルト:
null
- autoBindDlq
DLQ を自動的に宣言し、それをバインダー DLX にバインドするかどうか。
デフォルト:
false
.- batchingEnabled
プロデューサーによるメッセージのバッチ処理を有効にするかどうか。メッセージは、次のプロパティ (このリストの次の 3 つのエントリで説明) に従って 1 つのメッセージにバッチ処理されます: 'batchSize'、
batchBufferLimit
、batchTimeout
。詳細については、バッチ処理を参照してください。また、バッチメッセージの受信も参照してください。デフォルト:
false
.- batchSize
バッチ処理が有効になっているときにバッファリングするメッセージの数。
デフォルト:
100
.- batchBufferLimit
バッチ処理が有効になっている場合の最大バッファサイズ。
デフォルト:
10000
.- batchTimeout
バッチ処理が有効になっている場合のバッチタイムアウト。
デフォルト:
5000
.- bindingRoutingKey
キューを交換にバインドするためのルーティングキー(
bindQueue
がtrue
の場合)。複数のキーにすることができます。bindingRoutingKeyDelimiter
を参照してください。パーティション化された宛先の場合、-n
が各キーに追加されます。requiredGroups
が提供されている場合にのみ適用され、それらのグループにのみ適用されます。デフォルト:
#
.- bindingRoutingKeyDelimiter
これが null でない場合、"bindingRoutingKey" はこの値で区切られたキーのリストであると見なされます。多くの場合、コンマが使用されます。
requiredGroups
が提供されている場合にのみ適用され、その場合はそれらのグループにのみ適用されます。デフォルト:
null
.- bindQueue
キューを宣言して宛先交換にバインドするかどうか。独自のインフラストラクチャをセットアップし、以前にキューを作成してバインドしたことがある場合は、
false
に設定します。requiredGroups
が提供されている場合にのみ適用され、それらのグループにのみ適用されます。デフォルト:
true
.- compress
送信時にデータを圧縮する必要があるかどうか。
デフォルト:
false
.- confirmAckChannel
errorChannelEnabled
が true の場合、肯定的な配信確認を送信するチャネル(別名パブリッシャーが確認)。チャネルが存在しない場合は、DirectChannel
がこの名前で登録されます。接続ファクトリは、発行者が確認できるように構成する必要があります。useConfirmHeader
と相互に排他的です。デフォルト:
nullChannel
(ack は破棄されます)。- deadLetterQueueName
DLQ の名前は、
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
prefix+destination.dlq
- deadLetterExchange
キューに割り当てる DLX。
autoBindDlq
がtrue
の場合にのみ関連します。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト: 「プレフィックス +DLX」
- deadLetterExchangeType
キューに割り当てる DLX の型。
autoBindDlq
がtrue
の場合にのみ関連します。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト: ' 直接 '
- deadLetterRoutingKey
キューに割り当てるデッドレタールーティングキー。
autoBindDlq
がtrue
の場合にのみ関連します。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
destination
- declareDlx
宛先のデッドレター交換を宣言するかどうか。
autoBindDlq
がtrue
の場合にのみ関連します。DLX が事前構成されている場合は、false
に設定します。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
true
.- declareExchange
宛先の交換を宣言するかどうか。
デフォルト:
true
.- delayExpression
メッセージに適用する遅延を評価するための SpEL 式(
x-delay
ヘッダー)。交換が遅延メッセージ交換でない場合は効果がありません。デフォルト:
x-delay
ヘッダーは設定されていません。- delayedExchange
交換を
Delayed Message Exchange
として宣言するかどうか。ブローカーに遅延メッセージ交換プラグインが必要です。x-delayed-type
引数はexchangeType
に設定されます。デフォルト:
false
.- deliveryMode
配信モード。
デフォルト:
PERSISTENT
.- dlqBindingArguments
dlq をデッドレター交換にバインドするときに適用される引数。
headers
deadLetterExchangeType
とともに使用して、一致するヘッダーを指定します。たとえば、…dlqBindingArguments.x-match=any
、…dlqBindingArguments.someHeader=someValue
。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト: 空
- dlqDeadLetterExchange
DLQ が宣言されると、そのキューに割り当てる DLX。
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
none
- dlqDeadLetterRoutingKey
DLQ が宣言されると、そのキューに割り当てるデッドレタールーティングキー。
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
none
- dlqExpires
未使用のデッドレターキューが削除されるまでの時間(ミリ秒単位)。
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
no expiration
- dlqLazy
x-queue-mode=lazy
引数を使用してデッドレターキューを宣言します。“怠惰なキュー” (英語) を参照してください。ポリシーを使用すると、キューを削除せずに設定を変更できるため、この設定の代わりにポリシーを使用することを検討してください。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。- dlqMaxLength
デッドレターキュー内のメッセージの最大数。
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
no limit
- dlqMaxLengthBytes
すべてのメッセージからのデッドレターキューの合計バイト数の最大数。
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
no limit
- dlqMaxPriority
デッドレターキュー内のメッセージの最大優先度(0-255)
requiredGroups
が提供されている場合にのみ適用され、次にそれらのグループにのみ適用されます。デフォルト:
none
- dlqQuorum.deliveryLimit
quorum.enabled=true
の場合、配信制限を設定してから、メッセージがドロップまたはデッドレターになります。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト: なし - ブローカーのデフォルトが適用されます。
- dlqQuorum.enabled
true の場合、従来のキューの代わりにクォーラムデッドレターキューを作成します。
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト: false
- dlqQuorum.initialGroupSize
quorum.enabled=true
の場合、初期クォーラムサイズを設定します。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト: なし - ブローカーのデフォルトが適用されます。
- dlqSingleActiveConsumer
true に設定すると、
x-single-active-consumer
キュープロパティが true に設定されます。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
false
- dlqTtl
宣言されたときにデッドレターキューに適用されるデフォルトの存続時間(ミリ秒単位)。
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
no limit
- exchangeAutoDelete
declareExchange
がtrue
の場合、交換を自動削除するかどうか(最後のキューが削除された後に削除されます)。デフォルト:
true
.- exchangeDurable
declareExchange
がtrue
の場合、交換が永続的である必要があるかどうか(ブローカーの再起動後も存続します)。デフォルト:
true
.- exchangeType
交換型: パーティション化されていない宛先の場合は
direct
、fanout
、headers
またはtopic
、パーティション化された宛先の場合はdirect
、headers
またはtopic
。デフォルト:
topic
.- expires
未使用のキューが削除されるまでの時間(ミリ秒単位)。
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
no expiration
- headerPatterns
送信メッセージにマップされるヘッダーのパターン。
デフォルト:
['*']
(すべてのヘッダー)。- lazy
x-queue-mode=lazy
引数を使用してキューを宣言します。“怠惰なキュー” (英語) を参照してください。ポリシーを使用すると、キューを削除せずに設定を変更できるため、この設定の代わりにポリシーを使用することを検討してください。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
false
.- maxLength
キュー内のメッセージの最大数。
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
no limit
- maxLengthBytes
すべてのメッセージからのキュー内の合計バイト数の最大数。
requiredGroups
が提供されている場合にのみ適用され、それらのグループにのみ適用されます。デフォルト:
no limit
- maxPriority
キュー内のメッセージの最大優先度(0-255)。
requiredGroups
が提供されている場合にのみ適用され、それらのグループにのみ適用されます。デフォルト:
none
- prefix
destination
交換の名前に追加されるプレフィックス。デフォルト: "".
- producerType
プロデューサーの型。
クラシックキューおよびクォーラムキュー用の
AMQP
AMQP クライアントSTREAM_SYNC
RabbitMQ Streams プラグインクライアント、確認を受信するまでブロックSTREAM_ASYNC
RabbitMQ ストリームプラグインクライアント、ブロックしないデフォルト: "".
- queueBindingArguments
キューを取引所にバインドするときに適用される引数。
headers
exchangeType
とともに使用して、一致するヘッダーを指定します。たとえば、…queueBindingArguments.x-match=any
、…queueBindingArguments.someHeader=someValue
。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト: 空
- queueNameGroupOnly
true
の場合、group
と等しい名前のキューから消費します。それ以外の場合、キュー名はdestination.group
です。これは、たとえば、Spring Cloud Stream を使用して既存の RabbitMQ キューから消費する場合に役立ちます。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト: false。
- quorum.deliveryLimit
quorum.enabled=true
の場合、配信制限を設定してから、メッセージがドロップまたはデッドレターになります。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト: なし - ブローカーのデフォルトが適用されます。
- quorum.enabled
true の場合、従来のキューの代わりにクォーラムキューを作成します。
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト: false
- quorum.initialGroupSize
quorum.enabled=true
の場合、初期クォーラムサイズを設定します。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト: なし - ブローカーのデフォルトが適用されます。
- routingKeyExpression
メッセージのパブリッシュ時に使用するルーティングキーを決定する SpEL 式。固定ルーティングキーの場合は、
routingKey
を使用します。デフォルト: パーティション化された宛先の場合は
destination
またはdestination-<partition>
。- routingKey
メッセージの発行時に使用する固定ルーティングキーを定義する文字列。
デフォルト:
routingKeyExpression
を参照- singleActiveConsumer
true に設定すると、
x-single-active-consumer
キュープロパティが true に設定されます。requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
false
- transacted
トランザクションチャネルを使用するかどうか。
デフォルト:
false
.- ttl
宣言されたときにキューに適用するために存続するデフォルトの時間(ミリ秒単位)。
requiredGroups
が提供されている場合にのみ適用され、その後、それらのグループにのみ適用されます。デフォルト:
no limit
- useConfirmHeader
パブリッシャーが確認を参照してください。
confirmAckChannel
と相互に排他的です。RabbitMQ の場合、コンテンツ型ヘッダーは外部アプリケーションによって設定できます。Spring Cloud Stream は、ヘッダーをネイティブにサポートしない Kafka(0.11 より前)などのトランスポートを含む、あらゆる型のトランスポートに使用される拡張内部プロトコルの一部としてサポートします。
3.8. パブリッシャーが確認
メッセージの公開結果を取得するには、2 つのメカニズムがあります。いずれの場合も、接続ファクトリには publisherConfirmType
セット ConfirmType.CORRELATED
が必要です。「レガシー」メカニズムは、confirmAckChannel
をメッセージチャネルの Bean 名に設定し、そこから非同期で確認を取得できるようにすることです。負の ack がエラーチャネルに送信されます(有効になっている場合)- エラーチャネルを参照してください。
バージョン 3.1 で追加された推奨メカニズムは、相関データヘッダーを使用し、その Future<Confirm>
プロパティを介して結果を待機することです。これは、結果を待つ前に複数のメッセージを送信できるため、バッチリスナーで特に役立ちます。この手法を使用するには、useConfirmHeader
プロパティを true に設定します。次の簡単なアプリケーションは、この手法の使用例です。
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.source=output
spring.cloud.stream.bindings.output-out-0.producer.error-channel-enabled=true
spring.cloud.stream.rabbit.bindings.output-out-0.producer.useConfirmHeader=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
@SpringBootApplication
public class Application {
private static final Logger log = LoggerFactory.getLogger(Application.class);
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Autowired
private StreamBridge bridge;
@Bean
Consumer<List<String>> input() {
return list -> {
List<MyCorrelationData> results = new ArrayList<>();
list.forEach(str -> {
log.info("Received: " + str);
MyCorrelationData corr = new MyCorrelationData(UUID.randomUUID().toString(), str);
results.add(corr);
this.bridge.send("output-out-0", MessageBuilder.withPayload(str.toUpperCase())
.setHeader(AmqpHeaders.PUBLISH_CONFIRM_CORRELATION, corr)
.build());
});
results.forEach(correlation -> {
try {
Confirm confirm = correlation.getFuture().get(10, TimeUnit.SECONDS);
log.info(confirm + " for " + correlation.getPayload());
if (correlation.getReturnedMessage() != null) {
log.error("Message for " + correlation.getPayload() + " was returned ");
// throw some exception to invoke binder retry/error handling
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}
catch (ExecutionException | TimeoutException e) {
throw new IllegalStateException(e);
}
});
};
}
@Bean
public ApplicationRunner runner(BatchingRabbitTemplate template) {
return args -> IntStream.range(0, 10).forEach(i ->
template.convertAndSend("input-in-0", "input-in-0.rbgh303", "foo" + i));
}
@Bean
public BatchingRabbitTemplate template(CachingConnectionFactory cf, TaskScheduler taskScheduler) {
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(10, 1000000, 1000);
return new BatchingRabbitTemplate(cf, batchingStrategy, taskScheduler);
}
}
class MyCorrelationData extends CorrelationData {
private final String payload;
MyCorrelationData(String id, String payload) {
super(id);
this.payload = payload;
}
public String getPayload() {
return this.payload;
}
}
ご覧のとおり、各メッセージを送信してから、公開結果を待ちます。メッセージをルーティングできない場合は、将来が完了する前に、返されたメッセージが相関データに入力されます。
フレームワークが相関を実行できるように、相関データには一意の id を提供する必要があります。 |
useConfirmHeader
と confirmAckChannel
の両方を設定することはできませんが、useConfirmHeader
が true の場合でも、エラーチャネルで返されたメッセージを受信できますが、相関ヘッダーを使用する方が便利です。
3.9. RabbitMQ ストリームプラグインの初期プロデューサーサポート
RabbitMQ ストリームプラグイン (英語) の基本的なサポートが提供されるようになりました。この機能を有効にするには、spring-rabbit-stream
jar をクラスパスに追加する必要があります。これは spring-amqp
および spring-rabbit
と同じバージョンである必要があります。
上記のプロデューサープロパティは、producerType プロパティを STREAM_SYNC または STREAM_ASYNC に設定した場合はサポートされません。 |
ストリーム ProducerType
を使用するようにバインダーを構成するために、Spring Boot はアプリケーションのプロパティから Environment
@Bean
を構成します。オプションで、カスタマイザーを追加して、メッセージハンドラーをカスタマイズできます。
@Bean
ProducerMessageHandlerCustomizer<MessageHandler> handlerCustomizer() {
return (hand, dest) -> {
RabbitStreamMessageHandler handler = (RabbitStreamMessageHandler) hand;
handler.setConfirmTimeout(5000);
((RabbitStreamTemplate) handler.getStreamOperations()).setProducerCustomizer(
(name, builder) -> {
...
});
};
}
環境およびプロデューサービルダーの構成については、RabbitMQStreamJava クライアントのドキュメント (英語) を参照してください。
3.9.1. RabbitMQ スーパーストリームのプロデューサーサポート
スーパーストリームについては、スーパーストリーム (英語) を参照してください。
スーパーストリームを使用すると、スーパーストリームの各パーティションで 1 つのアクティブなコンシューマーを使用して、自動的にスケールアップ / スケールダウンできます。Spring Cloud Stream を使用すると、AMQP またはストリームクライアントを使用してスーパーストリームに公開できます。
スーパーストリームはすでに存在している必要があります。スーパーストリームの作成は、プロデューサーバーインディングではサポートされていません。 |
AMQP を介したスーパーストリームへのパブリッシュ:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
ストリームクライアントを使用してスーパーストリームに公開する:
spring.cloud.stream.bindings.output.destination=super
spring.cloud.stream.bindings.output.producer.partition-count=3
spring.cloud.stream.bindings.output.producer.partition-key-expression=headers['cust-no']
spring.cloud.stream.rabbit.bindings.output.producer.producer-type=stream-async
spring.cloud.stream.rabbit.bindings.output.producer.super-stream=true
spring.cloud.stream.rabbit.bindings.output.producer.declare-exchange=false
ストリームクライアントを使用する場合、confirmAckChannel
を設定すると、正常に送信されたメッセージのコピーがそのチャネルに送信されます。
4. 既存のキュー / 交換の使用
デフォルトでは、バインダーは、宛先バインディングプロパティ <prefix><destination>
の値から派生した名前でトピック交換を自動的にプロビジョニングします。指定しない場合、宛先はデフォルトでバインディング名になります。コンシューマーをバインドする場合、キューは <prefix><destination>.<group>
という名前で自動的にプロビジョニングされます(group
バインディングプロパティが指定されている場合)。または、group
がない場合は、匿名の自動削除キューがプロビジョニングされます。キューは、非パーティションバインディングの場合は "match-all" ワイルドカードルーティングキー(#
)、パーティションバインディングの場合は <destination>-<instanceIndex>
を使用して交換にバインドされます。プレフィックスは、デフォルトでは空の String
です。出力バインディングが requiredGroups
で指定されている場合、キュー / バインディングはグループごとにプロビジョニングされます。
このデフォルトの動作を変更できるウサギ固有のバインディングプロパティがいくつかあります。
使用したい既存の交換 / キューがある場合、交換の名前が myExchange
で、キューの名前が myQueue
であると仮定すると、次のように自動プロビジョニングを完全に無効にできます。
spring.cloud.stream.bindings.<binding name>.destination=myExchange
spring.cloud.stream.bindings.<binding name>.group=myQueue
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindQueue=false
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.declareExchange=false
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.queueNameGroupOnly=true
バインダーにキュー / 交換をプロビジョニングさせたいが、ここで説明したデフォルト以外のものを使用してプロビジョニングしたい場合は、次のプロパティを使用します。詳細については、上記のプロパティドキュメントを参照してください。
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.bindingRoutingKey=myRoutingKey
spring.cloud.stream.rabbit.bindings.<binding name>.consumer.exchangeType=<type>
spring.cloud.stream.rabbit.bindings.<binding name>.producer.routingKeyExpression='myRoutingKey'
autoBindDlq
が true
の場合、デッドレター交換 / キューを宣言するときに使用される同様のプロパティがあります。
5. RabbitMQ バインダーで再試行する
バインダー内で再試行が有効になっている場合、リスナーコンテナースレッドは、構成されているバックオフ期間の間中断されます。これは、単一のコンシューマーで厳密なオーダーが必要な場合に重要になる可能性があります。ただし、他のユースケースでは、他のメッセージがそのスレッドで処理されるのを防ぎます。バインダーの再試行を使用する代わりに、デッドレターキュー(DLQ)に存続する時間のあるデッドレタリングと、DLQ 自体のデッドレター構成を設定することもできます。ここで説明するプロパティの詳細については、"RabbitMQ バインダーのプロパティ" を参照してください。次の設定例を使用して、この機能を有効にできます。
autoBindDlq
をtrue
に設定します。バインダーは DLQ を作成します。オプションで、deadLetterQueueName
で名前を指定できます。dlqTtl
を、再配信の間に待機するバックオフ時間に設定します。dlqDeadLetterExchange
をデフォルトの交換に設定します。デフォルトのdeadLetterRoutingKey
はキュー名(destination.group
)であるため、DLQ からの期限切れのメッセージは元のキューにルーティングされます。次の例に示すように、デフォルトの交換に設定するには、値を指定せずにプロパティを設定します。
メッセージを強制的にデッドレターにするには、AmqpRejectAndDontRequeueException
をスローするか、requeueRejected
を false
(デフォルト)に設定して例外をスローします。
ループは終わりなく続きます。これは一時的な問題には問題ありませんが、何度か試行した後はあきらめたい場合があります。幸い、RabbitMQ は x-death
ヘッダーを提供します。これにより、発生したサイクル数を判別できます。
諦めた後にメッセージを確認するには、ImmediateAcknowledgeAmqpException
をスローします。
5.1. すべてまとめる
次の構成では、ワイルドカードルーティングキー #
を使用してトピック交換にバインドされたキュー myDestination.consumerGroup
を使用して交換 myDestination
を作成します。
---
spring.cloud.stream.bindings.input.destination=myDestination
spring.cloud.stream.bindings.input.group=consumerGroup
#disable binder retries
spring.cloud.stream.bindings.input.consumer.max-attempts=1
#dlx/dlq setup
spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=5000
spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=
---
この構成では、myDestination.consumerGroup
のルーティングキーを使用して直接交換(DLX
)にバインドされた DLQ が作成されます。メッセージが拒否されると、メッセージは DLQ にルーティングされます。次の例に示すように、5 秒後、メッセージは期限切れになり、キュー名をルーティングキーとして使用して元のキューにルーティングされます。
@SpringBootApplication
public class XDeathApplication {
public static void main(String[] args) {
SpringApplication.run(XDeathApplication.class, args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
Map<?,?> death = message.getHeaders().get("x-death");
if (death != null && death.get("count").equals(3L)) {
// giving up - don't send to DLX
throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
}
throw new AmqpRejectAndDontRequeueException("failed");
};
}
}
x-death
ヘッダーの count プロパティが Long
であることに注意してください。
6. エラーチャネル
バージョン 1.3 以降、バインダーは各コンシューマー宛先のエラーチャネルに無条件に例外を送信し、非同期プロデューサー送信エラーをエラーチャネルに送信するように構成することもできます。詳細については、"[Spring- クラウド - ストリーム - 概要 - エラー処理 ]" を参照してください。
RabbitMQ には、2 種類の送信失敗があります。
返されたメッセージ
否定的に認められたパブリッシャーが確認 (英語) 。
後者はまれです。RabbitMQ のドキュメントによると、「[A nack] は、キューを担当する Erlang プロセスで内部エラーが発生した場合にのみ配信されます。」 reject-publish
キューオーバーフロー動作を使用して制限付きキューに公開する場合も、否定応答を受け取る可能性があります。
( "[Spring- クラウド - ストリーム - 概要 - エラー処理 ]" に従って)プロデューサーエラーチャネルを有効にするだけでなく、RabbitMQ バインダーは、接続ファクトリが次のように適切に構成されている場合にのみ、チャネルにメッセージを送信します。
ccf.setPublisherConfirms(true);
ccf.setPublisherReturns(true);
接続ファクトリに Spring Boot 構成を使用する場合は、次のプロパティを設定します。
spring.rabbitmq.publisher-confirms
spring.rabbitmq.publisher-returns
返されたメッセージの ErrorMessage
のペイロードは、次のプロパティを持つ ReturnedAmqpMessageException
です。
failedMessage
: 送信に失敗した spring-messagingMessage<?>
。amqpMessage
: 生の spring-amqpMessage
。replyCode
: 失敗の理由を示す整数値(たとえば、312- ルートなし)。replyText
: 失敗の理由を示すテキスト値(たとえば、NO_ROUTE
)。exchange
: メッセージが公開された交換。routingKey
: メッセージが公開されたときに使用されたルーティングキー。
返されたメッセージを受信するための代替メカニズムについては、パブリッシャーが確認も参照してください。
否定的に確認された確認の場合、ペイロードは次のプロパティを持つ NackedAmqpMessageException
です。
failedMessage
: 送信に失敗した spring-messagingMessage<?>
。nackReason
: 理由(利用可能な場合 - 詳細については、ブローカーのログを調べる必要がある場合があります)。
これらの例外(デッドレターキューへの送信など)の自動処理はありません。これらの例外は、独自の Spring Integration フローで使用できます。
7. Rabbit バインダーヘルスインジケーター
Rabbit バインダーのヘルスインジケーターは、Spring Boot から提供されたものに委譲されます。詳細については、こちらを参照してください。
プロパティ - management.health.binders.enabled
を使用してバインダーレベルでこのヘルスインジケーターを無効にし、これを false
に設定できます。マルチバインダー環境の場合、これはバインダーの環境プロパティで設定する必要があります。
ヘルスインジケーターが無効になっている場合、ヘルスアクチュエーターのエンドポイントに次のようなものが表示されます。
"rabbit": {
"status": "UNKNOWN"
}
Spring Boot レベルで、Rabbit ヘルスインジケーターを無効にする場合は、プロパティ management.health.rabbit.enabled
を使用し、false
に設定する必要があります。
8. デッドレターキュー処理
ユーザーがデッドレターメッセージをどのように処理するかを予測できないため、フレームワークはそれらを処理するための標準的なメカニズムを提供しません。デッドレタリングの理由が一時的なものである場合は、メッセージを元のキューに戻すことをお勧めします。ただし、課題が永続的な課題である場合は、無限ループが発生する可能性があります。次の Spring Boot アプリケーションは、これらのメッセージを元のキューに戻す方法の例を示していますが、3 回試行した後、3 番目の「駐車場」キューに移動します。2 番目の例では、RabbitMQ 遅延メッセージ交換 (英語) を使用して、再キューイングされたメッセージに遅延を導入します。この例では、試行ごとに遅延が増加します。これらの例では、@RabbitListener
を使用して DLQ からメッセージを受信します。バッチプロセスで RabbitTemplate.receive()
を使用することもできます。
例では、元の宛先が so8400in
であり、コンシューマーグループが so8400
であると想定しています。
8.1. パーティション化されていない宛先
最初の 2 つの例は、宛先がパーティション化されていない場合のものです。
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public DirectExchange delayExchange() {
DirectExchange exchange = new DirectExchange(DELAY_EXCHANGE);
exchange.setDelayed(true);
return exchange;
}
@Bean
public Binding bindOriginalToDelay() {
return BindingBuilder.bind(new Queue(ORIGINAL_QUEUE)).to(delayExchange()).with(ORIGINAL_QUEUE);
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
8.2. パーティション化された宛先
パーティション化された宛先では、すべてのパーティションに 1 つの DLQ があります。ヘッダーから元のキューを決定します。
8.2.1. republishToDlq=false
republishToDlq
が false
の場合、次の例に示すように、RabbitMQ は元の宛先に関する情報を含む x-death
ヘッダーを使用してメッセージを DLX/DLQ に公開します。
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_DEATH_HEADER = "x-death";
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
List<Map<String, ?>> xDeath = (List<Map<String, ?>>) headers.get(X_DEATH_HEADER);
String exchange = (String) xDeath.get(0).get("exchange");
List<String> routingKeys = (List<String>) xDeath.get(0).get("routing-keys");
this.rabbitTemplate.send(exchange, routingKeys.get(0), failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
8.2.2. republishToDlq=true
republishToDlq
が true
の場合、次の例に示すように、再発行リカバリは元の交換キーとルーティングキーをヘッダーに追加します。
@SpringBootApplication
public class ReRouteDlqApplication {
private static final String ORIGINAL_QUEUE = "so8400in.so8400";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private static final String X_RETRIES_HEADER = "x-retries";
private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
System.in.read();
context.close();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map<String, Object> headers = failedMessage.getMessageProperties().getHeaders();
Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
if (retriesHeader == null) {
retriesHeader = Integer.valueOf(0);
}
if (retriesHeader < 3) {
headers.put(X_RETRIES_HEADER, retriesHeader + 1);
String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER);
String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER);
this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage);
}
else {
this.rabbitTemplate.send(PARKING_LOT, failedMessage);
}
}
@Bean
public Queue parkingLot() {
return new Queue(PARKING_LOT);
}
}
9. RabbitMQ バインダーを使用したパーティション分割
RabbitMQ はネイティブでのパーティショニングをサポートしていません。
特定のパーティションにデータを送信すると有利な場合があります。たとえば、メッセージ処理を厳密に並べ替える場合は、特定の顧客宛てのすべてのメッセージを同じパーティションに送信する必要があります。
RabbitMessageChannelBinder
は、各パーティションのキューを宛先交換にバインドすることにより、パーティション化を提供します。
次の Java と YAML の例は、プロデューサーを構成する方法を示しています。
@SpringBootApplication
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
前の例の構成では、デフォルトのパーティショニング(
|
次の構成は、トピック交換をプロビジョニングします。
次のキューがその交換にバインドされています。
次のバインディングは、キューを交換に関連付けます。
次の Java と YAML の例は、前の例を継続し、コンシューマーを構成する方法を示しています。
@SpringBootApplication
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
System.out.println(in + " received from queue " + queue);
};
}
}
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
RabbitMessageChannelBinder は動的スケーリングをサポートしていません。パーティションごとに少なくとも 1 つのコンシューマーが必要です。コンシューマーの instanceIndex は、どのパーティションが消費されているかを示すために使用されます。Cloud Foundry などのプラットフォームは、instanceIndex を持つインスタンスを 1 つだけ持つことができます。 |