クラス KafkaMessageSource<K,​V>

型パラメーター:
K - 鍵の型。
V - 値の型。
実装されているすべてのインターフェース:
AwareBeanFactoryAwareBeanNameAwareDisposableBeanInitializingBeanLifecycleMessageSource<ObjectSE>PausableIntegrationPatternNamedComponentIntegrationInboundManagementIntegrationManagementManageableLifecycle

public class KafkaMessageSource<K,​V>
extends AbstractMessageSource<ObjectSE>
implements Pausable
Apache Kafka のポーリングされたメッセージソース。一度に 1 つのスレッドだけがデータをポーリング (またはメッセージを確認) できます。

NOTE: アプリケーションがメッセージの順序が正しくないことを確認した場合、オフセットの前のすべてのメッセージが確認応答されるまで、確認応答は延期されます。複数のレコードが取得され、以前のオフセットが再キューイングされた場合、後続のオフセットからのレコードは、正常に処理された場合でも再配信されます。アプリケーションはべき等性を実装する必要があります。

バージョン 3.1.2 以降、このソースは Pausable を実装しており、Consumer を一時停止および再開できます。コンシューマーが一時停止している間、リバランスを防ぐために、max.poll.interval.ms 内で AbstractMessageSource.receive() を呼び出し続ける必要があります。

導入:
5.4
作成者:
Gary Russell, Mark Norkin, Artem Bilan, Anshul Mehra
  • フィールドの詳細

  • コンストラクターの詳細

    • KafkaMessageSource

      public KafkaMessageSource​(org.springframework.kafka.core.ConsumerFactory<K,​V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)
      指定されたパラメーターを使用してインスタンスを作成します。ポーリングごとに複数のレコードをフェッチすることは無効になります。
      パラメーター:
      consumerFactory - コンシューマーファクトリ。
      consumerProperties - コンシューマーの特性。
      関連事項:
      KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
    • KafkaMessageSource

      public KafkaMessageSource​(org.springframework.kafka.core.ConsumerFactory<K,​V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)
      指定されたパラメーターを使用してインスタンスを構築します。各ポーリングで最大 max.poll.records を取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリが DefaultKafkaConsumerFactory の場合は max.poll.records が 1 に強制変換され、それ以外の場合は IllegalArgumentExceptionSE で拒否されます。重要: true の場合、max.poll.interval.ms 内で受信したレコード数を消費するのに十分な速度で AbstractMessageSource.receive() を呼び出す必要があります。false の場合、max.poll.interval.ms 内で AbstractMessageSource.receive() を呼び出す必要があります。pause() は、前回のポーリングのレコードが消費されるまで有効になりません。
      パラメーター:
      consumerFactory - コンシューマーファクトリ。
      consumerProperties - コンシューマーの特性。
      allowMultiFetch - max.poll.records > 1 を許可する場合は true。
    • KafkaMessageSource

      public KafkaMessageSource​(org.springframework.kafka.core.ConsumerFactory<K,​V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,​V> ackCallbackFactory)
      指定されたパラメーターを使用してインスタンスを作成します。ポーリングごとに複数のレコードをフェッチすることは無効になります。
      パラメーター:
      consumerFactory - コンシューマーファクトリ。
      consumerProperties - コンシューマーの特性。
      ackCallbackFactory - ack コールバックファクトリ。
      関連事項:
      KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
    • KafkaMessageSource

      public KafkaMessageSource​(org.springframework.kafka.core.ConsumerFactory<K,​V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,​V> ackCallbackFactory, boolean allowMultiFetch)
      指定されたパラメーターを使用してインスタンスを構築します。各ポーリングで最大 max.poll.records を取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリが DefaultKafkaConsumerFactory の場合は max.poll.records が 1 に強制変換され、それ以外の場合は IllegalArgumentExceptionSE で拒否されます。重要: true の場合、max.poll.interval.ms 内で受信したレコード数を消費するのに十分な速度で AbstractMessageSource.receive() を呼び出す必要があります。false の場合、max.poll.interval.ms 内で AbstractMessageSource.receive() を呼び出す必要があります。pause() は、前回のポーリングのレコードが消費されるまで有効になりません。
      パラメーター:
      consumerFactory - コンシューマーファクトリ。
      consumerProperties - コンシューマーの特性。
      ackCallbackFactory - ack コールバックファクトリ。
      allowMultiFetch - max.poll.records > 1 を許可する場合は true。
  • メソッドの詳細

    • getAssignedPartitions

      public CollectionSE<org.apache.kafka.common.TopicPartition> getAssignedPartitions()
      現在割り当てられているパーティションを返します。
      戻り値:
      パーティション。
    • onInit

      protected void onInit()
      オーバーライド:
      クラス AbstractExpressionEvaluatoronInit 
    • getConsumerProperties

      public org.springframework.kafka.listener.ConsumerProperties getConsumerProperties()
      構成されたコンシューマープロパティへの参照を取得します。ソースを開始する前に、プロパティをさらにカスタマイズできます。
      戻り値:
      プロパティ。
    • getGroupId

      protected StringSE getGroupId()
    • getClientId

      protected StringSE getClientId()
    • getPollTimeout

      protected long getPollTimeout()
    • getMessageConverter

      protected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter()
    • setMessageConverter

      public void setMessageConverter​(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
      デフォルトの MessagingMessageConverter を置き換えるようにメッセージコンバーターを設定します。
      パラメーター:
      messageConverter - コンバーター。
    • getPayloadType

      protected ClassSE<?> getPayloadType()
    • setPayloadType

      public void setPayloadType​(ClassSE<?> payloadType)
      ペイロード型を設定します。型対応のメッセージコンバーターが提供されている場合にのみ適用されます。
      パラメーター:
      payloadType - 変換する型。
    • getRebalanceListener

      protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener()
    • getComponentType

      public StringSE getComponentType()
      次で指定:
      インターフェース NamedComponentgetComponentType 
    • isRawMessageHeader

      protected boolean isRawMessageHeader()
    • setRawMessageHeader

      public void setRawMessageHeader​(boolean rawMessageHeader)
      true に設定すると、生の ConsumerRecord がキー KafkaHeaders.RAW_DATA および IntegrationMessageHeaderAccessor.SOURCE_DATA のヘッダーとして含まれます。呼び出し元がレコードにアクセスしてエラーを処理できるようにします。
      パラメーター:
      rawMessageHeader - ヘッダーを含める場合は true。
    • getCommitTimeout

      protected DurationSE getCommitTimeout()
    • setCloseTimeout

      public void setCloseTimeout​(DurationSE closeTimeout)
      クローズタイムアウトを設定します。デフォルトは 30 秒です。
      パラメーター:
      closeTimeout - クローズタイムアウト。
    • isRunning

      public boolean isRunning()
      次で指定:
      インターフェース LifecycleisRunning 
      次で指定:
      インターフェース ManageableLifecycleisRunning 
    • start

      public void start()
      次で指定:
      インターフェース Lifecyclestart 
      次で指定:
      インターフェース ManageableLifecyclestart 
    • stop

      public void stop()
      次で指定:
      インターフェース Lifecyclestop 
      次で指定:
      インターフェース ManageableLifecyclestop 
    • pause

      public void pause()
      インターフェースからコピーされた説明: Pausable
      エンドポイントを一時停止します。
      次で指定:
      インターフェース Pausablepause 
    • resume

      public void resume()
      インターフェースからコピーされた説明: Pausable
      一時停止した場合は、エンドポイントを再開します。
      次で指定:
      インターフェース Pausableresume 
    • isPaused

      public boolean isPaused()
      インターフェースからコピーされた説明: Pausable
      エンドポイントが一時停止しているかどうかを確認します。
      次で指定:
      インターフェース PausableisPaused 
      戻り値:
      一時停止した場合は true。
    • doReceive

      protected ObjectSE doReceive()
      クラスからコピーされた説明: AbstractMessageSource
      サブクラスはこのメソッドを実装する必要があります。通常、戻り値は T 型の payload ですが、戻り値はペイロードが T 型の Message インスタンスでもかまいません。追加のヘッダーの作成に使用される AbstractIntegrationMessageBuilder にすることもできます。
      次で指定:
      クラス AbstractMessageSource<ObjectSE>doReceive 
      戻り値:
      返された値。
    • createConsumer

      protected void createConsumer()
    • destroy

      public void destroy()
      次で指定:
      インターフェース DisposableBeandestroy 
      次で指定:
      インターフェース IntegrationManagementdestroy 
      オーバーライド:
      クラス AbstractMessageSource<ObjectSE>destroy