新着情報
3.1 以降の 3.2 の新機能
このセクションでは、バージョン 3.1 からバージョン 3.2 に加えられた変更について説明します。以前のバージョンでの変更については、変更履歴を参照してください。
Kafka クライアントバージョン
このバージョンには 3.7.0 kafka-clients
が必要です。Kafka クライアントの 3.7.0 バージョンでは、新しいコンシューマーグループプロトコルが導入されています。詳細と制限については、KIP-848 [Apache] (英語) を参照してください。新しいコンシューマーグループプロトコルは早期アクセスリリースであり、本番環境での使用を意図したものではありません。このバージョンでは、テスト目的でのみ使用することをお勧めします。Spring for Apache Kafka は、kafka-client
自体で利用可能なテストレベルのサポートの範囲内でのみ、この新しいコンシューマーグループプロトコルをサポートします。デフォルトでは、Spring for Apache Kafka は従来のコンシューマーグループプロトコルを使用し、新しいコンシューマーグループプロトコルをテストする場合は、コンシューマーの group.protocol
プロパティを介してオプトインする必要があります。
サポートの変更のテスト
kraft
モードは、デフォルトでは EmbeddedKafka
では無効になっており、kraft
モードを使用するユーザーは有効にする必要があります。これは、EmbeddedKafka
を kraft
モードで使用する場合、特に新しいコンシューマーグループプロトコルをテストする場合に、特定の不安定性が見られるためです。新しいコンシューマーグループプロトコルは kraft
モードでのみサポートされており、このため、新しいプロトコルをテストする場合は、実際の Kafka クラスターに対して行う必要があり、EmbeddedKafka
のベースである KafkaClusterTestKit
に基づくクラスターに対して行う必要はありません。さらに、kraft
モードで複数の KafkaListener
メソッドを EmbeddedKafka
とともに実行しているときに、他の競合状態もいくつか見られました。これらの課題が解決されるまで、EmbeddedKafka
の kraft
のデフォルトは false
のままになります。
Kafka ストリームの対話型クエリのサポート
Kafka Streams 対話型クエリで使用されるクエリ可能なストアにアクセスするための新しい API KafkaStreamsInteractiveQuerySupport
。詳細については、Kafka ストリームの対話型サポートを参照してください。
TransactionIdSuffixStrategy
transactional.id
サフィックスを管理するために、新しい TransactionIdSuffixStrategy
インターフェースが導入されました。maxCache
をゼロより大きく設定すると、特定の範囲内で transactional.id
を再利用できる場合、デフォルトの実装は DefaultTransactionIdSuffixStrategy
です。それ以外の場合は、カウンターをインクリメントすることによってサフィックスがオンザフライで生成されます。詳細については、"固定 TransactionIdSuffix" を参照してください。
非同期 @KafkaListener リターン
@KafkaListener
(および @KafkaHandler
) メソッドは、CompletableFuture<?>
、Mono<?>
、Kotlin suspend
関数などの非同期戻り値の型を返すことができるようになりました。詳細については、"非同期の戻り値" を参照してください。
スローされた例外に基づいたカスタム DLT へのメッセージのルーティング
メッセージ処理中にスローされた例外の型に基づいて、メッセージをカスタム DLT にリダイレクトできるようになりました。リダイレクトのルールは、RetryableTopic.exceptionBasedDltRouting
または RetryTopicConfigurationBuilder.dltRoutingRules
を介して設定されます。カスタム DLT は、他の再試行トピックや配信不能トピックと同様に自動的に作成されます。詳細については、"スローされた例外に基づいたカスタム DLT へのメッセージのルーティング" を参照してください。
ContainerProperties transactionManager プロパティの廃止
ContainerProperties
の transactionManager
プロパティを非推奨にし、代わりに、一般的な PlatformTransactionManager
に比べて狭い型である KafkaAwareTransactionManager
を使用します。ContainerProperties およびトランザクションの同期を参照してください。
ロールバック処理後
新しい AfterRollbackProcessor
API processBatch
が提供されます。詳細については、"ロールバック後のプロセッサー" を参照してください。
@RetryableTopic SameIntervalTopicReuseStrategy のデフォルト値を変更する
@RetryableTopic
プロパティ SameIntervalTopicReuseStrategy
のデフォルト値を SINGLE_TOPIC
に変更します。maxInterval Exponential Delay の単一トピックを参照してください。
ノンブロッキング再試行はクラスレベル @KafkaListener をサポートします
ノンブロッキング再試行は @KafkaListener のクラスをサポートします。ノンブロッキング再試行を参照してください。
RetryTopicConfigurationProvider のクラスでプロセス @RetryableTopic をサポートします。
RetryTopicConfiguration
を見つけるための新しいパブリック API を提供します。RetryTopicConfiguration を探すを参照してください
RetryTopicConfigurer はプロセス MultiMethodKafkaListenerEndpoint をサポートします。
RetryTopicConfigurer
はプロセスをサポートし、MultiMethodKafkaListenerEndpoint
を登録します。MultiMethodKafkaListenerEndpoint
は、プロパティ defaultMethod
および methods
に getter/setter
を提供します。MethodKafkaListenerEndpoint
型専用の EndpointCustomizer
を変更します。EndpointHandlerMethod
は、提供された Bean のインスタンスを構築する新しいコンストラクターを追加します。エンドポイントを再試行するためのマルチメソッドをハンドラーに処理する新しいクラス EndpointHandlerMultiMethod
を提供します。
ユーザーが指定した関数に基づいてオフセットをシークする新しい API メソッド
ConsumerCallback
は、コンシューマー内の現在のオフセットを引数として受け取るユーザー定義関数に基づいてオフセットをシークするための新しい API を提供します。詳細については、API ドキュメントを探すを参照してください。
SeekPosition の @PartitionOffset サポート
TopicPartitionOffset.SeekPosition
の @PartitionOffset
サポートに seekPosition
プロパティを追加します。詳細については、手動割り当てを参照してください。
TopicPartitionOffset の新しいコンストラクターは、シークするオフセットを計算する関数を受け入れます。
TopicPartitionOffset
には、ユーザー提供の関数を受け取り、シークするオフセットを計算する新しいコンストラクターがあります。このコンストラクターを使用すると、フレームワークは、現在のコンシューマーオフセット位置の入力引数を使用して関数を呼び出します。詳細については、API ドキュメントを探すを参照してください。
デフォルトのクライアント ID プレフィックスとしての Spring Boot アプリケーション名
アプリケーション名を定義する Spring Boot アプリケーションの場合、この名前は、特定のクライアント型に対して自動生成されるクライアント ID のデフォルトのプレフィックスとして使用されるようになりました。詳細については、デフォルトのクライアント ID プレフィックスを参照してください。
MessageListenerContainers の強化された検索
ListenerContainerRegistry
は、MessageListenerContainer
インスタンスを動的に検索してフィルタリングする 2 つの新しい API を提供します。ID でフィルタリングする getListenerContainersMatching(Predicate<String> idMatcher)
と、ID とコンテナープロパティでフィルタリングする getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)
です。
詳細については、@KafkaListener
ライフサイクル管理の API ドキュメントを参照してください。
追跡タグの追加による観察の強化
KafkaTemplateObservation
はより多くのトレースタグ (低カーディナリティ) を提供します。KafkaListenerObservation
は、高カーディナリティのキー名と、より多くのトレースタグ (高カーディナリティまたは低カーディナリティ) を見つけるための新しい API を提供します。Micrometer Observation を参照してください。