構成オプション

このセクションには、Apache Kafka バインダーで使用される構成オプションが含まれています。

バインダーに関連する一般的な構成オプションとプロパティについては、コアドキュメントのバインディングプロパティを参照してください。

Kafka バインダーのプロパティ

spring.cloud.stream.kafka.binder.brokers

Kafka バインダーが接続するブローカーのリスト。

デフォルト: localhost.

spring.cloud.stream.kafka.binder.defaultBrokerPort

brokers は、ポート情報の有無にかかわらず指定されたホストを許可します(たとえば、host1,host2:port2)。これにより、ブローカーリストにポートが構成されていない場合のデフォルトポートが設定されます。

デフォルト: 9092.

spring.cloud.stream.kafka.binder.configuration

バインダーによって作成されたすべてのクライアントに渡されるクライアントプロパティ(プロデューサーとコンシューマーの両方)のキー / 値マップ。これらのプロパティはプロデューサーとコンシューマーの両方で使用されるため、使用はセキュリティ設定などの共通のプロパティに制限する必要があります。この構成を通じて提供される不明な Kafka プロデューサーまたはコンシューマープロパティは除外され、伝播は許可されません。ここでのプロパティは、Boot で設定されたすべてのプロパティに優先します。

デフォルト: 空の地図。

spring.cloud.stream.kafka.binder.consumerProperties

任意の Kafka クライアントコンシューマープロパティのキー / 値マップ。既知の Kafka コンシューマープロパティをサポートすることに加えて、未知のコンシューマープロパティもここで許可されます。ここでのプロパティは、boot および上記の configuration プロパティで設定されたプロパティに優先します。

デフォルト: 空の地図。

spring.cloud.stream.kafka.binder.headers

バインダーによって転送されるカスタムヘッダーのリスト。kafka-clients バージョンが 0.11.0.0 未満の古いアプリケーション (⇐ 1.3.x) と通信する場合にのみ必要です。新しいバージョンでは、ヘッダーがネイティブにサポートされます。

デフォルト: 空。

spring.cloud.stream.kafka.binder.healthTimeout

パーティション情報の取得を待機する時間(秒単位)。このタイマーが期限切れになると、ヘルスはダウンとして報告します。

デフォルト: 60.

spring.cloud.stream.kafka.binder.requiredAcks

ブローカーに必要な ack の数。プロデューサー acks プロパティについては、Kafka のドキュメントを参照してください。

デフォルト: 1.

spring.cloud.stream.kafka.binder.minPartitionCount

autoCreateTopics または autoAddPartitions が設定されている場合にのみ有効です。バインダーがデータを生成または消費するトピックに構成するパーティションのグローバル最小数。これは、プロデューサーの partitionCount 設定、またはプロデューサーの instanceCount * concurrency 設定の値(どちらか大きい場合)に置き換えることができます。

デフォルト: 1.

spring.cloud.stream.kafka.binder.producerProperties

任意の Kafka クライアントプロデューサープロパティのキー / 値マップ。既知の Kafka プロデューサープロパティをサポートすることに加えて、未知のプロデューサープロパティもここで許可されます。ここでのプロパティは、boot および上記の configuration プロパティで設定されたプロパティに優先します。

デフォルト: 空の地図。

spring.cloud.stream.kafka.binder.replicationFactor

autoCreateTopics がアクティブな場合、自動作成されたトピックの複製係数。各バインディングでオーバーライドできます。

2.4 より前のバージョンの Kafka ブローカーを使用している場合、この値は少なくとも 1 に設定する必要があります。バージョン 3.0.8 以降、バインダーは -1 をデフォルト値として使用します。これは、ブローカーの "default.replication.factor" プロパティを使用してレプリカの数を決定することを示しています。Kafka ブローカーの管理者に問い合わせて、最小のレプリケーション係数を必要とするポリシーが設定されているかどうかを確認します。その場合、通常、default.replication.factor はその値と一致するため、より大きなレプリケーション係数が必要な場合を除き、最小値の -1 を使用する必要があります。

デフォルト: -1.

spring.cloud.stream.kafka.binder.autoCreateTopics

true に設定すると、バインダーは新しいトピックを自動的に作成します。false に設定されている場合、バインダーはすでに構成されているトピックに依存します。後者の場合、トピックが存在しないと、バインダーは開始できません。

この設定は、ブローカーの auto.create.topics.enable 設定とは独立しており、影響を与えません。サーバーがトピックを自動作成するように設定されている場合、デフォルトのブローカー設定でメタデータ取得リクエストの一部として作成される場合があります。

デフォルト: true.

spring.cloud.stream.kafka.binder.autoAddPartitions

true に設定すると、バインダーは必要に応じて新しいパーティションを作成します。false に設定されている場合、バインダーは、すでに構成されているトピックのパーティションサイズに依存します。ターゲットトピックのパーティション数が期待値よりも小さい場合、バインダーは開始できません。

デフォルト: false.

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix

バインダーでのトランザクションを有効にします。Kafka ドキュメントの transaction.id および spring-kafka ドキュメントのトランザクションを参照してください。トランザクションが有効になっている場合、個々の producer プロパティは無視され、すべてのプロデューサーが spring.cloud.stream.kafka.binder.transaction.producer.* プロパティを使用します。

デフォルトの null (トランザクションなし)

spring.cloud.stream.kafka.binder.transaction.producer.*

トランザクションバインダー内のプロデューサーのグローバルプロデューサープロパティ。spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix と Kafka プロデューサーのプロパティ、およびすべてのバインダーでサポートされている一般的なプロデューサーのプロパティを参照してください。

デフォルト: 個々のプロデューサーのプロパティを参照してください。

spring.cloud.stream.kafka.binder.headerMapperBeanName

spring-messaging ヘッダーと Kafka ヘッダーのマッピングに使用される KafkaHeaderMapper の Bean 名。たとえば、ヘッダーに JSON 逆直列化を使用する BinderHeaderMapper Bean の信頼できるパッケージをカスタマイズする場合にこれを使用します。このカスタム BinderHeaderMapper Bean がこのプロパティを使用してバインダーで使用可能になっていない場合、バインダーは、バインダーによって作成されたデフォルトの BinderHeaderMapper にフォールバックする前に、型 BinderHeaderMapper の kafkaBinderHeaderMapper という名前のヘッダーマッパー Bean を探します。

デフォルト: なし。

spring.cloud.stream.kafka.binder.considerDownWhenAnyPartitionHasNoLeader

データを受信しているコンシューマーに関係なく、トピック上のパーティションがリーダーなしで見つかった場合、バインダーヘルスを down として設定するフラグ。

デフォルト: true.

spring.cloud.stream.kafka.binder.certificateStoreDirectory

トラストストアまたはキーストア証明書の場所が非ローカルファイルシステムリソース (org.springframework.core.io.Resource でサポートされているリソース (CLASSPATH、HTTP など)) として指定されている場合、バインダーはリソースをパス (org.springframework.core.io.Resource に変換可能) から上の場所にコピーします。ファイルシステム。これは、ブローカーレベルの証明書 (ssl.truststore.location および ssl.keystore.location) とスキーマレジストリ向けの証明書 (schema.registry.ssl.truststore.location および schema.registry.ssl.keystore.location) の両方に当てはまります。トラストストアとキーストアの場所のパスは spring.cloud.stream.kafka.binder.configuration…​ で指定する必要があることに注意してください。例: spring.cloud.stream.kafka.binder.configuration.ssl.truststore.locationspring.cloud.stream.kafka.binder.configuration.schema.registry.ssl.truststore.location など。ファイルは、このプロパティの値として指定された場所にコピーされます。この場所は、アプリケーションを実行しているプロセスによって書き込み可能なファイルシステム上の既存のディレクトリである必要があります。この値が設定されておらず、証明書ファイルが非ローカルファイルシステムリソースである場合、証明書ファイルは System.getProperty("java.io.tmpdir") によって返されるシステムの一時ディレクトリにコピーされます。これは、この値が存在するが、ディレクトリがファイルシステム上に見つからないか、書き込み可能でない場合にも当てはまります。

デフォルト: なし。

spring.cloud.stream.kafka.binder.metrics.defaultOffsetLagMetricsEnabled

true に設定すると、各コンシューマートピックのオフセットラグメトリクスは、メトリクスにアクセスするたびに計算されます。false に設定すると、定期的に計算されるオフセットラグのみが使用されます。

デフォルト: true

spring.cloud.stream.kafka.binder.metrics.offsetLagMetricsInterval

各コンシューマートピックのオフセットラグが計算される間隔。この値は、metrics.defaultOffsetLagMetricsEnabled が無効になっているか、その計算に時間がかかりすぎる場合に使用されます。

デフォルト: 60 秒

spring.cloud.stream.kafka.binder.enableObservation

このバインダー内のすべてのバインディングで Micrometer 監視レジストリを有効にします。

デフォルト: false

spring.cloud.stream.kafka.binder.healthIndicatorConsumerGroup

KafkaHealthIndicator メタデータコンシューマー group.id。このコンシューマーは、使用中のトピックに関するメタデータを照会するために HealthIndicator によって使用されます。

デフォルト: なし。

Kafka コンシューマープロパティ

次のプロパティは Kafka コンシューマーでのみ使用可能であり、接頭辞 spring.cloud.stream.kafka.bindings.<channelName>.consumer. を付ける必要があります。

繰り返しを避けるために、Spring Cloud Stream は、spring.cloud.stream.kafka.default.consumer.<property>=<value> の形式ですべてのチャネルの値を設定することをサポートしています。
admin.configuration

バージョン 2.1.1 以降、このプロパティは topic.properties を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。

admin.replicas- 割り当て

バージョン 2.1.1 以降、このプロパティは topic.replicas-assignment を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。

admin.replication-factor

バージョン 2.1.1 以降、このプロパティは topic.replication-factor を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。

autoRebalanceEnabled

true の場合、トピックパーティションはコンシューマーグループのメンバー間で自動的に再調整されます。false の場合、各コンシューマーには、spring.cloud.stream.instanceCount および spring.cloud.stream.instanceIndex に基づいてパーティションの固定セットが割り当てられます。これには、spring.cloud.stream.instanceCount プロパティと spring.cloud.stream.instanceIndex プロパティの両方を、起動された各インスタンスで適切に設定する必要があります。この場合、spring.cloud.stream.instanceCount プロパティの値は通常 1 より大きくなければなりません。

デフォルト: true.

ackEachRecord

autoCommitOffset が true の場合、この設定は各レコードが処理された後にオフセットをコミットするかどうかを決定します。デフォルトでは、オフセットは consumer.poll() によって返されたレコードのバッチ内のすべてのレコードが処理された後にコミットされます。ポーリングによって返されるレコードの数は、コンシューマー configuration プロパティを通じて設定される max.poll.records Kafka プロパティで制御できます。これを true に設定するとパフォーマンスが低下する可能性がありますが、そうすることで障害発生時にレコードが再配信される可能性が低くなります。また、バインダー requiredAcks プロパティも参照してください。これもオフセットのコミットのパフォーマンスに影響します。このプロパティは 3.1 以降は非推奨となり、代わりに ackMode を使用してください。ackMode が設定されておらず、バッチモードが有効になっていない場合は、RECORD ackMode が使用されます。

デフォルト: false.

autoCommitOffset

バージョン 3.1 以降、このプロパティは非推奨になりました。代替案の詳細については、ackMode を参照してください。メッセージが処理されたときにオフセットを自動コミットするかどうか。false に設定されている場合、型 org.springframework.kafka.support.Acknowledgment ヘッダーのキー kafka_acknowledgment を持つヘッダーが受信メッセージに存在します。アプリケーションは、メッセージの確認にこのヘッダーを使用できます。詳細については、例のセクションを参照してください。このプロパティが false に設定されている場合、Kafka バインダーは ack モードを org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL に設定し、アプリケーションはレコードの確認を担当します。ackEachRecord も参照してください。

デフォルト: true.

ackMode

コンテナーの ack モードを指定します。これは、Spring Kafka で定義されている AckMode 列挙に基づいています。ackEachRecord プロパティが true に設定されていて、コンシューマーがバッチモードでない場合、これは RECORD の ack モードを使用します。それ以外の場合は、このプロパティを使用して提供された ack モードを使用します。

autoCommitOnError

ポーリング可能なコンシューマーでは、true に設定されている場合、エラー時に常に自動コミットします。設定されていない場合(デフォルト)または false の場合、ポーリング可能なコンシューマーで自動コミットされません。このプロパティは、ポーリング可能なコンシューマーにのみ適用されることに注意してください。

デフォルト: 未設定。

resetOffsets

コンシューマーのオフセットを startOffset によって提供された値にリセットするかどうか。KafkaBindingRebalanceListener が提供されている場合は false にする必要があります。再バランスリスナーを参照してください。このプロパティの詳細については、reset-offsets を参照してください。

デフォルト: false.

startOffset

新しいグループの開始オフセット。許可される値: earliest および latest。コンシューマーグループがコンシューマー 'binding' に対して明示的に設定されている場合 ( spring.cloud.stream.bindings.<channelName>.group 経由)、"startOffset" は earliest に設定されます。それ以外の場合は、anonymous コンシューマーグループに対して latest に設定されます。このプロパティの詳細については、reset-offsets を参照してください。

デフォルト: null(earliest と同等)。

enableDlq

true に設定すると、コンシューマーの DLQ 動作が有効になります。デフォルトでは、エラーが発生したメッセージは error.<destination>.<group> という名前のトピックに転送されます。DLQ トピック名は、dlqName プロパティを設定するか、型 DlqDestinationResolver の @Bean を定義することによって構成できます。これは、エラーの数が比較的少なく、元のトピック全体を再生するのが面倒すぎる場合に、より一般的な Kafka 再生シナリオの代替オプションを提供します。詳細については、「kafka dlq 処理」を参照してください。バージョン 2.0 以降、DLQ トピックに送信されるメッセージは、ヘッダー x-original-topicx-exception-messagex-exception-stacktrace as byte[] で拡張されています。デフォルトでは、失敗したレコードは、DLQ トピック内の元のレコードと同じパーティション番号に送信されます。その動作を変更する方法については、dlq パーティションの選択を参照してください。destinationIsPattern が true の場合は許可されません

デフォルト: false.

dlqPartitions

enableDlq が true で、このプロパティが設定されていない場合、プライマリトピックと同じ数のパーティションを持つデッドレタートピックが作成されます。通常、配信不能レコードは、配信不能トピック内の元のレコードと同じパーティションに送信されます。この動作は変更できます。「dlq パーティションの選択」を参照してください。このプロパティが 1 に設定されており、DqlPartitionFunction Bean が存在しない場合、すべての配信不能レコードがパーティション 0 に書き込まれます。このプロパティが 1 より大きい場合、MUST は DlqPartitionFunction Bean を提供します。実際のパーティション数は、バインダーの minPartitionCount プロパティの影響を受けることに注意してください。

デフォルト: none

構成

一般的な Kafka コンシューマープロパティを含むキーと値のペアを使用してマップします。Kafka コンシューマープロパティに加えて、他の構成プロパティをここに渡すことができます。たとえば、spring.cloud.stream.kafka.bindings.input.consumer.configuration.foo=bar などのアプリケーションに必要ないくつかのプロパティ。ここでは bootstrap.servers プロパティを設定できません。複数のクラスターに接続する必要がある場合は、マルチバインダーサポートを使用してください。

デフォルト: 空の地図。

dlqName

エラーメッセージを受信する DLQ トピックの名前。

デフォルト: null(指定されていない場合、エラーが発生するメッセージは error.<destination>.<group> という名前のトピックに転送されます)。

dlqProducerProperties

これを使用して、DLQ 固有のプロデューサープロパティを設定できます。Kafka プロデューサープロパティを通じて使用できるすべてのプロパティは、このプロパティを通じて設定できます。コンシューマーでネイティブデコードが有効になっている場合 (つまり、useNativeDecoding: true)、アプリケーションは DLQ に対応するキー / 値シリアライザーを提供する必要があります。これは、dlqProducerProperties.configuration.key.serializer および dlqProducerProperties.configuration.value.serializer の形式で提供する必要があります。

デフォルト: デフォルトの Kafka プロデューサープロパティ。

standardHeaders

受信チャネルアダプターによって入力される標準ヘッダーを示します。許可される値: noneidtimestamp、または both。ネイティブの逆直列化を使用していて、メッセージを受信する最初のコンポーネントに id (JDBC メッセージストアを使用するように構成されたアグリゲーターなど)が必要な場合に便利です。

デフォルト: none

converterBeanName

RecordMessageConverter を実装する Bean の名前。デフォルトの MessagingMessageConverter を置き換えるために、受信チャネルアダプターで使用されます。

デフォルト: null

idleEventInterval

最近メッセージを受信していないことを示すイベント間の間隔 (ミリ秒単位)。これらのイベントを受信するには、ApplicationListener<ListenerContainerIdleEvent> を使用します。使用例については、「一時停止と再開」を参照してください。

デフォルト: 30000

destinationIsPattern

true の場合、宛先は、ブローカーによってトピック名を照合するために使用される正規表現 Pattern として扱われます。true の場合、トピックはプロビジョニングされず、enableDlq は許可されません。これは、バインダーがプロビジョニングフェーズ中にトピック名を認識しないためです。パターンに一致する新しいトピックを検出するのにかかる時間は、コンシューマープロパティ metadata.max.age.ms によって制御されることに注意してください。これは、(執筆時点では)デフォルトで 300,000ms(5 分)です。これは、上記の configuration プロパティを使用して構成できます。

デフォルト: false

topic.properties

新しいトピックをプロビジョニングするときに使用される Kafka トピックプロパティの Map (たとえば、spring.cloud.stream.kafka.bindings.input.consumer.topic.properties.message.format.version=0.9.0.0

デフォルト: なし。

topic.replicas- 割り当て

レプリカ割り当ての Map <Integer、List <Integer >>。キーはパーティション、値は割り当てです。新しいトピックをプロビジョニングするときに使用されます。kafka-clients jar の NewTopic Javadoc を参照してください。

デフォルト: なし。

topic.replication-factor

トピックをプロビジョニングするときに使用するレプリケーション係数。バインダー全体の設定を上書きします。replicas-assignments が存在する場合は無視されます。

デフォルト: none(バインダー全体のデフォルトである -1 が使用されます)。

pollTimeout

ポーリング可能なコンシューマーでのポーリングに使用されるタイムアウト。

デフォルト: 5 秒。

transactionManager

このバインディングのバインダーのトランザクションマネージャーをオーバーライドするために使用される KafkaAwareTransactionManager の Bean 名。通常、ChainedKafkaTransactionManaager を使用して、別のトランザクションを Kafka トランザクションと同期する場合に必要です。レコードの消費と生成を 1 回だけ実行するには、コンシューマーとプロデューサーのバインディングをすべて同じトランザクションマネージャーで構成する必要があります。

デフォルト: なし。

txCommitRecovered

トランザクションバインダーを使用する場合、復元されたレコードのオフセット(たとえば、再試行が終了し、レコードがデッドレタートピックに送信された場合)は、デフォルトで新しいトランザクションを介してコミットされます。このプロパティを false に設定すると、リカバリされたレコードのオフセットのコミットが抑制されます。

デフォルト: true。

commonErrorHandlerBeanName

CommonErrorHandler Bean 名前。コンシューマーバインディングごとに使用します。存在する場合、このユーザーが提供した CommonErrorHandler は、バインダーによって定義された他のエラーハンドラーよりも優先されます。これは、アプリケーションが ListenerContainerCustomizer を使用したくない場合に、エラーハンドラーを表現するための便利な方法であり、宛先とグループの組み合わせをチェックしてエラーハンドラーを設定します。

デフォルト: なし。

Kafka プロデューサーのプロパティ

次のプロパティは Kafka プロデューサーのみが使用でき、接頭辞 spring.cloud.stream.kafka.bindings.<channelName>.producer. を付ける必要があります。

繰り返しを避けるために、Spring Cloud Stream は、spring.cloud.stream.kafka.default.producer.<property>=<value> の形式ですべてのチャネルの値を設定することをサポートしています。
admin.configuration

バージョン 2.1.1 以降、このプロパティは topic.properties を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。

admin.replicas- 割り当て

バージョン 2.1.1 以降、このプロパティは topic.replicas-assignment を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。

admin.replication-factor

バージョン 2.1.1 以降、このプロパティは topic.replication-factor を優先して非推奨になり、将来のバージョンでサポートが削除される予定です。

bufferSize

Kafka プロデューサーが送信する前にバッチ処理を試みるデータ量の上限(バイト単位)。

デフォルト: 16384.

同期

プロデューサーが同期しているかどうか。

デフォルト: false.

sendTimeoutExpression

同期公開が有効になっている場合に ack を待機する時間を評価するために使用される送信メッセージに対して評価される SpEL 式(たとえば、headers['mySendTimeout'])。タイムアウトの値はミリ秒単位です。3.0 より前のバージョンでは、ネイティブエンコーディングが使用されていない限り、ペイロードを使用できませんでした。この式が評価されるまでに、ペイロードはすでに byte[] の形式であったためです。これで、ペイロードが変換される前に式が評価されます。

デフォルト: none.

batchTimeout

メッセージを送信する前に、プロデューサーが同じバッチにさらに多くのメッセージが蓄積されるのを待機する時間。(通常、プロデューサーはまったく待機せず、前の送信の進行中に蓄積されたすべてのメッセージを送信するだけです)ゼロ以外の値は、待ち時間を犠牲にしてスループットを向上させる可能性があります。

デフォルト: 0.

messageKeyExpression

生成された Kafka メッセージのキーを設定するために使用される発信メッセージ(たとえば、headers['myKey'])に対して評価された SpEL 式。3.0 より前のバージョンでは、ネイティブエンコーディングが使用されていない限り、ペイロードを使用できませんでした。この式が評価されるまでに、ペイロードはすでに byte[] の形式であったためです。これで、ペイロードが変換される前に式が評価されます。通常のプロセッサー(Function<String, String> または Function<Message<?>, Message<?>)の場合、生成されたキーがトピックからの受信キーと同じである必要がある場合、このプロパティは次のように設定できます。spring.cloud.stream.kafka.bindings.<output-binding-name>.producer.messageKeyExpression: headers['kafka_receivedMessageKey'] リアクティブ機能について覚えておくべき重要な警告があります。その場合、受信メッセージから発信メッセージにヘッダーを手動でコピーするのはアプリケーションの責任です。ヘッダーを設定できます。例: myKey を使用し、上記のように headers['myKey'] を使用するか、便宜上、KafkaHeaders.MESSAGE_KEY ヘッダーを設定するだけで、このプロパティを設定する必要はまったくありません。

デフォルト: none.

headerPatterns

ProducerRecord の Kafka Headers にマップされる Spring メッセージングヘッダーに一致する単純なパターンのコンマ区切りリスト。パターンは、ワイルドカード文字(アスタリスク)で開始または終了できます。パターンは、接頭辞 ! を付けることで無効にできます。一致は最初の一致(正または負)の後に停止します。たとえば、!ask,as* は ash を渡しますが、ask は渡しません。id と timestamp はマップされません。

デフォルト: * (すべてのヘッダー - id と timestamp を除く)

構成

一般的な Kafka プロデューサープロパティを含むキー / 値ペアを使用してマップします。ここでは bootstrap.servers プロパティを設定できません。複数のクラスターに接続する必要がある場合は、マルチバインダーサポートを使用してください。

デフォルト: 空の地図。

topic.properties

新しいトピックをプロビジョニングするときに使用される Kafka トピックプロパティの Map (たとえば、spring.cloud.stream.kafka.bindings.output.producer.topic.properties.message.format.version=0.9.0.0

topic.replicas- 割り当て

レプリカ割り当ての Map <Integer、List <Integer >>。キーはパーティション、値は割り当てです。新しいトピックをプロビジョニングするときに使用されます。kafka-clients jar の NewTopic Javadoc を参照してください。

デフォルト: なし。

topic.replication-factor

トピックをプロビジョニングするときに使用するレプリケーション係数。バインダー全体の設定を上書きします。replicas-assignments が存在する場合は無視されます。

デフォルト: none(バインダー全体のデフォルトである -1 が使用されます)。

useTopicHeader

true に設定すると、デフォルトのバインディング宛先(トピック名)が送信メッセージの KafkaHeaders.TOPIC メッセージヘッダーの値で上書きされます。ヘッダーが存在しない場合は、デフォルトのバインディング宛先が使用されます。

デフォルト: false.

recordMetadataChannel

正常な送信結果の送信先となる MessageChannel の Bean 名。Bean はアプリケーションコンテキストに存在する必要があります。チャネルに送信されるメッセージは、追加のヘッダー KafkaHeaders.RECORD_METADATA を含む送信済みメッセージ(変換後)です。ヘッダーには、Kafka クライアントによって提供される RecordMetadata オブジェクトが含まれています。これには、トピックでレコードが書き込まれたパーティションとオフセットが含まれます。

ResultMetadata meta = sendResultMsg.getHeaders().get(KafkaHeaders.RECORD_METADATA, RecordMetadata.class)

失敗した送信は、プロデューサーエラーチャネルに送信されます(構成されている場合)。Kafka エラーチャネルを参照してください。

デフォルト: null。

Kafka バインダーは、プロデューサーの partitionCount 設定をヒントとして使用して、指定されたパーティション数でトピックを作成します(minPartitionCount と組み合わせて、2 つのうち最大のものが使用される値です)。大きい方の値が使用されるため、バインダー用の minPartitionCount とアプリケーション用の partitionCount の両方を構成する場合は注意が必要です。パーティション数が少ないトピックがすでに存在し、autoAddPartitions が無効(デフォルト)の場合、バインダーは開始に失敗します。パーティション数が少ないトピックがすでに存在し、autoAddPartitions が有効になっている場合は、新しいパーティションが追加されます。最大数(minPartitionCount または partitionCount)よりも多くのパーティションを持つトピックがすでに存在する場合は、既存のパーティション数が使用されます。
圧縮

compression.type プロデューサープロパティを設定します。サポートされている値は nonegzipsnappylz4zstd です。Spring for Apache Kafka ドキュメントで説明したように、kafka-clients jar を 2.1.0(またはそれ以降)にオーバーライドし、zstd 圧縮を使用する場合は、spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.compression.type=zstd を使用します。

デフォルト: none.

transactionManager

このバインディングのバインダーのトランザクションマネージャーをオーバーライドするために使用される KafkaAwareTransactionManager の Bean 名。通常、ChainedKafkaTransactionManaager を使用して、別のトランザクションを Kafka トランザクションと同期する場合に必要です。レコードの消費と生成を 1 回だけ実行するには、コンシューマーとプロデューサーのバインディングをすべて同じトランザクションマネージャーで構成する必要があります。

デフォルト: なし。

closeTimeout

プロデューサーを閉じるときに待機する秒数のタイムアウト。

デフォルト: 30

allowNonTransactional

通常、トランザクションバインダーに関連付けられているすべての出力バインディングは、まだ処理されていない場合は、新しいトランザクションで公開されます。このプロパティを使用すると、その動作をオーバーライドできます。true に設定すると、この出力バインディングに公開されたレコードは、すでに処理中でない限り、トランザクションで実行されません。

デフォルト: false