メッセージリスナコンテナー
2 つの MessageListenerContainer
実装が提供されています。
KafkaMessageListenerContainer
ConcurrentMessageListenerContainer
KafkaMessageListenerContainer
は、すべてのトピックまたはパーティションからのすべてのメッセージを単一のスレッドで受信します。ConcurrentMessageListenerContainer
は、1 つ以上の KafkaMessageListenerContainer
インスタンスに委譲して、マルチスレッド消費を提供します。
バージョン 2.2.7 以降、リスナーコンテナーに RecordInterceptor
を追加できます。リスナーを呼び出す前に呼び出され、インスペクションまたはレコードの変更が許可されます。インターセプターが null を返す場合、リスナーは呼び出されません。バージョン 2.7 以降、リスナーの終了後に(通常、例外をスローすることによって)呼び出される追加のメソッドがあります。また、バージョン 2.7 以降、BatchInterceptor
があり、バッチリスナーに同様の機能を提供します。さらに、ConsumerAwareRecordInterceptor
(および BatchInterceptor
)は Consumer<?, ?>
へのアクセスを提供します。これは、たとえば、インターセプターのコンシューマーメトリクスにアクセスするために使用される場合があります。
これらのインターセプターでコンシューマーの位置やコミットされたオフセットに影響を与えるメソッドを実行しないでください。コンテナーはそのような情報を管理する必要があります。 |
インターセプターが(新しいレコードを作成することによって)レコードを変更する場合、レコードの損失などの予期しない副作用を回避するために、topic 、partition 、offset は同じままである必要があります。 |
CompositeRecordInterceptor
および CompositeBatchInterceptor
は、複数のインターセプターを呼び出すために使用できます。
デフォルトでは、バージョン 2.8 以降、トランザクションを使用する場合、トランザクションが開始される前にインターセプターが呼び出されます。代わりに、リスナーコンテナーの interceptBeforeTx
プロパティを false
に設定して、トランザクションの開始後にインターセプターを呼び出すことができます。バージョン 2.9 以降、これは KafkaAwareTransactionManager
だけでなく、すべてのトランザクションマネージャーに適用されます。これにより、たとえば、コンテナーによって開始された JDBC トランザクションにインターセプターが参加できるようになります。
バージョン 2.3.8、2.4.6 以降、同時実行性が 1 より大きい場合、ConcurrentMessageListenerContainer
は静的メンバーシップ [Apache] (英語) をサポートするようになりました。group.instance.id
の接尾辞は -n
で、n
は 1
で始まります。これは、session.timeout.ms
の増加とともに、たとえばアプリケーションインスタンスが再起動されたときなど、リバランスイベントを減らすために使用できます。
KafkaMessageListenerContainer
を使用する
次のコンストラクターを使用できます。
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
ConsumerFactory
と、トピックとパーティションに関する情報、およびその他の構成を ContainerProperties
オブジェクトで受け取ります。ContainerProperties
には、次のコンストラクターがあります。
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
最初のコンストラクターは、TopicPartitionOffset
引数の配列を受け取り、使用するパーティションについてコンテナーに明示的に指示し(コンシューマー assign()
メソッドを使用)、オプションの初期オフセットを使用します。正の値は、デフォルトでは絶対オフセットです。負の値は、デフォルトでパーティション内の現在の最後のオフセットを基準にしています。追加の boolean
引数を取る TopicPartitionOffset
のコンストラクターが提供されています。これが true
の場合、初期オフセット(正または負)は、このコンシューマーの現在の位置を基準にしています。オフセットは、コンテナーの開始時に適用されます。2 つ目はトピックの配列を取り、Kafka は group.id
プロパティに基づいてパーティションを割り当てます。グループ全体にパーティションを分散します。3 つ目は、正規表現 Pattern
を使用してトピックを選択します。
MessageListener
をコンテナーに割り当てるには、コンテナーの作成時に ContainerProps.setMessageListener
メソッドを使用できます。次の例は、その方法を示しています。
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
DefaultKafkaConsumerFactory
を作成するときに、上記のようにプロパティを取り込むコンストラクターを使用すると、キーと値の Deserializer
クラスが構成から取得されることに注意してください。または、Deserializer
インスタンスをキーや値の DefaultKafkaConsumerFactory
コンストラクターに渡すこともできます。その場合、すべてのコンシューマーが同じインスタンスを共有します。別のオプションは、Consumer
ごとに個別の Deserializer
インスタンスを取得するために使用される Supplier<Deserializer>
(バージョン 2.3 以降)を提供することです。
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
設定できるさまざまなプロパティの詳細については、ContainerProperties
の Javadoc を参照してください。
バージョン 2.1.1 以降、logContainerConfig
と呼ばれる新しいプロパティが使用可能になりました。true
および INFO
ロギングが有効になっている場合、各リスナーコンテナーは、その構成プロパティを要約したログメッセージを書き込みます。
デフォルトでは、トピックオフセットコミットのロギングは DEBUG
ロギングレベルで実行されます。バージョン 2.1.2 以降、commitLogLevel
と呼ばれる ContainerProperties
のプロパティを使用して、これらのメッセージのログレベルを指定できます。例: ログレベルを INFO
に変更するには、containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
を使用できます。
バージョン 2.2 以降、missingTopicsFatal
と呼ばれる新しいコンテナープロパティが追加されました(デフォルト: 2.3.4 以降の false
)。これにより、構成されたトピックのいずれかがブローカーに存在しない場合にコンテナーが開始されなくなります。コンテナーがトピックパターン(正規表現)をリッスンするように構成されている場合は適用されません。以前は、コンテナースレッドは consumer.poll()
メソッド内でループし、多くのメッセージをログに記録している間、トピックが表示されるのを待っていました。ログを除いて、問題があったという兆候はありませんでした。
バージョン 2.8 の時点で、新しいコンテナープロパティ authExceptionRetryInterval
が導入されました。これにより、コンテナーは KafkaConsumer
から AuthenticationException
または AuthorizationException
を取得した後、メッセージのフェッチを再試行します。これは、たとえば、構成されたユーザーが特定のトピックを読み取るためのアクセスを拒否された場合、または資格情報が正しくない場合に発生する可能性があります。authExceptionRetryInterval
を定義すると、適切な権限が付与されたときにコンテナーをリカバリできます。
デフォルトでは、間隔は構成されていません。認証および認可エラーは致命的と見なされ、コンテナーが停止します。 |
バージョン 2.8 以降、コンシューマーファクトリを作成するときに、デシリアライザーをオブジェクトとして(コンストラクター内または setter 経由で)提供すると、ファクトリは configure()
メソッドを呼び出して、構成プロパティで構成します。
ConcurrentMessageListenerContainer
を使用する
単一のコンストラクターは、KafkaListenerContainer
コンストラクターに似ています。次のリストは、コンストラクターの署名を示しています。
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
また、concurrency
プロパティもあります。例: container.setConcurrency(3)
は 3 つの KafkaMessageListenerContainer
インスタンスを作成します。
最初のコンストラクターの場合、Kafka は、グループ管理機能を使用して、コンシューマー全体にパーティションを分散します。
複数のトピックを聞いている場合、デフォルトのパーティション分散は期待したものではない可能性があります。例: それぞれ 5 つのパーティションを持つ 3 つのトピックがあり、 Spring Boot を使用する場合、次のように戦略を設定することができます。
|
コンテナーのプロパティが TopicPartitionOffset
で構成されている場合、ConcurrentMessageListenerContainer
は TopicPartitionOffset
インスタンスをデリゲート KafkaMessageListenerContainer
インスタンス全体に分散します。
たとえば、6 つの TopicPartitionOffset
インスタンスが提供され、concurrency
が 3
である場合。各コンテナーは 2 つのパーティションを取得します。5 つの TopicPartitionOffset
インスタンスの場合、2 つのコンテナーが 2 つのパーティションを取得し、3 番目が 1 つのパーティションを取得します。concurrency
が TopicPartitions
の数より大きい場合、concurrency
は、各コンテナーが 1 つのパーティションを取得するように調整されます。
client.id プロパティ(設定されている場合)には -n が追加されます。ここで、n は、同時実行性に対応するコンシューマーインスタンスです。これは、JMX が有効になっているときに MBean に一意の名前を指定するために必要です。 |
バージョン 1.3 以降、MessageListenerContainer
は、基盤となる KafkaConsumer
のメトリクスへのアクセスを提供します。ConcurrentMessageListenerContainer
の場合、metrics()
メソッドはすべてのターゲット KafkaMessageListenerContainer
インスタンスのメトリクスを返します。メトリクスは、基礎となる KafkaConsumer
に提供される client-id
によって Map<MetricName, ? extends Metric>
にグループ化されます。
バージョン 2.3 以降、ContainerProperties
は idleBetweenPolls
オプションを提供し、リスナーコンテナーのメインループを KafkaConsumer.poll()
呼び出し間でスリープさせます。実際のスリープ間隔は、提供されたオプションと max.poll.interval.ms
コンシューマー構成と現在のレコードのバッチ処理時間の差から最小値として選択されます。
オフセットのコミット
オフセットをコミットするためのいくつかのオプションが提供されています。enable.auto.commit
コンシューマープロパティが true
の場合、Kafka はその構成に従ってオフセットを自動コミットします。false
の場合、コンテナーはいくつかの AckMode
設定をサポートします(次のリストで説明)。デフォルトの AckMode
は BATCH
です。バージョン 2.3 以降、フレームワークは、構成で明示的に設定されていない限り、enable.auto.commit
を false
に設定します。以前は、プロパティが設定されていない場合、Kafka のデフォルト(true
)が使用されていました。
コンシューマー poll()
メソッドは、1 つ以上の ConsumerRecords
を返します。MessageListener
はレコードごとに呼び出されます。以下のリストは、各 AckMode
に対してコンテナーによって実行されるアクションを説明しています(トランザクションが使用されていない場合)。
RECORD
: レコードの処理後にリスナーが戻ったときにオフセットをコミットします。BATCH
:poll()
によって返されたすべてのレコードが処理されたら、オフセットをコミットします。TIME
: 最後のコミット以降のackTime
を超えている限り、poll()
によって返されたすべてのレコードが処理されたときにオフセットをコミットします。COUNT
:poll()
によって返されたすべてのレコードが処理されたら、最後のコミット以降にackCount
レコードが受信されている限り、オフセットをコミットします。COUNT_TIME
:TIME
およびCOUNT
に似ていますが、いずれかの条件がtrue
の場合にコミットが実行されます。MANUAL
: メッセージリスナーは、acknowledge()
、Acknowledgment
を担当します。その後、BATCH
と同じセマンティクスが適用されます。MANUAL_IMMEDIATE
:Acknowledgment.acknowledge()
メソッドがリスナーによって呼び出されたら、すぐにオフセットをコミットします。
transactions を使用する場合、オフセットはトランザクションに送信され、セマンティクスはリスナーの型 (レコードまたはバッチ) に応じて RECORD
または BATCH
と同等です。
MANUAL および MANUAL_IMMEDIATE では、リスナーが AcknowledgingMessageListener または BatchAcknowledgingMessageListener である必要があります。メッセージリスナーを参照してください。 |
syncCommits
コンテナーのプロパティに応じて、コンシューマーの commitSync()
または commitAsync()
メソッドが使用されます。syncCommits
はデフォルトで true
です。setSyncCommitTimeout
も参照してください。非同期コミットの結果を取得するには、setCommitCallback
を参照してください。デフォルトのコールバックは、エラー(およびデバッグレベルでの成功)をログに記録する LoggingCommitCallback
です。
リスナーコンテナーにはオフセットをコミットするための独自のメカニズムがあるため、Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
よりも false
が優先されます。バージョン 2.3 以降では、コンシューマーファクトリで特に設定されている場合やコンテナーのコンシューマープロパティがオーバーライドされている場合を除き、無条件に false に設定されます。
Acknowledgment
には次の方法があります。
public interface Acknowledgment {
void acknowledge();
}
このメソッドにより、リスナーはオフセットがコミットされるタイミングを制御できます。
バージョン 2.3 以降、Acknowledgment
インターフェースには 2 つの追加メソッド nack(long sleep)
および nack(int index, long sleep)
があります。1 つ目はレコードリスナーで使用され、2 つ目はバッチリスナーで使用されます。リスナー型に対して間違ったメソッドを呼び出すと、IllegalStateException
がスローされます。
nack() を使用して部分バッチをコミットする場合、トランザクションを使用する場合は、AckMode を MANUAL に設定します。nack() を呼び出すと、正常に処理されたレコードのオフセットがトランザクションに送信されます。 |
nack() は、リスナーを呼び出すコンシューマースレッドでのみ呼び出すことができます。 |
順不同のコミットを使用する場合、nack() は許可されません。 |
レコードリスナーを使用すると、nack()
が呼び出されると、保留中のオフセットがコミットされ、最後のポーリングの残りのレコードが破棄され、パーティションでシークが実行され、失敗したレコードと未処理のレコードが次の poll()
で再配信されます。sleep
引数を設定することにより、再配信の前にコンシューマーを一時停止できます。これは、コンテナーが DefaultErrorHandler
で構成されている場合に例外をスローするのと同様の機能です。
nack() は、指定されたスリープ期間中、割り当てられたすべてのパーティションを含むリスナー全体を一時停止します。 |
バッチリスナーを使用する場合、障害が発生したバッチ内のインデックスを指定できます。nack()
が呼び出されると、インデックスの前にレコードのオフセットがコミットされ、失敗したレコードと破棄されたレコードのパーティションでシークが実行され、次の poll()
で再配信されます。
詳細については、コンテナーエラーハンドラーを参照してください。
スリープ中はコンシューマーが一時停止されるため、コンシューマーを存続させるためにブローカーのポーリングを続行します。実際のスリープ時間とその解決は、デフォルトで 5 秒に設定されているコンテナーの pollTimeout によって異なります。最小スリープ時間は pollTimeout と同じであり、すべてのスリープ時間はその倍数になります。スリープ時間が短い場合、または精度を上げるために、コンテナーの pollTimeout を減らすことを検討してください。 |
バージョン 3.0.10 以降、バッチリスナーは、Acknowledgment
引数で acknowledge(index)
を使用して、バッチの一部のオフセットをコミットできます。このメソッドが呼び出されると、インデックスにあるレコードのオフセット (および以前のすべてのレコード) がコミットされます。部分的なバッチコミットの実行後に acknowledge()
を呼び出すと、バッチの残りのオフセットがコミットされます。次の制限が適用されます。
AckMode.MANUAL_IMMEDIATE
が必要ですメソッドはリスナースレッドで呼び出される必要があります
リスナーは生の
ConsumerRecords
ではなくList
を使用する必要がありますインデックスはリストの要素の範囲内にある必要があります
インデックスは前の呼び出しで使用されたインデックスよりも大きくなければなりません
これらの制限が適用され、メソッドは違反に応じて IllegalArgumentException
または IllegalStateException
をスローします。