クラス KafkaMessageSource<K,V>

型パラメーター:
K - 鍵の型。
V - 値の型。
実装されたすべてのインターフェース:
AwareBeanClassLoaderAwareBeanFactoryAwareBeanNameAwareDisposableBeanInitializingBeanLifecycleMessageSource<ObjectSE>PausableIntegrationPatternNamedComponentIntegrationInboundManagementIntegrationManagementManageableLifecycle

public class KafkaMessageSource<K,V> extends AbstractMessageSource<ObjectSE> implements Pausable, BeanClassLoaderAware
Apache Kafka のポーリングされたメッセージソース。一度に 1 つのスレッドだけがデータをポーリング (またはメッセージを確認) できます。

NOTE: アプリケーションがメッセージの順序が正しくないことを確認した場合、オフセットの前のすべてのメッセージが確認応答されるまで、確認応答は延期されます。複数のレコードが取得され、以前のオフセットが再キューイングされた場合、後続のオフセットからのレコードは、正常に処理された場合でも再配信されます。アプリケーションはべき等性を実装する必要があります。

バージョン 3.1.2 以降、このソースは Pausable を実装しており、Consumer を一時停止および再開できます。コンシューマーが一時停止している間、リバランスを防ぐために、max.poll.interval.ms 内で AbstractMessageSource.receive() を呼び出し続ける必要があります。

導入:
5.4
作成者:
Gary Russell, Mark Norkin, Artem Bilan, Anshul Mehra, Christian Tzolov, Ngoc Nhan
  • フィールドの詳細

    • REMAINING_RECORDS

      public static final StringSE REMAINING_RECORDS
      前回のポーリングから残っているレコードの数。
      導入:
      3.2
      関連事項:
    • newAssignment

      public volatile boolean newAssignment
  • コンストラクターの詳細

    • KafkaMessageSource

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)
      指定されたパラメーターを使用してインスタンスを作成します。ポーリングごとに複数のレコードをフェッチすることは無効になります。
      パラメーター:
      consumerFactory - コンシューマーファクトリ。
      consumerProperties - コンシューマーの特性。
      関連事項:
    • KafkaMessageSource

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)
      指定されたパラメーターを使用してインスタンスを構築します。各ポーリングで最大 max.poll.records を取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリが DefaultKafkaConsumerFactory の場合は max.poll.records が 1 に強制変換され、それ以外の場合は IllegalArgumentExceptionSE で拒否されます。重要: true の場合、max.poll.interval.ms 内で受信したレコード数を消費するのに十分な速度で AbstractMessageSource.receive() を呼び出す必要があります。false の場合、max.poll.interval.ms 内で AbstractMessageSource.receive() を呼び出す必要があります。pause() は、前回のポーリングのレコードが消費されるまで有効になりません。
      パラメーター:
      consumerFactory - コンシューマーファクトリ。
      consumerProperties - コンシューマーの特性。
      allowMultiFetch - max.poll.records > 1 を許可する場合は true。
    • KafkaMessageSource

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory)
      指定されたパラメーターを使用してインスタンスを作成します。ポーリングごとに複数のレコードをフェッチすることは無効になります。
      パラメーター:
      consumerFactory - コンシューマーファクトリ。
      consumerProperties - コンシューマーの特性。
      ackCallbackFactory - ack コールバックファクトリ。
      関連事項:
    • KafkaMessageSource

      public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, boolean allowMultiFetch)
      指定されたパラメーターを使用してインスタンスを構築します。各ポーリングで最大 max.poll.records を取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリが DefaultKafkaConsumerFactory の場合は max.poll.records が 1 に強制変換され、それ以外の場合は IllegalArgumentExceptionSE で拒否されます。重要: true の場合、max.poll.interval.ms 内で受信したレコード数を消費するのに十分な速度で AbstractMessageSource.receive() を呼び出す必要があります。false の場合、max.poll.interval.ms 内で AbstractMessageSource.receive() を呼び出す必要があります。pause() は、前回のポーリングのレコードが消費されるまで有効になりません。
      パラメーター:
      consumerFactory - コンシューマーファクトリ。
      consumerProperties - コンシューマーの特性。
      ackCallbackFactory - ack コールバックファクトリ。
      allowMultiFetch - max.poll.records > 1 を許可する場合は true。
  • メソッドの詳細

    • getAssignedPartitions

      public CollectionSE<org.apache.kafka.common.TopicPartition> getAssignedPartitions()
      現在割り当てられているパーティションを返します。
      戻り値:
      パーティション。
    • setBeanClassLoader

      public void setBeanClassLoader(ClassLoaderSE classLoader)
      次で指定:
      インターフェース BeanClassLoaderAwaresetBeanClassLoader 
    • onInit

      protected void onInit()
      オーバーライド:
      クラス AbstractExpressionEvaluatoronInit 
    • getConsumerProperties

      public org.springframework.kafka.listener.ConsumerProperties getConsumerProperties()
      構成されたコンシューマープロパティへの参照を取得します。ソースを開始する前に、プロパティをさらにカスタマイズできます。
      戻り値:
      プロパティ。
    • getGroupId

      protected StringSE getGroupId()
    • getClientId

      protected StringSE getClientId()
    • getPollTimeout

      protected long getPollTimeout()
    • getMessageConverter

      protected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter()
    • setMessageConverter

      public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
      デフォルトの MessagingMessageConverter を置き換えるようにメッセージコンバーターを設定します。
      パラメーター:
      messageConverter - コンバーター。
    • getPayloadType

      protected ClassSE<?> getPayloadType()
    • setPayloadType

      public void setPayloadType(ClassSE<?> payloadType)
      ペイロード型を設定します。型対応のメッセージコンバーターが提供されている場合にのみ適用されます。
      パラメーター:
      payloadType - 変換する型。
    • getRebalanceListener

      protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener()
    • getComponentType

      public StringSE getComponentType()
      次で指定:
      インターフェース NamedComponentgetComponentType 
    • isRawMessageHeader

      protected boolean isRawMessageHeader()
    • setRawMessageHeader

      public void setRawMessageHeader(boolean rawMessageHeader)
      true に設定すると、生の ConsumerRecord がキー KafkaHeaders.RAW_DATA および IntegrationMessageHeaderAccessor.SOURCE_DATA のヘッダーとして含まれます。呼び出し元がレコードにアクセスしてエラーを処理できるようにします。
      パラメーター:
      rawMessageHeader - ヘッダーを含める場合は true。
    • getCommitTimeout

      protected DurationSE getCommitTimeout()
    • setCloseTimeout

      public void setCloseTimeout(DurationSE closeTimeout)
      クローズタイムアウトを設定します。デフォルトは 30 秒です。
      パラメーター:
      closeTimeout - クローズタイムアウト。
    • isRunning

      public boolean isRunning()
      次で指定:
      インターフェース LifecycleisRunning 
      次で指定:
      インターフェース ManageableLifecycleisRunning 
    • start

      public void start()
      次で指定:
      インターフェース Lifecyclestart 
      次で指定:
      インターフェース ManageableLifecyclestart 
    • stop

      public void stop()
      次で指定:
      インターフェース Lifecyclestop 
      次で指定:
      インターフェース ManageableLifecyclestop 
    • pause

      public void pause()
      インターフェースからコピーされた説明: Pausable
      エンドポイントを一時停止します。
      次で指定:
      インターフェース Pausablepause 
    • resume

      public void resume()
      インターフェースからコピーされた説明: Pausable
      一時停止した場合は、エンドポイントを再開します。
      次で指定:
      インターフェース Pausableresume 
    • isPaused

      public boolean isPaused()
      インターフェースからコピーされた説明: Pausable
      エンドポイントが一時停止しているかどうかを確認します。
      次で指定:
      インターフェース PausableisPaused 
      戻り値:
      一時停止した場合は true。
    • doReceive

      protected ObjectSE doReceive()
      クラスからコピーされた説明: AbstractMessageSource
      サブクラスはこのメソッドを実装する必要があります。通常、返される値は型 T の payload になりますが、返される値はペイロードが型 T である Message インスタンスである場合もあります。また、追加のヘッダーの設定に使用される AbstractIntegrationMessageBuilder である場合もあります。
      次で指定:
      クラス AbstractMessageSource<ObjectSE>doReceive 
      戻り値:
      返された値。
    • createConsumer

      protected void createConsumer()
    • destroy

      public void destroy()
      次で指定:
      インターフェース DisposableBeandestroy 
      次で指定:
      インターフェース IntegrationManagementdestroy 
      オーバーライド:
      クラス AbstractMessageSource<ObjectSE>destroy