クラス KafkaTemplate<K,V>

java.lang.ObjectSE
org.springframework.kafka.core.KafkaTemplate<K,V>
型パラメーター:
K - 鍵の型。
V - 値の型。
実装されたすべてのインターフェース:
EventListenerSEAwareBeanNameAwareDisposableBeanSmartInitializingSingletonApplicationContextAwareApplicationListener<ContextStoppedEvent>KafkaOperations<K,V>
既知の直属サブクラス
ReplyingKafkaTemplateRoutingKafkaTemplate

高レベルの操作を実行するためのテンプレート。DefaultKafkaProducerFactory とともに使用する場合、テンプレートはスレッドセーフです。プロデューサーファクトリと KafkaProducer はこれを保証します。それぞれの javadoc を参照してください。
作成者:
Marius Bogoevici, Gary Russell, Igor Stepanov, Artem Bilan, Biju Kunjummen, Endika Gutierrez, Thomas Strau ß , Soby Chacko, Gurps Bassi
  • フィールドの詳細

  • コンストラクターの詳細

    • KafkaTemplate

      public KafkaTemplate(ProducerFactory<K,V> producerFactory)
      提供されたプロデューサーファクトリと autoFlush false を使用してインスタンスを作成します。
      パラメーター:
      producerFactory - プロデューサーファクトリ。
    • KafkaTemplate

      public KafkaTemplate(ProducerFactory<K,V> producerFactory, @Nullable MapSE<StringSE,ObjectSE> configOverrides)
      autoFlush を false にして、提供されたプロデューサーファクトリとプロパティを使用してインスタンスを作成します。configOverrides が null または空でない場合、新しい DefaultKafkaProducerFactory は、マージされたプロデューサープロパティを使用して作成され、指定されたファクトリのプロパティの後にオーバーライドが適用されます。
      パラメーター:
      producerFactory - プロデューサーファクトリ。
      configOverrides - オーバーライドするプロデューサー構成プロパティ。
      導入:
      2.5
    • KafkaTemplate

      public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush)
      提供されたプロデューサーファクトリと autoFlush 設定を使用してインスタンスを作成します。

      linger.ms または batch.size プロパティの値に関係なく、このテンプレートの送信操作をすぐに実行する場合は、autoFlush を true に設定します。これも、ブローカがプロデューサーの acks プロパティに従って受信を確認するまでブロックされます。

      パラメーター:
      producerFactory - プロデューサーファクトリ。
      autoFlush - 各送信後にフラッシュする場合は true。
      関連事項:
      • Producer.flush()
    • KafkaTemplate

      public KafkaTemplate(ProducerFactory<K,V> producerFactory, boolean autoFlush, @Nullable MapSE<StringSE,ObjectSE> configOverrides)
      提供されたプロデューサーファクトリと autoFlush 設定を使用してインスタンスを作成します。

      linger.ms または batch.size プロパティの値に関係なく、このテンプレートの送信操作をすぐに実行する場合は、autoFlush を true に設定します。これも、ブローカがプロデューサーの acks プロパティに従って受信を確認するまでブロックされます。configOverrides が null または空でない場合、ProducerFactory.copyWithConfigurationOverride(java.util.Map) を使用して新しい ProducerFactory が作成されます。ファクトリは、提供されたファクトリのプロパティの後にオーバーライドを適用する必要があります。元のファクトリからの ProducerPostProcessor がコピーされて、計装が維持されます。登録された ProducerFactory.Listener も新しいファクトリに追加されます。ファクトリ実装がコピー操作をサポートしていない場合、DefaultKafkaProducerFactory 型の ProducerFactory の汎用コピーが作成されます。

      パラメーター:
      producerFactory - プロデューサーファクトリ。
      autoFlush - 各送信後にフラッシュする場合は true。
      configOverrides - オーバーライドするプロデューサー構成プロパティ。
      導入:
      2.5
      関連事項:
      • Producer.flush()
  • メソッドの詳細

    • setBeanName

      public void setBeanName(StringSE name)
      次で指定:
      インターフェース BeanNameAwaresetBeanName 
    • setApplicationContext

      public void setApplicationContext(ApplicationContext applicationContext)
      次で指定:
      インターフェース ApplicationContextAwaresetApplicationContext 
    • getDefaultTopic

      public StringSE getDefaultTopic()
      トピックが提供されない送信メソッドのデフォルトのトピック。
      戻り値:
      トピック。
    • setDefaultTopic

      public void setDefaultTopic(StringSE defaultTopic)
      トピックが提供されていない送信メソッドのデフォルトのトピックを設定します。
      パラメーター:
      defaultTopic - トピック。
    • setProducerListener

      public void setProducerListener(@Nullable ProducerListener<K,V> producerListener)
      Kafka が送信操作を確認したときに呼び出される ProducerListener を設定します。デフォルトでは、エラーのみを記録する LoggingProducerListener が設定されています。
      パラメーター:
      producerListener - リスナー ; null の可能性があります。
    • getMessageConverter

      public RecordMessageConverter getMessageConverter()
      メッセージコンバーターを返します。
      戻り値:
      メッセージコンバーター。
    • setMessageConverter

      public void setMessageConverter(RecordMessageConverter messageConverter)
      使用するメッセージコンバーターを設定します。
      パラメーター:
      messageConverter - メッセージコンバーター。
    • setMessagingConverter

      public void setMessagingConverter(SmartMessageConverter messageConverter)
      デフォルトの MessagingMessageConverter で使用するように SmartMessageConverter を設定します。カスタム messageConverter が提供されている場合は許可されません。
      パラメーター:
      messageConverter - コンバーター。
      導入:
      2.7.1
    • isTransactional

      public boolean isTransactional()
      インターフェースからコピーされた説明: KafkaOperations
      実装がトランザクションをサポートしている (トランザクション対応のプロデューサーファクトリがある) 場合は true を返します。
      次で指定:
      インターフェース KafkaOperations<K,V>isTransactional 
      戻り値:
      正しいか間違っているか。
    • getTransactionIdPrefix

      public StringSE getTransactionIdPrefix()
    • setTransactionIdPrefix

      public void setTransactionIdPrefix(StringSE transactionIdPrefix)
      トランザクション ID プレフィックスを設定して、プロデューサーファクトリのプレフィックスをオーバーライドします。
      パラメーター:
      transactionIdPrefix - プレフィックス。
      導入:
      2.3
    • setCloseTimeout

      public void setCloseTimeout(DurationSE closeTimeout)
      プロデューサーを閉じるときに待機する最大時間を設定します。デフォルトは 5 秒です。
      パラメーター:
      closeTimeout - クローズタイムアウト。
      導入:
      2.1.14
    • setAllowNonTransactional

      public void setAllowNonTransactional(boolean allowNonTransactional)
      テンプレートがトランザクションの場合に非トランザクション送信を許可するには、true に設定します。
      パラメーター:
      allowNonTransactional - 許可する場合は true。
      導入:
      2.4.3
    • isAllowNonTransactional

      public boolean isAllowNonTransactional()
      インターフェースからコピーされた説明: KafkaOperations
      このテンプレートがトランザクションの場合、トランザクション以外の操作を許可する場合は true を返します。
      次で指定:
      インターフェース KafkaOperations<K,V>isAllowNonTransactional 
      戻り値:
      許可する場合は true。
    • setMicrometerEnabled

      public void setMicrometerEnabled(boolean micrometerEnabled)
      micrometer がクラスパス上にある場合、false に設定すると、micrometer タイマーが無効になります。
      パラメーター:
      micrometerEnabled - 無効にする場合は false。
      導入:
      2.5
    • setMicrometerTags

      public void setMicrometerTags(@Nullable MapSE<StringSE,StringSE> tags)
      Micrometer リスナータイマーに追加のタグを設定します。
      パラメーター:
      tags - タグ。
      導入:
      2.5
    • setMicrometerTagsProvider

      public void setMicrometerTagsProvider(@Nullable FunctionSE<org.apache.kafka.clients.producer.ProducerRecord<?,?>,MapSE<StringSE,StringSE>> micrometerTagsProvider)
      プロデューサーレコードに基づいて動的タグを提供する関数を設定します。これらのタグは、micrometerTags で提供されるすべての静的タグに追加されます。レコードリスナーにのみ適用され、バッチリスナーでは無視されます。監視が有効になっている場合は適用されません。
      パラメーター:
      micrometerTagsProvider - micrometerTagsProvider。
      導入:
      2.9.8
      関連事項:
    • getMicrometerTagsProvider

      @Nullable public FunctionSE<org.apache.kafka.clients.producer.ProducerRecord<?,?>,MapSE<StringSE,StringSE>> getMicrometerTagsProvider()
      Micrometer タグプロバイダーを返します。
      戻り値:
      micrometerTagsProvider。
      導入:
      2.9.8
    • getProducerFactory

      public ProducerFactory<K,V> getProducerFactory()
      このテンプレートで使用されるプロデューサーファクトリを返します。
      次で指定:
      インターフェース KafkaOperations<K,V>getProducerFactory 
      戻り値:
      ファクトリ。
      導入:
      2.2.5
    • getProducerFactory

      protected ProducerFactory<K,V> getProducerFactory(StringSE topic)
      トピックに基づいて、このテンプレートで使用されるプロデューサーファクトリを返します。デフォルトの実装は、唯一のプロデューサーファクトリを返します。
      パラメーター:
      topic - トピック。
      戻り値:
      ファクトリ。
      導入:
      2.5
    • setConsumerFactory

      public void setConsumerFactory(ConsumerFactory<K,V> consumerFactory)
      受信操作用のコンシューマーファクトリを設定します。
      パラメーター:
      consumerFactory - コンシューマーファクトリ。
      導入:
      2.8
    • setProducerInterceptor

      public void setProducerInterceptor(org.apache.kafka.clients.producer.ProducerInterceptor<K,V> producerInterceptor)
      このテンプレートにプロデューサーインターセプターを設定します。
      パラメーター:
      producerInterceptor - プロデューサーインターセプター
      導入:
      3.0
    • setObservationEnabled

      public void setObservationEnabled(boolean observationEnabled)
      Micrometer 経由の観測を有効にするには、true に設定します。
      パラメーター:
      observationEnabled - 有効にする場合は true。
      導入:
      3.0
      関連事項:
    • setObservationConvention

      public void setObservationConvention(KafkaTemplateObservationConvention observationConvention)
      カスタム KafkaTemplateObservationConvention を設定します。
      パラメーター:
      observationConvention - 大会。
      導入:
      3.0
    • getKafkaAdmin

      @Nullable public KafkaAdmin getKafkaAdmin()
      存在する場合、観測用のクラスター ID を見つけるために使用される KafkaAdmin を返します。
      戻り値:
      カフカ管理者
      導入:
      3.0.5
    • setKafkaAdmin

      public void setKafkaAdmin(KafkaAdmin kafkaAdmin)
      存在する場合、観測用のクラスター ID を見つけるために使用される KafkaAdmin を設定します。
      パラメーター:
      kafkaAdmin - 管理者。
    • afterSingletonsInstantiated

      public void afterSingletonsInstantiated()
      次で指定:
      インターフェース SmartInitializingSingletonafterSingletonsInstantiated 
    • onApplicationEvent

      public void onApplicationEvent(ContextStoppedEvent event)
      次で指定:
      インターフェース ApplicationListener<K>onApplicationEvent 
    • sendDefault

      public CompletableFutureSE<SendResult<K,V>> sendDefault(@Nullable V data)
      インターフェースからコピーされた説明: KafkaOperations
      キーまたはパーティションなしでデータをデフォルトトピックに送信します。
      次で指定:
      インターフェース KafkaOperations<K,V>sendDefault 
      パラメーター:
      data - データ。
      戻り値:
      SendResult の未来。
    • sendDefault

      public CompletableFutureSE<SendResult<K,V>> sendDefault(K key, @Nullable V data)
      インターフェースからコピーされた説明: KafkaOperations
      提供されたキーを使用してパーティションなしでデータをデフォルトのトピックに送信します。
      次で指定:
      インターフェース KafkaOperations<K,V>sendDefault 
      パラメーター:
      key - キー。
      data - データ。
      戻り値:
      SendResult の未来。
    • sendDefault

      public CompletableFutureSE<SendResult<K,V>> sendDefault(IntegerSE partition, K key, @Nullable V data)
      インターフェースからコピーされた説明: KafkaOperations
      提供されたキーとパーティションを使用して、データをデフォルトのトピックに送信します。
      次で指定:
      インターフェース KafkaOperations<K,V>sendDefault 
      パラメーター:
      partition - パーティション。
      key - キー。
      data - データ。
      戻り値:
      SendResult の未来。
    • sendDefault

      public CompletableFutureSE<SendResult<K,V>> sendDefault(IntegerSE partition, LongSE timestamp, K key, @Nullable V data)
      インターフェースからコピーされた説明: KafkaOperations
      提供されたキーとパーティションを使用して、データをデフォルトのトピックに送信します。
      次で指定:
      インターフェース KafkaOperations<K,V>sendDefault 
      パラメーター:
      partition - パーティション。
      timestamp - レコードのタイムスタンプ。
      key - キー。
      data - データ。
      戻り値:
      SendResult の未来。
    • send

      public CompletableFutureSE<SendResult<K,V>> send(StringSE topic, @Nullable V data)
      インターフェースからコピーされた説明: KafkaOperations
      キーまたはパーティションなしで、提供されたトピックにデータを送信します。
      次で指定:
      インターフェース KafkaOperations<K,V>send 
      パラメーター:
      topic - トピック。
      data - データ。
      戻り値:
      SendResult の未来。
    • send

      public CompletableFutureSE<SendResult<K,V>> send(StringSE topic, K key, @Nullable V data)
      インターフェースからコピーされた説明: KafkaOperations
      提供されたキーを使用して、パーティションなしで、提供されたトピックにデータを送信します。
      次で指定:
      インターフェース KafkaOperations<K,V>send 
      パラメーター:
      topic - トピック。
      key - キー。
      data - データ。
      戻り値:
      SendResult の未来。
    • send

      public CompletableFutureSE<SendResult<K,V>> send(StringSE topic, IntegerSE partition, K key, @Nullable V data)
      インターフェースからコピーされた説明: KafkaOperations
      提供されたキーとパーティションを使用して、提供されたトピックにデータを送信します。
      次で指定:
      インターフェース KafkaOperations<K,V>send 
      パラメーター:
      topic - トピック。
      partition - パーティション。
      key - キー。
      data - データ。
      戻り値:
      SendResult の未来。
    • send

      public CompletableFutureSE<SendResult<K,V>> send(StringSE topic, IntegerSE partition, LongSE timestamp, K key, @Nullable V data)
      インターフェースからコピーされた説明: KafkaOperations
      提供されたキーとパーティションを使用して、提供されたトピックにデータを送信します。
      次で指定:
      インターフェース KafkaOperations<K,V>send 
      パラメーター:
      topic - トピック。
      partition - パーティション。
      timestamp - レコードのタイムスタンプ。
      key - キー。
      data - データ。
      戻り値:
      SendResult の未来。
    • send

      public CompletableFutureSE<SendResult<K,V>> send(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
      インターフェースからコピーされた説明: KafkaOperations
      提供された ProducerRecord を送信します。
      次で指定:
      インターフェース KafkaOperations<K,V>send 
      パラメーター:
      record - レコード。
      戻り値:
      SendResult の未来。
    • send

      public CompletableFutureSE<SendResult<K,V>> send(Message<?> message)
      インターフェースからコピーされた説明: KafkaOperations
      メッセージヘッダーにルーティング情報を含むメッセージを送信します。メッセージペイロードは、送信前に変換される場合があります。
      次で指定:
      インターフェース KafkaOperations<K,V>send 
      パラメーター:
      message - 送信するメッセージ。
      戻り値:
      SendResult の未来。
      関連事項:
    • partitionsFor

      public ListSE<org.apache.kafka.common.PartitionInfo> partitionsFor(StringSE topic)
      インターフェースからコピーされた説明: KafkaOperations
      Producer.partitionsFor(String) を参照してください。
      次で指定:
      インターフェース KafkaOperations<K,V>partitionsFor 
      パラメーター:
      topic - トピック。
      戻り値:
      パーティション情報。
    • metrics

      public MapSE<org.apache.kafka.common.MetricName,? extends org.apache.kafka.common.Metric> metrics()
      インターフェースからコピーされた説明: KafkaOperations
      Producer.metrics() を参照してください。
      次で指定:
      インターフェース KafkaOperations<K,V>metrics 
      戻り値:
      メトリクス。
    • execute

      public <T> T execute(KafkaOperations.ProducerCallback<K,V,T> callback)
      インターフェースからコピーされた説明: KafkaOperations
      プロデューサーで任意の操作を実行し、結果を返します。
      次で指定:
      インターフェース KafkaOperations<K,V>execute 
      型パラメーター:
      T - 結果の型。
      パラメーター:
      callback - コールバック。
      戻り値:
      結果。
    • executeInTransaction

      public <T> T executeInTransaction(KafkaOperations.OperationsCallback<K,V,T> callback)
      インターフェースからコピーされた説明: KafkaOperations
      操作に対して任意の操作を実行し、結果を返します。操作はローカルトランザクション内で呼び出され、グローバルトランザクション (存在する場合) には参加しません。
      次で指定:
      インターフェース KafkaOperations<K,V>executeInTransaction 
      型パラメーター:
      T - 結果の型。
      パラメーター:
      callback - コールバック。
      戻り値:
      結果。
    • flush

      public void flush()
      プロデューサーをフラッシュします。

      ProducerFactory がシングルトンプロデューサー ( DefaultKafkaProducerFactory など) を提供する場合にのみ、このメソッドを呼び出すことが理にかなっています。

      次で指定:
      インターフェース KafkaOperations<K,V>flush 
    • sendOffsetsToTransaction

      public void sendOffsetsToTransaction(MapSE<org.apache.kafka.common.TopicPartition,org.apache.kafka.clients.consumer.OffsetAndMetadata> offsets, org.apache.kafka.clients.consumer.ConsumerGroupMetadata groupMetadata)
      インターフェースからコピーされた説明: KafkaOperations
      トランザクションで実行する場合、コンシューマーオフセットをトランザクションに送信します。操作がリスナーコンテナースレッドで呼び出された場合 (およびリスナーコンテナーが KafkaAwareTransactionManager で構成されている場合) は、コンテナーがトランザクションへのオフセットの送信を処理するため、このメソッドを呼び出す必要はありません。2.5 ブローカー以降で使用します。
      次で指定:
      インターフェース KafkaOperations<K,V>sendOffsetsToTransaction 
      パラメーター:
      offsets - オフセット。
      groupMetadata - コンシューマーグループのメタデータ。
      関連事項:
      • Producer.sendOffsetsToTransaction(Map, ConsumerGroupMetadata)
    • receive

      @Nullable public org.apache.kafka.clients.consumer.ConsumerRecord<K,V> receive(StringSE topic, int partition, long offset, DurationSE pollTimeout)
      インターフェースからコピーされた説明: KafkaOperations
      単一のレコードを受け取ります。
      次で指定:
      インターフェース KafkaOperations<K,V>receive 
      パラメーター:
      topic - トピック。
      partition - パーティション。
      offset - オフセット。
      pollTimeout - タイムアウト。
      戻り値:
      レコードまたは null。
    • receive

      public org.apache.kafka.clients.consumer.ConsumerRecords<K,V> receive(CollectionSE<TopicPartitionOffset> requested, DurationSE pollTimeout)
      インターフェースからコピーされた説明: KafkaOperations
      複数のレコードを受信します。絶対的な正のオフセットのみがサポートされます。
      次で指定:
      インターフェース KafkaOperations<K,V>receive 
      パラメーター:
      requested - レコードリクエストのコレクション(トピック / パーティション / オフセット)。
      pollTimeout - タイムアウト。
      戻り値:
      レコードまたは null。
    • closeProducer

      protected void closeProducer(org.apache.kafka.clients.producer.Producer<K,V> producer, boolean inTx)
    • doSend

      protected CompletableFutureSE<SendResult<K,V>> doSend(org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord, io.micrometer.observation.Observation observation)
      プロデューサーレコードを送信します。
      パラメーター:
      producerRecord - プロデューサーの記録。
      observation - 観察。
      戻り値:
      RecordMetadata の未来。
    • inTransaction

      public boolean inTransaction()
      テンプレートが呼び出しスレッドのトランザクションで現在実行されている場合は true を返します。
      次で指定:
      インターフェース KafkaOperations<K,V>inTransaction 
      戻り値:
      トランザクションが実行されている場合は true。
      導入:
      2.2.1
    • getTheProducer

      protected org.apache.kafka.clients.producer.Producer<K,V> getTheProducer(@Nullable StringSE topic)
    • destroy

      public void destroy()
      次で指定:
      インターフェース DisposableBeandestroy