クラス DefaultKafkaProducerFactory<K,V>

java.lang.ObjectSE
org.springframework.kafka.core.KafkaResourceFactory
org.springframework.kafka.core.DefaultKafkaProducerFactory<K,V>
型パラメーター:
K - 鍵の型。
V - 値の型。
実装されたすべてのインターフェース:
EventListenerSEAwareBeanNameAwareDisposableBeanApplicationContextAwareApplicationListener<ContextStoppedEvent>LifecyclePhasedSmartLifecycleProducerFactory<K,V>

singleton 共有 Producer インスタンスの ProducerFactory 実装。

この実装は、各 createProducer() 呼び出しで、提供された MapSE configs およびオプションの Serializer 実装に対して、同じ Producer インスタンス (トランザクションが有効になっていない場合) を返します。

引数のないコンストラクターがあり、セットアップを必要としない Serializer を使用している場合は、DefaultKafkaProducerFactory コンストラクターに渡される configs の ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG および ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG キーに対して Serializer クラスを指定するのが最も簡単です。

それが不可能であるが、次のうち少なくとも 1 つが当てはまると確信している場合:

  • 1 つの Producer のみが Serializer を使用します
  • Producer インスタンス間で共有される可能性のある Serializer を使用している (具体的には、それらの close() メソッドはノーオペレーションです)
  • 同じ Serializer を持つ他の Producer インスタンスが使用されているときに、単一の Producer が閉じられるリスクがないことを確信しています。
次に、キーと値のシリアライザーの一方または両方の Serializer インスタンスを渡すことができます。

上記のいずれにも当てはまらない場合は、ファクトリで Producer が作成されるたびに Serializer を取得するために使用される Serializer の一方または両方に SupplierSE 関数を提供できます。

Producer はラップされ、基になる KafkaProducer インスタンスは、Producer.close() が呼び出されたときに実際には閉じられません。DisposableBean.destroy() が呼び出されるか、アプリケーションコンテキストが ContextStoppedEvent を発行すると、KafkaProducer は物理的に閉じられます。reset() を呼び出すこともできます。

setTransactionIdPrefix(String) を設定すると、トランザクションが有効になります。その場合、プロデューサーのキャッシュが維持されます。プロデューサーを閉じると、プロデューサーがキャッシュに返されます。ファクトリが破棄されるか、アプリケーションコンテキストが停止するか、reset() メソッドが呼び出されると、プロデューサーは閉じられ、キャッシュはクリアされます。

作成者:
Gary Russell, Murali Reddy, Nakul Mishra, Artem Bilan, Chris Gilbert, Thomas Strau ß , Adrian Gygax, Soby Chacko
  • コンストラクターの詳細

    • DefaultKafkaProducerFactory

      public DefaultKafkaProducerFactory(MapSE<StringSE,ObjectSE> configs)
      提供された構成でファクトリを構築します。
      パラメーター:
      configs - 構成。
    • DefaultKafkaProducerFactory

      public DefaultKafkaProducerFactory(MapSE<StringSE,ObjectSE> configs, @Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer, @Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
      提供された構成と Serializer でファクトリを構築します。また、transactionIdPrefix が指定されている場合は、ProducerConfig.TRANSACTIONAL_ID_CONFIG からの値として transactionIdPrefix を構成します。この構成は、ターゲット Producer インスタンスのサフィックスで上書きされます。シリアライザーの configure() メソッドは、構成マップで呼び出されます。
      パラメーター:
      configs - 構成。
      keySerializer - キー Serializer
      valueSerializer - 値 Serializer
    • DefaultKafkaProducerFactory

      public DefaultKafkaProducerFactory(MapSE<StringSE,ObjectSE> configs, @Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer, @Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer, boolean configureSerializers)
      提供された構成と Serializer でファクトリを構築します。また、transactionIdPrefix が指定されている場合は、ProducerConfig.TRANSACTIONAL_ID_CONFIG からの値として transactionIdPrefix を構成します。この構成は、ターゲット Producer インスタンスのサフィックスで上書きされます。シリアライザーの configure() メソッドは、configureSerializers が false でない限り、構成マップで呼び出されます。
      パラメーター:
      configs - 構成。
      keySerializer - キー Serializer
      valueSerializer - 値 Serializer
      configureSerializers - シリアライザーがすでに完全に構成されている場合は、false に設定します。
      導入:
      2.8.7
    • DefaultKafkaProducerFactory

      public DefaultKafkaProducerFactory(MapSE<StringSE,ObjectSE> configs, @Nullable SupplierSE<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, @Nullable SupplierSE<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier)
      提供された構成と Serializer サプライヤーでファクトリを構築します。また、transactionIdPrefix が指定されている場合は、ProducerConfig.TRANSACTIONAL_ID_CONFIG からの値として transactionIdPrefix を構成します。この構成は、ターゲット Producer インスタンスのサフィックスで上書きされます。インスタンスを取得するためにサプライヤーが呼び出されると、シリアライザーの configure() メソッドが構成マップで呼び出されます。
      パラメーター:
      configs - 構成。
      keySerializerSupplier - キー Serializer サプライヤー関数。
      valueSerializerSupplier - 値 Serializer サプライヤー関数。
      導入:
      2.3
    • DefaultKafkaProducerFactory

      public DefaultKafkaProducerFactory(MapSE<StringSE,ObjectSE> configs, @Nullable SupplierSE<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier, @Nullable SupplierSE<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier, boolean configureSerializers)
      提供された構成と Serializer サプライヤーでファクトリを構築します。また、transactionIdPrefix が指定されている場合は、ProducerConfig.TRANSACTIONAL_ID_CONFIG からの値として transactionIdPrefix を構成します。この構成は、ターゲット Producer インスタンスのサフィックスで上書きされます。インスタンスを取得するためにサプライヤーが呼び出されると、configureSerializers が false でない限り、シリアライザーの configure() メソッドが構成マップで呼び出されます。
      パラメーター:
      configs - 構成。
      keySerializerSupplier - キー Serializer サプライヤー関数。
      valueSerializerSupplier - 値 Serializer サプライヤー関数。
      configureSerializers - シリアライザーがすでに完全に構成されている場合は、false に設定します。
      導入:
      2.8.7
  • メソッドの詳細

    • setApplicationContext

      public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
      次で指定:
      インターフェース ApplicationContextAwaresetApplicationContext 
      例外:
      BeansException
    • setBeanName

      public void setBeanName(StringSE name)
      次で指定:
      インターフェース BeanNameAwaresetBeanName 
    • setKeySerializer

      public void setKeySerializer(@Nullable org.apache.kafka.common.serialization.Serializer<K> keySerializer)
      キーシリアライザーを設定します。configureSerializers が false でない限り、シリアライザーはプロデューサー構成を使用して構成されます。
      パラメーター:
      keySerializer - キーシリアライザー。
      関連事項:
    • setValueSerializer

      public void setValueSerializer(@Nullable org.apache.kafka.common.serialization.Serializer<V> valueSerializer)
      値シリアライザーを設定します。configureSerializers が false でない限り、シリアライザーはプロデューサー構成を使用して構成されます。
      パラメーター:
      valueSerializer - 値シリアライザー。
      関連事項:
    • setKeySerializerSupplier

      public void setKeySerializerSupplier(SupplierSE<org.apache.kafka.common.serialization.Serializer<K>> keySerializerSupplier)
      キーシリアライザーのインスタンスを提供するようにサプライヤーを設定します。configureSerializers が false でない限り、シリアライザーはプロデューサー構成を使用して構成されます。
      パラメーター:
      keySerializerSupplier - サプライヤー。
      導入:
      2.8
      関連事項:
    • setValueSerializerSupplier

      public void setValueSerializerSupplier(SupplierSE<org.apache.kafka.common.serialization.Serializer<V>> valueSerializerSupplier)
      値シリアライザーのインスタンスを提供するようにサプライヤーを設定します。
      パラメーター:
      valueSerializerSupplier - サプライヤー。configureSerializers が false でない限り、シリアライザーはプロデューサー構成を使用して構成されます。
      導入:
      2.8
      関連事項:
    • setTransactionIdSuffixStrategy

      public void setTransactionIdSuffixStrategy(TransactionIdSuffixStrategy transactionIdSuffixStrategy)
      トランザクションサフィックス戦略を設定します。
      パラメーター:
      transactionIdSuffixStrategy - 戦略。
      導入:
      3.2
    • isConfigureSerializers

      public boolean isConfigureSerializers()
      true(デフォルト)の場合、プログラムで提供されるシリアライザー(コンストラクターまたは setter を介して)は、プロデューサー構成を使用して構成されます。シリアライザーがすでに完全に構成されている場合は、false に設定します。
      戻り値:
      構成する場合は true。
      導入:
      2.8.7
      関連事項:
    • setConfigureSerializers

      public void setConfigureSerializers(boolean configureSerializers)
      false(デフォルトは true)に設定して、プログラムで提供されたシリアライザー(コンストラクターまたは setter を介して)がプロデューサー構成を使用して構成されないようにします。シリアライザーがすでに完全に構成されている場合。
      パラメーター:
      configureSerializers - 構成しない場合は false。
      導入:
      2.8.7
      関連事項:
    • setPhysicalCloseTimeout

      public void setPhysicalCloseTimeout(int physicalCloseTimeout)
      プロデューサー自体を閉じるのではなく、ファクトリを介してプロデューサーを物理的に閉じるときに待機する時間(reset()#closeProducerFor(String)closeThreadBoundProducer() が呼び出されたとき)。秒単位で指定。デフォルトの ProducerFactory.DEFAULT_PHYSICAL_CLOSE_TIMEOUT
      パラメーター:
      physicalCloseTimeout - 秒単位のタイムアウト。
      導入:
      1.0.7
    • getPhysicalCloseTimeout

      public DurationSE getPhysicalCloseTimeout()
      物理的なクローズタイムアウトを取得します。
      次で指定:
      インターフェース ProducerFactory<K,V>getPhysicalCloseTimeout 
      戻り値:
      タイムアウト。
      導入:
      2.5
    • setTransactionIdPrefix

      public final void setTransactionIdPrefix(StringSE transactionIdPrefix)
      ProducerConfig.TRANSACTIONAL_ID_CONFIG 構成のプレフィックスを設定します。デフォルトでは、構成の ProducerConfig.TRANSACTIONAL_ID_CONFIG 値がターゲットプロデューサー構成のプレフィックスとして使用されます。
      パラメーター:
      transactionIdPrefix - プレフィックス。
      導入:
      1.3
    • getTransactionIdPrefix

      @Nullable public StringSE getTransactionIdPrefix()
      インターフェースからコピーされた説明: ProducerFactory
      トランザクション ID プレフィックスを返します。
      次で指定:
      インターフェース ProducerFactory<K,V>getTransactionIdPrefix 
      戻り値:
      プレフィックスまたは設定されていない場合は null。
    • setProducerPerThread

      public void setProducerPerThread(boolean producerPerThread)
      すべてのクライアントで共有されるシングルトンではなく、スレッドごとにプロデューサーを作成するには、true に設定します。クライアント は、closeThreadBoundProducer() を呼び出して、プロデューサーが不要になったときに物理的に閉じる必要があります。これらのプロデューサーは、destroy() または reset() によって閉鎖されることはありません。
      パラメーター:
      producerPerThread - スレッドごとのプロデューサーに当てはまります。
      導入:
      2.3
      関連事項:
    • isProducerPerThread

      public boolean isProducerPerThread()
      インターフェースからコピーされた説明: ProducerFactory
      スレッドごとにプロデューサーが存在する場合は true を返します。
      次で指定:
      インターフェース ProducerFactory<K,V>isProducerPerThread 
      戻り値:
      スレッドごとのプロデューサー。
    • getKeySerializer

      @Nullable public org.apache.kafka.common.serialization.Serializer<K> getKeySerializer()
      インターフェースからコピーされた説明: ProducerFactory
      構成されたキーシリアライザーを返します(プロパティでクラス名の代わりにオブジェクトとして提供されている場合)。
      次で指定:
      インターフェース ProducerFactory<K,V>getKeySerializer 
      戻り値:
      シリアライザー。
    • getValueSerializer

      @Nullable public org.apache.kafka.common.serialization.Serializer<V> getValueSerializer()
      インターフェースからコピーされた説明: ProducerFactory
      構成された値シリアライザーを返します(プロパティでクラス名の代わりにオブジェクトとして提供されている場合)。
      次で指定:
      インターフェース ProducerFactory<K,V>getValueSerializer 
      戻り値:
      シリアライザー。
    • getKeySerializerSupplier

      public SupplierSE<org.apache.kafka.common.serialization.Serializer<K>> getKeySerializerSupplier()
      インターフェースからコピーされた説明: ProducerFactory
      キーシリアライザーのサプライヤーを return してください。同様のファクトリを作成するためのクローン作成に役立ちます。
      次で指定:
      インターフェース ProducerFactory<K,V>getKeySerializerSupplier 
      戻り値:
      サプライヤー。
    • getValueSerializerSupplier

      public SupplierSE<org.apache.kafka.common.serialization.Serializer<V>> getValueSerializerSupplier()
      インターフェースからコピーされた説明: ProducerFactory
      バリューシリアライザーのサプライヤーを return してください。同様のファクトリを作成するためのクローン作成に役立ちます。
      次で指定:
      インターフェース ProducerFactory<K,V>getValueSerializerSupplier 
      戻り値:
      サプライヤー。
    • getConfigurationProperties

      public MapSE<StringSE,ObjectSE> getConfigurationProperties()
      このファクトリの構成マップへの変更不可能な参照を返します。類似のファクトリを作るためのクローン作成に便利です。
      次で指定:
      インターフェース ProducerFactory<K,V>getConfigurationProperties 
      戻り値:
      構成。
      導入:
      1.3
    • getListeners

      public ListSE<ProducerFactory.Listener<K,V>> getListeners()
      リスナーの現在のリストを取得します。
      次で指定:
      インターフェース ProducerFactory<K,V>getListeners 
      戻り値:
      リスナー。
      導入:
      2.5
    • getPostProcessors

      public ListSE<ProducerPostProcessor<K,V>> getPostProcessors()
      インターフェースからコピーされた説明: ProducerFactory
      ポストプロセッサーの現在のリストを取得します。
      次で指定:
      インターフェース ProducerFactory<K,V>getPostProcessors 
      戻り値:
      ポストプロセッサー。
    • setMaxAge

      public void setMaxAge(DurationSE maxAge)
      プロデューサーの最大年齢を設定します。トランザクションを使用するときに便利で、非アクティブのためにブローカーが transactional.id を期限切れにする可能性があります。
      パラメーター:
      maxAge - 設定する maxAge
      導入:
      2.5.8
    • start

      public void start()
      次で指定:
      インターフェース Lifecyclestart 
    • stop

      public void stop()
      次で指定:
      インターフェース Lifecyclestop 
    • isRunning

      public boolean isRunning()
      次で指定:
      インターフェース LifecycleisRunning 
    • getPhase

      public int getPhase()
      次で指定:
      インターフェース PhasedgetPhase 
      次で指定:
      インターフェース SmartLifecyclegetPhase 
    • copyWithConfigurationOverride

      public ProducerFactory<K,V> copyWithConfigurationOverride(MapSE<StringSE,ObjectSE> overrideProperties)
      インスタンスのプロパティと指定されたプロパティをコピーして、新しいプロデューサーファクトリを作成します。

      DefaultKafkaProducerFactory がそれ自体のコピーを作成する場合、トランザクション ID プレフィックスがプロパティからリカバリされます。ID 構成を変更する場合は、オーバーライド構成に新しい ProducerConfig.TRANSACTIONAL_ID_CONFIG キーを追加します。

      次で指定:
      インターフェース ProducerFactory<K,V>copyWithConfigurationOverride 
      パラメーター:
      overrideProperties - 新しいファクトリに適用されるプロパティ
      戻り値:
      プロパティが適用された DefaultKafkaProducerFactory
      関連事項:
    • addListener

      public void addListener(ProducerFactory.Listener<K,V> listener)
      リスナーを追加します。
      次で指定:
      インターフェース ProducerFactory<K,V>addListener 
      パラメーター:
      listener - リスナー。
      導入:
      2.5
    • addListener

      public void addListener(int index, ProducerFactory.Listener<K,V> listener)
      特定のインデックスにリスナーを追加します。
      次で指定:
      インターフェース ProducerFactory<K,V>addListener 
      パラメーター:
      index - インデックス (リストの位置)。
      listener - リスナー。
      導入:
      2.5
    • removeListener

      public boolean removeListener(ProducerFactory.Listener<K,V> listener)
      リスナーを削除します。
      次で指定:
      インターフェース ProducerFactory<K,V>removeListener 
      パラメーター:
      listener - リスナー。
      戻り値:
      削除された場合は true。
      導入:
      2.5
    • addPostProcessor

      public void addPostProcessor(ProducerPostProcessor<K,V> postProcessor)
      インターフェースからコピーされた説明: ProducerFactory
      ポストプロセッサーを追加します。
      次で指定:
      インターフェース ProducerFactory<K,V>addPostProcessor 
      パラメーター:
      postProcessor - ポストプロセッサー。
    • removePostProcessor

      public boolean removePostProcessor(ProducerPostProcessor<K,V> postProcessor)
      インターフェースからコピーされた説明: ProducerFactory
      ポストプロセッサーを取り外します。
      次で指定:
      インターフェース ProducerFactory<K,V>removePostProcessor 
      パラメーター:
      postProcessor - ポストプロセッサー。
      戻り値:
      削除された場合は true。
    • updateConfigs

      public void updateConfigs(MapSE<StringSE,ObjectSE> updates)
      インターフェースからコピーされた説明: ProducerFactory
      プロデューサー構成マップを更新します。クレデンシャルローテーションなどの状況で役立ちます。
      次で指定:
      インターフェース ProducerFactory<K,V>updateConfigs 
      パラメーター:
      updates - 更新する構成プロパティ。
    • removeConfig

      public void removeConfig(StringSE configKey)
      インターフェースからコピーされた説明: ProducerFactory
      指定されたキーを構成マップから削除します。
      次で指定:
      インターフェース ProducerFactory<K,V>removeConfig 
      パラメーター:
      configKey - 削除するキー。
    • transactionCapable

      public boolean transactionCapable()
      インターフェースからコピーされた説明: ProducerFactory
      ファクトリがトランザクションをサポートしている場合は true を返します。
      次で指定:
      インターフェース ProducerFactory<K,V>transactionCapable 
      戻り値:
      トランザクションの場合は true。
    • destroy

      public void destroy()
      次で指定:
      インターフェース DisposableBeandestroy 
    • onApplicationEvent

      public void onApplicationEvent(ContextStoppedEvent event)
      次で指定:
      インターフェース ApplicationListener<K>onApplicationEvent 
    • reset

      public void reset()
      Producer (s) を閉じ、トランザクション Producer (s) のキャッシュをクリアします。
      次で指定:
      インターフェース ProducerFactory<K,V>reset 
      導入:
      2.2
    • createProducer

      public org.apache.kafka.clients.producer.Producer<K,V> createProducer()
      インターフェースからコピーされた説明: ProducerFactory
      ファクトリがそのように構成されている場合にトランザクションになるプロデューサーを作成します。
      次で指定:
      インターフェース ProducerFactory<K,V>createProducer 
      戻り値:
      プロデューサー。
      関連事項:
    • createProducer

      public org.apache.kafka.clients.producer.Producer<K,V> createProducer(@Nullable StringSE txIdPrefixArg)
      インターフェースからコピーされた説明: ProducerFactory
      オーバーライドされたトランザクション ID プレフィックスを使用してプロデューサーを作成します。
      次で指定:
      インターフェース ProducerFactory<K,V>createProducer 
      パラメーター:
      txIdPrefixArg - トランザクション ID プレフィックス。
      戻り値:
      プロデューサー。
    • createNonTransactionalProducer

      public org.apache.kafka.clients.producer.Producer<K,V> createNonTransactionalProducer()
      インターフェースからコピーされた説明: ProducerFactory
      非トランザクションプロデューサーを作成します。
      次で指定:
      インターフェース ProducerFactory<K,V>createNonTransactionalProducer 
      戻り値:
      プロデューサー。
      関連事項:
    • createKafkaProducer

      protected org.apache.kafka.clients.producer.Producer<K,V> createKafkaProducer()
      サブクラスは、DefaultKafkaProducerFactory.CloseSafeProducer にラップされる生のプロデューサーを返す必要があります。
      戻り値:
      プロデューサー。
    • removeProducer

      protected final boolean removeProducer(DefaultKafkaProducerFactory.CloseSafeProducer<K,V> producerToRemove, DurationSE timeout)
      存在する場合は、単一の共有プロデューサーとスレッドバインドインスタンスを削除します。
      パラメーター:
      producerToRemove - プロデューサー。
      timeout - クローズタイムアウト。
      戻り値:
      プロデューサーが閉じられている場合は true。
      導入:
      2.2.13
    • createTransactionalProducer

      protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducer()
      サブクラスは、getCache() からのプロデューサー、または DefaultKafkaProducerFactory.CloseSafeProducer にラップされた新しい生のプロデューサーを返す必要があります。
      戻り値:
      プロデューサー -null にすることはできません。
      導入:
      1.3
    • createTransactionalProducer

      protected org.apache.kafka.clients.producer.Producer<K,V> createTransactionalProducer(StringSE txIdPrefix)
    • createRawProducer

      protected org.apache.kafka.clients.producer.Producer<K,V> createRawProducer(MapSE<StringSE,ObjectSE> rawConfigs)
    • getCache

    • getCache

    • closeThreadBoundProducer

      public void closeThreadBoundProducer()
      setProducerPerThread(boolean)(true)を使用する場合は、このメソッドを呼び出して、このスレッドのプロデューサーを閉じて解放します。スレッドにバインドされたプロデューサーは、destroy() または reset() メソッドによって閉じられません。
      次で指定:
      インターフェース ProducerFactory<K,V>closeThreadBoundProducer 
      導入:
      2.3
      関連事項:
    • getProducerConfigs

      protected MapSE<StringSE,ObjectSE> getProducerConfigs()
      プロデューサーの構成を返します。
      戻り値:
      プロデューサーの構成。
      導入:
      2.8.3
      関連事項:
    • getTxProducerConfigs

      protected MapSE<StringSE,ObjectSE> getTxProducerConfigs(StringSE transactionId)
      トランザクションプロデューサーの構成を返します。
      パラメーター:
      transactionId - transactionId。
      戻り値:
      トランザクションプロデューサーの構成。
      導入:
      2.8.3
      関連事項:
      • doCreateTxProducer(String, String, BiPredicate)