新着情報

3.1 以降の 3.2 の新機能

このセクションでは、バージョン 3.1 からバージョン 3.2 に加えられた変更について説明します。以前のバージョンでの変更については、変更履歴を参照してください。

Kafka クライアントバージョン

このバージョンには 3.7.0 kafka-clients が必要です。Kafka クライアントの 3.7.0 バージョンでは、新しいコンシューマーグループプロトコルが導入されています。詳細と制限については、KIP-848 [Apache] (英語) を参照してください。新しいコンシューマーグループプロトコルは早期アクセスリリースであり、本番環境での使用を意図したものではありません。このバージョンでは、テスト目的でのみ使用することをお勧めします。Spring for Apache Kafka は、kafka-client 自体で利用可能なテストレベルのサポートの範囲内でのみ、この新しいコンシューマーグループプロトコルをサポートします。デフォルトでは、Spring for Apache Kafka は従来のコンシューマーグループプロトコルを使用し、新しいコンシューマーグループプロトコルをテストする場合は、コンシューマーの group.protocol プロパティを介してオプトインする必要があります。

サポートの変更のテスト

kraft モードは、デフォルトでは EmbeddedKafka では無効になっており、kraft モードを使用するユーザーは有効にする必要があります。これは、EmbeddedKafka を kraft モードで使用する場合、特に新しいコンシューマーグループプロトコルをテストする場合に、特定の不安定性が見られるためです。新しいコンシューマーグループプロトコルは kraft モードでのみサポートされており、このため、新しいプロトコルをテストする場合は、実際の Kafka クラスターに対して行う必要があり、EmbeddedKafka のベースである KafkaClusterTestKit に基づくクラスターに対して行う必要はありません。さらに、kraft モードで複数の KafkaListener メソッドを EmbeddedKafka とともに実行しているときに、他の競合状態もいくつか見られました。これらの課題が解決されるまで、EmbeddedKafka の kraft のデフォルトは false のままになります。

Kafka ストリームの対話型クエリのサポート

Kafka Streams 対話型クエリで使用されるクエリ可能なストアにアクセスするための新しい API KafkaStreamsInteractiveQuerySupport。詳細については、Kafka ストリームの対話型サポートを参照してください。

TransactionIdSuffixStrategy

transactional.id サフィックスを管理するために、新しい TransactionIdSuffixStrategy インターフェースが導入されました。maxCache をゼロより大きく設定すると、特定の範囲内で transactional.id を再利用できる場合、デフォルトの実装は DefaultTransactionIdSuffixStrategy です。それ以外の場合は、カウンターをインクリメントすることによってサフィックスがオンザフライで生成されます。詳細については、"固定 TransactionIdSuffix" を参照してください。

非同期 @KafkaListener リターン

@KafkaListener (および @KafkaHandler) メソッドは、CompletableFuture<?>Mono<?>、Kotlin suspend 関数などの非同期戻り値の型を返すことができるようになりました。詳細については、"非同期の戻り値" を参照してください。

スローされた例外に基づいたカスタム DLT へのメッセージのルーティング

メッセージ処理中にスローされた例外の型に基づいて、メッセージをカスタム DLT にリダイレクトできるようになりました。リダイレクトのルールは、RetryableTopic.exceptionBasedDltRouting または RetryTopicConfigurationBuilder.dltRoutingRules を介して設定されます。カスタム DLT は、他の再試行トピックや配信不能トピックと同様に自動的に作成されます。詳細については、"スローされた例外に基づいたカスタム DLT へのメッセージのルーティング" を参照してください。

ContainerProperties transactionManager プロパティの廃止

ContainerProperties の transactionManager プロパティを非推奨にし、代わりに、一般的な PlatformTransactionManager に比べて狭い型である KafkaAwareTransactionManager を使用します。ContainerProperties およびトランザクションの同期を参照してください。

ロールバック処理後

新しい AfterRollbackProcessor API processBatch が提供されます。詳細については、"ロールバック後のプロセッサー" を参照してください。

@RetryableTopic SameIntervalTopicReuseStrategy のデフォルト値を変更する

@RetryableTopic プロパティ SameIntervalTopicReuseStrategy のデフォルト値を SINGLE_TOPIC に変更します。maxInterval Exponential Delay の単一トピックを参照してください。

ノンブロッキング再試行はクラスレベル @KafkaListener をサポートします

ノンブロッキング再試行は @KafkaListener のクラスをサポートします。ノンブロッキング再試行を参照してください。

RetryTopicConfigurationProvider のクラスでプロセス @RetryableTopic をサポートします。

RetryTopicConfiguration を見つけるための新しいパブリック API を提供します。RetryTopicConfiguration を探すを参照してください

RetryTopicConfigurer はプロセス MultiMethodKafkaListenerEndpoint をサポートします。

RetryTopicConfigurer はプロセスをサポートし、MultiMethodKafkaListenerEndpoint を登録します。MultiMethodKafkaListenerEndpoint は、プロパティ defaultMethod および methods に getter/setter を提供します。MethodKafkaListenerEndpoint 型専用の EndpointCustomizer を変更します。EndpointHandlerMethod は、提供された Bean のインスタンスを構築する新しいコンストラクターを追加します。エンドポイントを再試行するためのマルチメソッドをハンドラーに処理する新しいクラス EndpointHandlerMultiMethod を提供します。

ユーザーが指定した関数に基づいてオフセットをシークする新しい API メソッド

ConsumerCallback は、コンシューマー内の現在のオフセットを引数として受け取るユーザー定義関数に基づいてオフセットをシークするための新しい API を提供します。詳細については、API ドキュメントを探すを参照してください。

SeekPosition の @PartitionOffset サポート

TopicPartitionOffset.SeekPosition の @PartitionOffset サポートに seekPosition プロパティを追加します。詳細については、手動割り当てを参照してください。

TopicPartitionOffset の新しいコンストラクターは、シークするオフセットを計算する関数を受け入れます。

TopicPartitionOffset には、ユーザー提供の関数を受け取り、シークするオフセットを計算する新しいコンストラクターがあります。このコンストラクターを使用すると、フレームワークは、現在のコンシューマーオフセット位置の入力引数を使用して関数を呼び出します。詳細については、API ドキュメントを探すを参照してください。

デフォルトのクライアント ID プレフィックスとしての Spring Boot アプリケーション名

アプリケーション名を定義する Spring Boot アプリケーションの場合、この名前は、特定のクライアント型に対して自動生成されるクライアント ID のデフォルトのプレフィックスとして使用されるようになりました。詳細については、デフォルトのクライアント ID プレフィックスを参照してください。

MessageListenerContainers の強化された検索

ListenerContainerRegistry は、MessageListenerContainer インスタンスを動的に検索してフィルタリングする 2 つの新しい API を提供します。ID でフィルタリングする getListenerContainersMatching(Predicate<String> idMatcher) と、ID とコンテナープロパティでフィルタリングする getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher) です。

詳細については、@KafkaListener ライフサイクル管理の API ドキュメントを参照してください。

追跡タグの追加による観察の強化

KafkaTemplateObservation はより多くのトレースタグ (低カーディナリティ) を提供します。KafkaListenerObservation は、高カーディナリティのキー名と、より多くのトレースタグ (高カーディナリティまたは低カーディナリティ) を見つけるための新しい API を提供します。Micrometer Observation を参照してください。