構成オプション

このセクションには、Kafka Streams バインダーで使用される構成オプションが含まれています。

バインダーに関連する一般的な構成オプションとプロパティについては、コアドキュメントを参照してください。

Kafka ストリームバインダーのプロパティ

次のプロパティはバインダーレベルで使用でき、spring.cloud.stream.kafka.streams.binder. のプレフィックスを付ける必要があります。Kafka Streams バインダーで再利用されるプロパティを提供する Kafka バインダーには、spring.cloud.stream.kafka.binder ではなく spring.cloud.stream.kafka.streams.binder のプレフィックスを付ける必要があります。このルールの唯一の例外は、Kafka ブートストラップサーバープロパティを定義する場合です。この場合、どちらのプレフィックスも機能します。

構成

Apache Kafka Streams API に関連するプロパティを含むキー / 値のペアでマップします。このプロパティには、spring.cloud.stream.kafka.streams.binder. というプレフィックスを付ける必要があります。次に、このプロパティの使用例をいくつか示します。

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000

ストリーム構成に含まれる可能性のあるすべてのプロパティの詳細については、Apache Kafka ストリームドキュメントの StreamsConfig JavaDocs を参照してください。StreamsConfig から設定できるすべての構成は、これを通じて設定できます。このプロパティを使用する場合、これはバインダーレベルのプロパティであるため、アプリケーション全体に適用されます。アプリケーションに複数のプロセッサーがある場合は、それらすべてがこれらのプロパティを取得します。application.id などのプロパティの場合、これは問題になるため、このバインダーレベルの configuration プロパティを使用して StreamsConfig のプロパティがどのようにマップされるかを慎重に調べる必要があります。

関数。<function-bean-name> .applicationId

機能スタイルのプロセッサーにのみ適用されます。これは、アプリケーションの機能ごとにアプリケーション ID を設定するために使用できます。複数の機能がある場合、これはアプリケーション ID を設定するための便利な方法です。

関数。<function-bean-name> .configuration

関数型プロセッサーにのみ適用されます。Apache Kafka ストリーム API に関連するプロパティを含むキー / 値ペアでマップします。これは、上記のバインダーレベルの configuration プロパティに似ていますが、このレベルの configuration プロパティは、指定された関数に対してのみ制限されます。複数のプロセッサーがあり、特定の関数に基づいて構成へのアクセスを制限する場合は、これを使用できます。ここでは、すべての StreamsConfig プロパティを使用できます。

ブローカー

ブローカーの URL

デフォルト: localhost

zkNodes

Zookeeper URL

デフォルト: localhost

deserializationExceptionHandler

デシリアライズエラーハンドラー型。このハンドラーはバインダーレベルで適用されるため、アプリケーション内のすべての入力バインディングに対して適用されます。コンシューマーの拘束力のあるレベルで、よりきめ細かい方法でそれを制御する方法があります。可能な値は - logAndContinuelogAndFailskipAndContinue または sendToDlq です。

デフォルト: logAndFail

applicationId

Kafka Streams アプリケーションの application.id をバインダーレベルでグローバルに設定する便利な方法。アプリケーションに複数の機能が含まれている場合、アプリケーション ID は別の方法で設定する必要があります。アプリケーション ID の設定について詳しく説明されている上記を参照してください。

デフォルト: アプリケーションは静的アプリケーション ID を生成します。詳細については、アプリケーション ID のセクションを参照してください。

stateStoreRetry.maxAttempts

Max は、状態ストアへの接続を試みます。

デフォルト: 1

stateStoreRetry.backoffPeriod

再試行時に状態ストアに接続しようとしたときのバックオフ期間。

デフォルト: 1000 ミリ秒

consumerProperties

バインダーレベルでの任意のコンシューマー特性。

producerProperties

バインダーレベルでの任意のプロデューサープロパティ。

includeStoppedProcessorsForHealthCheck

プロセッサーのバインディングがアクチュエーターを介して停止されると、このプロセッサーはデフォルトではヘルスチェックに参加しません。このプロパティを true に設定すると、バインディングアクチュエーターエンドポイントを介して現在停止しているプロセッサーを含むすべてのプロセッサーのヘルスチェックが有効になります。

デフォルト: false

Kafka ストリームプロデューサーのプロパティ

次のプロパティは Kafka Streams プロデューサーでのみ使用可能であり、接頭辞 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer. を付ける必要があります。便宜上、複数の出力バインディングがあり、それらすべてに共通の値が必要な場合は、接頭辞 spring.cloud.stream.kafka.streams.default.producer. を使用して構成できます。

keySerde

使用するキー serde

デフォルト: メッセージの逆直列化 / 直列化に関する上記の説明を参照してください

valueSerde

使用する値 serde

デフォルト: メッセージの逆直列化 / 直列化に関する上記の説明を参照してください

useNativeEncoding

ネイティブエンコーディングを有効 / 無効にするフラグ

デフォルト: true.

streamPartitionerBeanName

コンシューマーで使用されるカスタム送信パーティショナー Bean 名。アプリケーションは、カスタム StreamPartitioner を Spring Bean として提供でき、この Bean の名前を、デフォルトの名前の代わりに使用するためにプロデューサーに提供できます。

デフォルト: 送信パーティションのサポートについては、上記の説明を参照してください。

producedAs

プロセッサーが生成するシンクコンポーネントのカスタム名。

デフォルト: none (Kafka ストリームによって生成されます)

Kafka はコンシューマーの特性をストリーミングします

次のプロパティは Kafka Streams コンシューマーで使用でき、接頭辞 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer. を付ける必要があります。便宜上、複数の入力バインディングがあり、それらすべてに共通の値が必要な場合は、接頭辞 spring.cloud.stream.kafka.streams.default.consumer. を使用して構成できます。

applicationId

入力バインディングごとに application.id を設定します。

デフォルト: 上記を参照。

keySerde

使用するキー serde

デフォルト: メッセージの逆直列化 / 直列化に関する上記の説明を参照してください

valueSerde

使用する値 serde

デフォルト: メッセージの逆直列化 / 直列化に関する上記の説明を参照してください

materializedAs

受信 KTable 型を使用するときにマテリアライズする状態ストア

デフォルト: none.

useNativeDecoding

ネイティブデコードを有効 / 無効にするフラグ

デフォルト: true.

dlqName

DLQ トピック名。

デフォルト: エラー処理と DLQ の説明については、上記を参照してください。

startOffset

消費するコミットされたオフセットがない場合に開始するオフセット。これは主に、コンシューマーがトピックから初めて消費するときに使用されます。Kafka Streams はデフォルト戦略として earliest を使用し、バインダーは同じデフォルトを使用します。これは、このプロパティを使用して latest にオーバーライドできます。

デフォルト: earliest.

メモ: コンシューマーで resetOffsets を使用しても、Kafka Streams バインダーには影響しません。メッセージチャネルベースのバインダーとは異なり、Kafka Streams バインダーはオンデマンドで開始または終了しようとはしません。

deserializationExceptionHandler

デシリアライズエラーハンドラー型。このハンドラーは、前述のバインダーレベルのプロパティとは対照的に、コンシューマーバインディングごとに適用されます。可能な値は - logAndContinuelogAndFailskipAndContinue または sendToDlq です。

デフォルト: logAndFail

timestampExtractorBeanName

コンシューマーで使用される特定のタイムスタンプ抽出機能 Bean 名。アプリケーションは TimestampExtractor を Spring Bean として提供でき、この Bean の名前は、デフォルトの名前の代わりに使用するためにコンシューマーに提供できます。

デフォルト: タイムスタンプエクストラクターに関する上記の説明を参照してください。

eventTypes

このバインディングでサポートされているイベント型のコンマ区切りリスト。

デフォルト: none

eventTypeHeaderKey

このバインディングを介した各受信レコードのイベント型ヘッダーキー。

デフォルト: event_type

consumedAs

プロセッサーが消費しているソースコンポーネントのカスタム名。

Deafult: none (Kafka ストリームによって生成されます)

並行性に関する特記事項

Kafka Streams では、num.stream.threads プロパティを使用して、プロセッサーが作成できるスレッドの数を制御できます。これは、上記のバインダー、関数、プロデューサー、コンシューマーレベルで説明したさまざまな configuration オプションを使用して行うことができます。この目的のためにコア Spring Cloud Stream が提供する concurrency プロパティを使用することもできます。これを使用する場合は、コンシューマーで使用する必要があります。複数の入力バインディングがある場合は、これを最初の入力バインディングに設定します。たとえば、spring.cloud.stream.bindings.process-in-0.consumer.concurrency を設定すると、バインダーによって num.stream.threads として変換されます。複数のプロセッサーがあり、1 つのプロセッサーがバインディングレベルの同時実行を定義し、他のプロセッサーは定義しない場合、バインディングレベルの同時実行のないプロセッサーは、デフォルトで spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads で指定されたバインダー全体のプロパティに戻ります。このバインダー構成が使用できない場合、アプリケーションは Kafka Streams によって設定されたデフォルトを使用します。