クラス MessagingMessageListenerAdapter<K,V>

java.lang.ObjectSE
org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter<K,V>
型パラメーター:
K - 鍵の型。
V - 値の型。
実装されたすべてのインターフェース:
AsyncRepliesAwareConsumerSeekAware
既知の直属サブクラス
BatchMessagingMessageListenerAdapterRecordMessagingMessageListenerAdapter

public abstract class MessagingMessageListenerAdapter<K,V> extends ObjectSE implements ConsumerSeekAware, AsyncRepliesAware
Message のペイロードを抽出するために必要なインフラストラクチャを提供する抽象 MessageListener アダプター。
作成者:
Stephane Nicoll, Gary Russell, Artem Bilan, Venil Noronha, Nathan Xu, Wang ZhiYang, Huijin Hong
  • フィールドの詳細

    • NULL_MESSAGE

      protected static final Message<KafkaNull> NULL_MESSAGE
      変換が不要な場合に使用されるメッセージ。
    • logger

      protected final LogAccessor logger
  • コンストラクターの詳細

    • MessagingMessageListenerAdapter

      protected MessagingMessageListenerAdapter(ObjectSE bean, MethodSE method)
      提供された Bean とメソッドでインスタンスを作成します。
      パラメーター:
      bean - Bean。
      method - メソッド。
    • MessagingMessageListenerAdapter

      protected MessagingMessageListenerAdapter(ObjectSE bean, MethodSE method, @Nullable KafkaListenerErrorHandler errorHandler)
      提供された Bean メソッドと kafka リスナーエラーハンドラーを使用してインスタンスを作成します。
      パラメーター:
      bean - Bean。
      method - メソッド。
      errorHandler - kafka リスナーのエラーハンドラー。
  • メソッドの詳細

    • setCorrelationHeaderName

      public void setCorrelationHeaderName(StringSE correlationHeaderName)
      相関 ID のカスタムヘッダー名を設定します。デフォルト KafkaHeaders.CORRELATION_ID。このヘッダーは、返信メッセージでエコーバックされます。
      パラメーター:
      correlationHeaderName - ヘッダー名。
      導入:
      3.0
    • setMessageConverter

      public void setMessageConverter(RecordMessageConverter messageConverter)
      MessageConverter を設定します。
      パラメーター:
      messageConverter - コンバーター。
    • getMessageConverter

      protected final RecordMessageConverter getMessageConverter()
      Message を変換できるようにして、このリスナーの MessagingMessageConverter を返します。
      戻り値:
      このリスナーの MessagingMessageConverterMessage を変換できます。
    • setMessagingConverter

      public void setMessagingConverter(SmartMessageConverter messageConverter)
      デフォルトの MessagingMessageConverter で使用するように SmartMessageConverter を設定します。カスタム messageConverter が提供されている場合は許可されません。
      パラメーター:
      messageConverter - コンバーター。
      導入:
      2.7.1
    • getType

      protected TypeSE getType()
      変換のために推論された型を返します。null の場合は fallbackType を返します。
      戻り値:
      型。
    • setFallbackType

      public void setFallbackType(ClassSE<?> fallbackType)
      型認識メッセージコンバーターを使用するときに使用するフォールバック型を設定し、このアダプターはメソッドから推論された型を判別できません。型認識メッセージコンバーターの例は StringJsonMessageConverter です。デフォルトは ObjectSE です。
      パラメーター:
      fallbackType - 型。
    • setHandlerMethod

      public void setHandlerMethod(HandlerAdapter handlerMethod)
      受信 ConsumerRecord を処理するメソッドを呼び出すために使用する HandlerAdapter を設定します。
      パラメーター:
      handlerMethod - HandlerAdapter インスタンス。
    • isAsyncReplies

      public boolean isAsyncReplies()
      インターフェースからコピーされた説明: AsyncRepliesAware
      HandlerAdapter の戻り値の型が非同期の場合は true を返します。
      次で指定:
      インターフェース AsyncRepliesAwareisAsyncReplies 
      戻り値:
      非同期返信の場合は true。
    • isConsumerRecordList

      protected boolean isConsumerRecordList()
    • isConsumerRecords

      public boolean isConsumerRecords()
    • isConversionNeeded

      public boolean isConversionNeeded()
    • setReplyTopic

      public void setReplyTopic(StringSE replyTopicParam)
      メソッド呼び出しからの結果を送信するトピックを設定します。実行時に評価される SpEL 式 !{...} の可能性があります。
      パラメーター:
      replyTopicParam - トピックまたは表現。
      導入:
      2.0
    • setReplyTemplate

      public void setReplyTemplate(KafkaTemplate<?,?> replyTemplate)
      メソッド呼び出しからの結果を送信するために使用するテンプレートを設定します。
      パラメーター:
      replyTemplate - テンプレート。
      導入:
      2.0
    • setBeanResolver

      public void setBeanResolver(BeanResolver beanResolver)
      ランタイム SpEL 式の Bean リゾルバーを設定します。また、標準型のコンバーターとマップアクセサーを使用して評価コンテキストを構成します。
      パラメーター:
      beanResolver - リゾルバー。
      導入:
      2.0
    • isMessageList

      protected boolean isMessageList()
    • getReplyHeadersConfigurer

      protected ReplyHeadersConfigurer getReplyHeadersConfigurer()
      応答コンフィギュレータを返します。
      戻り値:
      コンフィギュレーター。
      導入:
      2.2
      関連事項:
    • setReplyHeadersConfigurer

      public void setReplyHeadersConfigurer(ReplyHeadersConfigurer replyHeadersConfigurer)
      応答メッセージを作成するときに呼び出される設定者を設定します。
      パラメーター:
      replyHeadersConfigurer - コンフィギュレーター。
      導入:
      2.2
    • isSplitIterables

      protected boolean isSplitIterables()
      true の場合、IterableSE の戻り結果は個別のレコードに分割されます。
      戻り値:
      true に分割します。
      導入:
      2.3.5
    • setSplitIterables

      public void setSplitIterables(boolean splitIterables)
      IterableSE 応答値を個別のレコードに分割しないようにするには、false に設定します。
      パラメーター:
      splitIterables - 無効にする場合は false。デフォルトは真。
      導入:
      2.3.5
    • registerSeekCallback

      public void registerSeekCallback(ConsumerSeekAware.ConsumerSeekCallback callback)
      インターフェースからコピーされた説明: ConsumerSeekAware
      任意のタイミングでシークする際に使用するコールバックを登録します。ConcurrentMessageListenerContainer または複数のコンテナー内の同じリスナーインスタンスで使用する場合、リスナーはコールバックを ThreadLocal またはスレッドによってキー設定されたマップに保存する必要があります。
      次で指定:
      インターフェース ConsumerSeekAwareregisterSeekCallback 
      パラメーター:
      callback - コールバック。
    • onPartitionsAssigned

      public void onPartitionsAssigned(MapSE<org.apache.kafka.common.TopicPartition,LongSE> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)
      インターフェースからコピーされた説明: ConsumerSeekAware
      グループ管理を使用する場合。パーティションの割り当てが変更されたときに呼び出されます。
      次で指定:
      インターフェース ConsumerSeekAwareonPartitionsAssigned 
      パラメーター:
      assignments - 新しい割り当てとその現在のオフセット。
      callback - 割り当て後に最初のシークを実行するためのコールバック。
    • onPartitionsRevoked

      public void onPartitionsRevoked(CollectionSE<org.apache.kafka.common.TopicPartition> partitions)
      インターフェースからコピーされた説明: ConsumerSeekAware
      グループ管理を使用する場合。パーティションの割り当てが取り消されたときに呼び出されます。リスナーは、このスレッドで ConsumerSeekAware.registerSeekCallback(ConsumerSeekCallback) から保存されたコールバックを破棄する必要があります。
      次で指定:
      インターフェース ConsumerSeekAwareonPartitionsRevoked 
      パラメーター:
      partitions - 取り消されたパーティション。
    • onIdleContainer

      public void onIdleContainer(MapSE<org.apache.kafka.common.TopicPartition,LongSE> assignments, ConsumerSeekAware.ConsumerSeekCallback callback)
      インターフェースからコピーされた説明: ConsumerSeekAware
      コンテナーがアイドルコンテナーイベントを発行するように構成されている場合、このメソッドはコンテナーアイドルイベントが発行されたときに呼び出され、シーク操作を許可します。
      次で指定:
      インターフェース ConsumerSeekAwareonIdleContainer 
      パラメーター:
      assignments - 新しい割り当てとその現在のオフセット。
      callback - シークを実行するためのコールバック。
    • toMessagingMessage

      protected Message<?> toMessagingMessage(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> cRecord, @Nullable Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
    • invoke

      protected void invoke(ObjectSE records, @Nullable Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, Message<?> message)
    • invokeHandler

      protected final ObjectSE invokeHandler(ObjectSE data, @Nullable Acknowledgment acknowledgment, Message<?> message, org.apache.kafka.clients.consumer.Consumer<?,?> consumer)
      ハンドラーを呼び出し、専用のエラーメッセージで ListenerExecutionFailedException への例外をラップします。
      パラメーター:
      data - 呼び出し中に処理するデータ。
      acknowledgment - 使用する確認応答(ある場合)。
      message - 処理するメッセージ。
      consumer - コンシューマー。
      戻り値:
      呼び出しの結果。
    • handleResult

      protected void handleResult(ObjectSE resultArg, ObjectSE request, @Nullable Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, @Nullable Message<?> source)
      リスナーメソッドから返された指定された結果オブジェクトを処理し、SendTo トピックにレスポンスメッセージを送信します。
      パラメーター:
      resultArg - 処理する結果オブジェクト (非 null)
      request - 元のリクエストメッセージ
      acknowledgment - 手動確認応答の確認
      consumer - コンシューマーからハンドラーへのエラー
      source - メソッド呼び出しのソースデータ - 例: o.s.messaging.Message<?>; null の可能性があります
    • sendResponse

      protected void sendResponse(ObjectSE result, @Nullable StringSE topic, @Nullable ObjectSE source, boolean returnTypeMessage)
      結果をトピックに送信します。
      パラメーター:
      result - 結果。
      topic - トピック。
      source - ソース(入力)。
      returnTypeMessage - メッセージを返す場合は true。
      導入:
      2.1.3
    • asyncSuccess

      protected void asyncSuccess(@Nullable ObjectSE result, StringSE replyTopic, Message<?> source, boolean returnTypeMessage)
    • acknowledge

      protected void acknowledge(@Nullable Acknowledgment acknowledgment)
    • asyncFailure

      protected void asyncFailure(ObjectSE request, @Nullable Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, ThrowableSE t, Message<?> source)
    • handleException

      protected void handleException(ObjectSE records, @Nullable Acknowledgment acknowledgment, org.apache.kafka.clients.consumer.Consumer<?,?> consumer, Message<?> message, ListenerExecutionFailedException e)
    • createMessagingErrorMessage

      protected final StringSE createMessagingErrorMessage(StringSE description, ObjectSE payload)
    • determineInferredType

      protected TypeSE determineInferredType(MethodSE method)
      サブクラスはこのメソッドをオーバーライドして、別のメカニズムを使用してペイロード変換のターゲット型を決定できます。
      パラメーター:
      method - メソッド。
      戻り値:
      型。