クラス KafkaProducerMessageHandler<K,V>

型パラメーター:
K - 鍵の型。
V - 値の型。
実装されたすべてのインターフェース:
org.reactivestreams.Subscriber<Message<?>>AwareBeanClassLoaderAwareBeanFactoryAwareBeanNameAwareDisposableBeanInitializingBeanApplicationContextAwareLifecycleOrderedExpressionCapableOrderableMessageProducerHeaderPropagationAwareIntegrationPatternNamedComponentIntegrationManagementManageableLifecycleTrackableComponentMessageHandlerreactor.core.CoreSubscriber<Message<?>>

public class KafkaProducerMessageHandler<K,V> extends AbstractReplyProducingMessageHandler implements ManageableLifecycle
Apache Kafka のメッセージハンドラー。ReplyingKafkaTemplate と一緒に提供される場合、送信ゲートウェイのハンドラーとして使用されます。シンプルな KafkaTemplate と一緒に提供される場合、送信チャネルアダプターのハンドラーとして使用されます。

ハンドラーは、事前に作成された ProducerRecord ペイロードの受信もサポートします。その場合、ほとんどの構成プロパティ(setTopicExpression(Expression) など)は無視されます。ハンドラーがゲートウェイとして使用される場合、ProducerRecord のヘッダーは、すでにそのようなヘッダーが含まれていない限り、KafkaHeaders.REPLY_TOPIC を追加するように拡張されます。ハンドラーは追加のヘッダーをマップしません。このようなペイロードを提供することは、ヘッダーがすでにマップされていることを前提としています。

導入:
5.4
作成者:
Soby Chacko, Artem Bilan, Gary Russell, Marius Bogoevici, Biju Kunjummen, Tom van den Berge
  • コンストラクターの詳細

    • KafkaProducerMessageHandler

      public KafkaProducerMessageHandler(org.springframework.kafka.core.KafkaTemplate<K,V> kafkaTemplate)
  • メソッドの詳細

    • setTopicExpression

      public void setTopicExpression(Expression topicExpression)
    • setMessageKeyExpression

      public void setMessageKeyExpression(Expression messageKeyExpression)
    • setPartitionIdExpression

      public void setPartitionIdExpression(Expression partitionIdExpression)
    • setTimestampExpression

      public void setTimestampExpression(Expression timestampExpression)
      SpEL 式を指定して、Kafka レコードに追加されるタイムスタンプを評価します。結果の値は、ミリ秒単位のエポック時間を表す LongSE 型である必要があります。
      パラメーター:
      timestampExpression - 送信操作の結果を待機するタイムスタンプの Expression
    • setFlushExpression

      public void setFlushExpression(Expression flushExpression)
      BooleanSE に評価される SpEL 式を指定して、送信後にプロデューサーをフラッシュする必要があるかどうかを判別します。デフォルトでは、KafkaIntegrationHeaders.FLUSH ヘッダーで BooleanSE 値を検索します。存在しない場合は false。
      パラメーター:
      flushExpression - Expression
    • setHeaderMapper

      public void setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper)
      使用するヘッダーマッパーを設定します。
      パラメーター:
      headerMapper - マッパー ; null にして、ヘッダーマッピングを無効にすることができます。
    • getHeaderMapper

      public org.springframework.kafka.support.KafkaHeaderMapper getHeaderMapper()
    • getKafkaTemplate

      public org.springframework.kafka.core.KafkaTemplate<?,?> getKafkaTemplate()
    • setSync

      public void setSync(boolean sync)
      KafkaProducerMessageHandler が送信操作の結果を待機する必要があるかどうかを示す boolean。デフォルトは false です。sync モードでは、ダウンストリーム送信操作の例外が再スローされます。
      パラメーター:
      sync - 送信モード。デフォルトでは非同期です。
    • setSendTimeout

      public final void setSendTimeout(long sendTimeout)
      この KafkaProducerMessageHandler が操作結果の送信を待機する時間のタイムアウトをミリ秒単位で指定します。デフォルトは kafka delivery.timeout.ms プロパティ +5 秒です。タイムアウトが適用されます。成功または失敗チャネルに送信するときにも適用されます。
      オーバーライド:
      クラス AbstractMessageProducingHandlersetSendTimeout 
      パラメーター:
      sendTimeout - 送信操作の結果を待機するためのタイムアウト。
    • setSendTimeoutExpression

      public void setSendTimeoutExpression(Expression sendTimeoutExpression)
      SpEL 式を指定して、この KafkaProducerMessageHandler が操作結果の送信を待機する時間のタイムアウトをミリ秒単位で評価します。デフォルトは kafka delivery.timeout.ms プロパティ +5 秒です。タイムアウトは sync モードでのみ適用されます。この式の結果がその値よりも小さい場合は、より高い値が使用されます。
      パラメーター:
      sendTimeoutExpression - 送信操作の結果を待機するためのタイムアウト用の Expression
      関連事項:
    • setSendFailureChannel

      public void setSendFailureChannel(MessageChannel sendFailureChannel)
      障害チャネルを設定します。送信が失敗した後、ErrorMessage は、失敗したメッセージと原因を含む KafkaSendFailureException のペイロードとともにこのチャネルに送信されます。
      パラメーター:
      sendFailureChannel - 障害チャネル。
    • setSendFailureChannelName

      public void setSendFailureChannelName(StringSE sendFailureChannelName)
      障害チャネル名を設定します。送信が失敗した後、ErrorMessage は、失敗したメッセージと原因を含む KafkaSendFailureException のペイロードとともにこのチャネル名に送信されます。
      パラメーター:
      sendFailureChannelName - 障害チャネル名。
    • setSendSuccessChannel

      public void setSendSuccessChannel(MessageChannel sendSuccessChannel)
      成功チャネルを設定します。
      パラメーター:
      sendSuccessChannel - 成功チャネル。
    • setSendSuccessChannelName

      public void setSendSuccessChannelName(StringSE sendSuccessChannelName)
      成功チャンネル名を設定します。
      パラメーター:
      sendSuccessChannelName - 成功チャネル名。
    • setFuturesChannel

      public void setFuturesChannel(MessageChannel futuresChannel)
      フューチャーズチャンネルを設定します。
      パラメーター:
      futuresChannel - フューチャーズチャンネル。
    • setFuturesChannelName

      public void setFuturesChannelName(StringSE futuresChannelName)
      フューチャーズチャンネル名を設定します。
      パラメーター:
      futuresChannelName - フューチャーズチャンネル名。
    • setErrorMessageStrategy

      public void setErrorMessageStrategy(ErrorMessageStrategy errorMessageStrategy)
      送信失敗後にエラーメッセージを送信するときに使用するエラーメッセージ戦略の実装を設定します。null にすることはできません。
      パラメーター:
      errorMessageStrategy - 実装。
    • setReplyMessageConverter

      public void setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
      ゲートウェイ応答用のメッセージコンバーターを設定します。
      パラメーター:
      messageConverter - コンバーター。
      関連事項:
    • setReplyPayloadType

      public void setReplyPayloadType(TypeSE payloadType)
      型認識メッセージコンバーター(StringJsonMessageConverter など)を使用する場合、コンバーターが作成するペイロード型を設定します。デフォルトは ObjectSE です。
      パラメーター:
      payloadType - 型。
      関連事項:
    • setProducerRecordCreator

      public void setProducerRecordCreator(KafkaProducerMessageHandler.ProducerRecordCreator<K,V> producerRecordCreator)
      KafkaProducerMessageHandler.ProducerRecordCreator を設定して ProducerRecord を作成します。useTemplateConverter が true の場合は無視されます。
      パラメーター:
      producerRecordCreator - クリエイター。
      関連事項:
    • setTimeoutBuffer

      public void setTimeoutBuffer(int timeoutBuffer)
      構成済みの delivery.timeout.ms に追加されるバッファーをミリ秒単位で設定して、sync が true の場合に将来の送信完了を待機する最小時間を決定します。
      パラメーター:
      timeoutBuffer - バッファ。
      関連事項:
    • setUseTemplateConverter

      public void setUseTemplateConverter(boolean useTemplateConverter)
      テンプレートのメッセージコンバーターを使用して producerRecordCreator の代わりに ProducerRecord を作成するには、true に設定します。
      パラメーター:
      useTemplateConverter - コンバーターを使用する場合は true。
      導入:
      5.5.5
      関連事項:
    • setAssignmentDuration

      public void setAssignmentDuration(DurationSE assignmentDuration)
      デフォルトの返信先トピック / パーティションを決定するために、ゲートウェイとして使用される場合に、パーティションの割り当てを待機する時間を設定します。
      パラメーター:
      assignmentDuration - 設定する割り当て期間。
      導入:
      6.0
    • getComponentType

      public StringSE getComponentType()
      クラスからコピーされた説明: IntegrationObjectSupport
      サブクラスはこのメソッドを実装して、コンポーネント型情報を提供できます。
      次で指定:
      インターフェース NamedComponentgetComponentType 
      オーバーライド:
      クラス MessageHandlerSupportgetComponentType 
    • getSendFailureChannel

      @Nullable protected MessageChannel getSendFailureChannel()
    • getSendSuccessChannel

      protected MessageChannel getSendSuccessChannel()
    • getFuturesChannel

      protected MessageChannel getFuturesChannel()
    • doInit

      protected void doInit()
      オーバーライド:
      クラス AbstractReplyProducingMessageHandlerdoInit 
    • start

      public void start()
      次で指定:
      インターフェース Lifecyclestart 
      次で指定:
      インターフェース ManageableLifecyclestart 
    • stop

      public void stop()
      次で指定:
      インターフェース Lifecyclestop 
      次で指定:
      インターフェース ManageableLifecyclestop 
    • isRunning

      public boolean isRunning()
      次で指定:
      インターフェース LifecycleisRunning 
      次で指定:
      インターフェース ManageableLifecycleisRunning 
    • handleRequestMessage

      protected ObjectSE handleRequestMessage(Message<?> message)
      クラスからコピーされた説明: AbstractReplyProducingMessageHandler
      サブクラスはこのメソッドを実装して、リクエストメッセージを処理する必要があります。戻り値は、メッセージ、MessageBuilder、任意のプレーンオブジェクトです。基本クラスは、これらの開始点のいずれかからの返信メッセージの最終的な作成を処理します。戻り値が null の場合、メッセージフローはここで終了します。
      次で指定:
      クラス AbstractReplyProducingMessageHandlerhandleRequestMessage 
      パラメーター:
      message - リクエストメッセージ。
      戻り値:
      メッセージを処理した結果、または null
    • processSendResult

      public void processSendResult(Message<?> message, org.apache.kafka.clients.producer.ProducerRecord<K,V> producerRecord, CompletableFutureSE<org.springframework.kafka.support.SendResult<K,V>> future, MessageChannel metadataChannel) throws InterruptedExceptionSE, ExecutionExceptionSE
      例外:
      InterruptedExceptionSE
      ExecutionExceptionSE