メッセージリスナーコンテナーの設定

SimpleMessageListenerContainer (SMLC) と DirectMessageListenerContainer (DMLC) の構成には、トランザクションとサービスの品質に関連するかなりの数のオプションがあり、それらのいくつかは相互に対話します。SMLC、DMLC、StreamListenerContainer (StLC) (RabbitMQ ストリームプラグインの使用を参照) に適用されるプロパティは、該当する列のチェックマークで示されます。アプリケーションに適したコンテナーを決定するのに役立つ情報については、コンテナーの選択を参照してください。

次の表は、名前空間を使用して <rabbit:listener-container/> を構成する場合のコンテナープロパティ名とそれに相当する属性名 (括弧内) を示しています。その要素の type 属性は、simple (デフォルト) または direct で、それぞれ SMLC または DMLC を指定できます。一部のプロパティは名前空間によって公開されません。これらは、属性の N/A によって示されます。

表 1: メッセージリスナーコンテナーの構成オプション
プロパティ (属性) 説明 SMLCDMLCStLC

ackTimeout
(なし)

messagesPerAck が設定されている場合、このタイムアウトは ack を送信する代わりに使用されます。新しいメッセージが到着すると、確認応答されていないメッセージの数が messagesPerAck と比較され、最後の確認応答からの時間がこの値と比較されます。いずれかの条件が true の場合、メッセージは確認されます。新しいメッセージが到着せず、確認応答されていないメッセージがある場合、状態は monitorInterval ごとにのみチェックされるため、このタイムアウトは概算です。この表の messagesPerAck および monitorInterval も参照してください。

tickmark

acknowledgeMode
(認める)

  • NONE: ack は送信されません ( channelTransacted=true と互換性がありません)。RabbitMQ はこれを "autoack" と呼びます。これは、ブローカーがすべてのメッセージがコンシューマーからのアクションなしで ack されると想定しているためです。

  • MANUAL: リスナーは、Channel.basicAck() を呼び出してすべてのメッセージを確認する必要があります。

  • AUTO: コンテナーは、MessageListener が例外をスローしない限り、メッセージを自動的に確認します。acknowledgeMode は channelTransacted を補完するものであることに注意してください — チャネルが処理される場合、ブローカーは ack に加えてコミット通知を必要とします。これがデフォルトのモードです。batchSize も参照してください。

tickmark
tickmark

adviceChain
(advice-chain)

リスナーの実行に適用する AOP アドバイスの配列。これは、ブローカーが停止した場合の自動再試行など、追加の横断的関心事を適用するために使用できます。AMQP エラー後の単純な再接続は、ブローカーがまだ有効である限り、CachingConnectionFactory によって処理されることに注意してください。

tickmark
tickmark

afterReceivePostProcessors
(なし)

リスナーを呼び出す前に呼び出される MessagePostProcessor インスタンスの配列。ポストプロセッサーは PriorityOrdered または Ordered を実装できます。配列は、最後に呼び出された順不同のメンバーでソートされます。ポストプロセッサーが null を返す場合、メッセージは破棄されます (必要に応じて確認されます)。

tickmark
tickmark

alwaysRequeueWithTxManagerRollback
(なし)

true に設定すると、トランザクションマネージャーが構成されている場合に、ロールバック時にメッセージを常に再キューイングします。

tickmark
tickmark

autoDeclare
(auto-declare)

true (デフォルト) に設定すると、コンテナーは、おそらく auto-delete または期限切れであるために、少なくとも 1 つのキューが欠落していることを起動時に検出した場合、RabbitAdmin を使用してすべての AMQP オブジェクト (キュー、交換、バインディング) を再宣言します。ただし、何らかの理由でキューが欠落している場合は、再宣言が続行されます。この動作を無効にするには、このプロパティを false に設定します。すべてのキューが欠落している場合、コンテナーは起動に失敗することに注意してください。

バージョン 1.6 より前では、コンテキストに複数の管理者がいる場合、コンテナーはランダムに 1 つを選択していました。管理者がいない場合は、内部で作成されます。いずれの場合も、これにより予期しない結果が生じる可能性があります。バージョン 1.6 以降、autoDeclare が機能するには、コンテキスト内に RabbitAdmin が 1 つだけ存在するか、rabbitAdmin プロパティを使用して特定のインスタンスへの参照がコンテナーに構成されている必要があります。
tickmark
tickmark

autoStartup
(auto-startup)

ApplicationContext が開始するときにコンテナーを開始する必要があることを示すフラグ (すべての Bean が初期化された後に発生する SmartLifecycle コールバックの一部として)。デフォルトは true ですが、ブローカーが起動時に使用できない可能性がある場合は false に設定し、後でブローカーの準備が整ったときに手動で start() を呼び出すことができます。

tickmark
tickmark
tickmark

batchSize
(transaction-size) (batch-size)

acknowledgeMode を AUTO に設定して使用すると、コンテナーは ack を送信する前に、この数までのメッセージを処理しようとします (受信タイムアウト設定までそれぞれを待機します)。これは、トランザクションチャネルがコミットされるときでもあります。prefetchCount が batchSize より小さい場合は、batchSize と一致するように増加されます。

tickmark

batchingStrategy
(なし)

メッセージをデバッチするときに使用される戦略。デフォルト SimpleDebatchingStrategyバッチ処理およびバッチ処理を伴う @RabbitListener を参照してください。

tickmark
tickmark

channelTransacted
(channel-transacted)

トランザクションですべてのメッセージを (手動または自動で) 確認する必要があることを示すブール値フラグ。

tickmark
tickmark

concurrency
(なし)

m-n The range of concurrent consumers for each listener (min, max). If only n is provided, n is a fixed number of consumers. See Listener Concurrency.

tickmark

concurrentConsumers
(concurrency)

The number of concurrent consumers to initially start for each listener. See Listener Concurrency. For the StLC, concurrency is controlled via an overloaded superStream method; see Consuming Super Streams with Single Active Consumers.

tickmark
tickmark

connectionFactory
(connection-factory)

A reference to the ConnectionFactory. When configuring by using the XML namespace, the default referenced bean name is rabbitConnectionFactory.

tickmark
tickmark

consecutiveActiveTrigger
(min-consecutive-active)

The minimum number of consecutive messages received by a consumer, without a receive timeout occurring, when considering starting a new consumer. Also impacted by 'batchSize'. See Listener Concurrency. Default: 10.

tickmark

consecutiveIdleTrigger
(min-consecutive-idle)

The minimum number of receive timeouts a consumer must experience before considering stopping a consumer. Also impacted by 'batchSize'. See Listener Concurrency. Default: 10.

tickmark

consumerBatchEnabled
(batch-enabled)

If the MessageListener supports it, setting this to true enables batching of discrete messages, up to batchSize; a partial batch will be delivered if no new messages arrive in receiveTimeout or gathering batch messages time exceeded batchReceiveTimeout. When this is false, batching is only supported for batches created by a producer; see Batching.

tickmark

consumerCustomizer
(N/A)

A ConsumerCustomizer bean used to modify stream consumers created by the container.

tickmark

consumerStartTimeout
(N/A)

The time in milliseconds to wait for a consumer thread to start. If this time elapses, an error log is written. An example of when this might happen is if a configured taskExecutor has insufficient threads to support the container concurrentConsumers.

See Threading and Asynchronous Consumers. Default: 60000 (one minute).

tickmark

consumerTagStrategy
(consumer-tag-strategy)

Set an implementation of ConsumerTagStrategy, enabling the creation of a (unique) tag for each consumer.

tickmark
tickmark

consumersPerQueue
(consumers-per-queue)

The number of consumers to create for each configured queue. See Listener Concurrency.

tickmark

consumeDelay
(N/A)

When using the RabbitMQ Sharding Plugin [GitHub] (英語) with concurrentConsumers > 1, there is a race condition that can prevent even distribution of the consumers across the shards. Use this property to add a small delay between consumer starts to avoid this race condition. You should experiment with values to determine the suitable delay for your environment.

tickmark
tickmark

debatchingEnabled
(N/A)

When true, the listener container will debatch batched messages and invoke the listener with each message from the batch. Starting with version 2.2.7, producer created batches will be debatched as a List<Message> if the listener is a BatchMessageListener or ChannelAwareBatchMessageListener. Otherwise messages from the batch are presented one-at-a-time. Default true. See Batching and @RabbitListener with Batching.

tickmark
tickmark

declarationRetries
(declaration-retries)

The number of retry attempts when passive queue declaration fails. Passive queue declaration occurs when the consumer starts or, when consuming from multiple queues, when not all queues were available during initialization. When none of the configured queues can be passively declared (for any reason) after the retries are exhausted, the container behavior is controlled by the 'missingQueuesFatal` property, described earlier. Default: Three retries (for a total of four attempts).

tickmark

defaultRequeueRejected
(requeue-rejected)

Determines whether messages that are rejected because the listener threw an exception should be requeued or not. Default: true.

tickmark
tickmark

errorHandler
(error-handler)

A reference to an ErrorHandler strategy for handling any uncaught exceptions that may occur during the execution of the MessageListener. Default: ConditionalRejectingErrorHandler

tickmark
tickmark

exclusive
(排他的)

このコンテナー内の単一のコンシューマーがキューに排他的にアクセスできるかどうかを決定します。これが true の場合、コンテナーの同時実行数は 1 でなければなりません。別のコンシューマーが排他的アクセス権を持っている場合、コンテナーは recovery-interval または recovery-back-off に従ってコンシューマーを回復しようとします。名前空間を使用する場合、この属性はキュー名とともに <rabbit:listener/> 要素に表示されます。デフォルト: false

tickmark
tickmark

exclusiveConsumerExceptionLogger
(なし)

排他的なコンシューマーがキューにアクセスできない場合に使用される例外ロガー。デフォルトでは、これは WARN レベルでログに記録されます。

tickmark
tickmark

failedDeclarationRetryInterval
(失敗宣言 -retry-interval)

パッシブキュー宣言の再試行の間隔。パッシブキューの宣言は、コンシューマーが開始するとき、または複数のキューから消費するとき、初期化中にすべてのキューが使用可能でなかったときに発生します。デフォルト: 5000 (5 秒)。

tickmark
tickmark

forceCloseChannel
(なし)

コンシューマーが shutdownTimeout 内のシャットダウンに応答しない場合、これが true の場合、チャネルは閉じられ、確認応答されていないメッセージが再キューイングされます。2.0 以降のデフォルトは true です。false に設定して、以前の動作に戻すことができます。

tickmark
tickmark

forceStop
(なし)

現在のレコードが処理された後に (コンテナーの停止時に) 停止するには true に設定します。これにより、プリフェッチされたすべてのメッセージが再度キューに入れられます。デフォルトでは、コンテナーは停止する前にコンシューマーをキャンセルし、プリフェッチされたすべてのメッセージを処理します。バージョン 2.4.14、3.0.6 以降、デフォルトは false になります。

tickmark
tickmark

globalQos
(global-qos)

true の場合、prefetchCount は、チャネル上の各コンシューマーではなく、チャネルにグローバルに適用されます。詳細については、basicQos.global (英語) を参照してください。

tickmark
tickmark

(グループ)

これは、名前空間を使用する場合にのみ使用できます。指定すると、型 Collection<MessageListenerContainer> の Bean がこの名前で登録され、各 <listener/> 要素のコンテナーがコレクションに追加されます。これにより、たとえば、コレクションを反復処理してコンテナーのグループを開始および停止できます。複数の <listener-container/> 要素が同じグループ値を持つ場合、コレクション内のコンテナーは、そのように指定されたすべてのコンテナーの集合体を形成します。

tickmark
tickmark

idleEventInterval
(idle-event-interval)

アイドル状態の非同期コンシューマーの検出を参照してください。

tickmark
tickmark

javaLangErrorHandler
(なし)

コンテナースレッドが Error をキャッチしたときに呼び出される AbstractMessageListenerContainer.JavaLangErrorHandler 実装。デフォルトの実装は System.exit(99) を呼び出します。以前の動作に戻す (何もしない) には、no-op ハンドラーを追加します。

tickmark
tickmark

maxConcurrentConsumers
(max-concurrency)

必要に応じてオンデマンドで開始する同時コンシューマーの最大数。"concurrentConsumers" 以上である必要があります。リスナーの同時実行を参照してください。

tickmark

messagesPerAck
(なし)

ACK 間で受信するメッセージの数。これを使用して、ブローカーに送信される ack の数を減らします (ただし、メッセージが再配信される可能性が高くなります)。通常、このプロパティは、大量のリスナーコンテナーでのみ設定する必要があります。これが設定されていて、メッセージが拒否された (例外がスローされた) 場合、保留中の ack が確認され、失敗したメッセージは拒否されます。取引チャネルでは許可されていません。prefetchCount が messagesPerAck より小さい場合は、messagesPerAck と一致するように増加されます。デフォルト: すべてのメッセージを確認します。この表の ackTimeout も参照してください。

tickmark

mismatchedQueuesFatal
(mismatched-queues-fatal)

コンテナーの開始時に、このプロパティが true (デフォルト: false) の場合、コンテナーは、コンテキストで宣言されたすべてのキューがすでにブローカーにあるキューと互換性があることを確認します。一致しないプロパティ ( auto-delete など) または引数 ( x-message-ttl など) が存在する場合、コンテナー (およびアプリケーションコンテキスト) は致命的な例外で起動に失敗します。

リカバリ中に問題が検出された場合 (たとえば、接続が失われた後)、コンテナーは停止します。

アプリケーションコンテキスト (または rabbitAdmin プロパティを使用してコンテナーで具体的に構成されたもの) に単一の RabbitAdmin が存在する必要があります。それ以外の場合、このプロパティは false でなければなりません。

最初の起動時にブローカーが使用できない場合、コンテナーが起動し、接続が確立されたときに条件がチェックされます。
特定のリスナーが使用するように構成されているキューだけでなく、コンテキスト内のすべてのキューに対してチェックが行われます。コンテナーが使用するキューのみにチェックを制限したい場合は、コンテナー用に別の RabbitAdmin を設定し、rabbitAdmin プロパティを使用してそれへの参照を提供する必要があります。詳細については、条件宣言を参照してください。
@Lazy とマークされた Bean で @RabbitListener のコンテナーを開始している間、不一致のキュー引数の検出が無効になります。これは、そのようなコンテナーの開始を最大 60 秒遅らせる可能性のある潜在的なデッドロックを回避するためです。遅延リスナー Bean を使用するアプリケーションは、遅延 Bean への参照を取得する前にキュー引数をチェックする必要があります。
tickmark
tickmark

missingQueuesFatal
(missing-queues-fatal)

true (デフォルト) に設定すると、設定されたキューがブローカで使用できない場合、致命的と見なされます。これにより、起動時にアプリケーションコンテキストの初期化が失敗します。また、コンテナーの実行中にキューが削除されると、デフォルトでは、コンシューマーはキューへの接続を 3 回 (5 秒間隔で) 再試行し、これらの試行が失敗した場合はコンテナーを停止します。

これは、以前のバージョンでは構成できませんでした。

false に設定すると、3 回再試行した後、ブローカーのダウンなどの他の問題と同様に、コンテナーは回復モードになります。コンテナーは、recoveryInterval プロパティに従って回復を試みます。各回復の試行中に、各コンシューマーは 5 秒間隔でキューを受動的に宣言することを 4 回試みます。このプロセスは無期限に続きます。

次のように、プロパティ Bean を使用して、すべてのコンテナーのプロパティをグローバルに設定することもできます。

<util:properties
        id="spring.amqp.global.properties">
    <prop key="mlc.missing.queues.fatal">
        false
    </prop>
</util:properties>

このグローバルプロパティは、明示的な missingQueuesFatal プロパティセットを持つコンテナーには適用されません。

デフォルトの再試行プロパティ (5 秒間隔で 3 回の再試行) は、以下のプロパティを設定することでオーバーライドできます。

@Lazy とマークされた Bean で @RabbitListener のコンテナーを開始している間、欠落しているキューの検出が無効になります。これは、そのようなコンテナーの開始を最大 60 秒遅らせる可能性のある潜在的なデッドロックを回避するためです。遅延リスナー Bean を使用するアプリケーションは、遅延 Bean への参照を取得する前にキューをチェックする必要があります。
tickmark
tickmark

monitorInterval
(monitor-interval)

DMLC を使用すると、この間隔でタスクが実行され、コンシューマーの状態を監視し、失敗したものを回復するようにスケジュールされます。

tickmark

noLocal
(なし)

true に設定すると、同じチャネルの接続でパブリッシュされたサーバーからコンシューマーへのメッセージの配信が無効になります。

tickmark
tickmark

phase
(段階)

autoStartup が true の場合、このコンテナーが開始および停止するライフサイクルフェーズ。値が小さいほど、このコンテナーは早く開始し、遅く停止します。デフォルトは Integer.MAX_VALUE です。これは、コンテナーができるだけ遅く開始し、できるだけ早く停止することを意味します。

tickmark
tickmark

possibleAuthenticationFailureFatal
(possible-authentication-failure-fatal)

true (SMLC のデフォルト) に設定すると、接続中に PossibleAuthenticationFailureException がスローされた場合、致命的と見なされます。これにより、起動時にアプリケーションコンテキストの初期化が失敗します (コンテナーが自動起動で構成されている場合)。

バージョン 2.0 以降。

DirectMessageListenerContainer

false (デフォルト) に設定すると、各コンシューマーは monitorInterval に従って再接続を試みます。

SimpleMessageListenerContainer

false に設定すると、3 回再試行した後、ブローカーのダウンなどの他の問題と同様に、コンテナーは回復モードになります。コンテナーは、recoveryInterval プロパティに従って回復を試みます。各回復の試行中に、各コンシューマーは再び開始を 4 回試行します。このプロセスは無期限に継続されます。

次のように、プロパティ Bean を使用して、すべてのコンテナーのプロパティをグローバルに設定することもできます。

<util:properties
    id="spring.amqp.global.properties">
  <prop
    key="mlc.possible.authentication.failure.fatal">
     false
  </prop>
</util:properties>

このグローバルプロパティは、明示的な missingQueuesFatal プロパティセットを持つコンテナーには適用されません。

デフォルトの再試行プロパティ (5 秒間隔で 3 回の再試行) は、この後のプロパティを使用してオーバーライドできます。

tickmark
tickmark

prefetchCount
(プリフェッチ)

各コンシューマーで未解決のままにしておくことができる未確認メッセージの数。この値が高いほど、メッセージをより速く配信できますが、非順次処理のリスクが高くなります。acknowledgeMode が NONE の場合は無視されます。これは、必要に応じて、batchSize または messagePerAck と一致するように増加されます。2.0 以降のデフォルトは 250 です。1 に設定すると、以前の動作に戻すことができます。

プリフェッチ値を低くする必要があるシナリオがあります。たとえば、大きなメッセージで、特に処理が遅い場合 (メッセージがクライアントプロセスで大量のメモリを追加する可能性がある場合)、および厳密なメッセージの順序付けが必要な場合 (この場合、プリフェッチ値を 1 に戻す必要があります)。また、少量のメッセージングと複数のコンシューマー (単一のリスナーコンテナーインスタンス内での同時実行を含む) では、プリフェッチを減らして、コンシューマー間でメッセージをより均等に分散させたい場合があります。

globalQos も参照してください。

tickmark
tickmark

rabbitAdmin
(admin)

リスナーコンテナーが少なくとも 1 つの自動削除キューをリッスンし、起動時に欠落していることが判明した場合、コンテナーは RabbitAdmin を使用してキューと関連するバインディングおよび交換を宣言します。そのような要素が条件宣言を使用するように構成されている場合 ( 条件宣言を参照)、コンテナーは、それらの要素を宣言するように構成された admin を使用する必要があります。その管理者をここで指定します。条件付き宣言で自動削除キューを使用する場合にのみ必要です。コンテナーが開始されるまで自動削除キューを宣言したくない場合は、管理者で auto-startup を false に設定します。デフォルトは、すべての非条件要素を宣言する RabbitAdmin です。

tickmark
tickmark

receiveTimeout
(receive-timeout)

各メッセージを待機する最大時間。acknowledgeMode=NONE の場合、これはほとんど効果がありません — コンテナーはぐるぐる回って別のメッセージを要求します。トランザクション Channel と batchSize > 1 で最大の影響があります。これは、すでに消費されたメッセージがタイムアウトになるまで確認されない可能性があるためです。consumerBatchEnabled が true の場合、バッチが完了する前にこのタイムアウトが発生すると、部分的なバッチが配信されます。

tickmark

batchReceiveTimeout
(batch-receive-timeout)

バッチメッセージを収集するためのタイムアウトのミリ秒数。これにより、batchSize がいっぱいになるまでの待ち時間が制限されます。batchSize > 1 とバッチメッセージの収集時間が batchReceiveTime よりも長い場合、バッチは配信されます。デフォルトは 0 (タイムアウトなし) です。

tickmark

recoveryBackOff
(recovery-back-off)

致命的でない理由で開始に失敗した場合に、コンシューマーの開始を試行する間隔の BackOff を指定します。デフォルトは FixedBackOff で、5 秒ごとに無制限に再試行されます。recoveryInterval とは相互に排他的です。

tickmark
tickmark

recoveryInterval
(recovery-interval)

致命的でない理由で開始に失敗した場合に、コンシューマーの開始を試行する間隔をミリ秒単位で決定します。デフォルト: 5000。recoveryBackOff とは相互に排他的です。

tickmark
tickmark

retryDeclarationInterval
(欠落しているキューの再試行間隔)

コンシューマーの初期化中に構成されたキューのサブセットが使用可能な場合、コンシューマーはそれらのキューから消費を開始します。コンシューマーは、この間隔を使用して不足しているキューを受動的に宣言しようとします。この間隔が経過すると、"declarationRetries" と "failedDeclarationRetryInterval" が再び使用されます。欠落しているキューがまだある場合、コンシューマーはこの間隔を待ってから再試行します。このプロセスは、すべてのキューが使用可能になるまで無限に続きます。デフォルト: 60000 (1 分)。

tickmark

shutdownTimeout
(なし)

コンテナーがシャットダウンすると (たとえば、コンテナーを囲んでいる ApplicationContext が閉じられている場合)、処理中のメッセージがこの制限まで処理されるのを待機します。デフォルトは 5 秒です。

tickmark
tickmark

startConsumerMinInterval
(min-start-interval)

新しい各コンシューマーがオンデマンドで開始されるまでに経過しなければならないミリ秒単位の時間。リスナーの同時実行を参照してください。デフォルト: 10000 (10 秒)。

tickmark

statefulRetryFatal
WithNullMessageId (なし)

ステートフルな再試行アドバイスを使用する場合、messageId プロパティが欠落しているメッセージを受信すると、デフォルトでコンシューマーにとって致命的と見なされます (停止されます)。このようなメッセージを破棄 (または配信不能キューにルーティング) するには、これを false に設定します。

tickmark
tickmark

stopConsumerMinInterval
(min-stop-interval)

アイドル状態のコンシューマーが検出されたときに最後のコンシューマーが停止されてから、コンシューマーが停止されるまでに経過する必要があるミリ秒単位の時間。リスナーの同時実行を参照してください。デフォルト: 60000 (1 分)。

tickmark

streamConverter
(なし)

ネイティブ Stream メッセージを Spring AMQP メッセージに変換する StreamMessageConverter

tickmark

taskExecutor
(task-executor)

リスナー呼び出し元を実行するための Spring TaskExecutor (または標準 JDK 1.5+ Executor) への参照。デフォルトは、内部管理スレッドを使用する SimpleAsyncTaskExecutor です。

tickmark
tickmark

taskScheduler
(task-scheduler)

DMLC では、スケジューラは "monitorInterval" で監視タスクを実行するために使用されます。

tickmark

transactionManager
(transaction-manager)

リスナーの操作のための外部トランザクションマネージャー。また、channelTransacted を補完します — Channel が取引される場合、その取引は外部取引と同期されます。

tickmark
tickmark