新着情報

3.3 以降の 4.0 の新機能

このセクションでは、バージョン 3.3 からバージョン 4.0 への変更点について説明します。以前のバージョンの変更点については、変更履歴を参照してください。

Apache Kafka 4.0 クライアントのアップグレード

Spring for Apache Kafka は、Apache Kafka クライアントバージョン 4.0.0 を使用するようにアップグレードされました。このアップグレードにより、いくつかの重要な変更がもたらされます。

  • Kafka 4.0 が KRaft モードに完全に移行したため、ZooKeeper ベースの機能はすべて削除されました。

  • ZooKeeper 依存関係がプロジェクトから削除されました

  • 組み込み Kafka テストフレームワークは KRaft モードのみを使用するようになりました

  • EmbeddedKafkaZKBroker クラスは削除され、すべての機能は EmbeddedKafkaKraftBroker によって処理されるようになりました。

埋め込み Kafka テストフレームワークの変更

テストインフラストラクチャが大幅に更新されました。

  • EmbeddedKafkaRule JUnit 4 ルールは削除されました

  • @EmbeddedKafka アノテーションは、ZooKeeper 関連のプロパティが削除されて簡素化されました。

  • KRaft モードが唯一のオプションになったため、kraft プロパティは削除されました。

  • zookeeperPortzkConnectionTimeoutzkSessionTimeout のような ZooKeeper 固有のプロパティは削除されました

  • KafkaClusterTestKit インポートは KRaft モード用の新しいパッケージを使用するようになりました

  • KRaft モードでの静的ポート割り当ての制限に対処するために、いくつかのテストが更新されました。

  • KRaft の要件を満たすために、テストの複製係数に調整が行われました。

ConsumerRecords コンストラクターの変更

ConsumerRecords コンストラクターに Map パラメーターが追加されましたが、これはフレームワーク全体で修正されています。このコンストラクターを直接使用するアプリケーションは、コードを更新する必要があります。

プロデューサーインターフェースの更新

Kafka プロデューサーインターフェースからの新しいメソッドが実装されました:

  • registerMetricForSubscription

  • unregisterMetricFromSubscription

非推奨となりた機能の削除

いくつかの非推奨項目が削除されました:

  • 非推奨の partitioner クラスはランタイムヒントから削除されました

  • String consumerGroupId を使用する非推奨の sendOffsetsToTransaction メソッドは削除されました

Kafka ストリーム API の変更

  • KafkaStreamBrancher は、非推奨となりた branch() メソッドの代わりに新しい split() および branch() メソッドを使用するように更新されました。

  • DeserializationExceptionHandler は新しい ErrorHandlerContext を使用するように更新されました

Apache Kafka 4.0.0 に関連する内部 API の更新

  • BrokerAddress クラスは、非推奨の kafka.cluster.BrokerEndPoint の代わりに org.apache.kafka.server.network.BrokerEndPoint を使用するようになりました。

  • GlobalEmbeddedKafkaTestExecutionListener は KRaft モードでのみ動作するように更新されました

新しいコンシューマー再調整プロトコル

Spring for Apache Kafka 4.0 は、Kafka 4.0 の新しいコンシューマーリバランスプロトコルである KIP-848 [Apache] (英語) をサポートしています。詳細については、New Consumer Rebalance Protocol docs を参照してください。

複数値ヘッダーをサポート

JsonKafkaHeaderMapper および SimpleKafkaHeaderMapper は、Kafka レコードの複数値ヘッダーマッピングをサポートします。詳細については、複数値ヘッダーマッピングをサポートを参照してください。

追加の RecordInterceptor を構成する

リスナーコンテナーは、getRecordInterceptor() によるインターセプターのカスタマイズをサポートするようになりました。詳細はメッセージリスナコンテナーセクションを参照してください。

バッチリスナーにおけるレコードごとの監視

バッチリスナーの使用時に、各レコードの観測値を取得できるようになりました。詳細については、Observability for Batch Listeners を参照してください。

Kafka キュー(共有コンシューマー)のサポート

Spring for Apache Kafka は、Apache Kafka 4.0.0 の一部であり KIP-932 を実装した共有コンシューマーを通じて、Kafka キューへの早期アクセスサポートを提供するようになりました。これにより、複数のコンシューマーが同じパーティションから同時に消費できる協調消費が可能になり、従来のコンシューマーグループと比較して優れた負荷分散が実現します。詳細については、Kafka キュー (シェアコンシューマー) を参照してください。

Jackson 3 サポート

Spring for Apache Kafka は、既存の Jackson 2 サポートに加え、Jackson 3 も包括的にサポートするようになりました。Jackson 3 は自動的に検出され、利用可能な場合は優先的にサポートされるため、パフォーマンスが向上し、最新の JSON 処理機能も提供されます。

すべての Jackson 2 クラスに、一貫した命名と型の安全性が向上した Jackson 3 の対応するクラスが追加されました。

  • JsonKafkaHeaderMapper は DefaultKafkaHeaderMapper に取って代わります

  • JacksonJsonSerializer/Deserializer は JsonSerializer/Deserializer に取って代わります

  • JacksonJsonSerde は JsonSerde に取って代わります

  • JacksonJsonMessageConverter ファミリーが JsonMessageConverter ファミリーに取って代わる

  • JacksonProjectingMessageConverter は ProjectingMessageConverter に取って代わります

  • DefaultJacksonJavaTypeMapper は DefaultJackson2JavaTypeMapper に取って代わります

新しい Jackson 3 クラスは、型の安全性を強化するために汎用 ObjectMapper の代わりに JsonMapper を使用し、Jackson 3 の改善されたモジュールシステムとパフォーマンスの最適化を活用します。

Migration Path : 既存のアプリケーションは Jackson 2 でも変更なく動作し続けます。Jackson 3 に移行するには、クラスパスに Jackson 3 を追加し、クラス参照を更新して新しい Jackson 3 相当のものを使用するだけです。フレームワークは両方のバージョンが存在する場合、自動的に Jackson 3 を検出し、優先的に使用します。

下位互換性 : すべての Jackson 2 クラスは非推奨ですが、完全に機能します。将来のメジャーバージョンで削除される予定です。

設定例については、直列化、逆直列化、メッセージ変換を参照してください。

Spring Retry 依存関係の削除

Spring for Apache Kafka は Spring Retry への依存を廃止し、Spring Framework 7 で導入されたコアリトライサポートを採用しました。これはフレームワーク全体のリトライ設定と API に影響を与える重大な変更です。

必要な BackOff 値を事前に生成する BackOffValuesGenerator は、BackOffPolicy ではなく Spring Framework の BackOff インターフェースと直接連携するようになりました。これらの値はリスナーインフラストラクチャによって管理され、Spring Retry は関与しなくなりました。

設定の観点から見ると、Spring、Kafka は Spring Retry の @Backoff アノテーションに大きく依存していました。Spring Framework には同等のアノテーションがないため、以下の改善を加えて、このアノテーションは Spring、Kafka に @BackOff として移行されました。

  • 統一された命名: 一貫性を保つために @Backoff ではなく @BackOff を使用する

  • 式の評価: すべての文字列属性は SpEL 式とプロパティプレースホルダーをサポートします

  • 期間形式のサポート: 文字列属性は java.util.Duration 形式を受け入れます (e.g., "2s", "500ms")

  • 強化されたドキュメント: より明確な説明を備えた Javadoc の改善

移行例:

// Before
@RetryableTopic(backoff = @Backoff(delay = 2000, maxDelay = 10000, multiplier = 2))

// After
@RetryableTopic(backOff = @BackOff(delay = 2000, maxDelay = 10000, multiplier = 2))

// With new duration format support
@RetryableTopic(backOff = @BackOff(delayString = "2s", maxDelayString = "10s", multiplier = 2))

// With property placeholders
@RetryableTopic(backOff = @BackOff(delayString = "${retry.delay}", multiplierString = "${retry.multiplier}"))

RetryingDeserializer no longer offers a RecoveryCallback but an equivalent function that takes RetryException as input. This contains the exceptions thrown as well as the number of retry attempts:

// Before
retryingDeserializer.setRecoveryCallback(context -> {
    return fallbackValue;
});

// After
retryingDeserializer.setRecoveryCallback(retryException -> {
    return fallbackValue;
});

The use of BinaryExceptionClassifier has been replaced by the newly introduced ExceptionMatcher, which provides a polished API.

Additional changes include:

  • DestinationTopicPropertiesFactory uses ExceptionMatcher instead of BinaryExceptionClassifier

  • The uniformRandomBackoff method in RetryTopicConfigurationBuilder has been deprecated in favor of jitter support

  • Error handling utilities have been updated to work with the new exception matching system

  • Kafka Streams retry templates now use Spring Framework’s retry support

Applications must update their configuration to use the new Spring Framework retry API, but the retry behavior and functionality remain the same.