メッセージリスナコンテナー

2 つの MessageListenerContainer 実装が提供されています。

  • KafkaMessageListenerContainer

  • ConcurrentMessageListenerContainer

KafkaMessageListenerContainer は、単一のスレッド上のすべてのトピックまたはパーティションからすべてのメッセージを受信します。ConcurrentMessageListenerContainer は、1 つ以上の KafkaMessageListenerContainer インスタンスに委譲して、マルチスレッドの消費を提供します。

バージョン 2.2.7 以降、リスナーコンテナーに RecordInterceptor を追加できます。リスナーを呼び出す前に呼び出され、インスペクションまたはレコードの変更が許可されます。インターセプターが null を返す場合、リスナーは呼び出されません。バージョン 2.7 以降、リスナーの終了後に(通常、例外をスローすることによって)呼び出される追加のメソッドがあります。また、バージョン 2.7 以降、BatchInterceptor があり、バッチリスナーに同様の機能を提供します。さらに、ConsumerAwareRecordInterceptor (および BatchInterceptor)は Consumer<?, ?> へのアクセスを提供します。これは、たとえば、インターセプターのコンシューマーメトリクスにアクセスするために使用される場合があります。

これらのインターセプターでコンシューマーの位置やコミットされたオフセットに影響を与えるメソッドを実行しないでください。コンテナーはそのような情報を管理する必要があります。
インターセプターが(新しいレコードを作成することによって)レコードを変更する場合、レコードの損失などの予期しない副作用を回避するために、topicpartitionoffset は同じままである必要があります。

CompositeRecordInterceptor および CompositeBatchInterceptor は、複数のインターセプターを呼び出すために使用できます。

デフォルトでは、バージョン 2.8 以降、トランザクションを使用する場合、トランザクションが開始される前にインターセプターが呼び出されます。代わりに、リスナーコンテナーの interceptBeforeTx プロパティを false に設定して、トランザクションの開始後にインターセプターを呼び出すことができます。バージョン 2.9 以降、これは KafkaAwareTransactionManager だけでなく、すべてのトランザクションマネージャーに適用されます。これにより、たとえば、コンテナーによって開始された JDBC トランザクションにインターセプターが参加できるようになります。

バージョン 2.3.8、2.4.6 以降、同時実行性が 1 より大きい場合、ConcurrentMessageListenerContainer は静的メンバーシップ [Apache] (英語) をサポートするようになりました。group.instance.id の接尾辞は -n で、n は 1 で始まります。これは、session.timeout.ms の増加とともに、たとえばアプリケーションインスタンスが再起動されたときなど、リバランスイベントを減らすために使用できます。

KafkaMessageListenerContainer を使用する

次のコンストラクターを使用できます。

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                    ContainerProperties containerProperties)

ConsumerFactory と、トピックとパーティションに関する情報、およびその他の構成を ContainerProperties オブジェクトで受け取ります。ContainerProperties には、次のコンストラクターがあります。

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

最初のコンストラクターは、TopicPartitionOffset 引数の配列を受け取り、使用するパーティションについてコンテナーに明示的に指示し(コンシューマー assign() メソッドを使用)、オプションの初期オフセットを使用します。正の値は、デフォルトでは絶対オフセットです。負の値は、デフォルトでパーティション内の現在の最後のオフセットを基準にしています。追加の boolean 引数を取る TopicPartitionOffset のコンストラクターが提供されています。これが true の場合、初期オフセット(正または負)は、このコンシューマーの現在の位置を基準にしています。オフセットは、コンテナーの開始時に適用されます。2 つ目はトピックの配列を取り、Kafka は group.id プロパティに基づいてパーティションを割り当てます。グループ全体にパーティションを分散します。3 つ目は、正規表現 Pattern を使用してトピックを選択します。

MessageListener をコンテナーに割り当てるには、コンテナーの作成時に ContainerProps.setMessageListener メソッドを使用できます。次の例は、その方法を示しています。

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

DefaultKafkaConsumerFactory を作成するときに、上記のようにプロパティを取り込むコンストラクターを使用すると、キーと値の Deserializer クラスが構成から取得されることに注意してください。または、Deserializer インスタンスをキーや値の DefaultKafkaConsumerFactory コンストラクターに渡すこともできます。その場合、すべてのコンシューマーが同じインスタンスを共有します。別のオプションは、Consumer ごとに個別の Deserializer インスタンスを取得するために使用される Supplier<Deserializer> (バージョン 2.3 以降)を提供することです。

DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

設定できるさまざまなプロパティの詳細については、ContainerProperties の Javadoc を参照してください。

バージョン 2.1.1 以降、logContainerConfig と呼ばれる新しいプロパティが使用可能になりました。true および INFO ロギングが有効になっている場合、各リスナーコンテナーは、その構成プロパティを要約したログメッセージを書き込みます。

デフォルトでは、トピックオフセットコミットのロギングは DEBUG ロギングレベルで実行されます。バージョン 2.1.2 以降、commitLogLevel と呼ばれる ContainerProperties のプロパティを使用して、これらのメッセージのログレベルを指定できます。例: ログレベルを INFO に変更するには、containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO); を使用できます。

バージョン 2.2 以降、missingTopicsFatal と呼ばれる新しいコンテナープロパティが追加されました(デフォルト: 2.3.4 以降の false)。これにより、構成されたトピックのいずれかがブローカーに存在しない場合にコンテナーが開始されなくなります。コンテナーがトピックパターン(正規表現)をリッスンするように構成されている場合は適用されません。以前は、コンテナースレッドは consumer.poll() メソッド内でループし、多くのメッセージをログに記録している間、トピックが表示されるのを待っていました。ログを除いて、問題があったという兆候はありませんでした。

バージョン 2.8 の時点で、新しいコンテナープロパティ authExceptionRetryInterval が導入されました。これにより、コンテナーは KafkaConsumer から AuthenticationException または AuthorizationException を取得した後、メッセージのフェッチを再試行します。これは、たとえば、構成されたユーザーが特定のトピックを読み取るためのアクセスを拒否された場合、または資格情報が正しくない場合に発生する可能性があります。authExceptionRetryInterval を定義すると、適切な権限が付与されたときにコンテナーをリカバリできます。

デフォルトでは、間隔は構成されていません。認証および認可エラーは致命的と見なされ、コンテナーが停止します。

バージョン 2.8 以降、コンシューマーファクトリを作成するときに、デシリアライザーをオブジェクトとして(コンストラクター内または setter 経由で)提供すると、ファクトリは configure() メソッドを呼び出して、構成プロパティで構成します。

ConcurrentMessageListenerContainer を使用する

単一のコンストラクターは、KafkaListenerContainer コンストラクターに似ています。次のリストは、コンストラクターの署名を示しています。

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
                            ContainerProperties containerProperties)

また、concurrency プロパティもあります。例: container.setConcurrency(3) は 3 つの KafkaMessageListenerContainer インスタンスを作成します。

最初のコンストラクターの場合、Kafka は、グループ管理機能を使用して、コンシューマー全体にパーティションを分散します。

複数のトピックを聞いている場合、デフォルトのパーティション分散は期待したものではない可能性があります。例: それぞれ 5 つのパーティションを持つ 3 つのトピックがあり、concurrency=15 を使用する場合、アクティブなコンシューマーは 5 つだけで、それぞれが各トピックから 1 つのパーティションに割り当てられ、他の 10 のコンシューマーはアイドル状態です。これは、デフォルトの Kafka PartitionAssignor が RangeAssignor であるためです(Javadoc を参照)。このシナリオでは、代わりに RoundRobinAssignor の使用を検討することをお勧めします。これにより、すべてのコンシューマーにパーティションが分散されます。次に、各コンシューマーに 1 つのトピックまたはパーティションが割り当てられます。PartitionAssignor を変更するには、DefaultKafkaConsumerFactory に提供されているプロパティで partition.assignment.strategy コンシューマープロパティ(ConsumerConfigs.PARTITION_ASSIGNMENT_STRATEGY_CONFIG)を設定できます。

Spring Boot を使用する場合、次のように戦略を設定することができます。

spring.kafka.consumer.properties.partition.assignment.strategy=\
org.apache.kafka.clients.consumer.RoundRobinAssignor

コンテナーのプロパティが TopicPartitionOffset で構成されている場合、ConcurrentMessageListenerContainer は TopicPartitionOffset インスタンスをデリゲート KafkaMessageListenerContainer インスタンス全体に分散します。

たとえば、6 つの TopicPartitionOffset インスタンスが提供され、concurrency が 3 である場合。各コンテナーは 2 つのパーティションを取得します。5 つの TopicPartitionOffset インスタンスの場合、2 つのコンテナーが 2 つのパーティションを取得し、3 番目が 1 つのパーティションを取得します。concurrency が TopicPartitions の数より大きい場合、concurrency は、各コンテナーが 1 つのパーティションを取得するように調整されます。

client.id プロパティ(設定されている場合)には -n が追加されます。ここで、n は、同時実行性に対応するコンシューマーインスタンスです。これは、JMX が有効になっているときに MBean に一意の名前を指定するために必要です。

バージョン 1.3 以降、MessageListenerContainer は、基盤となる KafkaConsumer のメトリクスへのアクセスを提供します。ConcurrentMessageListenerContainer の場合、metrics() メソッドはすべてのターゲット KafkaMessageListenerContainer インスタンスのメトリクスを返します。メトリクスは、基礎となる KafkaConsumer に提供される client-id によって Map<MetricName, ? extends Metric> にグループ化されます。

バージョン 2.3 以降、ContainerProperties は idleBetweenPolls オプションを提供し、リスナーコンテナーのメインループを KafkaConsumer.poll() 呼び出し間でスリープさせます。実際のスリープ間隔は、提供されたオプションと max.poll.interval.ms コンシューマー構成と現在のレコードのバッチ処理時間の差から最小値として選択されます。

オフセットのコミット

オフセットをコミットするためのいくつかのオプションが提供されています。enable.auto.commit コンシューマープロパティが true の場合、Kafka はその構成に従ってオフセットを自動コミットします。false の場合、コンテナーはいくつかの AckMode 設定をサポートします(次のリストで説明)。デフォルトの AckMode は BATCH です。バージョン 2.3 以降、フレームワークは、構成で明示的に設定されていない限り、enable.auto.commit を false に設定します。以前は、プロパティが設定されていない場合、Kafka のデフォルト(true)が使用されていました。

コンシューマー poll() メソッドは、1 つ以上の ConsumerRecords を返します。MessageListener はレコードごとに呼び出されます。以下のリストは、各 AckMode に対してコンテナーによって実行されるアクションを説明しています(トランザクションが使用されていない場合)。

  • RECORD: レコードの処理後にリスナーが戻ったときにオフセットをコミットします。

  • BATCHpoll() によって返されたすべてのレコードが処理されたら、オフセットをコミットします。

  • TIME: 最後のコミット以降の ackTime を超えている限り、poll() によって返されたすべてのレコードが処理されたときにオフセットをコミットします。

  • COUNTpoll() によって返されたすべてのレコードが処理されたら、最後のコミット以降に ackCount レコードが受信されている限り、オフセットをコミットします。

  • COUNT_TIMETIME および COUNT に似ていますが、いずれかの条件が true の場合にコミットが実行されます。

  • MANUAL: メッセージリスナーは、acknowledge()Acknowledgment を担当します。その後、BATCH と同じセマンティクスが適用されます。

  • MANUAL_IMMEDIATEAcknowledgment.acknowledge() メソッドがリスナーによって呼び出されたら、すぐにオフセットをコミットします。

transactions を使用する場合、オフセットはトランザクションに送信され、セマンティクスはリスナーの型 (レコードまたはバッチ) に応じて RECORD または BATCH と同等です。

MANUAL および MANUAL_IMMEDIATE では、リスナーが AcknowledgingMessageListener または BatchAcknowledgingMessageListener である必要があります。メッセージリスナーを参照してください。

syncCommits コンテナーのプロパティに応じて、コンシューマーの commitSync() または commitAsync() メソッドが使用されます。syncCommits はデフォルトで true です。setSyncCommitTimeout も参照してください。非同期コミットの結果を取得するには、setCommitCallback を参照してください。デフォルトのコールバックは、エラー(およびデバッグレベルでの成功)をログに記録する LoggingCommitCallback です。

リスナーコンテナーにはオフセットをコミットするための独自のメカニズムがあるため、Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG よりも false が優先されます。バージョン 2.3 以降では、コンシューマーファクトリで特に設定されている場合やコンテナーのコンシューマープロパティがオーバーライドされている場合を除き、無条件に false に設定されます。

Acknowledgment には次の方法があります。

public interface Acknowledgment {

    void acknowledge();

}

このメソッドにより、リスナーはオフセットがコミットされるタイミングを制御できます。

バージョン 2.3 以降、Acknowledgment インターフェースには 2 つの追加メソッド nack(long sleep) および nack(int index, long sleep) があります。1 つ目はレコードリスナーで使用され、2 つ目はバッチリスナーで使用されます。リスナー型に対して間違ったメソッドを呼び出すと、IllegalStateException がスローされます。

nack() を使用して部分バッチをコミットする場合、トランザクションを使用する場合は、AckMode を MANUAL に設定します。nack() を呼び出すと、正常に処理されたレコードのオフセットがトランザクションに送信されます。
nack() は、リスナーを呼び出すコンシューマースレッドでのみ呼び出すことができます。
順不同のコミットを使用する場合、nack() は許可されません。

レコードリスナーを使用すると、nack() が呼び出されると、保留中のオフセットがコミットされ、最後のポーリングの残りのレコードが破棄され、パーティションでシークが実行され、失敗したレコードと未処理のレコードが次の poll() で再配信されます。sleep 引数を設定することにより、再配信の前にコンシューマーを一時停止できます。これは、コンテナーが DefaultErrorHandler で構成されている場合に例外をスローするのと同様の機能です。

nack() は、指定されたスリープ期間中、割り当てられたすべてのパーティションを含むリスナー全体を一時停止します。

バッチリスナーを使用する場合、障害が発生したバッチ内のインデックスを指定できます。nack() が呼び出されると、インデックスの前にレコードのオフセットがコミットされ、失敗したレコードと破棄されたレコードのパーティションでシークが実行され、次の poll() で再配信されます。

詳細については、コンテナーエラーハンドラーを参照してください。

スリープ中はコンシューマーが一時停止されるため、コンシューマーを存続させるためにブローカーのポーリングを続行します。実際のスリープ時間とその解決は、デフォルトで 5 秒に設定されているコンテナーの pollTimeout によって異なります。最小スリープ時間は pollTimeout と同じであり、すべてのスリープ時間はその倍数になります。スリープ時間が短い場合、または精度を上げるために、コンテナーの pollTimeout を減らすことを検討してください。

バージョン 3.0.10 以降、バッチリスナーは、Acknowledgment 引数で acknowledge(index) を使用して、バッチの一部のオフセットをコミットできます。このメソッドが呼び出されると、インデックスにあるレコードのオフセット (および以前のすべてのレコード) がコミットされます。部分的なバッチコミットの実行後に acknowledge() を呼び出すと、バッチの残りのオフセットがコミットされます。次の制限が適用されます。

  • AckMode.MANUAL_IMMEDIATE が必要です

  • メソッドはリスナースレッドで呼び出される必要があります

  • リスナーは生の ConsumerRecords ではなく List を使用する必要があります

  • インデックスはリストの要素の範囲内にある必要があります

  • インデックスは前の呼び出しで使用されたインデックスよりも大きくなければなりません

これらの制限が適用され、メソッドは違反に応じて IllegalArgumentException または IllegalStateException をスローします。

リスナーコンテナーの自動起動

リスナーコンテナーは SmartLifecycle を実装し、autoStartup はデフォルトで true です。コンテナーは後期フェーズ(Integer.MAX-VALUE - 100)で開始されます。リスナーからのデータを処理するために SmartLifecycle を実装する他のコンポーネントは、早い段階で開始する必要があります。- 100 は、後のフェーズの余地を残して、コンテナーの後でコンポーネントを自動起動できるようにします。