最新の安定バージョンについては、spring-cloud-stream 4.2.1 を使用してください。

レコードの直列化と逆直列化

Kafka Streams バインダーを使用すると、2 つの方法でレコードを直列化および逆直列化できます。1 つは Kafka によって提供されるネイティブの直列化および逆直列化機能であり、もう 1 つは Spring Cloud Stream フレームワークのメッセージ変換機能です。詳細を見てみましょう。

受信デシリアライズ

キーは常にネイティブ Serdes を使用して逆直列化されます。

値の場合、デフォルトでは、受信での逆直列化は Kafka によってネイティブに実行されます。これは、フレームワークによって逆直列化が行われた以前のバージョンの Kafka Streams バインダーからのデフォルトの動作に対する大きな変更であることに注意してください。

Kafka Streams バインダーは、java.util.function.Function|Consumer の型シグネチャーを見て、一致する Serde 型を推測しようとします。Serdes と一致する順序は次のとおりです。

  • アプリケーションが型 Serde の Bean を提供し、戻り値の型が受信キーまたは値型の実際の型でパラメーター化されている場合、アプリケーションはその Serde を受信逆直列化に使用します。たとえばアプリケーションに次のものがある場合、バインダーは、KStream の入力値型が Serde Bean でパラメーター化された型と一致することを検出します。これは、受信の逆直列化に使用されます。

@Bean
public Serde<Foo> customSerde() {
 ...
}

@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
  • 次に、型を調べて、それらが Kafka ストリームによって公開されている型の 1 つであるかどうかを確認します。もしそうなら、使用してください。バインダーが Kafka ストリームから一致させようとする Serde 型は次のとおりです。

    Integer, Long, Short, Double, Float, byte[], UUID and String.
  • Kafka ストリームによって提供される Serdes のいずれも型に一致しない場合、Spring Kafka によって提供される JsonSerde が使用されます。この場合、バインダーは型が JSON フレンドリーであると想定します。これは、バインダーが内部で正しい Java 型を推論するため、入力として複数の値オブジェクトがある場合に便利です。ただし、JsonSerde にフォールバックする前に、バインダーは Kafka Streams 構成に設定されているデフォルトの Serde をチェックして、受信する KStream の型と一致できる Serde かどうかを確認します。

上記の戦略のいずれも機能しなかった場合、アプリケーションは構成を通じて Serde を提供する必要があります。これは、バインディングまたはデフォルトの 2 つの方法で構成できます。

最初に、バインダーは、Serde がバインディングレベルで提供されているかどうかを確認します。たとえば次のプロセッサーを使用している場合

@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}

次に、以下を使用してバインディングレベル Serde を提供できます。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
入力バインディングごとに上記のように Serde を指定すると、優先順位が高くなり、バインダーは Serde 推論から遠ざかります。

デフォルトのキー / 値 Serdes を受信デシリアライズに使用する場合は、バインダーレベルで使用できます。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

Kafka が提供するネイティブデコードが必要ない場合は、Spring Cloud Stream が提供するメッセージ変換機能を利用できます。ネイティブデコードがデフォルトであるため、Spring Cloud Stream で受信値オブジェクトを逆直列化するには、ネイティブデコードを明示的に無効にする必要があります。

たとえば上記と同じ BiFunction プロセッサーを使用している場合は、spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false すべての入力のネイティブデコードを個別に無効にする必要があります。それ以外の場合は、無効にしないものにはネイティブデコードが引き続き適用されます。

デフォルトでは、Spring Cloud Stream はコンテンツ型として application/json を使用し、適切な json メッセージコンバーターを使用します。次のプロパティと適切な MessageConverter Bean を使用して、カスタムメッセージコンバーターを使用できます。

spring.cloud.stream.bindings.process-in-0.contentType

送信直列化

送信直列化は、受信デ直列化の上記とほぼ同じルールに従います。受信の逆直列化と同様に、Spring Cloud Stream の以前のバージョンからの大きな変更の 1 つは、送信での直列化が Kafka によってネイティブに処理されることです。バインダーの 3.0 バージョンの前は、これはフレームワーク自体によって行われていました。

送信のキーは、バインダーによって推測される一致する Serde を使用して、Kafka によって常に直列化されます。キーの型を推測できない場合は、構成を使用して指定する必要があります。

値 serdes は、受信の逆直列化に使用されるのと同じルールを使用して推測されます。最初に、送信型がアプリケーションで提供された Bean からのものであるかどうかを確認するために一致します。そうでない場合は、- IntegerLongShortDoubleFloatbyte[]UUIDString などの Kafka によって公開された Serde と一致するかどうかを確認します。それが機能しない場合は、Spring Kafka プロジェクトによって提供される JsonSerde にフォールバックしますが、最初にデフォルトの Serde 構成を調べて、一致するものがあるかどうかを確認します。これらはすべて、アプリケーションに対して透過的に行われることに注意してください。これらのいずれも機能しない場合、ユーザーは構成で使用する Serde を提供する必要があります。

上記と同じ BiFunction プロセッサーを使用しているとしましょう。次に、送信キー / 値 Serdes を次のように構成できます。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

Serde 推論が失敗し、バインディングレベルの Serdes が提供されていない場合、バインダーは JsonSerde にフォールバックしますが、デフォルトの Serdes で一致するものを探します。

デフォルトの serdes は、デシリアライズで説明されている上記と同じ方法で構成されます。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

アプリケーションが分岐機能を使用し、複数の出力バインディングがある場合、これらはバインディングごとに構成する必要があります。繰り返しになりますが、バインダーが Serde 型を推測できる場合は、この構成を行う必要はありません。

Kafka が提供するネイティブエンコーディングは必要ないが、フレームワークが提供するメッセージ変換を使用する場合は、ネイティブエンコーディングがデフォルトであるため、ネイティブエンコーディングを明示的に無効にする必要があります。たとえば上記と同じ BiFunction プロセッサーを使用している場合は、spring.cloud.stream.bindings.process-out-0.producer.nativeEncoding: false 分岐の場合は、すべての出力のネイティブエンコーディングを個別に無効にする必要があります。それ以外の場合は、無効にしないものにはネイティブエンコーディングが引き続き適用されます。

Spring Cloud Stream によって変換が行われる場合、デフォルトでは、コンテンツ型として application/json が使用され、適切な json メッセージコンバーターが使用されます。次のプロパティと対応する MessageConverter Bean を使用して、カスタムメッセージコンバーターを使用できます。

spring.cloud.stream.bindings.process-out-0.contentType

ネイティブエンコーディング / デコーディングが無効になっている場合、バインダーはネイティブ Serdes の場合のように推論を行いません。アプリケーションは、すべての構成オプションを明示的に提供する必要があります。そのため、Spring Cloud Stream Kafka Streams アプリケーションを作成するときは、通常、逆直列化のデフォルトオプションを使用し、Kafka Streams が提供するネイティブの逆直列化を使用することをお勧めします。フレームワークによって提供されるメッセージ変換機能を使用する必要がある 1 つのシナリオは、アップストリームプロデューサーが特定の直列化戦略を使用している場合です。その場合、ネイティブメカニズムが失敗する可能性があるため、一致する逆直列化戦略を使用する必要があります。デフォルトの Serde メカニズムに依存する場合、アプリケーションは、バインダーが適切な Serde で受信と送信を正しくマップする方法を持っていることを確認する必要があります。そうしないと、問題が発生する可能性があります。

上で概説したデータの逆直列化アプローチは、プロセッサーのエッジ、つまり受信と送信にのみ適用可能であることに注意してください。ビジネスロジックでは、Serde オブジェクトを明示的に必要とする Kafka StreamsAPI を呼び出す必要がある場合があります。これらは依然としてアプリケーションの責任であり、開発者がそれに応じて処理する必要があります。