バッチの消費
バージョン 3.0 以降、spring.cloud.stream.bindings.<name>.consumer.batch-mode
が true
に設定されている場合、Kafka Consumer
のポーリングによって受信されたすべてのレコードは、List<?>
としてリスナーメソッドに提示されます。それ以外の場合、メソッドは一度に 1 つのレコードで呼び出されます。バッチのサイズは、Kafka コンシューマープロパティ max.poll.records
、fetch.min.bytes
、fetch.max.wait.ms
によって制御されます。詳細については、Kafka のドキュメントを参照してください。
バッチを受信するときは、次の型署名が許可されます。
List<Person>
Message<List<Person>>
List<Person>
の最初のオプションでは、リスナーはメッセージヘッダーを取得しません。2 番目の型 シグネチャー (Message<List<Person>>
) が使用される場合、ヘッダーにアクセスできます。ただし、ヘッダーはすべて Collection
の形式のままです。次の例を見てみましょう。
Message
に 10 個の Person
オブジェクトのリストが含まれているとします。Message
の MessageHeaders
には、キーがヘッダー名、値がリストであるヘッダーのマップが含まれています。このリストには、ペイロードリストと同じ順序でそのヘッダーのヘッダー値が含まれています。ペイロードリストの反復に基づいて MessageHeaders
マップからヘッダーに正しくアクセスするのは、アプリケーションの責任です。
バッチモードで使用する場合、List<Message<Person>>
形式の型署名は許可されないことに注意してください。
バージョン 4.0.2
以降、バインダーは、バッチモードでの消費時に DLQ 機能をサポートします。バッチモードのコンシューマーバインディングで DLQ を使用する場合、前のポーリングから受信したすべてのレコードが DLQ トピックに配信されることに注意してください。
バッチモードを使用する場合、バインダー内での再試行はサポートされないため、maxAttempts は 1 にオーバーライドされます。バインダーで再試行する同様の機能を実現するように DefaultErrorHandler を構成できます(ListenerContainerCustomizer を使用)。手動の AckMode を使用し、Ackowledgment.nack(index, sleep) を呼び出して、部分バッチのオフセットをコミットし、残りのレコードを再配信することもできます。これらの手法の詳細については、Spring for Apache Kafka ドキュメントを参照してください。 |
バッチモードで KafkaNull オブジェクトを受信すると、受信したリストには対応する KafkaNull オブジェクトの null 要素が含まれます。これは、List<Person> および Message<List<Person>> スタイルの両方の型署名に当てはまります。 |
バッチモードで消費する場合の可観測性
レコードをバッチで消費する場合、観測トレースの伝播機能は直接サポートされません。これは、Kafka バインダーによって使用される Spring for Apache Kafka ライブラリがバッチリスナーでのトレースをサポートしていないためです。これはレコードリスナーでのみサポートされます。バッチリスナーでは、受信したレコードは複数のトピック / パーティションおよび複数のプロデューサーからのものである可能性があり、トレース情報の追加はオプションでした。バッチ内のレコード間に相関関係がない場合があるため、フレームワークは、単一のトレース ID として提供するなど、それらのトレースについて想定することはできません。Message<List<String>>
の型シグネチャーを使用すると、ペイロードと同じ数のエントリを含むリストを含む kafka_batchConvertedHeaders
というヘッダーを取得できます。このリストには、トレースヘッダーを含む Map
があります。ただし、これを適切に反復処理して観測を開始するのはアプリケーションの責任です。