リスナーコンテナーのプロパティ

表 1: ContainerProperties プロパティ
プロパティ デフォルト 説明

ackCount

1

ackMode が COUNT または COUNT_TIME の場合に、保留中のオフセットをコミットする前のレコード数。

adviceChain

null

メッセージリスナーをラップする Advice オブジェクトのチェーン(アドバイスの周囲の MethodInterceptor など)。順番に呼び出されます。

ackMode

BATCH

オフセットがコミットされる頻度を制御します。オフセットのコミットを参照してください。

ackTime

5000

ackMode が TIME または COUNT_TIME の場合に、保留中のオフセットがコミットされるまでのミリ秒単位の時間。

assignmentCommitOption

LATEST_ONLY _NO_TX

割り当て時に初期位置をコミットするかどうか。デフォルトでは、初期オフセットは ConsumerConfig.AUTO_OFFSET_RESET_CONFIG が latest の場合にのみコミットされ、トランザクションマネージャーが存在する場合でもトランザクション内で実行されません。利用可能なオプションの詳細については、ContainerProperties.AssignmentCommitOption の JavaDocs を参照してください。

asyncAcks

false

アウトオブオーダーコミットを有効にします ( 手動でオフセットをコミットするを参照)。コンシューマーは一時停止され、ギャップが埋まるまでコミットは延期されます。

authExceptionRetryInterval

null

null でない場合、AuthenticationException または AuthorizationException が Kafka クライアントによってスローされたときに、ポーリング間でスリープする Duration。null の場合、そのような例外は致命的と見なされ、コンテナーは停止します。

clientId

(空の文字列)

client.id コンシューマープロパティのプレフィックス。コンシューマーファクトリの client.id プロパティをオーバーライドします。並行コンテナーでは、-n が各コンシューマーインスタンスのサフィックスとして追加されます。

checkDeserExWhenKeyNull

false

nullkey を受信したときに、常に DeserializationException ヘッダーをチェックするには、true に設定します。委譲デシリアライザーを使用している場合など、コンシューマーコードが ErrorHandlingDeserializer が構成されていることを判別できない場合に役立ちます。

checkDeserExWhenValueNull

false

nullvalue を受信したときに、常に DeserializationException ヘッダーをチェックするには、true に設定します。委譲デシリアライザーを使用している場合など、コンシューマーコードが ErrorHandlingDeserializer が構成されていることを判別できない場合に役立ちます。

commitCallback

null

存在し、syncCommits が false の場合、コミットの補完後にコールバックが呼び出されます。

offsetAndMetadataProvider

null

OffsetAndMetadata のプロバイダー ; デフォルトでは、プロバイダーは空のメタデータを使用してオフセットとメタデータを作成します。プロバイダーは、メタデータをカスタマイズする方法を提供します。

commitLogLevel

DEBUG

オフセットのコミットに関連するログのログレベル。

consumerRebalanceListener

null

リバランスリスナー。リスナーのリバランスを参照してください。

consumerStartTimout

30 代

エラーをログに記録する前に、コンシューマーが開始するのを待つ時間。これは、たとえば、スレッドが不十分なタスクエグゼキュータを使用している場合に発生する可能性があります。

consumerTaskExecutor

SimpleAsyncTaskExecutor

コンシューマースレッドを実行するタスクエグゼキュータ。デフォルトのエグゼキュータは、<name>-C-n という名前のスレッドを作成します。KafkaMessageListenerContainer の場合、名前は Bean 名です。ConcurrentMessageListenerContainer の場合、名前は -n が付加された Bean 名であり、n は子コンテナーごとに増分されます。

deliveryAttemptHeader

false

配信試行ヘッダーを参照してください。

eosMode

V2

正確に一度のセマンティクスモード。正確に一度セマンティクスを参照してください。

fixTxOffsets

false

トランザクションプロデューサーによって作成されたレコードを消費するときに、コンシューマーがパーティションの最後に位置する場合、トランザクションのコミット / ロールバックを示すために使用される疑似レコードと、おそらくロールバックされたレコードの存在により、ラグが誤ってゼロより大きく報告されることがあります。これは関数にはコンシューマーに影響を与えませんが、一部のユーザーは「ラグ」がゼロでないことに関心事を示しています。このプロパティを true に設定すると、コンテナーはこのようなオフセットの誤報を修正します。このチェックは、コミット処理が著しく複雑にならないように、次のポーリングの前に実行されます。執筆時点では、コンシューマーが isolation.level=read_committed で構成されており、max.poll.records が 1 より大きい場合にのみラグが修正されます。詳細は KAFKA-10683 [Apache] (英語) を参照してください。

groupId

null

コンシューマーの group.id プロパティをオーバーライドします。@KafkaListenerid または groupId プロパティによって自動的に設定されます。

idleBeforeDataMultiplier

5.0

レコードを受信する前に適用される idleEventInterval の乗数。レコードを受信すると、乗数は適用されなくなります。バージョン 2.8 以降で使用できます。

idleBetweenPolls

0

ポーリングの間にスレッドをスリープさせることにより、配信を遅くするために使用されます。レコードのバッチとこの値を処理する時間は、max.poll.interval.ms コンシューマープロパティよりも短くする必要があります。

idleEventInterval

null

設定すると、ListenerContainerIdleEvent の公開が有効になります。アプリケーションイベントおよびアイドル状態のコンシューマーと無反応なコンシューマーの検出を参照してください。idleBeforeDataMultiplier も参照してください。

idlePartitionEventInterval

null

設定すると、ListenerContainerIdlePartitionEvent の公開が有効になります。アプリケーションイベントおよびアイドル状態のコンシューマーと無反応なコンシューマーの検出を参照してください。

kafkaConsumerProperties

なし

コンシューマーファクトリで構成された任意のコンシューマープロパティをオーバーライドするために使用されます。

logContainerConfig

false

true に設定すると、すべてのコンテナープロパティが INFO レベルでログに記録されます。

messageListener

null

メッセージリスナー。

micrometerEnabled

true

コンシューマースレッドの Micrometer タイマーを維持するかどうか。

micrometerTags

micrometer メトリクスに追加される静的タグのマップ。

micrometerTagsProvider

null

コンシューマーの記録に基づいて動的タグを提供する機能。

missingTopicsFatal

false

true の場合、構成されたトピックがブローカーに存在しない場合にコンテナーが開始されないようにします。

monitorInterval

30 代

NonResponsiveConsumerEvent のコンシューマースレッドの状態をチェックする頻度。noPollThreshold および pollTimeout を参照してください。

noPollThreshold

3.0

pollTimeOut を掛けて、NonResponsiveConsumerEvent を公開するかどうかを決定します。monitorInterval を参照してください。

onlyLogRecordMetadata

false

topic-partition@offset だけでなく、完全なコンシューマーレコード(エラー、デバッグログなど)をログに記録するには、false に設定します。

pauseImmediate

false

コンテナーが一時停止すると、前のポーリングからのすべてのレコードを処理した後ではなく、現在のレコードの後に処理を停止します。残りのレコードはメモリに保持され、コンテナーが再開されたときにリスナーに渡されます。

pollTimeout

5000

ミリ秒単位で Consumer.poll() に渡されたタイムアウト。

pollTimeoutWhilePaused

100

コンテナーが一時停止状態のときに Consumer.poll() に渡されるタイムアウト (ミリ秒単位)。

restartAfterAuthExceptions

false

認可 / 認証の例外が原因でコンテナーが停止した場合にコンテナーを再起動するには、True。

scheduler

ThreadPoolTaskScheduler

コンシューマーモニタータスクを実行するスケジューラー。

shutdownTimeout

10000

すべてのコンシューマーが停止し、コンテナー停止イベントを公開するまでに stop() メソッドをブロックする最大時間(ミリ秒単位)。

stopContainerWhenFenced

false

ProducerFencedException がスローされた場合は、リスナーコンテナーを停止します。詳細については、ロールバック後のプロセッサーを参照してください。

stopImmediate

false

コンテナーが停止したら、前のポーリングのすべてのレコードを処理した後ではなく、現在のレコードの後で処理を停止します。

subBatchPerPartition

説明を参照してください。

バッチリスナーを使用する場合、これが true の場合、リスナーはポーリングの結果を使用して呼び出され、パーティションごとに 1 つのサブバッチに分割されます。デフォルト false

syncCommitTimeout

null

syncCommits が true の場合に使用するタイムアウト。設定されていない場合、コンテナーは default.api.timeout.ms コンシューマープロパティを決定し、それを使用しようとします。それ以外の場合は 60 秒を使用します。

syncCommits

true

オフセットに同期コミットと非同期コミットのどちらを使用するか。commitCallback を参照してください。

topicstopicPatterntopicPartitions

なし

構成されたトピック、トピックパターン、明示的に割り当てられたトピック / パーティション。相互に排他的。少なくとも 1 つ提供する必要があります。ContainerProperties コンストラクターによって実施されます。

transactionManager

null

トランザクションを参照してください。

表 2: AbstractListenerContainer プロパティ
プロパティ デフォルト 説明

afterRollbackProcessor

DefaultAfterRollbackProcessor

トランザクションがロールバックされた後に呼び出す AfterRollbackProcessor

applicationEventPublisher

アプリケーションコンテキスト

イベント発行者。

batchErrorHandler

説明を参照してください。

非推奨 - commonErrorHandler を参照してください。

batchInterceptor

null

バッチリスナーを呼び出す前に呼び出すように BatchInterceptor を設定します。レコードリスナーには適用されません。interceptBeforeTx も参照してください。

beanName

Bean 名

コンテナーの Bean 名。子コンテナーの接尾辞は -n です。

commonErrorHandler

説明を参照してください。

DefaultErrorHandler or null when a transactionManager is provided when a DefaultAfterRollbackProcessor is used. See Container Error Handlers.

containerProperties

ContainerProperties

コンテナープロパティインスタンス。

errorHandler

説明を参照してください。

非推奨 - commonErrorHandler を参照してください。

genericErrorHandler

説明を参照してください。

非推奨 - commonErrorHandler を参照してください。

groupId

説明を参照してください。

containerProperties.groupId が存在する場合は、それ以外の場合は、コンシューマーファクトリからの group.id プロパティ。

interceptBeforeTx

true

トランザクションの開始前または開始後に recordInterceptor を呼び出すかどうかを決定します。

listenerId

説明を参照してください。

ユーザー構成コンテナーの Bean 名、または @KafkaListener の id 属性。

listenerInfo

null

KafkaHeaders.LISTENER_INFO ヘッダーに入力する値。@KafkaListener の場合、この値は info 属性から取得されます。このヘッダーは、RecordInterceptorRecordFilterStrategy などのさまざまな場所や、リスナーコード自体で使用できます。

pauseRequested

(読み取り専用)

コンシューマーの一時停止がリクエストされた場合は True。

recordInterceptor

null

レコードリスナーを呼び出す前に呼び出すように RecordInterceptor を設定します。バッチリスナーには適用されません。interceptBeforeTx も参照してください。

topicCheckTimeout

30 代

missingTopicsFatal コンテナーのプロパティが true の場合、describeTopics 操作が完了するまでの待機時間(秒単位)。

表 3: KafkaMessageListenerContainer プロパティ
プロパティ デフォルト 説明

assignedPartitions

(読み取り専用)

このコンテナーに現在割り当てられているパーティション(明示的かどうかに関係なく)。

assignedPartitionsByClientId

(読み取り専用)

このコンテナーに現在割り当てられているパーティション(明示的かどうかに関係なく)。

clientIdSuffix

null

並行コンテナーによって使用され、各子コンテナーのコンシューマーに一意の client.id を提供します。

containerPaused

なし

一時停止がリクエストされ、コンシューマーが実際に一時停止した場合は True。

表 4: ConcurrentMessageListenerContainer プロパティ
プロパティ デフォルト 説明

alwaysClientIdSuffix

true

false に設定すると、concurrency が 1 しかない場合に、client.id コンシューマープロパティへのサフィックスの追加が抑制されます。

assignedPartitions

(読み取り専用)

このコンテナーの子 KafkaMessageListenerContainer に現在割り当てられているパーティションの集合体(明示的かどうかに関係なく)。

assignedPartitionsByClientId

(読み取り専用)

このコンテナーの子 KafkaMessageListenerContainer に現在割り当てられているパーティション(明示的かどうかに関係なく)。子コンテナーのコンシューマーの client.id プロパティによってキー設定されます。

concurrency

1

管理する子 KafkaMessageListenerContainer の数。

containerPaused

なし

一時停止がリクエストされ、すべての子コンテナーのコンシューマーが実際に一時停止した場合は True。

containers

なし

すべての子 KafkaMessageListenerContainer への参照。