変更履歴
3.0 以降の 3.1 の新機能
このセクションでは、バージョン 3.0 からバージョン 3.1 に加えられた変更について説明します。以前のバージョンでの変更については、変更履歴を参照してください。
EmbeddedKafkaBroker
Zookeeper の代わりに Kraft
を使用する追加の実装が提供されるようになりました。詳細については、"組み込み Kafka ブローカー" を参照してください。
JsonDeserializer
逆直列化例外が発生すると、SerializationException
メッセージには Can’t deserialize data [[123, 34, 98, 97, 122, …
形式のデータが含まれなくなります。各データバイトの数値の配列は役に立たず、大規模なデータの場合は冗長になる可能性があります。ErrorHandlingDeserializer
とともに使用すると、エラーハンドラーに送信される DeserializationException
には、逆直列化できなかった生データを含む data
プロパティが含まれます。ErrorHandlingDeserializer
とともに使用しない場合、KafkaConsumer
は同じレコードに対して例外を継続的に発行し、トピック / パーティション / オフセットと Jackson によってスローされた原因を示します。
ContainerPostProcessor
@KafkaListener
アノテーションで ContainerPostProcessor
の Bean 名を指定することにより、リスナーコンテナーに後処理を適用できます。これは、コンテナーが作成され、コンテナーファクトリで構成された ContainerCustomizer
が構成された後に発生します。詳細については、コンテナーファクトリを参照してください。
ErrorHandlingDeserializer
これで、このデシリアライザーに Validator
を追加できるようになりました。デリゲート Deserializer
がオブジェクトの逆直列化に成功したが、そのオブジェクトの検証に失敗した場合は、逆直列化例外が発生した場合と同様の例外がスローされます。これにより、元の生データをエラーハンドラーに渡すことができます。詳細については、"ErrorHandlingDeserializer
の使用 " を参照してください。
再試行可能なトピック
@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2", fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
の場合、サフィックス -retry-5000
を -retry
に変更します。サフィックス -retry-5000
を維持したい場合は、@RetryableTopic(backoff = @Backoff(delay = 5000), attempts = "2")
を使用してください。詳細については、"トピックの命名" を参照してください。
リスナーコンテナーの変更
null
コンシューマー group.id
を使用してパーティションを手動で割り当てる場合、AckMode
は自動的に MANUAL
に強制されるようになりました。詳細については、"すべてのパーティションを手動で割り当てる" を参照してください。
2.9 以降の 3.0 の新機能
正確に一度セマンティクス
EOSMode.V1
(別名 ALPHA
) はサポートされなくなりました。
トランザクションを使用する場合、ブローカーの最小バージョンは 2.5 です。 |
詳細については、正確に一度セマンティクスおよび KIP-447 [Apache] (英語) を参照してください。
観測
タイマーの監視と Micrometer を使用したトレースの有効化がサポートされるようになりました。詳細については、観測を参照してください。
ネイティブイメージ
ネイティブイメージの作成がサポートされます。詳細については、ネイティブイメージを参照してください。
グローバルシングルエンベデッド Kafka
組み込み Kafka (EmbeddedKafkaBroker
) は、テスト計画全体の単一のグローバルインスタンスとして開始できるようになりました。詳細については、複数のテストクラスに同じブローカーを使用するを参照してください。
再試行可能なトピックの変更
この機能は (API に関する限り) 実験的とは見なされなくなりました。機能自体は 2.7 以降サポートされていますが、API の変更が中断される可能性が通常よりも大きくなっています。
ノンブロッキング再試行インフラストラクチャ Bean のブートストラップは、アプリケーションの初期化に関して一部のアプリケーションで発生したタイミングの問題を回避するために、このリリースで変更されました。
再試行コンテナーに別の concurrency
を設定できるようになりました。デフォルトでは、並行性はメインコンテナーと同じです。
@RetryableTopic
は、@AliasFor
プロパティのサポートを含め、カスタムアノテーションのメタアノテーションとして使用できるようになりました。
詳細については、構成を参照してください。
再試行トピックのデフォルトのレプリケーション係数が -1
(ブローカーのデフォルトを使用) になりました。ブローカーがそのバージョン 2.4 より前の場合、プロパティを明示的に設定する必要があります。
同じアプリケーションコンテキスト内の同じトピックで複数の @RetryableTopic
リスナーを構成できるようになりました。以前は、これは不可能でした。詳細については、複数のリスナー、同じトピックを参照してください。
RetryTopicConfigurationSupport
には破壊的な API の変更があります。具体的には、destinationTopicResolver
、kafkaConsumerBackoffManager
、/ または retryTopicConfigurer
の Bean 定義メソッドをオーバーライドする場合。これらのメソッドには、ObjectProvider<RetryTopicComponentFactory>
パラメーターが必要になりました。
リスナーコンテナーの変更
コンシューマーの認証と認可の失敗に関連するイベントが、コンテナーによって発行されるようになりました。詳細については、アプリケーションイベントを参照してください。
コンシューマースレッドで使用されるスレッド名をカスタマイズできるようになりました。詳細については、コンテナースレッドの命名を参照してください。
コンテナープロパティ restartAfterAuthException
が追加されました。詳細については、リスナーコンテナーのプロパティを参照してください。
KafkaTemplate
の変更
このクラスによって返される先物は、ListenableFuture
ではなく CompletableFuture
になりました。KafkaTemplate
の使用を参照してください。
ReplyingKafkaTemplate
の変更
このクラスによって返される先物は、ListenableFuture
ではなく CompletableFuture
になりました。ReplyingKafkaTemplate
の使用および Message<?>
でのリクエスト / リプライを参照してください。
@KafkaListener
の変更
返信メッセージでエコーされるカスタム相関ヘッダーを使用できるようになりました。詳細については、ReplyingKafkaTemplate
を使用するの最後にある注を参照してください。
バッチ全体が処理される前に、バッチの一部を手動でコミットできるようになりました。詳細については、"オフセットのコミット" を参照してください。
KafkaHeaders
の変更
2.9.x で非推奨となった KafkaHeaders
の 4 つの定数が削除されました。
MESSAGE_KEY
の代わりにKEY
を使用します。PARTITION_ID
の代わりにPARTITION
を使用してください
同様に、RECEIVED_MESSAGE_KEY
は RECEIVED_KEY
に置き換えられ、RECEIVED_PARTITION_ID
は RECEIVED_PARTITION
に置き換えられます。
変更のテスト
バージョン 3.0.7 では、MockConsumerFactory
および MockProducerFactory
が導入されました。詳細については、"コンシューマーとプロデューサーのモックアップ" を参照してください。
バージョン 3.0.10 以降、組み込み Kafka ブローカーは、デフォルトで、Spring Boot プロパティ spring.kafka.bootstrap-servers
を組み込みブローカーのアドレスに設定します。
2.8 以降の 2.9 の新機能
エラーハンドラーの変更
DefaultErrorHandler
は、残りのレコードのオフセットをシークする代わりに、コンテナーを 1 つのポーリングで一時停止し、前回のポーリングからの残りの結果を使用するように構成できるようになりました。詳細については、DefaultErrorHandler を参照してください。
DefaultErrorHandler
に BackOffHandler
プロパティが追加されました。詳細については、バックオフハンドラーを参照してください。
リスナーコンテナーの変更
interceptBeforeTx
は、すべてのトランザクションマネージャーで動作するようになりました (以前は、KafkaAwareTransactionManager
が使用されている場合にのみ適用されていました)。[interceptBeforeTx] を参照してください。
以前のポーリングからのすべてのレコードが処理された後ではなく、現在のレコードが処理された後にコンテナーがコンシューマーを一時停止できるようにする、新しいコンテナープロパティ pauseImmediate
が提供されます。[ 一時停止 ] を参照してください。
コンシューマーの認証と認可に関連するイベント
ヘッダーマッパーの変更点
どの受信 ヘッダーをマップするかを構成できるようになりました。2.8.8 以降のバージョンでも利用できます。詳細については、メッセージヘッダーを参照してください。
KafkaTemplate
の変更
3.0 では、このクラスによって返される先物は ListenableFuture
ではなく CompletableFuture
になります。このリリースを使用する場合の移行の支援については、KafkaTemplate
の使用を参照してください。
ReplyingKafkaTemplate
の変更
テンプレートは、応答コンテナーが初期化される前にリクエストを送信する際の競合を回避するために、応答コンテナーでの割り当てを待機するメソッドを提供するようになりました。2.8.8 以降のバージョンでも利用できます。ReplyingKafkaTemplate
を使用するを参照してください。
3.0 では、このクラスによって返される先物は ListenableFuture
ではなく CompletableFuture
になります。このリリースを使用する場合の移行の支援については、ReplyingKafkaTemplate
の使用および Message<?>
でのリクエスト / リプライを参照してください。
2.7 以降の 2.8 の新機能
このセクションでは、バージョン 2.7 からバージョン 2.8 に加えられた変更について説明します。以前のバージョンでの変更については、変更履歴を参照してください。
パッケージの変更
型マッピングに関連するクラスとインターフェースは、…support.converter
から …support.mapping
に移動されました。
AbstractJavaTypeMapper
ClassMapper
DefaultJackson2JavaTypeMapper
Jackson2JavaTypeMapper
故障した手動コミット
リスナーコンテナーは、手動のオフセットコミットを順不同で(通常は非同期に)受け入れるように構成できるようになりました。コンテナーは、欠落しているオフセットが確認されるまでコミットを延期します。詳細については、手動でオフセットをコミットするを参照してください。
@KafkaListener
の変更
リスナーメソッドがメソッド自体のバッチリスナーであるかどうかを指定できるようになりました。これにより、レコードリスナーとバッチリスナーの両方に同じコンテナーファクトリを使用できます。
詳細については、[ バッチリスナー ] を参照してください。
バッチリスナーは、変換例外を処理できるようになりました。
詳細については、バッチエラーハンドラーによる変換エラーを参照してください。
RecordFilterStrategy
をバッチリスナーで使用すると、1 回の呼び出しでバッチ全体をフィルタリングできるようになりました。詳細については、[ バッチリスナー ] の最後にある注記を参照してください。
@KafkaListener
アノテーションに filter
属性が追加され、このリスナーのみのコンテナーファクトリの RecordFilterStrategy
をオーバーライドできるようになりました。
@KafkaListener
アノテーションに info
属性が追加されました。これは、新しいリスナーコンテナープロパティ listenerInfo
にデータを入力するために使用されます。次に、これを使用して、RecordInterceptor
、RecordFilterStrategy
、リスナー自体で使用できる各レコードの KafkaHeaders.LISTENER_INFO
ヘッダーにデータを入力します。詳細については、リスナー情報ヘッダーおよび抽象リスナーコンテナーのプロパティを参照してください。
KafkaTemplate
の変更
これで、トピック、パーティション、オフセットを指定して、単一のレコードを受け取ることができます。詳細については、KafkaTemplate
を使用した受信を参照してください。
CommonErrorHandler
が追加されました
バッチリスナーを記録するためのレガシー GenericErrorHandler
とそのサブインターフェース階層は、GenericErrorHandler
のほとんどのレガシー実装に対応する実装を持つ新しいシングルインターフェース CommonErrorHandler
に置き換えられました。詳細については、コンテナーエラーハンドラーおよびカスタムレガシーエラーハンドラーの実装を CommonErrorHandler
に移行するを参照してください。
リスナーコンテナーの変更
interceptBeforeTx
コンテナープロパティは、デフォルトで true
になりました。
authorizationExceptionRetryInterval
プロパティは authExceptionRetryInterval
に名前が変更され、以前の AuthorizationException
に加えて AuthenticationException
にも適用されるようになりました。このプロパティが設定されていない限り、両方の例外は致命的と見なされ、コンテナーはデフォルトで停止します。
詳細については、KafkaMessageListenerContainer
を使用するおよびリスナーコンテナーのプロパティを参照してください。
シリアライザー / デシリアライザーの変更
DelegatingByTopicSerializer
と DelegatingByTopicDeserializer
が提供されるようになりました。詳細については、シリアライザーとデシリアライザーの委譲を参照してください。
DeadLetterPublishingRecover
の変更
プロパティ stripPreviousExceptionHeaders
は、デフォルトで true
になりました。
現在、出力レコードに追加されるヘッダーをカスタマイズするためのいくつかの手法があります。
詳細については、デッドレターレコードヘッダーの管理を参照してください。
再試行可能なトピックの変更
これで、再試行可能なトピックと再試行不可能なトピックに同じファクトリを使用できます。詳細については、ListenerContainerFactory の指定を参照してください。
失敗したレコードを DLT に直接送信する、致命的な例外の管理可能なグローバルリストが追加されました。管理方法については、例外分類子を参照してください。
ブロッキングとノンブロッキングの再試行を組み合わせて使用できるようになりました。詳細については、ブロッキングとノンブロッキングの再試行の組み合わせを参照してください。
再試行可能なトピック機能を使用するときにスローされる KafkaBackOffException は、DEBUG レベルでログに記録されるようになりました。ロギングレベルを WARN に戻すか、他のレベルに設定する必要がある場合は、KafkaBackOffException ログレベルの変更を参照してください。
2.6 および 2.7 間の変更
Kafka クライアントバージョン
このバージョンには、2.7.0 kafka-clients
が必要です。バージョン 2.7.1 以降、2.8.0 クライアントとも互換性があります。Spring Boot の依存関係を上書きを参照してください。
トピックを使用したノンブロッキング遅延再試行
この重要な新機能は、このリリースで追加されています。厳密な順序付けが重要でない場合、失敗した配信を別のトピックに送信して、後で使用することができます。このような一連の再試行トピックは、遅延を増やしながら構成できます。詳細については、ノンブロッキング再試行を参照してください。
リスナーコンテナーの変更
onlyLogRecordMetadata
コンテナープロパティは、デフォルトで true
になりました。
新しいコンテナープロパティ stopImmediate
が利用可能になりました。
詳細については、リスナーコンテナーのプロパティを参照してください。
配信試行の間に BackOff
を使用するエラーハンドラー (例: SeekToCurrentErrorHandler
と DefaultAfterRollbackProcessor
) は、停止を遅らせるのではなく、コンテナーの停止後すぐにバックオフ間隔を終了します。
FailedRecordProcessor
を継承するエラーハンドラーおよびアフターロールバックプロセッサーを 1 つ以上の RetryListener
で構成して、再試行およびリカバリの進行状況に関する情報を受け取ることができるようになりました。
RecordInterceptor
には、リスナーが戻った後に(通常、または例外をスローすることによって)呼び出される追加のメソッドが含まれるようになりました。また、サブインターフェース ConsumerAwareRecordInterceptor
もあります。さらに、バッチリスナー用の BatchInterceptor
が追加されました。詳細については、メッセージリスナコンテナーを参照してください。
@KafkaListener
の変更
@KafkaHandler
メソッド(クラスレベルのリスナー)のペイロードパラメーターを検証できるようになりました。詳細については、@KafkaListener
@Payload
検証を参照してください。
これで、MessagingMessageConverter
および BatchMessagingMessageConverter
に rawRecordHeader
プロパティを設定できます。これにより、変換された Message<?>
に生の ConsumerRecord
が追加されます。これは、たとえば、リスナーエラーハンドラーで DeadLetterPublishingRecoverer
を使用する場合に役立ちます。詳細については、リスナーエラーハンドラーを参照してください。
アプリケーションの初期化中に @KafkaListener
アノテーションを変更できるようになりました。詳細については、@KafkaListener
属性の変更を参照してください。
DeadLetterPublishingRecover
の変更
これで、キーと値の両方が逆直列化に失敗した場合、元の値が DLT に公開されます。以前は、値が入力されていましたが、キー DeserializationException
はヘッダーに残っていました。リカバリ装置をサブクラス化し、createProducerRecord
メソッドをオーバーライドした場合、API に重大な変更があります。
さらに、リカバリ機能は、宛先リゾルバーによって選択されたパーティションが実際に存在することを確認してから公開します。
詳細については、デッドレターレコードの公開を参照してください。
ChainedKafkaTransactionManager
は非推奨
詳細については、トランザクションを参照してください。
ReplyingKafkaTemplate
の変更
現在、何らかの条件が存在する場合に、応答を調べて将来を例外的に失敗させるメカニズムがあります。
spring-messaging
の送受信のサポート Message<?>
が追加されました。
詳細については、ReplyingKafkaTemplate
を使用するを参照してください。
Kafka ストリームの変更
デフォルトでは、StreamsBuilderFactoryBean
はローカル状態をクリーンアップしないように構成されています。詳細については、構成を参照してください。
KafkaAdmin
の変更
新しいメソッド createOrModifyTopics
および describeTopics
が追加されました。KafkaAdmin.NewTopics
が追加され、単一の Bean で複数のトピックを簡単に構成できるようになりました。詳細については、[ トピックの構成 ] を参照してください。
MessageConverter
の変更
spring-messaging
SmartMessageConverter
を MessagingMessageConverter
に追加できるようになり、contentType
ヘッダーに基づいたコンテンツネゴシエーションが可能になりました。詳細については、Spring メッセージングメッセージ変換を参照してください。
@KafkaListener
のシーケンス
詳細については、@KafkaListener
を順番に開始するを参照してください。
ExponentialBackOffWithMaxRetries
新しい BackOff
実装が提供され、最大再試行回数の構成がより便利になります。詳細については、ExponentialBackOffWithMaxRetries
の実装を参照してください。
条件付き委譲エラーハンドラー
これらの新しいエラーハンドラーは、例外の種類に応じて、さまざまなエラーハンドラーに委譲するように構成できます。詳細については、エラーハンドラーの委譲を参照してください。
2.5 および 2.6 間の変更
リスナーコンテナーの変更
デフォルトの EOSMode
は BETA
になりました。詳細については、正確に一度セマンティクスを参照してください。
さまざまなエラーハンドラー ( FailedRecordProcessor
を継承する) と DefaultAfterRollbackProcessor
は、回復が失敗した場合に BackOff
をリセットするようになりました。さらに、失敗したレコードや例外に基づいて、使用する BackOff
を選択できるようになりました。
コンテナーのプロパティで adviceChain
を設定できるようになりました。詳細については、リスナーコンテナーのプロパティを参照してください。
コンテナーが ListenerContainerIdleEvent
を公開するように構成されている場合、アイドルイベントの公開後にレコードを受信すると、ListenerContainerNoLongerIdleEvent
を公開するようになりました。詳細については、アプリケーションイベントおよびアイドル状態のコンシューマーと無反応なコンシューマーの検出を参照してください。
@KafkaListener の変更
手動のパーティション割り当てを使用する場合、どのパーティションを初期オフセットにリセットするかを決定するためのワイルドカードを指定できるようになりました。さらに、リスナーが ConsumerSeekAware
を実装している場合、onPartitionsAssigned()
は手動割り当ての後に呼び出されます。(バージョン 2.5.5 でも追加されました)。詳細については、明示的なパーティション割り当てを参照してください。
AbstractConsumerSeekAware
に便利なメソッドが追加され、検索が簡単になりました。詳細については、[ 求める ] を参照してください。
ErrorHandler の変更
FailedRecordProcessor
のサブクラス (例: SeekToCurrentErrorHandler
、DefaultAfterRollbackProcessor
、RecoveringBatchErrorHandler
) は、例外がこのレコードで以前に発生したものとは異なる型である場合、再試行状態をリセットするように構成できるようになりました。
プロデューサーファクトリの変更
プロデューサーの最大年齢を設定できるようになりました。その後、プロデューサーは閉じられて再作成されます。詳細については、トランザクションを参照してください。
DefaultKafkaProducerFactory
の作成後に構成マップを更新できるようになりました。これは、たとえば、資格情報が変更された後に SSL キー / トラストストアの場所を更新する必要がある場合に役立ちます。詳細については、DefaultKafkaProducerFactory
を使用するを参照してください。
2.4 および 2.5 間の変更
このセクションでは、バージョン 2.4 からバージョン 2.5 に加えられた変更について説明します。以前のバージョンでの変更については、変更履歴を参照してください。
コンシューマー / プロデューサーファクトリの変更
デフォルトのコンシューマーおよびプロデューサーファクトリは、コンシューマーまたはプロデューサーが作成またはクローズされるたびにコールバックを呼び出すことができるようになりました。ネイティブ Micrometer メトリクスの実装が提供されます。詳細については、ファクトリリスナーを参照してください。
実行時にブートストラップサーバーのプロパティを変更できるようになり、別の Kafka クラスターへのフェイルオーバーが可能になりました。詳細については、Kafka への接続を参照してください。
StreamsBuilderFactoryBean
の変更
ファクトリ Bean は、KafkaStreams
が作成または破棄されるたびにコールバックを呼び出すことができるようになりました。ネイティブ Micrometer メトリクスの実装が提供されます。詳細については、KafkaStreams Micrometer サポートを参照してください。
配信試行ヘッダー
特定のエラーハンドラーの使用時およびロールバックプロセッサー後の配信試行を追跡するヘッダーを追加するオプションが追加されました。詳細については、配信試行ヘッダーを参照してください。
@KafkaListener の変更
@KafkaListener
の戻り値の型が Message<?>
の場合、必要に応じてデフォルトの応答ヘッダーが自動的に入力されるようになりました。詳細については、返信型 Message<?> を参照してください。
受信レコードに null
キーがある場合、KafkaHeaders.RECEIVED_MESSAGE_KEY
には null
値が設定されなくなりました。ヘッダーは完全に省略されます。
@KafkaListener
メソッドは、トピック、パーティションなどのメタデータに個別のヘッダーを使用する代わりに、ConsumerRecordMetadata
パラメーターを指定できるようになりました。詳細については、コンシューマーレコードのメタデータを参照してください。
リスナーコンテナーの変更
assignmentCommitOption
コンテナープロパティは、デフォルトで LATEST_ONLY_NO_TX
になりました。詳細については、リスナーコンテナーのプロパティを参照してください。
トランザクションを使用する場合、subBatchPerPartition
コンテナープロパティはデフォルトで true
になりました。詳細については、トランザクションを参照してください。
新しい RecoveringBatchErrorHandler
が提供されるようになりました。
静的グループメンバーシップがサポートされるようになりました。詳細については、メッセージリスナコンテナーを参照してください。
増分 / 協調リバランスが構成されている場合、オフセットが致命的でない RebalanceInProgressException
でコミットに失敗した場合、コンテナーはリバランスが完了した後、このインスタンスに割り当てられたままのパーティションのオフセットを再コミットしようとします。
デフォルトのエラーハンドラーは、レコードリスナーの場合は SeekToCurrentErrorHandler
、バッチリスナーの場合は RecoveringBatchErrorHandler
になりました。詳細については、コンテナーエラーハンドラーを参照してください。
標準エラーハンドラーによって意図的にスローされた例外がログに記録されるレベルを制御できるようになりました。詳細については、コンテナーエラーハンドラーを参照してください。
getAssignmentsByClientId()
メソッドが追加され、同時コンテナー内のどのコンシューマーにどのパーティションが割り当てられているかを簡単に判断できるようになりました。詳細については、リスナーコンテナーのプロパティを参照してください。
エラー、デバッグログなどの ConsumerRecord
全体のログ記録を抑制できるようになりました。リスナーコンテナーのプロパティの onlyLogRecordMetadata
を参照してください。
KafkaTemplate の変更
KafkaTemplate
は micrometer タイマーを維持できるようになりました。詳細については、モニターを参照してください。
KafkaTemplate
を ProducerConfig
プロパティで構成して、プロデューサーファクトリのプロパティをオーバーライドできるようになりました。詳細については、KafkaTemplate
の使用を参照してください。
RoutingKafkaTemplate
が提供されるようになりました。詳細については、RoutingKafkaTemplate
を使用するを参照してください。
ListenerFutureCallback
の代わりに KafkaSendCallback
を使用してより狭い例外を取得できるようになり、失敗した ProducerRecord
を抽出しやすくなりました。詳細については、KafkaTemplate
の使用を参照してください。
Kafka 文字列シリアライザー / デシリアライザー
新しい ToStringSerializer
/StringDeserializer
および関連する SerDe
が提供されるようになりました。詳細については、文字列の直列化を参照してください。
JsonDeserializer
JsonDeserializer
は、デシリアライゼーション型を決定するための柔軟性が向上しました。詳細については、メソッドを使用して型を決定するを参照してください。
Serializer/Deserializer の委譲
送信 レコードにヘッダーがない場合、DelegatingSerializer
は「標準」型を処理できるようになりました。詳細については、シリアライザーとデシリアライザーの委譲を参照してください。
変更のテスト
KafkaTestUtils.consumerProps()
ヘルパーレコードは、デフォルトで ConsumerConfig.AUTO_OFFSET_RESET_CONFIG
を earliest
に設定するようになりました。詳細については、JUnit を参照してください。
2.3 および 2.4 間の変更
ConsumerAwareRebalanceListener
ConsumerRebalanceListener
と同様に、このインターフェースには onPartitionsLost
メソッドが追加されました。詳細については、Apache Kafka のドキュメントを参照してください。
ConsumerRebalanceListener
とは異なり、デフォルトの実装では onPartitionsRevoked
を呼び出しません。代わりに、リスナーコンテナーは onPartitionsLost
を呼び出した後にそのメソッドを呼び出します。ConsumerAwareRebalanceListener
を実装するときに同じことをすべきではありません。
詳細については、リスナーのリバランスの最後にある重要な注記を参照してください。
KafkaTemplate
KafkaTemplate
は、トランザクションと共に非トランザクションパブリッシングをサポートするようになりました。詳細については、KafkaTemplate
トランザクションおよび非トランザクションパブリッシングを参照してください。
AggregatingReplyingKafkaTemplate
releaseStrategy
が BiConsumer
になりました。これは、タイムアウト後に (およびレコードが到着したときに) 呼び出されるようになりました。タイムアウト後の呼び出しの場合、2 番目のパラメーターは true
です。
詳細については、複数の返信を集約するを参照してください。
リスナーコンテナー
ContainerProperties
は、AuthorizationException
が KafkaConsumer
によってスローされた後にリスナーコンテナーが再試行できるようにする authorizationExceptionRetryInterval
オプションを提供します。詳細については、JavaDocs および KafkaMessageListenerContainer
を使用するを参照してください。
@KafkaListener
@KafkaListener
アノテーションには、新しいプロパティ splitIterables
があります。デフォルトは真。応答リスナーが Iterable
を返す場合、このプロパティは、返される結果を単一のレコードとして送信するか、各要素のレコードとして送信するかを制御します。詳細については、@SendTo
を使用したリスナー結果の転送を参照してください。
バッチリスナーを BatchToRecordAdapter
で構成できるようになりました。これにより、たとえば、リスナーが一度に 1 つのレコードを取得しながら、トランザクションでバッチを処理できます。デフォルトの実装では、バッチ全体の処理を停止することなく、ConsumerRecordRecoverer
を使用してバッチ内のエラーを処理できます。これは、トランザクションを使用する場合に便利です。詳細については、バッチリスナーとのトランザクションを参照してください。
Kafka ストリーム
StreamsBuilderFactoryBean
は、新しいプロパティ KafkaStreamsInfrastructureCustomizer
を受け入れます。これにより、ストリームが作成される前にビルダーおよび / またはトポロジーを構成できます。詳細については、Spring 管理を参照してください。
2.2 および 2.3 間の変更
このセクションでは、バージョン 2.2 からバージョン 2.3 に加えられた変更について説明します。
ヒント、コツ、例
新しい章ヒント、コツ、例が追加されました。GitHub の課題および / またはその章の追加エントリのプルリクエストを送信してください。
構成変更
バージョン 2.3.4 以降、missingTopicsFatal
コンテナープロパティはデフォルトで false です。これが true の場合、ブローカーがダウンしている場合、アプリケーションは起動に失敗します。多くのユーザーがこの変更の影響を受けました。Kafka が高可用性プラットフォームであることを考えると、アクティブなブローカーなしでアプリケーションを開始することが一般的な使用例になるとは予想していませんでした。
プロデューサーとコンシューマーファクトリの変更
DefaultKafkaProducerFactory
は、スレッドごとにプロデューサーを作成するように設定できるようになりました。構成されたクラス (引数なしのコンストラクターが必要) または Serializer
インスタンスを使用して構築する代わりに、コンストラクターに Supplier<Serializer>
インスタンスを提供することもできます。詳細については、DefaultKafkaProducerFactory
の使用を参照してください。
DefaultKafkaConsumerFactory
の Supplier<Deserializer>
インスタンスでも同じオプションを使用できます。詳細については、KafkaMessageListenerContainer
の使用を参照してください。
リスナーコンテナーの変更
以前は、リスナーがリスナーアダプター ( @KafkaListener
など) を使用して呼び出されたときに、エラーハンドラーは ListenerExecutionFailedException
(実際のリスナー例外は cause
) を受け取りました。ネイティブ GenericMessageListener
によってスローされた例外は、変更されずにエラーハンドラーに渡されました。ListenerExecutionFailedException
は常に引数であり (実際のリスナー例外は cause
として)、コンテナーの group.id
プロパティへのアクセスを提供します。
リスナーコンテナーにはオフセットをコミットするための独自のメカニズムがあるため、Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
が false
であることが優先されます。コンシューマーファクトリまたはコンテナーのコンシューマープロパティオーバーライドで特に設定されていない限り、自動的に false に設定されるようになりました。
ackOnError
プロパティは、デフォルトで false
になりました。
リスナーメソッドでコンシューマーの group.id
プロパティを取得できるようになりました。詳細については、コンシューマー group.id
の取得を参照してください。
コンテナーには新しいプロパティ recordInterceptor
があり、リスナーを呼び出す前にレコードをインスペクションまたは変更できます。複数のインターセプターを呼び出す必要がある場合に備えて、CompositeRecordInterceptor
も提供されます。詳細については、メッセージリスナコンテナーを参照してください。
ConsumerSeekAware
には、開始位置、終了位置、現在の位置を基準にしてシークを実行し、タイムスタンプ以上の最初のオフセットをシークできる新しいメソッドがあります。詳細については、[ 求める ] を参照してください。
シークを簡素化するために、便利なクラス AbstractConsumerSeekAware
が提供されるようになりました。詳細については、[ 求める ] を参照してください。
ContainerProperties
は、idleBetweenPolls
オプションを提供して、リスナーコンテナーのメインループを KafkaConsumer.poll()
呼び出し間でスリープさせます。詳細については、JavaDocs および KafkaMessageListenerContainer
の使用を参照してください。
AckMode.MANUAL
(または MANUAL_IMMEDIATE
) を使用する場合、Acknowledgment
で nack
を呼び出すことで再配信を行うことができるようになりました。詳細については、オフセットのコミットを参照してください。
リスナーのパフォーマンスは、Micrometer Timer
を使用して監視できるようになりました。詳細については、モニターを参照してください。
コンテナーは、起動に関連する追加のコンシューマーライフサイクルイベントを公開するようになりました。詳細については、アプリケーションイベントを参照してください。
トランザクションバッチリスナーがゾンビフェンシングをサポートできるようになりました。詳細については、トランザクションを参照してください。
リスナーコンテナーファクトリは、作成および設定後に各コンテナーをさらに設定するために ContainerCustomizer
を使用して設定できるようになりました。詳細については、コンテナーファクトリを参照してください。
ErrorHandler の変更
SeekToCurrentErrorHandler
は現在、特定の例外を致命的なものとして扱い、それらの再試行を無効にして、最初の失敗時に回復プログラムを呼び出します。
SeekToCurrentErrorHandler
および SeekToCurrentBatchErrorHandler
は、配信試行の間に BackOff
(スレッドスリープ) を適用するように設定できるようになりました。
バージョン 2.3.2 以降、失敗したレコードの回復後にエラーハンドラーが戻るときに、回復されたレコードのオフセットがコミットされます。
DeadLetterPublishingRecoverer
は、ErrorHandlingDeserializer
と組み合わせて使用されると、デッドレタートピックに送信されたメッセージのペイロードを、デシリアライズできなかった元の値に設定するようになりました。以前は、null
であり、メッセージヘッダーから DeserializationException
を抽出するために必要なユーザーコードでした。詳細については、デッドレターレコードの公開を参照してください。
TopicBuilder
自動トピックプロビジョニング用の NewTopic
@Bean
をより便利に作成するために、新しいクラス TopicBuilder
が提供されています。詳細については、[ トピックの構成 ] を参照してください。
Kafka ストリームの変更
@EnableKafkaStreams
によって作成された StreamsBuilderFactoryBean
の追加構成を実行できるようになりました。詳細については、ストリーム構成を参照してください。
逆直列化エラーのあるレコードを回復できる RecoveringDeserializationExceptionHandler
が提供されるようになりました。DeadLetterPublishingRecoverer
と組み合わせて使用して、これらのレコードをデッドレタートピックに送信できます。詳細については、デシリアライズ例外からの回復を参照してください。
SpEL を使用してヘッダー値を生成する HeaderEnricher
トランスフォーマーが提供されています。詳細については、ヘッダーエンリッチャーを参照してください。
MessagingTransformer
が提供されました。これにより、Kafka ストリームトポロジが Spring Integration フローなどの spring-messaging コンポーネントと対話できるようになります。詳細については、MessagingProcessor
および【 KStream
から Spring Integration フローを呼び出すを参照してください。
JSON コンポーネントの変更
現在、すべての JSON 対応コンポーネントは、デフォルトで JacksonUtils.enhancedObjectMapper()
によって生成された Jackson ObjectMapper
で構成されています。JsonDeserializer
は、ターゲットジェネリクスコンテナー型の処理を改善するために、TypeReference
ベースのコンストラクターを提供するようになりました。また、org.springframework.util.MimeType
をプレーン文字列にシリアライズするために JacksonMimeTypeModule
が導入されました。詳細については、JavaDocs および直列化、逆直列化、メッセージ変換を参照してください。
ByteArrayJsonMessageConverter
と、すべての Json コンバーター用の新しいスーパークラス JsonMessageConverter
が提供されています。また、StringOrBytesSerializer
が利用可能になりました。ProducerRecord
で byte[]
、Bytes
、String
値を直列化できます。詳細については、Spring メッセージングメッセージ変換を参照してください。
JsonSerializer
、JsonDeserializer
、JsonSerde
には、プログラムによる構成をより簡単にするための流れるような API が含まれるようになりました。詳細については、javadoc、直列化、逆直列化、メッセージ変換、JSON の直列化と逆直列化をストリーミングしますを参照してください。
ReplyingKafkaTemplate
応答がタイムアウトすると、フューチャーは例外的に KafkaException
ではなく KafkaReplyTimeoutException
で完了します。
また、オーバーロードされた sendAndReceive
メソッドが提供され、メッセージごとに応答タイムアウトを指定できるようになりました。
AggregatingReplyingKafkaTemplate
複数の受信者からの応答を集約することにより、ReplyingKafkaTemplate
を拡張します。詳細については、複数の返信を集約するを参照してください。
トランザクションの変更
KafkaTemplate
および KafkaTransactionManager
でプロデューサーファクトリの transactionIdPrefix
をオーバーライドできるようになりました。詳細については、transactionIdPrefix
を参照してください。
新しい委譲シリアライザー / デシリアライザー
フレームワークは、ヘッダーを使用して複数のキー / 値型でレコードを生成および使用できるようにする、委譲 Serializer
および Deserializer
を提供するようになりました。詳細については、シリアライザーとデシリアライザーの委譲を参照してください。
新しい再試行デシリアライザー
フレームワークは、ネットワークの問題などの一時的なエラーが発生した場合に直列化を再試行するために、委譲 RetryingDeserializer
を提供するようになりました。詳細については、デシリアライザーの再試行を参照してください。
2.1 および 2.2 間の変更
クラスとパッケージの変更
ContainerProperties
クラスは org.springframework.kafka.listener.config
から org.springframework.kafka.listener
に移動しました。
AckMode
列挙型が AbstractMessageListenerContainer
から ContainerProperties
に移動されました。
setBatchErrorHandler()
および setErrorHandler()
メソッドは、ContainerProperties
から AbstractMessageListenerContainer
および AbstractKafkaListenerContainerFactory
の両方に移動されました。
ロールバック処理後
新しい AfterRollbackProcessor
戦略が提供されます。詳細については、ロールバック後のプロセッサーを参照してください。
ConcurrentKafkaListenerContainerFactory
の変更
ConcurrentKafkaListenerContainerFactory
を使用して、@KafkaListener
アノテーションだけでなく、任意の ConcurrentMessageListenerContainer
を作成および構成できるようになりました。詳細については、コンテナーファクトリを参照してください。
リスナーコンテナーの変更
新しいコンテナープロパティ (missingTopicsFatal
) が追加されました。詳細については、KafkaMessageListenerContainer
を使用するを参照してください。
コンシューマーが停止したときに ConsumerStoppedEvent
が発行されるようになりました。詳細については、スレッドセーフを参照してください。
バッチリスナーは、オプションで List<ConsumerRecord<?, ?>
の代わりに完全な ConsumerRecords<?, ?>
オブジェクトを受け取ることができます。詳細については、[ バッチリスナー ] を参照してください。
DefaultAfterRollbackProcessor
および SeekToCurrentErrorHandler
は、失敗し続けるレコードを回復 (スキップ) できるようになりました。デフォルトでは、10 回の失敗後に回復します。失敗したレコードを配信不能トピックに公開するように構成できます。
バージョン 2.2.4 以降、デッドレターのトピック名を選択する際に、コンシューマーのグループ ID を使用できるようになりました。
ConsumerStoppingEvent
が追加されました。詳細については、アプリケーションイベントを参照してください。
コンテナーが AckMode.MANUAL_IMMEDIATE
で構成されている場合、SeekToCurrentErrorHandler
は、復元されたレコードのオフセットをコミットするように構成できるようになりました (2.2.4 以降)。
@KafkaListener の変更
アノテーションにプロパティを設定することで、リスナーコンテナーファクトリの concurrency
および autoStartup
プロパティをオーバーライドできるようになりました。構成を追加して、応答メッセージにコピーするヘッダー (存在する場合) を決定できるようになりました。詳細については、@KafkaListener
アノテーションを参照してください。
@KafkaListener
を独自のアノテーションのメタアノテーションとして使用できるようになりました。詳細については、メタアノテーションとしての @KafkaListener
を参照してください。
@Payload
検証用に Validator
を構成する方が簡単になりました。詳細については、@KafkaListener
@Payload
検証を参照してください。
アノテーションで kafka コンシューマープロパティを直接指定できるようになりました。これらは、コンシューマーファクトリで定義された同じ名前のプロパティをオーバーライドします (バージョン 2.2.4 以降)。詳細については、アノテーションプロパティを参照してください。
ヘッダーマッピングの変更
型 MimeType
および MediaType
のヘッダーは、RecordHeader
値の単純な文字列としてマップされるようになりました。以前は、これらは JSON としてマッピングされ、MimeType
のみがデコードされていました。MediaType
をデコードできませんでした。これらは、相互運用性のための単純な文字列になりました。
また、DefaultKafkaHeaderMapper
には新しい addToStringClasses
メソッドがあり、JSON の代わりに toString()
を使用してマップする必要のある型を指定できます。詳細については、メッセージヘッダーを参照してください。
埋め込まれた Kafka の変更
KafkaEmbedded
クラスとその KafkaRule
インターフェースは非推奨となり、EmbeddedKafkaBroker
とその JUnit 4 EmbeddedKafkaRule
ラッパーが優先されました。@EmbeddedKafka
アノテーションは、非推奨の KafkaEmbedded
の代わりに EmbeddedKafkaBroker
Bean を設定するようになりました。この変更により、JUnit 5 テストで @EmbeddedKafka
を使用できるようになりました。@EmbeddedKafka
アノテーションには、EmbeddedKafkaBroker
を取り込むポートを指定する属性 ports
が含まれるようになりました。詳細については、アプリケーションのテストを参照してください。
JsonSerializer/ デシリアライザーの機能強化
プロデューサーとコンシューマーのプロパティを使用して、型 マッピング情報を提供できるようになりました。
新しいコンストラクターがデシリアライザーで使用可能になり、提供されたターゲット型で型ヘッダー情報をオーバーライドできます。
JsonDeserializer
は、デフォルトで型情報ヘッダーを削除するようになりました。
Kafka プロパティ (2.2.3 以降) を使用して、型情報ヘッダーを無視するように JsonDeserializer
を構成できるようになりました。
詳細については、直列化、逆直列化、メッセージ変換を参照してください。
Kafka ストリームの変更
ストリーム構成 Bean は、StreamsConfig
オブジェクトではなく KafkaStreamsConfiguration
オブジェクトである必要があります。
StreamsBuilderFactoryBean
は、パッケージ …core
から …config
に移動されました。
KafkaStreamBrancher
は、条件付き ブランチ が KStream
インスタンス上に構築されている場合のエンドユーザーエクスペリエンスを向上させるために導入されました。
詳細については、Apache Kafka ストリームのサポートおよび構成を参照してください。
トランザクション ID
リスナーコンテナーによってトランザクションが開始されると、transactional.id
は transactionIdPrefix
に <group.id>.<topic>.<partition>
が付加されます。この変更により、ここで説明するよう (英語) に、ゾンビの適切なフェンシングが可能になります。
2.0 および 2.1 間の変更
JSON の改善
StringJsonMessageConverter
および JsonSerializer
は Headers
に型情報を追加するようになり、コンバーターと JsonDeserializer
は、固定された構成型ではなく、メッセージ自体に基づいて、受信時に特定の型を作成できるようになりました。詳細については、直列化、逆直列化、メッセージ変換を参照してください。
コンテナー停止エラーハンドラー
コンテナーエラーハンドラーは、レコードリスナーとバッチリスナーの両方に提供されるようになりました。リスナーによってスローされた例外を致命的なものとして処理します。/ コンテナーを停止します。詳細については、例外の処理を参照してください。
コンテナーの一時停止と再開
リスナーコンテナーに pause()
および resume()
メソッドが追加されました (バージョン 2.1.3 以降)。詳細については、リスナーコンテナーの一時停止と再開を参照してください。
ステートフルリトライ
バージョン 2.1.3 以降、ステートフル再試行を構成できます。詳細については、ステートフルリトライを参照してください。
クライアント ID
バージョン 2.1.1 以降、@KafkaListener
に client.id
プレフィックスを設定できるようになりました。以前は、クライアント ID をカスタマイズするには、リスナーごとに個別のコンシューマーファクトリ (およびコンテナーファクトリ) が必要でした。プレフィックスには -n
のサフィックスが付いており、同時実行を使用する場合に一意のクライアント ID を提供します。
オフセットコミットのロギング
デフォルトでは、トピックオフセットコミットのロギングは DEBUG
ロギングレベルで実行されます。バージョン 2.1.2 以降、commitLogLevel
と呼ばれる ContainerProperties
の新しいプロパティを使用して、これらのメッセージのログレベルを指定できます。詳細については、KafkaMessageListenerContainer
の使用を参照してください。
デフォルトの @KafkaHandler
バージョン 2.1.3 以降、クラスレベルの @KafkaListener
で @KafkaHandler
アノテーションの 1 つをデフォルトとして指定できます。詳細については、クラスでの @KafkaListener
を参照してください。
ReplyingKafkaTemplate
バージョン 2.1.3 以降、リクエスト / 応答セマンティクスをサポートするために KafkaTemplate
のサブクラスが提供されます。詳細については、ReplyingKafkaTemplate
を使用するを参照してください。
2.0 からの移行ガイド
2.0 から 2.1 への移行 [GitHub] (英語) ガイドを参照してください。
1.3 および 2.0 間の変更
Spring Framework および Java バージョン
Spring for Apache Kafka プロジェクトには、Spring Framework 5.0 および Java 8 が必要になりました。
@KafkaListener
の変更
@KafkaListener
メソッド (およびクラスと @KafkaHandler
メソッド) に @SendTo
でアノテーションを付けることができるようになりました。メソッドが結果を返す場合、指定されたトピックに転送されます。詳細については、@SendTo
を使用したリスナー結果の転送を参照してください。
メッセージリスナー
メッセージリスナーは、Consumer
オブジェクトを認識できるようになりました。詳細については、[ メッセージリスナー ] を参照してください。
ConsumerAwareRebalanceListener
を使用する
リバランスリスナーは、リバランス通知中に Consumer
オブジェクトにアクセスできるようになりました。詳細については、リスナーのリバランスを参照してください。
1.2 および 1.3 間の変更
トランザクションのサポート
0.11.0.0 クライアントライブラリでは、トランザクションのサポートが追加されました。KafkaTransactionManager
およびその他のトランザクションのサポートが追加されました。詳細については、"トランザクション" を参照してください。
ヘッダーのサポート
0.11.0.0 クライアントライブラリでは、メッセージヘッダーのサポートが追加されました。これらは、spring-messaging
MessageHeaders
との間でマッピングできるようになりました。詳細については、"メッセージヘッダー" を参照してください。
トピックの作成
0.11.0.0 クライアントライブラリは、トピックの作成に使用できる AdminClient
を提供します。KafkaAdmin
は、このクライアントを使用して、@Bean
インスタンスとして定義されたトピックを自動的に追加します。
Kafka タイムスタンプのサポート
KafkaTemplate
は、タイムスタンプ付きのレコードを追加する API をサポートするようになりました。timestamp
のサポートに関して、新しい KafkaHeaders
が導入されました。また、新しい KafkaConditions.timestamp()
および KafkaMatchers.hasTimestamp()
テストユーティリティが追加されました。詳細については、KafkaTemplate
の使用、@KafkaListener
アノテーション、アプリケーションのテストを参照してください。
@KafkaListener
の変更
例外を処理するように KafkaListenerErrorHandler
を構成できるようになりました。詳細については、例外の処理を参照してください。
デフォルトでは、@KafkaListener
id
プロパティが group.id
プロパティとして使用され、コンシューマーファクトリで設定されたプロパティ (存在する場合) をオーバーライドします。さらに、アノテーションで groupId
を明示的に構成できます。以前は、リスナーに異なる group.id
値を使用するには、個別のコンテナーファクトリ (およびコンシューマーファクトリ) が必要でした。提供時に構成された group.id
を使用する以前の動作を復元するには、アノテーションの idIsGroup
プロパティを false
に設定します。
@EmbeddedKafka
アノテーション
便宜上、KafkaEmbedded
を Bean として登録するために、テストクラスレベルの @EmbeddedKafka
アノテーションが提供されています。詳細については、アプリケーションのテストを参照してください。
Kerberos 構成
Kerberos の構成がサポートされるようになりました。詳細については、JAAS と Kerberos を参照してください。
1.0 および 1.1 間の変更
シーク
各トピックまたはパーティションの位置をシークできるようになりました。グループ管理を使用していて、Kafka がパーティションを割り当てるときに、これを使用して初期化中に初期位置を設定できます。アイドル状態のコンテナーが検出されたとき、またはアプリケーションの実行中の任意の時点でシークすることもできます。詳細については、[ 求める ] を参照してください。