Kafka Streams アプリケーションでのイベント型ベースのルーティング

通常のメッセージチャネルベースのバインダーで使用可能なルーティング機能は、Kafka ストリームバインダーではサポートされていません。ただし、Kafka Streams バインダーは、受信レコードのイベント型レコードヘッダーを介したルーティング機能を引き続き提供します。

イベント型に基づくルーティングを有効にするには、アプリケーションは次のプロパティを提供する必要があります。

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.

これはコンマ区切りの値にすることができます。

例: この関数があると仮定しましょう:

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

また、受信レコードのイベント型が foo または bar の場合、この関数のビジネスロジックのみを実行する必要があると仮定します。これは、バインディングの eventTypes プロパティを使用して次のように表すことができます。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar

これで、アプリケーションの実行時に、バインダーはヘッダー event_type の各受信レコードをチェックし、値が foo または bar として設定されているかどうかを確認します。どちらも見つからない場合、関数の実行はスキップされます。

デフォルトでは、バインダーはレコードヘッダーキーが event_type であることを想定していますが、これはバインディングごとに変更できます。たとえば、このバインディングのヘッダーキーをデフォルトではなく my_event に変更する場合は、次のように変更できます。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.

Kafkfa Streams バインダーでイベントルーティング機能を使用する場合、バイト配列 Serde を使用してすべての受信レコードを逆直列化します。レコードヘッダーがイベント型と一致する場合は、実際の Serde のみを使用して、構成済みまたは推定済みの Serde を使用して適切な逆直列化を行います。これにより、バインドに逆直列化の例外ハンドラーを設定すると、予想される逆直列化がスタックでのみ発生し、予期しないエラーが発生するため、問題が発生します。この課題に対処するには、バインディングに次のプロパティを設定して、バインダがバイト配列 Serde の代わりに構成済みまたは推定済みの Serde を使用するようにします。

spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents

このようにして、アプリケーションは、イベントルーティング機能を使用するときに逆直列化の課題をすぐに検出し、適切な処理の決定を下すことができます。