バッチの消費

バージョン 3.0 以降、spring.cloud.stream.bindings.<name>.consumer.batch-mode が true に設定されている場合、Kafka Consumer のポーリングによって受信されたすべてのレコードは、List<?> としてリスナーメソッドに提示されます。それ以外の場合、メソッドは一度に 1 つのレコードで呼び出されます。バッチのサイズは、Kafka コンシューマープロパティ max.poll.recordsfetch.min.bytesfetch.max.wait.ms によって制御されます。詳細については、Kafka のドキュメントを参照してください。

バッチを受信するときは、次の型署名が許可されます。

List<Person>
Message<List<Person>>

List<Person> の最初のオプションでは、リスナーはメッセージヘッダーを取得しません。2 番目の型 シグネチャー (Message<List<Person>>) が使用される場合、ヘッダーにアクセスできます。ただし、ヘッダーはすべて Collection の形式のままです。次の例を見てみましょう。

Message に 10 個の Person オブジェクトのリストが含まれているとします。Message の MessageHeaders には、キーがヘッダー名、値がリストであるヘッダーのマップが含まれています。このリストには、ペイロードリストと同じ順序でそのヘッダーのヘッダー値が含まれています。ペイロードリストの反復に基づいて MessageHeaders マップからヘッダーに正しくアクセスするのは、アプリケーションの責任です。

バッチモードで使用する場合、List<Message<Person>> 形式の型署名は許可されないことに注意してください。

バージョン 4.0.2 以降、バインダーは、バッチモードでの消費時に DLQ 機能をサポートします。バッチモードのコンシューマーバインディングで DLQ を使用する場合、前のポーリングから受信したすべてのレコードが DLQ トピックに配信されることに注意してください。

バッチモードを使用する場合、バインダー内での再試行はサポートされないため、maxAttempts は 1 にオーバーライドされます。バインダーで再試行する同様の機能を実現するように DefaultErrorHandler を構成できます(ListenerContainerCustomizer を使用)。手動の AckMode を使用し、Ackowledgment.nack(index, sleep) を呼び出して、部分バッチのオフセットをコミットし、残りのレコードを再配信することもできます。これらの手法の詳細については、Spring for Apache Kafka ドキュメントを参照してください。
バッチモードで KafkaNull オブジェクトを受信すると、受信したリストには対応する KafkaNull オブジェクトの null 要素が含まれます。これは、List<Person> および Message<List<Person>> スタイルの両方の型署名に当てはまります。

バッチモードで消費する場合の可観測性

レコードをバッチで消費する場合、観測トレースの伝播機能は直接サポートされません。これは、Kafka バインダーによって使用される Spring for Apache Kafka ライブラリがバッチリスナーでのトレースをサポートしていないためです。これはレコードリスナーでのみサポートされます。バッチリスナーでは、受信したレコードは複数のトピック / パーティションおよび複数のプロデューサーからのものである可能性があり、トレース情報の追加はオプションでした。バッチ内のレコード間に相関関係がない場合があるため、フレームワークは、単一のトレース ID として提供するなど、それらのトレースについて想定することはできません。Message<List<String>> の型シグネチャーを使用すると、ペイロードと同じ数のエントリを含むリストを含む kafka_batchConvertedHeaders というヘッダーを取得できます。このリストには、トレースヘッダーを含む Map があります。ただし、これを適切に反復処理して観測を開始するのはアプリケーションの責任です。