変更履歴

3.0 以降の 3.1 の新機能

このセクションでは、バージョン 3.0 からバージョン 3.1 に加えられた変更について説明します。以前のバージョンでの変更については、変更履歴を参照してください。

Kafka クライアントバージョン

このバージョンには、3.6.0 kafka-clients が必要です。

EmbeddedKafkaBroker

Zookeeper の代わりに Kraft を使用する追加の実装が提供されるようになりました。詳細については、"組み込み Kafka ブローカー" を参照してください。

JsonDeserializer

逆直列化例外が発生すると、SerializationException メッセージには Can’t deserialize data [[123, 34, 98, 97, 122, …​ 形式のデータが含まれなくなります。各データバイトの数値の配列は役に立たず、大規模なデータの場合は冗長になる可能性があります。ErrorHandlingDeserializer とともに使用すると、エラーハンドラーに送信される DeserializationException には、逆直列化できなかった生データを含む data プロパティが含まれます。ErrorHandlingDeserializer とともに使用しない場合、KafkaConsumer は同じレコードに対して例外を継続的に発行し、トピック / パーティション / オフセットと Jackson によってスローされた原因を示します。

ContainerPostProcessor

@KafkaListener アノテーションで ContainerPostProcessor の Bean 名を指定することにより、リスナーコンテナーに後処理を適用できます。これは、コンテナーが作成され、コンテナーファクトリで構成された ContainerCustomizer が構成された後に発生します。詳細については、コンテナーファクトリを参照してください。

ErrorHandlingDeserializer

これで、このデシリアライザーに Validator を追加できるようになりました。デリゲート Deserializer がオブジェクトの逆直列化に成功したが、そのオブジェクトの検証に失敗した場合は、逆直列化例外が発生した場合と同様の例外がスローされます。これにより、元の生データをエラーハンドラーに渡すことができます。詳細については、"ErrorHandlingDeserializer の使用 " を参照してください。

再試行可能なトピック

@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC) の場合、サフィックス -retry-5000 を -retry に変更します。サフィックス -retry-5000 を維持したい場合は、@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2") を使用してください。詳細については、"トピックの命名" を参照してください。

リスナーコンテナーの変更

null コンシューマー group.id を使用してパーティションを手動で割り当てる場合、AckMode は自動的に MANUAL に強制されるようになりました。詳細については、"すべてのパーティションを手動で割り当てる" を参照してください。

2.9 以降の 3.0 の新機能

Kafka クライアントバージョン

このバージョンには、3.3.1 kafka-clients が必要です。

正確に一度セマンティクス

EOSMode.V1 (別名 ALPHA) はサポートされなくなりました。

トランザクションを使用する場合、ブローカーの最小バージョンは 2.5 です。

詳細については、正確に一度セマンティクスおよび KIP-447 [Apache] (英語) を参照してください。

観測

タイマーの監視と Micrometer を使用したトレースの有効化がサポートされるようになりました。詳細については、観測を参照してください。

ネイティブイメージ

ネイティブイメージの作成がサポートされます。詳細については、ネイティブイメージを参照してください。

グローバルシングルエンベデッド Kafka

組み込み Kafka (EmbeddedKafkaBroker) は、テスト計画全体の単一のグローバルインスタンスとして開始できるようになりました。詳細については、複数のテストクラスに同じブローカーを使用するを参照してください。

再試行可能なトピックの変更

この機能は (API に関する限り) 実験的とは見なされなくなりました。機能自体は 2.7 以降サポートされていますが、API の変更が中断される可能性が通常よりも大きくなっています。

ノンブロッキング再試行インフラストラクチャ Bean のブートストラップは、アプリケーションの初期化に関して一部のアプリケーションで発生したタイミングの問題を回避するために、このリリースで変更されました。

再試行コンテナーに別の concurrency を設定できるようになりました。デフォルトでは、並行性はメインコンテナーと同じです。

@RetryableTopic は、@AliasFor プロパティのサポートを含め、カスタムアノテーションのメタアノテーションとして使用できるようになりました。

詳細については、構成を参照してください。

再試行トピックのデフォルトのレプリケーション係数が -1 (ブローカーのデフォルトを使用) になりました。ブローカーがそのバージョン 2.4 より前の場合、プロパティを明示的に設定する必要があります。

同じアプリケーションコンテキスト内の同じトピックで複数の @RetryableTopic リスナーを構成できるようになりました。以前は、これは不可能でした。詳細については、複数のリスナー、同じトピックを参照してください。

RetryTopicConfigurationSupport には破壊的な API の変更があります。具体的には、destinationTopicResolverkafkaConsumerBackoffManager、/ または retryTopicConfigurer の Bean 定義メソッドをオーバーライドする場合。これらのメソッドには、ObjectProvider<RetryTopicComponentFactory> パラメーターが必要になりました。

リスナーコンテナーの変更

コンシューマーの認証と認可の失敗に関連するイベントが、コンテナーによって発行されるようになりました。詳細については、アプリケーションイベントを参照してください。

コンシューマースレッドで使用されるスレッド名をカスタマイズできるようになりました。詳細については、コンテナースレッドの命名を参照してください。

コンテナープロパティ restartAfterAuthException が追加されました。詳細については、リスナーコンテナーのプロパティを参照してください。

KafkaTemplate の変更

このクラスによって返される先物は、ListenableFuture ではなく CompletableFuture になりました。KafkaTemplate の使用を参照してください。

ReplyingKafkaTemplate の変更

このクラスによって返される先物は、ListenableFuture ではなく CompletableFuture になりました。ReplyingKafkaTemplate の使用および Message<?> でのリクエスト / リプライを参照してください。

@KafkaListener の変更

返信メッセージでエコーされるカスタム相関ヘッダーを使用できるようになりました。詳細については、ReplyingKafkaTemplate を使用するの最後にある注を参照してください。

バッチ全体が処理される前に、バッチの一部を手動でコミットできるようになりました。詳細については、"オフセットのコミット" を参照してください。

KafkaHeaders の変更

2.9.x で非推奨となった KafkaHeaders の 4 つの定数が削除されました。

  • MESSAGE_KEY の代わりに KEY を使用します。

  • PARTITION_ID の代わりに PARTITION を使用してください

同様に、RECEIVED_MESSAGE_KEY は RECEIVED_KEY に置き換えられ、RECEIVED_PARTITION_ID は RECEIVED_PARTITION に置き換えられます。

変更のテスト

バージョン 3.0.7 では、MockConsumerFactory および MockProducerFactory が導入されました。詳細については、"コンシューマーとプロデューサーのモックアップ" を参照してください。

バージョン 3.0.10 以降、組み込み Kafka ブローカーは、デフォルトで、Spring Boot プロパティ spring.kafka.bootstrap-servers を組み込みブローカーのアドレスに設定します。

2.8 以降の 2.9 の新機能

Kafka クライアントバージョン

このバージョンには、3.2.0 kafka-clients が必要です。

エラーハンドラーの変更

DefaultErrorHandler は、残りのレコードのオフセットをシークする代わりに、コンテナーを 1 つのポーリングで一時停止し、前回のポーリングからの残りの結果を使用するように構成できるようになりました。詳細については、DefaultErrorHandler を参照してください。

DefaultErrorHandler に BackOffHandler プロパティが追加されました。詳細については、バックオフハンドラーを参照してください。

リスナーコンテナーの変更

interceptBeforeTx は、すべてのトランザクションマネージャーで動作するようになりました (以前は、KafkaAwareTransactionManager が使用されている場合にのみ適用されていました)。[interceptBeforeTx] を参照してください。

以前のポーリングからのすべてのレコードが処理された後ではなく、現在のレコードが処理された後にコンテナーがコンシューマーを一時停止できるようにする、新しいコンテナープロパティ pauseImmediate が提供されます。[ 一時停止 ] を参照してください。

コンシューマーの認証と認可に関連するイベント

ヘッダーマッパーの変更点

どの受信 ヘッダーをマップするかを構成できるようになりました。2.8.8 以降のバージョンでも利用できます。詳細については、メッセージヘッダーを参照してください。

KafkaTemplate の変更

3.0 では、このクラスによって返される先物は ListenableFuture ではなく CompletableFuture になります。このリリースを使用する場合の移行の支援については、KafkaTemplate の使用を参照してください。

ReplyingKafkaTemplate の変更

テンプレートは、応答コンテナーが初期化される前にリクエストを送信する際の競合を回避するために、応答コンテナーでの割り当てを待機するメソッドを提供するようになりました。2.8.8 以降のバージョンでも利用できます。ReplyingKafkaTemplate を使用するを参照してください。

3.0 では、このクラスによって返される先物は ListenableFuture ではなく CompletableFuture になります。このリリースを使用する場合の移行の支援については、ReplyingKafkaTemplate の使用および Message<?> でのリクエスト / リプライを参照してください。

2.7 以降の 2.8 の新機能

このセクションでは、バージョン 2.7 からバージョン 2.8 に加えられた変更について説明します。以前のバージョンでの変更については、変更履歴を参照してください。

Kafka クライアントバージョン

このバージョンには 3.0.0 kafka-clients が必要です

パッケージの変更

型マッピングに関連するクラスとインターフェースは、…​support.converter から …​support.mapping に移動されました。

  • AbstractJavaTypeMapper

  • ClassMapper

  • DefaultJackson2JavaTypeMapper

  • Jackson2JavaTypeMapper

故障した手動コミット

リスナーコンテナーは、手動のオフセットコミットを順不同で(通常は非同期に)受け入れるように構成できるようになりました。コンテナーは、欠落しているオフセットが確認されるまでコミットを延期します。詳細については、手動でオフセットをコミットするを参照してください。

@KafkaListener の変更

リスナーメソッドがメソッド自体のバッチリスナーであるかどうかを指定できるようになりました。これにより、レコードリスナーとバッチリスナーの両方に同じコンテナーファクトリを使用できます。

詳細については、[ バッチリスナー ] を参照してください。

バッチリスナーは、変換例外を処理できるようになりました。

詳細については、バッチエラーハンドラーによる変換エラーを参照してください。

RecordFilterStrategy をバッチリスナーで使用すると、1 回の呼び出しでバッチ全体をフィルタリングできるようになりました。詳細については、[ バッチリスナー ] の最後にある注記を参照してください。

@KafkaListener アノテーションに filter 属性が追加され、このリスナーのみのコンテナーファクトリの RecordFilterStrategy をオーバーライドできるようになりました。

@KafkaListener アノテーションに info 属性が追加されました。これは、新しいリスナーコンテナープロパティ listenerInfo にデータを入力するために使用されます。次に、これを使用して、RecordInterceptorRecordFilterStrategy、リスナー自体で使用できる各レコードの KafkaHeaders.LISTENER_INFO ヘッダーにデータを入力します。詳細については、リスナー情報ヘッダーおよび抽象リスナーコンテナーのプロパティを参照してください。

KafkaTemplate の変更

これで、トピック、パーティション、オフセットを指定して、単一のレコードを受け取ることができます。詳細については、KafkaTemplate を使用した受信を参照してください。

CommonErrorHandler が追加されました

バッチリスナーを記録するためのレガシー GenericErrorHandler とそのサブインターフェース階層は、GenericErrorHandler のほとんどのレガシー実装に対応する実装を持つ新しいシングルインターフェース CommonErrorHandler に置き換えられました。詳細については、コンテナーエラーハンドラーおよびカスタムレガシーエラーハンドラーの実装を CommonErrorHandler に移行するを参照してください。

リスナーコンテナーの変更

interceptBeforeTx コンテナープロパティは、デフォルトで true になりました。

authorizationExceptionRetryInterval プロパティは authExceptionRetryInterval に名前が変更され、以前の AuthorizationException に加えて AuthenticationException にも適用されるようになりました。このプロパティが設定されていない限り、両方の例外は致命的と見なされ、コンテナーはデフォルトで停止します。

詳細については、KafkaMessageListenerContainer を使用するおよびリスナーコンテナーのプロパティを参照してください。

シリアライザー / デシリアライザーの変更

DelegatingByTopicSerializer と DelegatingByTopicDeserializer が提供されるようになりました。詳細については、シリアライザーとデシリアライザーの委譲を参照してください。

DeadLetterPublishingRecover の変更

プロパティ stripPreviousExceptionHeaders は、デフォルトで true になりました。

現在、出力レコードに追加されるヘッダーをカスタマイズするためのいくつかの手法があります。

詳細については、デッドレターレコードヘッダーの管理を参照してください。

再試行可能なトピックの変更

これで、再試行可能なトピックと再試行不可能なトピックに同じファクトリを使用できます。詳細については、ListenerContainerFactory の指定を参照してください。

失敗したレコードを DLT に直接送信する、致命的な例外の管理可能なグローバルリストが追加されました。管理方法については、例外分類子を参照してください。

ブロッキングとノンブロッキングの再試行を組み合わせて使用できるようになりました。詳細については、ブロッキングとノンブロッキングの再試行の組み合わせを参照してください。

再試行可能なトピック機能を使用するときにスローされる KafkaBackOffException は、DEBUG レベルでログに記録されるようになりました。ロギングレベルを WARN に戻すか、他のレベルに設定する必要がある場合は、KafkaBackOffException ログレベルの変更を参照してください。

2.6 および 2.7 間の変更

Kafka クライアントバージョン

このバージョンには、2.7.0 kafka-clients が必要です。バージョン 2.7.1 以降、2.8.0 クライアントとも互換性があります。Spring Boot の依存関係を上書きを参照してください。

トピックを使用したノンブロッキング遅延再試行

この重要な新機能は、このリリースで追加されています。厳密な順序付けが重要でない場合、失敗した配信を別のトピックに送信して、後で使用することができます。このような一連の再試行トピックは、遅延を増やしながら構成できます。詳細については、ノンブロッキング再試行を参照してください。

リスナーコンテナーの変更

onlyLogRecordMetadata コンテナープロパティは、デフォルトで true になりました。

新しいコンテナープロパティ stopImmediate が利用可能になりました。

詳細については、リスナーコンテナーのプロパティを参照してください。

配信試行の間に BackOff を使用するエラーハンドラー (例: SeekToCurrentErrorHandler と DefaultAfterRollbackProcessor) は、停止を遅らせるのではなく、コンテナーの停止後すぐにバックオフ間隔を終了します。

FailedRecordProcessor を継承するエラーハンドラーおよびアフターロールバックプロセッサーを 1 つ以上の RetryListener で構成して、再試行およびリカバリの進行状況に関する情報を受け取ることができるようになりました。

RecordInterceptor には、リスナーが戻った後に(通常、または例外をスローすることによって)呼び出される追加のメソッドが含まれるようになりました。また、サブインターフェース ConsumerAwareRecordInterceptor もあります。さらに、バッチリスナー用の BatchInterceptor が追加されました。詳細については、メッセージリスナコンテナーを参照してください。

@KafkaListener の変更

@KafkaHandler メソッド(クラスレベルのリスナー)のペイロードパラメーターを検証できるようになりました。詳細については、@KafkaListener@Payload 検証を参照してください。

これで、MessagingMessageConverter および BatchMessagingMessageConverter に rawRecordHeader プロパティを設定できます。これにより、変換された Message<?> に生の ConsumerRecord が追加されます。これは、たとえば、リスナーエラーハンドラーで DeadLetterPublishingRecoverer を使用する場合に役立ちます。詳細については、リスナーエラーハンドラーを参照してください。

アプリケーションの初期化中に @KafkaListener アノテーションを変更できるようになりました。詳細については、@KafkaListener 属性の変更を参照してください。

DeadLetterPublishingRecover の変更

これで、キーと値の両方が逆直列化に失敗した場合、元の値が DLT に公開されます。以前は、値が入力されていましたが、キー DeserializationException はヘッダーに残っていました。リカバリ装置をサブクラス化し、createProducerRecord メソッドをオーバーライドした場合、API に重大な変更があります。

さらに、リカバリ機能は、宛先リゾルバーによって選択されたパーティションが実際に存在することを確認してから公開します。

詳細については、デッドレターレコードの公開を参照してください。

ChainedKafkaTransactionManager は非推奨

詳細については、トランザクションを参照してください。

ReplyingKafkaTemplate の変更

現在、何らかの条件が存在する場合に、応答を調べて将来を例外的に失敗させるメカニズムがあります。

spring-messaging の送受信のサポート Message<?> が追加されました。

詳細については、ReplyingKafkaTemplate を使用するを参照してください。

Kafka ストリームの変更

デフォルトでは、StreamsBuilderFactoryBean はローカル状態をクリーンアップしないように構成されています。詳細については、構成を参照してください。

KafkaAdmin の変更

新しいメソッド createOrModifyTopics および describeTopics が追加されました。KafkaAdmin.NewTopics が追加され、単一の Bean で複数のトピックを簡単に構成できるようになりました。詳細については、[ トピックの構成 ] を参照してください。

MessageConverter の変更

spring-messagingSmartMessageConverter を MessagingMessageConverter に追加できるようになり、contentType ヘッダーに基づいたコンテンツネゴシエーションが可能になりました。詳細については、Spring メッセージングメッセージ変換を参照してください。

@KafkaListener のシーケンス

詳細については、@KafkaListener を順番に開始するを参照してください。

ExponentialBackOffWithMaxRetries

新しい BackOff 実装が提供され、最大再試行回数の構成がより便利になります。詳細については、ExponentialBackOffWithMaxRetries の実装を参照してください。

条件付き委譲エラーハンドラー

これらの新しいエラーハンドラーは、例外の種類に応じて、さまざまなエラーハンドラーに委譲するように構成できます。詳細については、エラーハンドラーの委譲を参照してください。

2.5 および 2.6 間の変更

Kafka クライアントバージョン

このバージョンには、2.6.0 kafka-clients が必要です。

リスナーコンテナーの変更

デフォルトの EOSMode は BETA になりました。詳細については、正確に一度セマンティクスを参照してください。

さまざまなエラーハンドラー ( FailedRecordProcessor を継承する) と DefaultAfterRollbackProcessor は、回復が失敗した場合に BackOff をリセットするようになりました。さらに、失敗したレコードや例外に基づいて、使用する BackOff を選択できるようになりました。

コンテナーのプロパティで adviceChain を設定できるようになりました。詳細については、リスナーコンテナーのプロパティを参照してください。

コンテナーが ListenerContainerIdleEvent を公開するように構成されている場合、アイドルイベントの公開後にレコードを受信すると、ListenerContainerNoLongerIdleEvent を公開するようになりました。詳細については、アプリケーションイベントおよびアイドル状態のコンシューマーと無反応なコンシューマーの検出を参照してください。

@KafkaListener の変更

手動のパーティション割り当てを使用する場合、どのパーティションを初期オフセットにリセットするかを決定するためのワイルドカードを指定できるようになりました。さらに、リスナーが ConsumerSeekAware を実装している場合、onPartitionsAssigned() は手動割り当ての後に呼び出されます。(バージョン 2.5.5 でも追加されました)。詳細については、明示的なパーティション割り当てを参照してください。

AbstractConsumerSeekAware に便利なメソッドが追加され、検索が簡単になりました。詳細については、[ 求める ] を参照してください。

ErrorHandler の変更

FailedRecordProcessor のサブクラス (例: SeekToCurrentErrorHandlerDefaultAfterRollbackProcessorRecoveringBatchErrorHandler) は、例外がこのレコードで以前に発生したものとは異なる型である場合、再試行状態をリセットするように構成できるようになりました。

プロデューサーファクトリの変更

プロデューサーの最大年齢を設定できるようになりました。その後、プロデューサーは閉じられて再作成されます。詳細については、トランザクションを参照してください。

DefaultKafkaProducerFactory の作成後に構成マップを更新できるようになりました。これは、たとえば、資格情報が変更された後に SSL キー / トラストストアの場所を更新する必要がある場合に役立ちます。詳細については、DefaultKafkaProducerFactory を使用するを参照してください。

2.4 および 2.5 間の変更

このセクションでは、バージョン 2.4 からバージョン 2.5 に加えられた変更について説明します。以前のバージョンでの変更については、変更履歴を参照してください。

コンシューマー / プロデューサーファクトリの変更

デフォルトのコンシューマーおよびプロデューサーファクトリは、コンシューマーまたはプロデューサーが作成またはクローズされるたびにコールバックを呼び出すことができるようになりました。ネイティブ Micrometer メトリクスの実装が提供されます。詳細については、ファクトリリスナーを参照してください。

実行時にブートストラップサーバーのプロパティを変更できるようになり、別の Kafka クラスターへのフェイルオーバーが可能になりました。詳細については、Kafka への接続を参照してください。

StreamsBuilderFactoryBean の変更

ファクトリ Bean は、KafkaStreams が作成または破棄されるたびにコールバックを呼び出すことができるようになりました。ネイティブ Micrometer メトリクスの実装が提供されます。詳細については、KafkaStreams Micrometer サポートを参照してください。

Kafka クライアントバージョン

このバージョンには、2.5.0 kafka-clients が必要です。

クラス / パッケージの変更

SeekUtils が o.s.k.support パッケージから o.s.k.listener に移動しました。

配信試行ヘッダー

特定のエラーハンドラーの使用時およびロールバックプロセッサー後の配信試行を追跡するヘッダーを追加するオプションが追加されました。詳細については、配信試行ヘッダーを参照してください。

@KafkaListener の変更

@KafkaListener の戻り値の型が Message<?> の場合、必要に応じてデフォルトの応答ヘッダーが自動的に入力されるようになりました。詳細については、返信型 Message<?> を参照してください。

受信レコードに null キーがある場合、KafkaHeaders.RECEIVED_MESSAGE_KEY には null 値が設定されなくなりました。ヘッダーは完全に省略されます。

@KafkaListener メソッドは、トピック、パーティションなどのメタデータに個別のヘッダーを使用する代わりに、ConsumerRecordMetadata パラメーターを指定できるようになりました。詳細については、コンシューマーレコードのメタデータを参照してください。

リスナーコンテナーの変更

assignmentCommitOption コンテナープロパティは、デフォルトで LATEST_ONLY_NO_TX になりました。詳細については、リスナーコンテナーのプロパティを参照してください。

トランザクションを使用する場合、subBatchPerPartition コンテナープロパティはデフォルトで true になりました。詳細については、トランザクションを参照してください。

新しい RecoveringBatchErrorHandler が提供されるようになりました。

静的グループメンバーシップがサポートされるようになりました。詳細については、メッセージリスナコンテナーを参照してください。

増分 / 協調リバランスが構成されている場合、オフセットが致命的でない RebalanceInProgressException でコミットに失敗した場合、コンテナーはリバランスが完了した後、このインスタンスに割り当てられたままのパーティションのオフセットを再コミットしようとします。

デフォルトのエラーハンドラーは、レコードリスナーの場合は SeekToCurrentErrorHandler、バッチリスナーの場合は RecoveringBatchErrorHandler になりました。詳細については、コンテナーエラーハンドラーを参照してください。

標準エラーハンドラーによって意図的にスローされた例外がログに記録されるレベルを制御できるようになりました。詳細については、コンテナーエラーハンドラーを参照してください。

getAssignmentsByClientId() メソッドが追加され、同時コンテナー内のどのコンシューマーにどのパーティションが割り当てられているかを簡単に判断できるようになりました。詳細については、リスナーコンテナーのプロパティを参照してください。

エラー、デバッグログなどの ConsumerRecord 全体のログ記録を抑制できるようになりました。リスナーコンテナーのプロパティの onlyLogRecordMetadata を参照してください。

KafkaTemplate の変更

KafkaTemplate は micrometer タイマーを維持できるようになりました。詳細については、モニターを参照してください。

KafkaTemplate を ProducerConfig プロパティで構成して、プロデューサーファクトリのプロパティをオーバーライドできるようになりました。詳細については、KafkaTemplate の使用を参照してください。

RoutingKafkaTemplate が提供されるようになりました。詳細については、RoutingKafkaTemplate を使用するを参照してください。

ListenerFutureCallback の代わりに KafkaSendCallback を使用してより狭い例外を取得できるようになり、失敗した ProducerRecord を抽出しやすくなりました。詳細については、KafkaTemplate の使用を参照してください。

Kafka 文字列シリアライザー / デシリアライザー

新しい ToStringSerializer/StringDeserializer および関連する SerDe が提供されるようになりました。詳細については、文字列の直列化を参照してください。

JsonDeserializer

JsonDeserializer は、デシリアライゼーション型を決定するための柔軟性が向上しました。詳細については、メソッドを使用して型を決定するを参照してください。

Serializer/Deserializer の委譲

送信 レコードにヘッダーがない場合、DelegatingSerializer は「標準」型を処理できるようになりました。詳細については、シリアライザーとデシリアライザーの委譲を参照してください。

変更のテスト

KafkaTestUtils.consumerProps() ヘルパーレコードは、デフォルトで ConsumerConfig.AUTO_OFFSET_RESET_CONFIG を earliest に設定するようになりました。詳細については、JUnit を参照してください。

2.3 および 2.4 間の変更

Kafka クライアントバージョン

このバージョンには 2.4.0 kafka-clients 以降が必要であり、新しい増分リバランス機能をサポートしています。

ConsumerAwareRebalanceListener

ConsumerRebalanceListener と同様に、このインターフェースには onPartitionsLost メソッドが追加されました。詳細については、Apache Kafka のドキュメントを参照してください。

ConsumerRebalanceListener とは異なり、デフォルトの実装では onPartitionsRevoked を呼び出しません。代わりに、リスナーコンテナーは onPartitionsLost を呼び出した後にそのメソッドを呼び出します。ConsumerAwareRebalanceListener を実装するときに同じことをすべきではありません。

詳細については、リスナーのリバランスの最後にある重要な注記を参照してください。

GenericErrorHandler

isAckAfterHandle() のデフォルトの実装は、デフォルトで true を返すようになりました。

KafkaTemplate

KafkaTemplate は、トランザクションと共に非トランザクションパブリッシングをサポートするようになりました。詳細については、KafkaTemplate トランザクションおよび非トランザクションパブリッシングを参照してください。

AggregatingReplyingKafkaTemplate

releaseStrategy が BiConsumer になりました。これは、タイムアウト後に (およびレコードが到着したときに) 呼び出されるようになりました。タイムアウト後の呼び出しの場合、2 番目のパラメーターは true です。

詳細については、複数の返信を集約するを参照してください。

リスナーコンテナー

ContainerProperties は、AuthorizationException が KafkaConsumer によってスローされた後にリスナーコンテナーが再試行できるようにする authorizationExceptionRetryInterval オプションを提供します。詳細については、JavaDocs および KafkaMessageListenerContainer を使用するを参照してください。

@KafkaListener

@KafkaListener アノテーションには、新しいプロパティ splitIterables があります。デフォルトは真。応答リスナーが Iterable を返す場合、このプロパティは、返される結果を単一のレコードとして送信するか、各要素のレコードとして送信するかを制御します。詳細については、@SendTo を使用したリスナー結果の転送を参照してください。

バッチリスナーを BatchToRecordAdapter で構成できるようになりました。これにより、たとえば、リスナーが一度に 1 つのレコードを取得しながら、トランザクションでバッチを処理できます。デフォルトの実装では、バッチ全体の処理を停止することなく、ConsumerRecordRecoverer を使用してバッチ内のエラーを処理できます。これは、トランザクションを使用する場合に便利です。詳細については、バッチリスナーとのトランザクションを参照してください。

Kafka ストリーム

StreamsBuilderFactoryBean は、新しいプロパティ KafkaStreamsInfrastructureCustomizer を受け入れます。これにより、ストリームが作成される前にビルダーおよび / またはトポロジーを構成できます。詳細については、Spring 管理を参照してください。

2.2 および 2.3 間の変更

このセクションでは、バージョン 2.2 からバージョン 2.3 に加えられた変更について説明します。

ヒント、コツ、例

新しい章ヒント、コツ、例が追加されました。GitHub の課題および / またはその章の追加エントリのプルリクエストを送信してください。

Kafka クライアントバージョン

このバージョンには 2.3.0 kafka-clients 以降が必要です。

クラス / パッケージの変更

TopicPartitionInitialOffset は TopicPartitionOffset を推奨して非推奨になりました。

構成変更

バージョン 2.3.4 以降、missingTopicsFatal コンテナープロパティはデフォルトで false です。これが true の場合、ブローカーがダウンしている場合、アプリケーションは起動に失敗します。多くのユーザーがこの変更の影響を受けました。Kafka が高可用性プラットフォームであることを考えると、アクティブなブローカーなしでアプリケーションを開始することが一般的な使用例になるとは予想していませんでした。

プロデューサーとコンシューマーファクトリの変更

DefaultKafkaProducerFactory は、スレッドごとにプロデューサーを作成するように設定できるようになりました。構成されたクラス (引数なしのコンストラクターが必要) または Serializer インスタンスを使用して構築する代わりに、コンストラクターに Supplier<Serializer> インスタンスを提供することもできます。詳細については、DefaultKafkaProducerFactory の使用を参照してください。

DefaultKafkaConsumerFactory の Supplier<Deserializer> インスタンスでも同じオプションを使用できます。詳細については、KafkaMessageListenerContainer の使用を参照してください。

リスナーコンテナーの変更

以前は、リスナーがリスナーアダプター ( @KafkaListener など) を使用して呼び出されたときに、エラーハンドラーは ListenerExecutionFailedException (実際のリスナー例外は cause) を受け取りました。ネイティブ GenericMessageListener によってスローされた例外は、変更されずにエラーハンドラーに渡されました。ListenerExecutionFailedException は常に引数であり (実際のリスナー例外は cause として)、コンテナーの group.id プロパティへのアクセスを提供します。

リスナーコンテナーにはオフセットをコミットするための独自のメカニズムがあるため、Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG が false であることが優先されます。コンシューマーファクトリまたはコンテナーのコンシューマープロパティオーバーライドで特に設定されていない限り、自動的に false に設定されるようになりました。

ackOnError プロパティは、デフォルトで false になりました。

リスナーメソッドでコンシューマーの group.id プロパティを取得できるようになりました。詳細については、コンシューマー group.id の取得を参照してください。

コンテナーには新しいプロパティ recordInterceptor があり、リスナーを呼び出す前にレコードをインスペクションまたは変更できます。複数のインターセプターを呼び出す必要がある場合に備えて、CompositeRecordInterceptor も提供されます。詳細については、メッセージリスナコンテナーを参照してください。

ConsumerSeekAware には、開始位置、終了位置、現在の位置を基準にしてシークを実行し、タイムスタンプ以上の最初のオフセットをシークできる新しいメソッドがあります。詳細については、[ 求める ] を参照してください。

シークを簡素化するために、便利なクラス AbstractConsumerSeekAware が提供されるようになりました。詳細については、[ 求める ] を参照してください。

ContainerProperties は、idleBetweenPolls オプションを提供して、リスナーコンテナーのメインループを KafkaConsumer.poll() 呼び出し間でスリープさせます。詳細については、JavaDocs および KafkaMessageListenerContainer の使用を参照してください。

AckMode.MANUAL (または MANUAL_IMMEDIATE) を使用する場合、Acknowledgment で nack を呼び出すことで再配信を行うことができるようになりました。詳細については、オフセットのコミットを参照してください。

リスナーのパフォーマンスは、Micrometer Timer を使用して監視できるようになりました。詳細については、モニターを参照してください。

コンテナーは、起動に関連する追加のコンシューマーライフサイクルイベントを公開するようになりました。詳細については、アプリケーションイベントを参照してください。

トランザクションバッチリスナーがゾンビフェンシングをサポートできるようになりました。詳細については、トランザクションを参照してください。

リスナーコンテナーファクトリは、作成および設定後に各コンテナーをさらに設定するために ContainerCustomizer を使用して設定できるようになりました。詳細については、コンテナーファクトリを参照してください。

ErrorHandler の変更

SeekToCurrentErrorHandler は現在、特定の例外を致命的なものとして扱い、それらの再試行を無効にして、最初の失敗時に回復プログラムを呼び出します。

SeekToCurrentErrorHandler および SeekToCurrentBatchErrorHandler は、配信試行の間に BackOff (スレッドスリープ) を適用するように設定できるようになりました。

バージョン 2.3.2 以降、失敗したレコードの回復後にエラーハンドラーが戻るときに、回復されたレコードのオフセットがコミットされます。

DeadLetterPublishingRecoverer は、ErrorHandlingDeserializer と組み合わせて使用されると、デッドレタートピックに送信されたメッセージのペイロードを、デシリアライズできなかった元の値に設定するようになりました。以前は、null であり、メッセージヘッダーから DeserializationException を抽出するために必要なユーザーコードでした。詳細については、デッドレターレコードの公開を参照してください。

TopicBuilder

自動トピックプロビジョニング用の NewTopic@Bean をより便利に作成するために、新しいクラス TopicBuilder が提供されています。詳細については、[ トピックの構成 ] を参照してください。

Kafka ストリームの変更

@EnableKafkaStreams によって作成された StreamsBuilderFactoryBean の追加構成を実行できるようになりました。詳細については、ストリーム構成を参照してください。

逆直列化エラーのあるレコードを回復できる RecoveringDeserializationExceptionHandler が提供されるようになりました。DeadLetterPublishingRecoverer と組み合わせて使用して、これらのレコードをデッドレタートピックに送信できます。詳細については、デシリアライズ例外からの回復を参照してください。

SpEL を使用してヘッダー値を生成する HeaderEnricher トランスフォーマーが提供されています。詳細については、ヘッダーエンリッチャーを参照してください。

MessagingTransformer が提供されました。これにより、Kafka ストリームトポロジが Spring Integration フローなどの spring-messaging コンポーネントと対話できるようになります。詳細については、MessagingProcessor および【 KStream から Spring Integration フローを呼び出すを参照してください。

JSON コンポーネントの変更

現在、すべての JSON 対応コンポーネントは、デフォルトで JacksonUtils.enhancedObjectMapper() によって生成された Jackson ObjectMapper で構成されています。JsonDeserializer は、ターゲットジェネリクスコンテナー型の処理を改善するために、TypeReference ベースのコンストラクターを提供するようになりました。また、org.springframework.util.MimeType をプレーン文字列にシリアライズするために JacksonMimeTypeModule が導入されました。詳細については、JavaDocs および直列化、逆直列化、メッセージ変換を参照してください。

ByteArrayJsonMessageConverter と、すべての Json コンバーター用の新しいスーパークラス JsonMessageConverter が提供されています。また、StringOrBytesSerializer が利用可能になりました。ProducerRecord で byte[]BytesString 値を直列化できます。詳細については、Spring メッセージングメッセージ変換を参照してください。

JsonSerializerJsonDeserializerJsonSerde には、プログラムによる構成をより簡単にするための流れるような API が含まれるようになりました。詳細については、javadoc、直列化、逆直列化、メッセージ変換JSON の直列化と逆直列化をストリーミングしますを参照してください。

ReplyingKafkaTemplate

応答がタイムアウトすると、フューチャーは例外的に KafkaException ではなく KafkaReplyTimeoutException で完了します。

また、オーバーロードされた sendAndReceive メソッドが提供され、メッセージごとに応答タイムアウトを指定できるようになりました。

AggregatingReplyingKafkaTemplate

複数の受信者からの応答を集約することにより、ReplyingKafkaTemplate を拡張します。詳細については、複数の返信を集約するを参照してください。

トランザクションの変更

KafkaTemplate および KafkaTransactionManager でプロデューサーファクトリの transactionIdPrefix をオーバーライドできるようになりました。詳細については、transactionIdPrefix を参照してください。

新しい委譲シリアライザー / デシリアライザー

フレームワークは、ヘッダーを使用して複数のキー / 値型でレコードを生成および使用できるようにする、委譲 Serializer および Deserializer を提供するようになりました。詳細については、シリアライザーとデシリアライザーの委譲を参照してください。

新しい再試行デシリアライザー

フレームワークは、ネットワークの問題などの一時的なエラーが発生した場合に直列化を再試行するために、委譲 RetryingDeserializer を提供するようになりました。詳細については、デシリアライザーの再試行を参照してください。

2.1 および 2.2 間の変更

Kafka クライアントバージョン

このバージョンには 2.0.0 kafka-clients 以降が必要です。

クラスとパッケージの変更

ContainerProperties クラスは org.springframework.kafka.listener.config から org.springframework.kafka.listener に移動しました。

AckMode 列挙型が AbstractMessageListenerContainer から ContainerProperties に移動されました。

setBatchErrorHandler() および setErrorHandler() メソッドは、ContainerProperties から AbstractMessageListenerContainer および AbstractKafkaListenerContainerFactory の両方に移動されました。

ロールバック処理後

新しい AfterRollbackProcessor 戦略が提供されます。詳細については、ロールバック後のプロセッサーを参照してください。

ConcurrentKafkaListenerContainerFactory の変更

ConcurrentKafkaListenerContainerFactory を使用して、@KafkaListener アノテーションだけでなく、任意の ConcurrentMessageListenerContainer を作成および構成できるようになりました。詳細については、コンテナーファクトリを参照してください。

リスナーコンテナーの変更

新しいコンテナープロパティ (missingTopicsFatal) が追加されました。詳細については、KafkaMessageListenerContainer を使用するを参照してください。

コンシューマーが停止したときに ConsumerStoppedEvent が発行されるようになりました。詳細については、スレッドセーフを参照してください。

バッチリスナーは、オプションで List<ConsumerRecord<?, ?> の代わりに完全な ConsumerRecords<?, ?> オブジェクトを受け取ることができます。詳細については、[ バッチリスナー ] を参照してください。

DefaultAfterRollbackProcessor および SeekToCurrentErrorHandler は、失敗し続けるレコードを回復 (スキップ) できるようになりました。デフォルトでは、10 回の失敗後に回復します。失敗したレコードを配信不能トピックに公開するように構成できます。

バージョン 2.2.4 以降、デッドレターのトピック名を選択する際に、コンシューマーのグループ ID を使用できるようになりました。

ConsumerStoppingEvent が追加されました。詳細については、アプリケーションイベントを参照してください。

コンテナーが AckMode.MANUAL_IMMEDIATE で構成されている場合、SeekToCurrentErrorHandler は、復元されたレコードのオフセットをコミットするように構成できるようになりました (2.2.4 以降)。

@KafkaListener の変更

アノテーションにプロパティを設定することで、リスナーコンテナーファクトリの concurrency および autoStartup プロパティをオーバーライドできるようになりました。構成を追加して、応答メッセージにコピーするヘッダー (存在する場合) を決定できるようになりました。詳細については、@KafkaListener アノテーションを参照してください。

@KafkaListener を独自のアノテーションのメタアノテーションとして使用できるようになりました。詳細については、メタアノテーションとしての @KafkaListener  を参照してください。

@Payload 検証用に Validator を構成する方が簡単になりました。詳細については、@KafkaListener@Payload 検証を参照してください。

アノテーションで kafka コンシューマープロパティを直接指定できるようになりました。これらは、コンシューマーファクトリで定義された同じ名前のプロパティをオーバーライドします (バージョン 2.2.4 以降)。詳細については、アノテーションプロパティを参照してください。

ヘッダーマッピングの変更

型 MimeType および MediaType のヘッダーは、RecordHeader 値の単純な文字列としてマップされるようになりました。以前は、これらは JSON としてマッピングされ、MimeType のみがデコードされていました。MediaType をデコードできませんでした。これらは、相互運用性のための単純な文字列になりました。

また、DefaultKafkaHeaderMapper には新しい addToStringClasses メソッドがあり、JSON の代わりに toString() を使用してマップする必要のある型を指定できます。詳細については、メッセージヘッダーを参照してください。

埋め込まれた Kafka の変更

KafkaEmbedded クラスとその KafkaRule インターフェースは非推奨となり、EmbeddedKafkaBroker とその JUnit 4 EmbeddedKafkaRule ラッパーが優先されました。@EmbeddedKafka アノテーションは、非推奨の KafkaEmbedded の代わりに EmbeddedKafkaBroker Bean を設定するようになりました。この変更により、JUnit 5 テストで @EmbeddedKafka を使用できるようになりました。@EmbeddedKafka アノテーションには、EmbeddedKafkaBroker を取り込むポートを指定する属性 ports が含まれるようになりました。詳細については、アプリケーションのテストを参照してください。

JsonSerializer/ デシリアライザーの機能強化

プロデューサーとコンシューマーのプロパティを使用して、型 マッピング情報を提供できるようになりました。

新しいコンストラクターがデシリアライザーで使用可能になり、提供されたターゲット型で型ヘッダー情報をオーバーライドできます。

JsonDeserializer は、デフォルトで型情報ヘッダーを削除するようになりました。

Kafka プロパティ (2.2.3 以降) を使用して、型情報ヘッダーを無視するように JsonDeserializer を構成できるようになりました。

詳細については、直列化、逆直列化、メッセージ変換を参照してください。

Kafka ストリームの変更

ストリーム構成 Bean は、StreamsConfig オブジェクトではなく KafkaStreamsConfiguration オブジェクトである必要があります。

StreamsBuilderFactoryBean は、パッケージ …​core から …​config に移動されました。

KafkaStreamBrancher は、条件付き ブランチ が KStream インスタンス上に構築されている場合のエンドユーザーエクスペリエンスを向上させるために導入されました。

詳細については、Apache Kafka ストリームのサポートおよび構成を参照してください。

トランザクション ID

リスナーコンテナーによってトランザクションが開始されると、transactional.id は transactionIdPrefix に <group.id>.<topic>.<partition> が付加されます。この変更により、ここで説明するよう (英語) に、ゾンビの適切なフェンシングが可能になります。

2.0 および 2.1 間の変更

Kafka クライアントバージョン

このバージョンには 1.0.0 kafka-clients 以降が必要です。

1.1.x クライアントは、バージョン 2.2 でネイティブにサポートされています。

JSON の改善

StringJsonMessageConverter および JsonSerializer は Headers に型情報を追加するようになり、コンバーターと JsonDeserializer は、固定された構成型ではなく、メッセージ自体に基づいて、受信時に特定の型を作成できるようになりました。詳細については、直列化、逆直列化、メッセージ変換を参照してください。

コンテナー停止エラーハンドラー

コンテナーエラーハンドラーは、レコードリスナーとバッチリスナーの両方に提供されるようになりました。リスナーによってスローされた例外を致命的なものとして処理します。/ コンテナーを停止します。詳細については、例外の処理を参照してください。

コンテナーの一時停止と再開

リスナーコンテナーに pause() および resume() メソッドが追加されました (バージョン 2.1.3 以降)。詳細については、リスナーコンテナーの一時停止と再開を参照してください。

ステートフルリトライ

バージョン 2.1.3 以降、ステートフル再試行を構成できます。詳細については、ステートフルリトライを参照してください。

クライアント ID

バージョン 2.1.1 以降、@KafkaListener に client.id プレフィックスを設定できるようになりました。以前は、クライアント ID をカスタマイズするには、リスナーごとに個別のコンシューマーファクトリ (およびコンテナーファクトリ) が必要でした。プレフィックスには -n のサフィックスが付いており、同時実行を使用する場合に一意のクライアント ID を提供します。

オフセットコミットのロギング

デフォルトでは、トピックオフセットコミットのロギングは DEBUG ロギングレベルで実行されます。バージョン 2.1.2 以降、commitLogLevel と呼ばれる ContainerProperties の新しいプロパティを使用して、これらのメッセージのログレベルを指定できます。詳細については、KafkaMessageListenerContainer の使用を参照してください。

デフォルトの @KafkaHandler

バージョン 2.1.3 以降、クラスレベルの @KafkaListener で @KafkaHandler アノテーションの 1 つをデフォルトとして指定できます。詳細については、クラスでの @KafkaListener  を参照してください。

ReplyingKafkaTemplate

バージョン 2.1.3 以降、リクエスト / 応答セマンティクスをサポートするために KafkaTemplate のサブクラスが提供されます。詳細については、ReplyingKafkaTemplate を使用するを参照してください。

ChainedKafkaTransactionManager

バージョン 2.1.3 は ChainedKafkaTransactionManager を導入しました。(現在は非推奨です)。

2.0 からの移行ガイド

2.0 から 2.1 への移行 [GitHub] (英語) ガイドを参照してください。

1.3 および 2.0 間の変更

Spring Framework および Java バージョン

Spring for Apache Kafka プロジェクトには、Spring Framework 5.0 および Java 8 が必要になりました。

@KafkaListener の変更

@KafkaListener メソッド (およびクラスと @KafkaHandler メソッド) に @SendTo でアノテーションを付けることができるようになりました。メソッドが結果を返す場合、指定されたトピックに転送されます。詳細については、@SendTo を使用したリスナー結果の転送を参照してください。

メッセージリスナー

メッセージリスナーは、Consumer オブジェクトを認識できるようになりました。詳細については、[ メッセージリスナー ] を参照してください。

ConsumerAwareRebalanceListener を使用する

リバランスリスナーは、リバランス通知中に Consumer オブジェクトにアクセスできるようになりました。詳細については、リスナーのリバランスを参照してください。

1.2 および 1.3 間の変更

トランザクションのサポート

0.11.0.0 クライアントライブラリでは、トランザクションのサポートが追加されました。KafkaTransactionManager およびその他のトランザクションのサポートが追加されました。詳細については、"トランザクション" を参照してください。

ヘッダーのサポート

0.11.0.0 クライアントライブラリでは、メッセージヘッダーのサポートが追加されました。これらは、spring-messagingMessageHeaders との間でマッピングできるようになりました。詳細については、"メッセージヘッダー" を参照してください。

トピックの作成

0.11.0.0 クライアントライブラリは、トピックの作成に使用できる AdminClient を提供します。KafkaAdmin は、このクライアントを使用して、@Bean インスタンスとして定義されたトピックを自動的に追加します。

Kafka タイムスタンプのサポート

KafkaTemplate は、タイムスタンプ付きのレコードを追加する API をサポートするようになりました。timestamp のサポートに関して、新しい KafkaHeaders が導入されました。また、新しい KafkaConditions.timestamp() および KafkaMatchers.hasTimestamp() テストユーティリティが追加されました。詳細については、KafkaTemplate の使用@KafkaListener アノテーションアプリケーションのテストを参照してください。

@KafkaListener の変更

例外を処理するように KafkaListenerErrorHandler を構成できるようになりました。詳細については、例外の処理を参照してください。

デフォルトでは、@KafkaListenerid プロパティが group.id プロパティとして使用され、コンシューマーファクトリで設定されたプロパティ (存在する場合) をオーバーライドします。さらに、アノテーションで groupId を明示的に構成できます。以前は、リスナーに異なる group.id 値を使用するには、個別のコンテナーファクトリ (およびコンシューマーファクトリ) が必要でした。提供時に構成された group.id を使用する以前の動作を復元するには、アノテーションの idIsGroup プロパティを false に設定します。

@EmbeddedKafka アノテーション

便宜上、KafkaEmbedded を Bean として登録するために、テストクラスレベルの @EmbeddedKafka アノテーションが提供されています。詳細については、アプリケーションのテストを参照してください。

Kerberos 構成

Kerberos の構成がサポートされるようになりました。詳細については、JAAS と Kerberos を参照してください。

1.1 および 1.2 間の変更

このバージョンでは、0.10.2.x クライアントが使用されます。

1.0 および 1.1 間の変更

Kafka クライアント

このバージョンでは、Apache Kafka 0.10.x.x クライアントを使用します。

バッチリスナー

リスナーは、一度に 1 つずつではなく、consumer.poll() 操作によって返されるメッセージのバッチ全体を受信するように構成できます。

null ペイロード

Null ペイロードは、ログ圧縮を使用するときにキーを「削除」するために使用されます。

初期オフセット

パーティションを明示的に割り当てるときに、現在のエンドからの絶対または相対ではなく、コンシューマーグループの現在の位置に対して相対的な初期オフセットを構成できるようになりました。

シーク

各トピックまたはパーティションの位置をシークできるようになりました。グループ管理を使用していて、Kafka がパーティションを割り当てるときに、これを使用して初期化中に初期位置を設定できます。アイドル状態のコンテナーが検出されたとき、またはアプリケーションの実行中の任意の時点でシークすることもできます。詳細については、[ 求める ] を参照してください。