クラス KafkaMessageDrivenChannelAdapter<K,V>

型パラメーター:
K - 鍵の型。
V - 値の型。
実装されたすべてのインターフェース:
AwareBeanFactoryAwareBeanNameAwareDisposableBeanInitializingBeanSmartInitializingSingletonApplicationContextAwareLifecyclePhasedSmartLifecycleExpressionCapableOrderlyShutdownCapableMessageProducerPausableIntegrationPatternKafkaInboundEndpointNamedComponentIntegrationInboundManagementIntegrationManagementManageableLifecycleManageableSmartLifecycleTrackableComponent

public class KafkaMessageDrivenChannelAdapter<K,V> extends MessageProducerSupport implements KafkaInboundEndpoint, OrderlyShutdownCapable, Pausable
メッセージ駆動型チャネルアダプター。
導入:
5.4
作成者:
Marius Bogoevici, Gary Russell, Artem Bilan, Urs Keller
  • コンストラクターの詳細

    • KafkaMessageDrivenChannelAdapter

      public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer)
      モード KafkaMessageDrivenChannelAdapter.ListenerMode.record でインスタンスを作成します。
      パラメーター:
      messageListenerContainer - コンテナー。
    • KafkaMessageDrivenChannelAdapter

      public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode)
      提供されたモードでインスタンスを構築します。
      パラメーター:
      messageListenerContainer - コンテナー。
      mode - モード。
  • メソッドの詳細

    • setMessageConverter

      public void setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter)
      メッセージコンバーターを設定します。モードに応じて、RecordMessageConverter または BatchMessageConverter でなければなりません。
      パラメーター:
      messageConverter - コンバーター。
    • setRecordMessageConverter

      public void setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
      レコードコンバーターを使用するようにメッセージコンバーターを設定します。
      パラメーター:
      messageConverter - コンバーター。
    • setBatchMessageConverter

      public void setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter)
      メッセージコンバーターを設定して、バッチベースのコンシューマーで使用します。
      パラメーター:
      messageConverter - コンバーター。
    • setRecordFilterStrategy

      public void setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K,V> recordFilterStrategy)
      KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener を FilteringMessageListenerAdapter にラップする RecordFilterStrategy を指定します。
      パラメーター:
      recordFilterStrategy - 使用する RecordFilterStrategy
    • setAckDiscarded

      public void setAckDiscarded(boolean ackDiscarded)
      FilteringMessageListenerAdapter が破棄されたレコードを確認する必要があるかどうかを示す boolean フラグ。setRecordFilterStrategy(RecordFilterStrategy) が指定されていない場合は意味がありません。
      パラメーター:
      ackDiscarded - 破棄されたメッセージに確認応答(コミットオフセット)する場合は true。
    • setRetryTemplate

      public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
      配信の再試行に使用する RetryTemplate インスタンスを指定します。

      IMPORTANT: この形式の再試行はブロックされており、ポーリングされたすべてのレコードの合計再試行遅延が max.poll.interval.ms を超える可能性がある場合、再調整が発生する可能性があります。代わりに、KafkaErrorSendingMessageRecoverer で構成されたリスナーコンテナーに DefaultErrorHandler を追加することを検討してください。

      パラメーター:
      retryTemplate - 使用する RetryTemplate
    • setRecoveryCallback

      public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback)
      再試行操作用の RecoveryCallback インスタンス。null の場合、再試行が使い果たされた後にコンテナーに例外がスローされます (エラーチャネルが構成されていない場合)。setRetryTemplate(RetryTemplate) が指定されている場合にのみ使用されます。エラーチャネルが指定されている場合、デフォルトは ErrorMessageSendingRecoverer です。再試行が使い果たされた後にコンテナーに例外をスローする場合は、null に設定します。
      パラメーター:
      recoveryCallback - リカバリコールバック。
    • setFilterInRetry

      public void setFilterInRetry(boolean filterInRetry)
      フィルター操作と再試行操作が実行される順序を指定する boolean フラグ。RetryTemplate または RecordFilterStrategy のいずれかのみが存在する場合、または存在しない場合は意味がありません。true の場合、フィルターは再試行のたびに呼び出されます。false の場合、フィルターはコンテナーからの配信ごとに 1 回だけ呼び出されます。
      パラメーター:
      filterInRetry - 再試行ごとにフィルター処理する場合は true。デフォルトは false です。
    • setPayloadType

      public void setPayloadType(ClassSE<?> payloadType)
      StringJsonMessageConverter などの型対応メッセージコンバーターを使用する場合は、コンバーターが作成するペイロード型を設定します。デフォルトは ObjectSE です。
      パラメーター:
      payloadType - 型。
    • setOnPartitionsAssignedSeekCallback

      public void setOnPartitionsAssignedSeekCallback(BiConsumerSE<MapSE<org.apache.kafka.common.TopicPartition,LongSE>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)
      KafkaMessageListenerContainer からの ConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback) 呼び出し中のシーク管理用に BiConsumerSE を指定します。これは、内部 MessagingMessageListenerAdapter 実装から呼び出されます。
      パラメーター:
      onPartitionsAssignedCallback - 使用する BiConsumerSE
      関連事項:
      • ConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
    • setBindSourceRecord

      public void setBindSourceRecord(boolean bindSourceRecord)
      true に設定すると、IntegrationMessageHeaderAccessor.SOURCE_DATA という名前のヘッダーにソースコンシューマーレコードがバインドされます。バッチリスナーには適用されません。
      パラメーター:
      bindSourceRecord - バインドする場合は true。
    • getComponentType

      public StringSE getComponentType()
      クラスからコピーされた説明: IntegrationObjectSupport
      サブクラスはこのメソッドを実装して、コンポーネント型情報を提供できます。
      次で指定:
      インターフェース NamedComponentgetComponentType 
      オーバーライド:
      クラス IntegrationObjectSupportgetComponentType 
    • onInit

      protected void onInit()
      クラスからコピーされた説明: IntegrationObjectSupport
      サブクラスは、初期化ロジック用にこれを実装できます。
      オーバーライド:
      クラス MessageProducerSupportonInit 
    • doStart

      protected void doStart()
      クラスからコピーされた説明: MessageProducerSupport
      デフォルトでは何もしません。サブクラスは、ライフサイクル管理の動作が必要な場合、これをオーバーライドできます。"lifecycleLock" によって保護されています。
      オーバーライド:
      クラス MessageProducerSupportdoStart 
    • doStop

      protected void doStop()
      クラスからコピーされた説明: MessageProducerSupport
      デフォルトでは何もしません。サブクラスは、ライフサイクル管理の動作が必要な場合、これをオーバーライドできます。
      オーバーライド:
      クラス MessageProducerSupportdoStop 
    • pause

      public void pause()
      インターフェースからコピーされた説明: Pausable
      エンドポイントを一時停止します。
      次で指定:
      インターフェース Pausablepause 
    • resume

      public void resume()
      インターフェースからコピーされた説明: Pausable
      一時停止した場合は、エンドポイントを再開します。
      次で指定:
      インターフェース Pausableresume 
    • isPaused

      public boolean isPaused()
      インターフェースからコピーされた説明: Pausable
      エンドポイントが一時停止しているかどうかを確認します。
      次で指定:
      インターフェース PausableisPaused 
      戻り値:
      一時停止した場合は true。
    • beforeShutdown

      public int beforeShutdown()
      インターフェースからコピーされた説明: OrderlyShutdownCapable
      シャットダウンが始まる前に呼び出されます。実装は、新しいメッセージの受け入れを停止する必要があります。オプションで、処理中のアクティブなメッセージの数を返すことができます。
      次で指定:
      インターフェース OrderlyShutdownCapablebeforeShutdown 
      戻り値:
      使用可能な場合、アクティブなメッセージの数。
    • afterShutdown

      public int afterShutdown()
      インターフェースからコピーされた説明: OrderlyShutdownCapable
      スケジューラー、エグゼキューターなどの通常のシャットダウンの後、シャットダウン遅延が経過した後、残りのアクティブなスケジューラー / エグゼキュータースレッドの強制シャットダウンの前に呼び出されます。オプションで、まだ処理中のアクティブなメッセージの数を返すことができます。
      次で指定:
      インターフェース OrderlyShutdownCapableafterShutdown 
      戻り値:
      使用可能な場合、アクティブなメッセージの数。
    • getErrorMessageAttributes

      protected AttributeAccessor getErrorMessageAttributes(Message<?> message)
      クラスからコピーされた説明: MessageProducerSupport
      errorMessageStrategy でエラーメッセージを作成するときに使用する AttributeAccessor を設定します。
      オーバーライド:
      クラス MessageProducerSupportgetErrorMessageAttributes 
      パラメーター:
      message - メッセージ。
      戻り値:
      属性。