構成オプション
このセクションには、Kafka Streams バインダーで使用される構成オプションが含まれています。
バインダーに関連する一般的な構成オプションとプロパティについては、コアドキュメントを参照してください。
Kafka ストリームバインダーのプロパティ
次のプロパティはバインダーレベルで使用でき、spring.cloud.stream.kafka.streams.binder.
のプレフィックスを付ける必要があります。Kafka Streams バインダーで再利用されるプロパティを提供する Kafka バインダーには、spring.cloud.stream.kafka.binder
ではなく spring.cloud.stream.kafka.streams.binder
のプレフィックスを付ける必要があります。このルールの唯一の例外は、Kafka ブートストラップサーバープロパティを定義する場合です。この場合、どちらのプレフィックスも機能します。
- 構成
Apache Kafka Streams API に関連するプロパティを含むキー / 値のペアでマップします。このプロパティには、
spring.cloud.stream.kafka.streams.binder.
というプレフィックスを付ける必要があります。次に、このプロパティの使用例をいくつか示します。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms=1000
ストリーム構成に含まれる可能性のあるすべてのプロパティの詳細については、Apache Kafka ストリームドキュメントの StreamsConfig
JavaDocs を参照してください。StreamsConfig
から設定できるすべての構成は、これを通じて設定できます。このプロパティを使用する場合、これはバインダーレベルのプロパティであるため、アプリケーション全体に適用されます。アプリケーションに複数のプロセッサーがある場合は、それらすべてがこれらのプロパティを取得します。application.id
などのプロパティの場合、これは問題になるため、このバインダーレベルの configuration
プロパティを使用して StreamsConfig
のプロパティがどのようにマップされるかを慎重に調べる必要があります。
- 関数。<function-bean-name> .applicationId
機能スタイルのプロセッサーにのみ適用されます。これは、アプリケーションの機能ごとにアプリケーション ID を設定するために使用できます。複数の機能がある場合、これはアプリケーション ID を設定するための便利な方法です。
- 関数。<function-bean-name> .configuration
関数型プロセッサーにのみ適用されます。Apache Kafka ストリーム API に関連するプロパティを含むキー / 値ペアでマップします。これは、上記のバインダーレベルの
configuration
プロパティに似ていますが、このレベルのconfiguration
プロパティは、指定された関数に対してのみ制限されます。複数のプロセッサーがあり、特定の関数に基づいて構成へのアクセスを制限する場合は、これを使用できます。ここでは、すべてのStreamsConfig
プロパティを使用できます。- ブローカー
ブローカーの URL
デフォルト:
localhost
- zkNodes
Zookeeper URL
デフォルト:
localhost
- deserializationExceptionHandler
デシリアライズエラーハンドラー型。このハンドラーはバインダーレベルで適用されるため、アプリケーション内のすべての入力バインディングに対して適用されます。コンシューマーの拘束力のあるレベルで、よりきめ細かい方法でそれを制御する方法があります。可能な値は -
logAndContinue
、logAndFail
、skipAndContinue
またはsendToDlq
です。デフォルト:
logAndFail
- applicationId
Kafka Streams アプリケーションの application.id をバインダーレベルでグローバルに設定する便利な方法。アプリケーションに複数の機能が含まれている場合、アプリケーション ID は別の方法で設定する必要があります。アプリケーション ID の設定について詳しく説明されている上記を参照してください。
デフォルト: アプリケーションは静的アプリケーション ID を生成します。詳細については、アプリケーション ID のセクションを参照してください。
- stateStoreRetry.maxAttempts
Max は、状態ストアへの接続を試みます。
デフォルト: 1
- stateStoreRetry.backoffPeriod
再試行時に状態ストアに接続しようとしたときのバックオフ期間。
デフォルト: 1000 ミリ秒
- consumerProperties
バインダーレベルでの任意のコンシューマー特性。
- producerProperties
バインダーレベルでの任意のプロデューサープロパティ。
- includeStoppedProcessorsForHealthCheck
プロセッサーのバインディングがアクチュエーターを介して停止されると、このプロセッサーはデフォルトではヘルスチェックに参加しません。このプロパティを
true
に設定すると、バインディングアクチュエーターエンドポイントを介して現在停止しているプロセッサーを含むすべてのプロセッサーのヘルスチェックが有効になります。デフォルト: false
Kafka ストリームプロデューサーのプロパティ
次のプロパティは Kafka Streams プロデューサーでのみ使用可能であり、接頭辞 spring.cloud.stream.kafka.streams.bindings.<binding name>.producer.
を付ける必要があります。便宜上、複数の出力バインディングがあり、それらすべてに共通の値が必要な場合は、接頭辞 spring.cloud.stream.kafka.streams.default.producer.
を使用して構成できます。
- keySerde
使用するキー serde
デフォルト: メッセージの逆直列化 / 直列化に関する上記の説明を参照してください
- valueSerde
使用する値 serde
デフォルト: メッセージの逆直列化 / 直列化に関する上記の説明を参照してください
- useNativeEncoding
ネイティブエンコーディングを有効 / 無効にするフラグ
デフォルト:
true
.- streamPartitionerBeanName
コンシューマーで使用されるカスタム送信パーティショナー Bean 名。アプリケーションは、カスタム
StreamPartitioner
を Spring Bean として提供でき、この Bean の名前を、デフォルトの名前の代わりに使用するためにプロデューサーに提供できます。デフォルト: 送信パーティションのサポートについては、上記の説明を参照してください。
- producedAs
プロセッサーが生成するシンクコンポーネントのカスタム名。
デフォルト:
none
(Kafka ストリームによって生成されます)
Kafka はコンシューマーの特性をストリーミングします
次のプロパティは Kafka Streams コンシューマーで使用でき、接頭辞 spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.
を付ける必要があります。便宜上、複数の入力バインディングがあり、それらすべてに共通の値が必要な場合は、接頭辞 spring.cloud.stream.kafka.streams.default.consumer.
を使用して構成できます。
- applicationId
入力バインディングごとに application.id を設定します。
デフォルト: 上記を参照。
- keySerde
使用するキー serde
デフォルト: メッセージの逆直列化 / 直列化に関する上記の説明を参照してください
- valueSerde
使用する値 serde
デフォルト: メッセージの逆直列化 / 直列化に関する上記の説明を参照してください
- materializedAs
受信 KTable 型を使用するときにマテリアライズする状態ストア
デフォルト:
none
.- useNativeDecoding
ネイティブデコードを有効 / 無効にするフラグ
デフォルト:
true
.- dlqName
DLQ トピック名。
デフォルト: エラー処理と DLQ の説明については、上記を参照してください。
- startOffset
消費するコミットされたオフセットがない場合に開始するオフセット。これは主に、コンシューマーがトピックから初めて消費するときに使用されます。Kafka Streams はデフォルト戦略として
earliest
を使用し、バインダーは同じデフォルトを使用します。これは、このプロパティを使用してlatest
にオーバーライドできます。デフォルト:
earliest
.
メモ: コンシューマーで resetOffsets
を使用しても、Kafka Streams バインダーには影響しません。メッセージチャネルベースのバインダーとは異なり、Kafka Streams バインダーはオンデマンドで開始または終了しようとはしません。
- deserializationExceptionHandler
デシリアライズエラーハンドラー型。このハンドラーは、前述のバインダーレベルのプロパティとは対照的に、コンシューマーバインディングごとに適用されます。可能な値は -
logAndContinue
、logAndFail
、skipAndContinue
またはsendToDlq
です。デフォルト:
logAndFail
- timestampExtractorBeanName
コンシューマーで使用される特定のタイムスタンプ抽出機能 Bean 名。アプリケーションは
TimestampExtractor
を Spring Bean として提供でき、この Bean の名前は、デフォルトの名前の代わりに使用するためにコンシューマーに提供できます。デフォルト: タイムスタンプエクストラクターに関する上記の説明を参照してください。
- eventTypes
このバインディングでサポートされているイベント型のコンマ区切りリスト。
デフォルト:
none
- eventTypeHeaderKey
このバインディングを介した各受信レコードのイベント型ヘッダーキー。
デフォルト:
event_type
- consumedAs
プロセッサーが消費しているソースコンポーネントのカスタム名。
Deafult:
none
(Kafka ストリームによって生成されます)
並行性に関する特記事項
Kafka Streams では、num.stream.threads
プロパティを使用して、プロセッサーが作成できるスレッドの数を制御できます。これは、上記のバインダー、関数、プロデューサー、コンシューマーレベルで説明したさまざまな configuration
オプションを使用して行うことができます。この目的のためにコア Spring Cloud Stream が提供する concurrency
プロパティを使用することもできます。これを使用する場合は、コンシューマーで使用する必要があります。複数の入力バインディングがある場合は、これを最初の入力バインディングに設定します。たとえば、spring.cloud.stream.bindings.process-in-0.consumer.concurrency
を設定すると、バインダーによって num.stream.threads
として変換されます。複数のプロセッサーがあり、1 つのプロセッサーがバインディングレベルの同時実行を定義し、他のプロセッサーは定義しない場合、バインディングレベルの同時実行のないプロセッサーは、デフォルトで spring.cloud.stream.kafka.streams.binder.configuration.num.stream.threads
で指定されたバインダー全体のプロパティに戻ります。このバインダー構成が使用できない場合、アプリケーションは Kafka Streams によって設定されたデフォルトを使用します。