クラス ReplyingKafkaTemplate<K,V,R>

java.lang.ObjectSE
org.springframework.kafka.core.KafkaTemplate<K,V>
org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K,V,R>
型パラメーター:
K - 鍵の型。
V - 送信データ型。
R - 応答データ型。
実装されたすべてのインターフェース:
EventListenerSEAwareBeanNameAwareDisposableBeanInitializingBeanSmartInitializingSingletonApplicationContextAwareApplicationListener<ContextStoppedEvent>LifecyclePhasedSmartLifecycleKafkaOperations<K,V>BatchMessageListener<K,R>ConsumerSeekAwareGenericMessageListener<ListSE<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>>>ReplyingKafkaOperations<K,V,R>
既知の直属サブクラス
AggregatingReplyingKafkaTemplate

public class ReplyingKafkaTemplate<K,V,R> extends KafkaTemplate<K,V> implements BatchMessageListener<K,R>, InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K,V,R>, ConsumerSeekAware
リクエスト / 応答セマンティクスを実装する KafkaTemplate。
導入:
2.1.3
作成者:
Gary Russell, Artem Bilan, Borahm Lee
  • コンストラクターの詳細

  • メソッドの詳細

    • setTaskScheduler

      public void setTaskScheduler(TaskScheduler scheduler)
    • getDefaultReplyTimeout

      protected DurationSE getDefaultReplyTimeout()
      sendAndReceive(ProducerRecord, Duration) 呼び出しで replyTimeout が提供されない場合に使用される応答タイムアウトを返します。
      戻り値:
      タイムアウト。
      導入:
      2.3
    • setDefaultReplyTimeout

      public void setDefaultReplyTimeout(DurationSE defaultReplyTimeout)
      sendAndReceive(ProducerRecord, Duration) 呼び出しで replyTimeout が提供されない場合に使用される応答タイムアウトを設定します。
      パラメーター:
      defaultReplyTimeout - タイムアウト。
      導入:
      2.3
    • isRunning

      public boolean isRunning()
      次で指定:
      インターフェース LifecycleisRunning 
    • getPhase

      public int getPhase()
      次で指定:
      インターフェース PhasedgetPhase 
      次で指定:
      インターフェース SmartLifecyclegetPhase 
    • setPhase

      public void setPhase(int phase)
    • isAutoStartup

      public boolean isAutoStartup()
      次で指定:
      インターフェース SmartLifecycleisAutoStartup 
    • setAutoStartup

      public void setAutoStartup(boolean autoStartup)
    • getAssignedReplyTopicPartitions

      public CollectionSE<org.apache.kafka.common.TopicPartition> getAssignedReplyTopicPartitions()
      応答するリスナーコンテナーに割り当てられたトピック / パーティションを返します。
      戻り値:
      トピック / パーティション。
    • setSharedReplyTopic

      public void setSharedReplyTopic(boolean sharedReplyTopic)
      複数のテンプレートが返信に同じトピックを使用している場合は、true に設定します。これは、予期しない応答のログをエラーではなくデバッグに変更するだけです。
      パラメーター:
      sharedReplyTopic - 共有トピックを使用する場合は true。
      導入:
      2.2
    • setCorrelationIdStrategy

      public void setCorrelationIdStrategy(FunctionSE<org.apache.kafka.clients.producer.ProducerRecord<K,V>,CorrelationKey> correlationStrategy)
      リクエストレコードごとに一意の相関キーを確立するために呼び出される関数を設定します。
      パラメーター:
      correlationStrategy - 関数。
      導入:
      2.3
    • setCorrelationHeaderName

      public void setCorrelationHeaderName(StringSE correlationHeaderName)
      相関 ID のカスタムヘッダー名を設定します。デフォルト KafkaHeaders.CORRELATION_ID
      パラメーター:
      correlationHeaderName - ヘッダー名。
      導入:
      2.3
    • getCorrelationHeaderName

      protected StringSE getCorrelationHeaderName()
      相関ヘッダー名を返します。
      戻り値:
      ヘッダー名。
      導入:
      2.8.8
    • setReplyTopicHeaderName

      public void setReplyTopicHeaderName(StringSE replyTopicHeaderName)
      返信トピックのカスタムヘッダー名を設定します。デフォルト KafkaHeaders.REPLY_TOPIC
      パラメーター:
      replyTopicHeaderName - ヘッダー名。
      導入:
      2.3
    • setReplyPartitionHeaderName

      public void setReplyPartitionHeaderName(StringSE replyPartitionHeaderName)
      応答パーティションのカスタムヘッダー名を設定します。デフォルト KafkaHeaders.REPLY_PARTITION
      パラメーター:
      replyPartitionHeaderName - 応答パーティションのヘッダー名。
      導入:
      2.3
    • setReplyErrorChecker

      public void setReplyErrorChecker(FunctionSE<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,ExceptionSE> replyErrorChecker)
      サーバーから返されたエラーの応答を調べる関数を設定します。
      パラメーター:
      replyErrorChecker - エラーチェッカー機能。
      導入:
      2.6.7
    • setBinaryCorrelation

      public void setBinaryCorrelation(boolean binaryCorrelation)
      バイナリ表現ではなく、correlationId として相関の文字列表現を使用するには、false に設定します。デフォルトは真です。
      パラメーター:
      binaryCorrelation - 文字列の場合は false。
      導入:
      3.0
    • isBinaryCorrelation

      protected boolean isBinaryCorrelation()
    • afterPropertiesSet

      public void afterPropertiesSet()
      次で指定:
      インターフェース InitializingBeanafterPropertiesSet 
    • start

      public void start()
      次で指定:
      インターフェース Lifecyclestart 
    • stop

      public void stop()
      次で指定:
      インターフェース Lifecyclestop 
    • stop

      public void stop(RunnableSE callback)
      次で指定:
      インターフェース SmartLifecyclestop 
    • onFirstPoll

      public void onFirstPoll()
      インターフェースからコピーされた説明: ConsumerSeekAware
      手動パーティション割り当てを使用する場合、最初のポーリングが完了したときに呼び出されます。auto.offset.reset=latest を使用していて、初期位置が確立されるまで待つ必要がある場合に便利です。
      次で指定:
      インターフェース ConsumerSeekAwareonFirstPoll 
    • waitForAssignment

      public boolean waitForAssignment(DurationSE duration) throws InterruptedExceptionSE
      インターフェースからコピーされた説明: ReplyingKafkaOperations
      パーティションが割り当てられるまで待ちます。例: auto.offset.reset=latest の場合。手動割り当てを使用する場合、期間はコンテナーの pollTimeout プロパティよりも長くする必要があります。
      次で指定:
      インターフェース ReplyingKafkaOperations<K,V,R>waitForAssignment 
      パラメーター:
      duration - どのぐらい待つのか。
      戻り値:
      パーティションが割り当てられている場合は true。
      例外:
      InterruptedExceptionSE - 待機中にスレッドが中断された場合。
    • sendAndReceive

      public RequestReplyMessageFuture<K,V> sendAndReceive(Message<?> message)
      インターフェースからコピーされた説明: ReplyingKafkaOperations
      リクエストメッセージを送信し、デフォルトのタイムアウトで応答メッセージを受信します。
      次で指定:
      インターフェース ReplyingKafkaOperations<K,V,R>sendAndReceive 
      パラメーター:
      message - 送信するメッセージ。
      戻り値:
      RequestReplyMessageFuture。
    • sendAndReceive

      public RequestReplyMessageFuture<K,V> sendAndReceive(Message<?> message, DurationSE replyTimeout)
      インターフェースからコピーされた説明: ReplyingKafkaOperations
      リクエストメッセージを送信し、応答メッセージを受信します。
      次で指定:
      インターフェース ReplyingKafkaOperations<K,V,R>sendAndReceive 
      パラメーター:
      message - 送信するメッセージ。
      replyTimeout - 応答タイムアウト。null の場合、デフォルトが使用されます。
      戻り値:
      RequestReplyMessageFuture。
    • sendAndReceive

      public <P> RequestReplyTypedMessageFuture<K,V,P> sendAndReceive(Message<?> message, @Nullable ParameterizedTypeReference<P> returnType)
      インターフェースからコピーされた説明: ReplyingKafkaOperations
      リクエストメッセージを送信し、応答メッセージを受信します。
      次で指定:
      インターフェース ReplyingKafkaOperations<K,V,R>sendAndReceive 
      型パラメーター:
      P - 応答ペイロード型。
      パラメーター:
      message - 送信するメッセージ。
      returnType - 応答ペイロード型のメッセージコンバーターへのヒント。
      戻り値:
      RequestReplyMessageFuture。
    • sendAndReceive

      public <P> RequestReplyTypedMessageFuture<K,V,P> sendAndReceive(Message<?> message, @Nullable DurationSE replyTimeout, @Nullable ParameterizedTypeReference<P> returnType)
      インターフェースからコピーされた説明: ReplyingKafkaOperations
      リクエストメッセージを送信し、応答メッセージを受信します。
      次で指定:
      インターフェース ReplyingKafkaOperations<K,V,R>sendAndReceive 
      型パラメーター:
      P - 応答ペイロード型。
      パラメーター:
      message - 送信するメッセージ。
      replyTimeout - 応答タイムアウト。null の場合、デフォルトが使用されます。
      returnType - 応答ペイロード型のメッセージコンバーターへのヒント。
      戻り値:
      RequestReplyMessageFuture。
    • sendAndReceive

      public RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record)
      インターフェースからコピーされた説明: ReplyingKafkaOperations
      リクエストを送信し、デフォルトのタイムアウトで応答を受信します。
      次で指定:
      インターフェース ReplyingKafkaOperations<K,V,R>sendAndReceive 
      パラメーター:
      record - 送信するレコード。
      戻り値:
      RequestReplyFuture。
    • sendAndReceive

      public RequestReplyFuture<K,V,R> sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K,V> record, @Nullable DurationSE replyTimeout)
      インターフェースからコピーされた説明: ReplyingKafkaOperations
      リクエストを送信し、返信を受け取ります。
      次で指定:
      インターフェース ReplyingKafkaOperations<K,V,R>sendAndReceive 
      パラメーター:
      record - 送信するレコード。
      replyTimeout - 応答タイムアウト。null の場合、デフォルトが使用されます。
      戻り値:
      RequestReplyFuture。
    • handleTimeout

      protected boolean handleTimeout(ObjectSE correlationId, RequestReplyFuture<K,V,R> future)
      リクエストがタイムアウトしたことをサブクラスに通知して、状態をクリーンアップし、オプションでフューチャーを完了することができるようにするために使用されます。
      パラメーター:
      correlationId - 相関 ID。
      future - 未来。
      戻り値:
      true は、将来が完了したことを示します。
      導入:
      2.3
    • isPending

      protected boolean isPending(ObjectSE correlationId)
      この相関 ID がまだアクティブな場合は true を返します。
      パラメーター:
      correlationId - 相関 ID。
      戻り値:
      保留中の場合は true。
      導入:
      2.3
    • destroy

      public void destroy()
      次で指定:
      インターフェース DisposableBeandestroy 
      オーバーライド:
      クラス KafkaTemplate<K,V>destroy 
    • onMessage

      public void onMessage(ListSE<org.apache.kafka.clients.consumer.ConsumerRecord<K,R>> data)
      インターフェースからコピーされた説明: GenericMessageListener
      kafka からのデータで呼び出されます。
      次で指定:
      インターフェース GenericMessageListener<K>onMessage 
      パラメーター:
      data - 処理されるデータ。
    • checkForErrors

      @Nullable protected ExceptionSE checkForErrors(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record)
      返信にエラーがないか確認してください。デフォルトの実装は DeserializationException をチェックし、replyErrorChecker 関数を呼び出します。
      パラメーター:
      record - レコード。
      戻り値:
      例外、または存在しない場合は null。
      導入:
      2.6.7
    • checkDeserialization

      @Nullable public static DeserializationException checkDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, LogAccessor logger)
      キーまたは値のいずれかが逆直列化に失敗した場合は、DeserializationException を返します。それ以外の場合は null。それがキーであるか値であるかを判別する必要がある場合は、代わりに SerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER および SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER を使用して SerializationUtils.getExceptionFromHeader(ConsumerRecord, String, LogAccessor) を呼び出します。
      パラメーター:
      record - レコード。
      logger - LogAccessor
      戻り値:
      DeserializationException または null
      導入:
      2.2.15
    • logLateArrival

      protected void logLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K,R> record, ObjectSE correlationId)