構成オプション
このセクションには、Apache Kafka バインダーで使用される構成オプションが含まれています。
バインダーに関連する一般的な構成オプションとプロパティについては、コアドキュメントのバインディングプロパティを参照してください。
Kafka バインダーのプロパティ
- spring.cloud.stream.kafka.binder.brokers
Kafka バインダーが接続するブローカーのリスト。
デフォルト:
localhost
.- spring.cloud.stream.kafka.binder.defaultBrokerPort
brokers
は、ポート情報の有無にかかわらず指定されたホストを許可します(たとえば、host1,host2:port2
)。これにより、ブローカーリストにポートが構成されていない場合のデフォルトポートが設定されます。デフォルト:
9092
.- spring.cloud.stream.kafka.binder.configuration
バインダーによって作成されたすべてのクライアントに渡されるクライアントプロパティ(プロデューサーとコンシューマーの両方)のキー / 値マップ。これらのプロパティはプロデューサーとコンシューマーの両方で使用されるため、使用はセキュリティ設定などの共通のプロパティに制限する必要があります。この構成を通じて提供される不明な Kafka プロデューサーまたはコンシューマープロパティは除外され、伝播は許可されません。ここでのプロパティは、Boot で設定されたすべてのプロパティに優先します。
デフォルト: 空の地図。
- spring.cloud.stream.kafka.binder.consumerProperties
任意の Kafka クライアントコンシューマープロパティのキー / 値マップ。既知の Kafka コンシューマープロパティをサポートすることに加えて、未知のコンシューマープロパティもここで許可されます。ここでのプロパティは、boot および上記の
configuration
プロパティで設定されたプロパティに優先します。デフォルト: 空の地図。
- spring.cloud.stream.kafka.binder.headers
バインダーによって転送されるカスタムヘッダーのリスト。
kafka-clients
バージョンが 0.11.0.0 未満の古いアプリケーション (⇐ 1.3.x) と通信する場合にのみ必要です。新しいバージョンでは、ヘッダーがネイティブにサポートされます。デフォルト: 空。
- spring.cloud.stream.kafka.binder.healthTimeout
パーティション情報の取得を待機する時間(秒単位)。このタイマーが期限切れになると、ヘルスはダウンとして報告します。
デフォルト: 60.
- spring.cloud.stream.kafka.binder.requiredAcks
ブローカーに必要な ack の数。プロデューサー
acks
プロパティについては、Kafka のドキュメントを参照してください。デフォルト:
1
.- spring.cloud.stream.kafka.binder.minPartitionCount
autoCreateTopics
またはautoAddPartitions
が設定されている場合にのみ有効です。バインダーがデータを生成または消費するトピックに構成するパーティションのグローバル最小数。これは、プロデューサーのpartitionCount
設定、またはプロデューサーのinstanceCount * concurrency
設定の値(どちらか大きい場合)に置き換えることができます。デフォルト:
1
.- spring.cloud.stream.kafka.binder.producerProperties
任意の Kafka クライアントプロデューサープロパティのキー / 値マップ。既知の Kafka プロデューサープロパティをサポートすることに加えて、未知のプロデューサープロパティもここで許可されます。ここでのプロパティは、boot および上記の
configuration
プロパティで設定されたプロパティに優先します。デフォルト: 空の地図。
- spring.cloud.stream.kafka.binder.replicationFactor
autoCreateTopics
がアクティブな場合、自動作成されたトピックの複製係数。各バインディングでオーバーライドできます。2.4 より前のバージョンの Kafka ブローカーを使用している場合、この値は少なくとも 1
に設定する必要があります。バージョン 3.0.8 以降、バインダーは-1
をデフォルト値として使用します。これは、ブローカーの "default.replication.factor" プロパティを使用してレプリカの数を決定することを示しています。Kafka ブローカーの管理者に問い合わせて、最小のレプリケーション係数を必要とするポリシーが設定されているかどうかを確認します。その場合、通常、default.replication.factor
はその値と一致するため、より大きなレプリケーション係数が必要な場合を除き、最小値の-1
を使用する必要があります。デフォルト:
-1
.- spring.cloud.stream.kafka.binder.autoCreateTopics
true
に設定すると、バインダーは新しいトピックを自動的に作成します。false
に設定されている場合、バインダーはすでに構成されているトピックに依存します。後者の場合、トピックが存在しないと、バインダーは開始できません。この設定は、ブローカーの auto.create.topics.enable
設定とは独立しており、影響を与えません。サーバーがトピックを自動作成するように設定されている場合、デフォルトのブローカー設定でメタデータ取得リクエストの一部として作成される場合があります。デフォルト:
true
.- spring.cloud.stream.kafka.binder.autoAddPartitions
true
に設定すると、バインダーは必要に応じて新しいパーティションを作成します。false
に設定されている場合、バインダーは、すでに構成されているトピックのパーティションサイズに依存します。ターゲットトピックのパーティション数が期待値よりも小さい場合、バインダーは開始できません。デフォルト:
false
.- spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
バインダーでのトランザクションを有効にします。Kafka ドキュメントの
transaction.id
およびspring-kafka
ドキュメントのトランザクションを参照してください。トランザクションが有効になっている場合、個々のproducer
プロパティは無視され、すべてのプロデューサーがspring.cloud.stream.kafka.binder.transaction.producer.*
プロパティを使用します。デフォルトの
null
(トランザクションなし)- spring.cloud.stream.kafka.binder.transaction.producer.*
トランザクションバインダー内のプロデューサーのグローバルプロデューサープロパティ。
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
と Kafka プロデューサーのプロパティ、およびすべてのバインダーでサポートされている一般的なプロデューサーのプロパティを参照してください。デフォルト: 個々のプロデューサーのプロパティを参照してください。
- spring.cloud.stream.kafka.binder.headerMapperBeanName
spring-messaging
ヘッダーと Kafka ヘッダーのマッピングに使用されるKafkaHeaderMapper
の Bean 名。たとえば、ヘッダーに JSON 逆直列化を使用するBinderHeaderMapper
Bean の信頼できるパッケージをカスタマイズする場合にこれを使用します。このカスタムBinderHeaderMapper
Bean がこのプロパティを使用してバインダーで使用可能になっていない場合、バインダーは、バインダーによって作成されたデフォルトのBinderHeaderMapper
にフォールバックする前に、型BinderHeaderMapper
のkafkaBinderHeaderMapper
という名前のヘッダーマッパー Bean を探します。デフォルト: なし。
- spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader
データを受信しているコンシューマーに関係なく、トピック上のパーティションがリーダーなしで見つかった場合、バインダーヘルスを
down
として設定するフラグ。デフォルト:
true
.- spring.cloud.stream.kafka.binder.certificateStoreDirectory
トラストストアまたはキーストア証明書の場所が非ローカルファイルシステムリソース (org.springframework.core.io.Resource でサポートされているリソース (CLASSPATH、HTTP など)) として指定されている場合、バインダーはリソースをパス (org.springframework.core.io.Resource に変換可能) から上の場所にコピーします。ファイルシステム。これは、ブローカーレベルの証明書 (
ssl.truststore.location
およびssl.keystore.location
) とスキーマレジストリ向けの証明書 (schema.registry.ssl.truststore.location
およびschema.registry.ssl.keystore.location
) の両方に当てはまります。トラストストアとキーストアの場所のパスはspring.cloud.stream.kafka.binder.configuration…
で指定する必要があることに注意してください。例:spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location
、spring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location
など。ファイルは、このプロパティの値として指定された場所にコピーされます。この場所は、アプリケーションを実行しているプロセスによって書き込み可能なファイルシステム上の既存のディレクトリである必要があります。この値が設定されておらず、証明書ファイルが非ローカルファイルシステムリソースである場合、証明書ファイルはSystem.getProperty("java.io.tmpdir")
によって返されるシステムの一時ディレクトリにコピーされます。これは、この値が存在するが、ディレクトリがファイルシステム上に見つからないか、書き込み可能でない場合にも当てはまります。デフォルト: なし。
- spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled
true に設定すると、各コンシューマートピックのオフセットラグメトリクスは、メトリクスにアクセスするたびに計算されます。false に設定すると、定期的に計算されるオフセットラグのみが使用されます。
デフォルト: true
- spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval
各コンシューマートピックのオフセットラグが計算される間隔。この値は、
metrics.defaultOffsetLagMetricsEnabled
が無効になっているか、その計算に時間がかかりすぎる場合に使用されます。デフォルト: 60 秒
- spring.cloud.stream.kafka.binder.enableObservation
このバインダー内のすべてのバインディングで Micrometer 監視レジストリを有効にします。
デフォルト: false
- spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup
KafkaHealthIndicator
メタデータコンシューマーgroup.id
。このコンシューマーは、使用中のトピックに関するメタデータを照会するためにHealthIndicator
によって使用されます。デフォルト: なし。
Kafka コンシューマープロパティ
次のプロパティは Kafka コンシューマーでのみ使用可能であり、接頭辞 spring.cloud.stream.kafka.bindings.<channelName>.consumer.
を付ける必要があります。
繰り返しを避けるために、Spring Cloud Stream は、spring.cloud.stream.kafka.default.consumer.<property>=<value> の形式ですべてのチャネルの値を設定することをサポートしています。 |
- admin.configuration
バージョン 2.1.1 以降、このプロパティは
topic.properties
を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。- admin.replicas- 割り当て
バージョン 2.1.1 以降、このプロパティは
topic.replicas-assignment
を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。- admin.replication-factor
バージョン 2.1.1 以降、このプロパティは
topic.replication-factor
を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。- autoRebalanceEnabled
true
の場合、トピックパーティションはコンシューマーグループのメンバー間で自動的に再調整されます。false
の場合、各コンシューマーには、spring.cloud.stream.instanceCount
およびspring.cloud.stream.instanceIndex
に基づいてパーティションの固定セットが割り当てられます。これには、spring.cloud.stream.instanceCount
プロパティとspring.cloud.stream.instanceIndex
プロパティの両方を、起動された各インスタンスで適切に設定する必要があります。この場合、spring.cloud.stream.instanceCount
プロパティの値は通常 1 より大きくなければなりません。デフォルト:
true
.- ackEachRecord
autoCommitOffset
がtrue
の場合、この設定は各レコードが処理された後にオフセットをコミットするかどうかを決定します。デフォルトでは、オフセットはconsumer.poll()
によって返されたレコードのバッチ内のすべてのレコードが処理された後にコミットされます。ポーリングによって返されるレコードの数は、コンシューマーconfiguration
プロパティを通じて設定されるmax.poll.records
Kafka プロパティで制御できます。これをtrue
に設定するとパフォーマンスが低下する可能性がありますが、そうすることで障害発生時にレコードが再配信される可能性が低くなります。また、バインダーrequiredAcks
プロパティも参照してください。これもオフセットのコミットのパフォーマンスに影響します。このプロパティは 3.1 以降は非推奨となり、代わりにackMode
を使用してください。ackMode
が設定されておらず、バッチモードが有効になっていない場合は、RECORD
ackMode が使用されます。デフォルト:
false
.- autoCommitOffset
バージョン 3.1 以降、このプロパティは非推奨になりました。代替案の詳細については、
ackMode
を参照してください。メッセージが処理されたときにオフセットを自動コミットするかどうか。false
に設定されている場合、型org.springframework.kafka.support.Acknowledgment
ヘッダーのキーkafka_acknowledgment
を持つヘッダーが受信メッセージに存在します。アプリケーションは、メッセージの確認にこのヘッダーを使用できます。詳細については、例のセクションを参照してください。このプロパティがfalse
に設定されている場合、Kafka バインダーは ack モードをorg.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL
に設定し、アプリケーションはレコードの確認を担当します。ackEachRecord
も参照してください。デフォルト:
true
.- ackMode
コンテナーの ack モードを指定します。これは、Spring Kafka で定義されている AckMode 列挙に基づいています。
ackEachRecord
プロパティがtrue
に設定されていて、コンシューマーがバッチモードでない場合、これはRECORD
の ack モードを使用します。それ以外の場合は、このプロパティを使用して提供された ack モードを使用します。- autoCommitOnError
ポーリング可能なコンシューマーでは、
true
に設定されている場合、エラー時に常に自動コミットします。設定されていない場合(デフォルト)または false の場合、ポーリング可能なコンシューマーで自動コミットされません。このプロパティは、ポーリング可能なコンシューマーにのみ適用されることに注意してください。デフォルト: 未設定。
- resetOffsets
コンシューマーのオフセットを startOffset によって提供された値にリセットするかどうか。
KafkaBindingRebalanceListener
が提供されている場合は false にする必要があります。再バランスリスナーを参照してください。このプロパティの詳細については、reset-offsets を参照してください。デフォルト:
false
.- startOffset
新しいグループの開始オフセット。許可される値:
earliest
およびlatest
。コンシューマーグループがコンシューマー 'binding' に対して明示的に設定されている場合 (spring.cloud.stream.bindings.<channelName>.group
経由)、"startOffset" はearliest
に設定されます。それ以外の場合は、anonymous
コンシューマーグループに対してlatest
に設定されます。このプロパティの詳細については、reset-offsets を参照してください。デフォルト: null(
earliest
と同等)。- enableDlq
true に設定すると、コンシューマーの DLQ 動作が有効になります。デフォルトでは、エラーが発生したメッセージは
error.<destination>.<group>
という名前のトピックに転送されます。DLQ トピック名は、dlqName
プロパティを設定するか、型DlqDestinationResolver
の@Bean
を定義することによって構成できます。これは、エラーの数が比較的少なく、元のトピック全体を再生するのが面倒すぎる場合に、より一般的な Kafka 再生シナリオの代替オプションを提供します。詳細については、「kafka dlq 処理」を参照してください。バージョン 2.0 以降、DLQ トピックに送信されるメッセージは、ヘッダーx-original-topic
、x-exception-message
、x-exception-stacktrace
asbyte[]
で拡張されています。デフォルトでは、失敗したレコードは、DLQ トピック内の元のレコードと同じパーティション番号に送信されます。その動作を変更する方法については、dlq パーティションの選択を参照してください。destinationIsPattern
がtrue
の場合は許可されません。デフォルト:
false
.- dlqPartitions
enableDlq
が true で、このプロパティが設定されていない場合、プライマリトピックと同じ数のパーティションを持つデッドレタートピックが作成されます。通常、配信不能レコードは、配信不能トピック内の元のレコードと同じパーティションに送信されます。この動作は変更できます。「dlq パーティションの選択」を参照してください。このプロパティが1
に設定されており、DqlPartitionFunction
Bean が存在しない場合、すべての配信不能レコードがパーティション0
に書き込まれます。このプロパティが1
より大きい場合、MUST はDlqPartitionFunction
Bean を提供します。実際のパーティション数は、バインダーのminPartitionCount
プロパティの影響を受けることに注意してください。デフォルト:
none
- 構成
一般的な Kafka コンシューマープロパティを含むキーと値のペアを使用してマップします。Kafka コンシューマープロパティに加えて、他の構成プロパティをここに渡すことができます。たとえば、
spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar
などのアプリケーションに必要ないくつかのプロパティ。ここではbootstrap.servers
プロパティを設定できません。複数のクラスターに接続する必要がある場合は、マルチバインダーサポートを使用してください。デフォルト: 空の地図。
- dlqName
エラーメッセージを受信する DLQ トピックの名前。
デフォルト: null(指定されていない場合、エラーが発生するメッセージは
error.<destination>.<group>
という名前のトピックに転送されます)。- dlqProducerProperties
これを使用して、DLQ 固有のプロデューサープロパティを設定できます。Kafka プロデューサープロパティを通じて使用できるすべてのプロパティは、このプロパティを通じて設定できます。コンシューマーでネイティブデコードが有効になっている場合 (つまり、useNativeDecoding: true)、アプリケーションは DLQ に対応するキー / 値シリアライザーを提供する必要があります。これは、
dlqProducerProperties.configuration.key.serializer
およびdlqProducerProperties.configuration.value.serializer
の形式で提供する必要があります。デフォルト: デフォルトの Kafka プロデューサープロパティ。
- standardHeaders
受信チャネルアダプターによって入力される標準ヘッダーを示します。許可される値:
none
、id
、timestamp
、またはboth
。ネイティブの逆直列化を使用していて、メッセージを受信する最初のコンポーネントにid
(JDBC メッセージストアを使用するように構成されたアグリゲーターなど)が必要な場合に便利です。デフォルト:
none
- converterBeanName
RecordMessageConverter
を実装する Bean の名前。デフォルトのMessagingMessageConverter
を置き換えるために、受信チャネルアダプターで使用されます。デフォルト:
null
- idleEventInterval
最近メッセージを受信していないことを示すイベント間の間隔 (ミリ秒単位)。これらのイベントを受信するには、
ApplicationListener<ListenerContainerIdleEvent>
を使用します。使用例については、「一時停止と再開」を参照してください。デフォルト:
30000
- destinationIsPattern
true の場合、宛先は、ブローカーによってトピック名を照合するために使用される正規表現
Pattern
として扱われます。true の場合、トピックはプロビジョニングされず、enableDlq
は許可されません。これは、バインダーがプロビジョニングフェーズ中にトピック名を認識しないためです。パターンに一致する新しいトピックを検出するのにかかる時間は、コンシューマープロパティmetadata.max.age.ms
によって制御されることに注意してください。これは、(執筆時点では)デフォルトで 300,000ms(5 分)です。これは、上記のconfiguration
プロパティを使用して構成できます。デフォルト:
false
- topic.properties
新しいトピックをプロビジョニングするときに使用される Kafka トピックプロパティの
Map
(たとえば、spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0
)デフォルト: なし。
- topic.replicas- 割り当て
レプリカ割り当ての Map <Integer、List <Integer >>。キーはパーティション、値は割り当てです。新しいトピックをプロビジョニングするときに使用されます。
kafka-clients
jar のNewTopic
Javadoc を参照してください。デフォルト: なし。
- topic.replication-factor
トピックをプロビジョニングするときに使用するレプリケーション係数。バインダー全体の設定を上書きします。
replicas-assignments
が存在する場合は無視されます。デフォルト: none(バインダー全体のデフォルトである -1 が使用されます)。
- pollTimeout
ポーリング可能なコンシューマーでのポーリングに使用されるタイムアウト。
デフォルト: 5 秒。
- transactionManager
このバインディングのバインダーのトランザクションマネージャーをオーバーライドするために使用される
KafkaAwareTransactionManager
の Bean 名。通常、ChainedKafkaTransactionManaager
を使用して、別のトランザクションを Kafka トランザクションと同期する場合に必要です。レコードの消費と生成を 1 回だけ実行するには、コンシューマーとプロデューサーのバインディングをすべて同じトランザクションマネージャーで構成する必要があります。デフォルト: なし。
- txCommitRecovered
トランザクションバインダーを使用する場合、復元されたレコードのオフセット(たとえば、再試行が終了し、レコードがデッドレタートピックに送信された場合)は、デフォルトで新しいトランザクションを介してコミットされます。このプロパティを
false
に設定すると、リカバリされたレコードのオフセットのコミットが抑制されます。デフォルト: true。
- commonErrorHandlerBeanName
CommonErrorHandler
Bean 名前。コンシューマーバインディングごとに使用します。存在する場合、このユーザーが提供したCommonErrorHandler
は、バインダーによって定義された他のエラーハンドラーよりも優先されます。これは、アプリケーションがListenerContainerCustomizer
を使用したくない場合に、エラーハンドラーを表現するための便利な方法であり、宛先とグループの組み合わせをチェックしてエラーハンドラーを設定します。デフォルト: なし。
Kafka プロデューサーのプロパティ
次のプロパティは Kafka プロデューサーのみが使用でき、接頭辞 spring.cloud.stream.kafka.bindings.<channelName>.producer.
を付ける必要があります。
繰り返しを避けるために、Spring Cloud Stream は、spring.cloud.stream.kafka.default.producer.<property>=<value> の形式ですべてのチャネルの値を設定することをサポートしています。 |
- admin.configuration
バージョン 2.1.1 以降、このプロパティは
topic.properties
を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。- admin.replicas- 割り当て
バージョン 2.1.1 以降、このプロパティは
topic.replicas-assignment
を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。- admin.replication-factor
バージョン 2.1.1 以降、このプロパティは
topic.replication-factor
を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。- bufferSize
Kafka プロデューサーが送信する前にバッチ処理を試みるデータ量の上限(バイト単位)。
デフォルト:
16384
.- 同期
プロデューサーが同期しているかどうか。
デフォルト:
false
.- sendTimeoutExpression
同期公開が有効になっている場合に ack を待機する時間を評価するために使用される送信メッセージに対して評価される SpEL 式(たとえば、
headers['mySendTimeout']
)。タイムアウトの値はミリ秒単位です。3.0 より前のバージョンでは、ネイティブエンコーディングが使用されていない限り、ペイロードを使用できませんでした。この式が評価されるまでに、ペイロードはすでにbyte[]
の形式であったためです。これで、ペイロードが変換される前に式が評価されます。デフォルト:
none
.- batchTimeout
メッセージを送信する前に、プロデューサーが同じバッチにさらに多くのメッセージが蓄積されるのを待機する時間。(通常、プロデューサーはまったく待機せず、前の送信の進行中に蓄積されたすべてのメッセージを送信するだけです)ゼロ以外の値は、待ち時間を犠牲にしてスループットを向上させる可能性があります。
デフォルト:
0
.- messageKeyExpression
生成された Kafka メッセージのキーを設定するために使用される発信メッセージ(たとえば、
headers['myKey']
)に対して評価された SpEL 式。3.0 より前のバージョンでは、ネイティブエンコーディングが使用されていない限り、ペイロードを使用できませんでした。この式が評価されるまでに、ペイロードはすでにbyte[]
の形式であったためです。これで、ペイロードが変換される前に式が評価されます。通常のプロセッサー(Function<String, String>
またはFunction<Message<?>, Message<?>
)の場合、生成されたキーがトピックからの受信キーと同じである必要がある場合、このプロパティは次のように設定できます。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey']
リアクティブ機能について覚えておくべき重要な警告があります。その場合、受信メッセージから発信メッセージにヘッダーを手動でコピーするのはアプリケーションの責任です。ヘッダーを設定できます。例:myKey
を使用し、上記のようにheaders['myKey']
を使用するか、便宜上、KafkaHeaders.MESSAGE_KEY
ヘッダーを設定するだけで、このプロパティを設定する必要はまったくありません。デフォルト:
none
.- headerPatterns
ProducerRecord
の KafkaHeaders
にマップされる Spring メッセージングヘッダーに一致する単純なパターンのコンマ区切りリスト。パターンは、ワイルドカード文字(アスタリスク)で開始または終了できます。パターンは、接頭辞!
を付けることで無効にできます。一致は最初の一致(正または負)の後に停止します。たとえば、!ask,as*
はash
を渡しますが、ask
は渡しません。id
とtimestamp
はマップされません。デフォルト:
*
(すべてのヘッダー -id
とtimestamp
を除く)- 構成
一般的な Kafka プロデューサープロパティを含むキー / 値ペアを使用してマップします。ここでは
bootstrap.servers
プロパティを設定できません。複数のクラスターに接続する必要がある場合は、マルチバインダーサポートを使用してください。デフォルト: 空の地図。
- topic.properties
新しいトピックをプロビジョニングするときに使用される Kafka トピックプロパティの
Map
(たとえば、spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0
)- topic.replicas- 割り当て
レプリカ割り当ての Map <Integer、List <Integer >>。キーはパーティション、値は割り当てです。新しいトピックをプロビジョニングするときに使用されます。
kafka-clients
jar のNewTopic
Javadoc を参照してください。デフォルト: なし。
- topic.replication-factor
トピックをプロビジョニングするときに使用するレプリケーション係数。バインダー全体の設定を上書きします。
replicas-assignments
が存在する場合は無視されます。デフォルト: none(バインダー全体のデフォルトである -1 が使用されます)。
- useTopicHeader
true
に設定すると、デフォルトのバインディング宛先(トピック名)が送信メッセージのKafkaHeaders.TOPIC
メッセージヘッダーの値で上書きされます。ヘッダーが存在しない場合は、デフォルトのバインディング宛先が使用されます。デフォルト:
false
.- recordMetadataChannel
正常な送信結果の送信先となる
MessageChannel
の Bean 名。Bean はアプリケーションコンテキストに存在する必要があります。チャネルに送信されるメッセージは、追加のヘッダーKafkaHeaders.RECORD_METADATA
を含む送信済みメッセージ(変換後)です。ヘッダーには、Kafka クライアントによって提供されるRecordMetadata
オブジェクトが含まれています。これには、トピックでレコードが書き込まれたパーティションとオフセットが含まれます。ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)
失敗した送信は、プロデューサーエラーチャネルに送信されます(構成されている場合)。Kafka エラーチャネルを参照してください。
デフォルト: null。
Kafka バインダーは、プロデューサーの partitionCount 設定をヒントとして使用して、指定されたパーティション数でトピックを作成します(minPartitionCount と組み合わせて、2 つのうち最大のものが使用される値です)。大きい方の値が使用されるため、バインダー用の minPartitionCount とアプリケーション用の partitionCount の両方を構成する場合は注意が必要です。パーティション数が少ないトピックがすでに存在し、autoAddPartitions が無効(デフォルト)の場合、バインダーは開始に失敗します。パーティション数が少ないトピックがすでに存在し、autoAddPartitions が有効になっている場合は、新しいパーティションが追加されます。最大数(minPartitionCount または partitionCount )よりも多くのパーティションを持つトピックがすでに存在する場合は、既存のパーティション数が使用されます。 |
- 圧縮
compression.type
プロデューサープロパティを設定します。サポートされている値はnone
、gzip
、snappy
、lz4
、zstd
です。Spring for Apache Kafka ドキュメントで説明したように、kafka-clients
jar を 2.1.0(またはそれ以降)にオーバーライドし、zstd
圧縮を使用する場合は、spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd
を使用します。デフォルト:
none
.- transactionManager
このバインディングのバインダーのトランザクションマネージャーをオーバーライドするために使用される
KafkaAwareTransactionManager
の Bean 名。通常、ChainedKafkaTransactionManaager
を使用して、別のトランザクションを Kafka トランザクションと同期する場合に必要です。レコードの消費と生成を 1 回だけ実行するには、コンシューマーとプロデューサーのバインディングをすべて同じトランザクションマネージャーで構成する必要があります。デフォルト: なし。
- closeTimeout
プロデューサーを閉じるときに待機する秒数のタイムアウト。
デフォルト:
30
- allowNonTransactional
通常、トランザクションバインダーに関連付けられているすべての出力バインディングは、まだ処理されていない場合は、新しいトランザクションで公開されます。このプロパティを使用すると、その動作をオーバーライドできます。true に設定すると、この出力バインディングに公開されたレコードは、すでに処理中でない限り、トランザクションで実行されません。
デフォルト:
false