クラス KafkaMessageSource<K,V>
java.lang.ObjectSE
org.springframework.integration.util.AbstractExpressionEvaluator
org.springframework.integration.endpoint.AbstractMessageSource<ObjectSE>
org.springframework.integration.kafka.inbound.KafkaMessageSource<K,V>
- 型パラメーター:
K- 鍵の型。V- 値の型。
- 実装されているすべてのインターフェース:
Aware、BeanFactoryAware、BeanNameAware、DisposableBean、InitializingBean、Lifecycle、MessageSource<ObjectSE>、Pausable、IntegrationPattern、NamedComponent、IntegrationInboundManagement、IntegrationManagement、ManageableLifecycle
public class KafkaMessageSource<K,V> extends AbstractMessageSource<ObjectSE> implements Pausable
Apache Kafka のポーリングされたメッセージソース。一度に 1 つのスレッドだけがデータをポーリング (またはメッセージを確認) できます。
NOTE: アプリケーションがメッセージの順序が正しくないことを確認した場合、オフセットの前のすべてのメッセージが確認応答されるまで、確認応答は延期されます。複数のレコードが取得され、以前のオフセットが再キューイングされた場合、後続のオフセットからのレコードは、正常に処理された場合でも再配信されます。アプリケーションはべき等性を実装する必要があります。
バージョン 3.1.2 以降、このソースは Pausable を実装しており、Consumer を一時停止および再開できます。コンシューマーが一時停止している間、リバランスを防ぐために、max.poll.interval.ms 内で AbstractMessageSource.receive() を呼び出し続ける必要があります。
- 導入:
- 5.4
- 作成者:
- Gary Russell, Mark Norkin, Artem Bilan, Anshul Mehra
ネストされたクラスの要約
ネストされたクラス 修飾子と型 クラス 説明 static classKafkaMessageSource.KafkaAckCallback<K,V>Kafka の場合は AcknowledgmentCallback。static classKafkaMessageSource.KafkaAckCallbackFactory<K,V>KafkaAckInfo の場合は AcknowledgmentCallbackFactory。static interfaceKafkaMessageSource.KafkaAckInfo<K,V>KafkaAckCallback を構築するための情報。classKafkaMessageSource.KafkaAckInfoImplKafkaAckCallback を構築するための情報。インターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたネストクラス / インターフェース
IntegrationManagement.ManagementOverridesフィールドのサマリー
フィールド 修飾子と型 フィールド 説明 booleannewAssignmentstatic StringSEREMAINING_RECORDS前回のポーリングから残っているレコードの数。クラス org.springframework.integration.util.AbstractExpressionEvaluator から継承されたフィールド
EXPRESSION_PARSER, loggerインターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたフィールド
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEコンストラクターの概要
コンストラクター コンストラクター 説明 KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)指定されたパラメーターを使用してインスタンスを作成します。KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)指定されたパラメーターを使用してインスタンスを作成します。KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory)指定されたパラメーターを使用してインスタンスを作成します。KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, boolean allowMultiFetch)指定されたパラメーターを使用してインスタンスを作成します。メソッドのサマリー
修飾子と型 メソッド 説明 protected voidcreateConsumer()voiddestroy()protected ObjectSEdoReceive()サブクラスはこのメソッドを実装する必要があります。CollectionSE<org.apache.kafka.common.TopicPartition>getAssignedPartitions()現在割り当てられているパーティションを返します。protected StringSEgetClientId()protected DurationSEgetCommitTimeout()StringSEgetComponentType()org.springframework.kafka.listener.ConsumerPropertiesgetConsumerProperties()構成されたコンシューマープロパティへの参照を取得します。ソースを開始する前に、プロパティをさらにカスタマイズできます。protected StringSEgetGroupId()protected org.springframework.kafka.support.converter.RecordMessageConvertergetMessageConverter()protected ClassSE<?>getPayloadType()protected longgetPollTimeout()protected org.apache.kafka.clients.consumer.ConsumerRebalanceListenergetRebalanceListener()booleanisPaused()エンドポイントが一時停止しているかどうかを確認します。protected booleanisRawMessageHeader()booleanisRunning()protected voidonInit()voidpause()エンドポイントを一時停止します。voidresume()一時停止した場合は、エンドポイントを再開します。voidsetCloseTimeout(DurationSE closeTimeout)クローズタイムアウトを設定します。デフォルトは 30 秒です。voidsetMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)デフォルトのMessagingMessageConverterを置き換えるようにメッセージコンバーターを設定します。voidsetPayloadType(ClassSE<?> payloadType)ペイロード型を設定します。voidsetRawMessageHeader(boolean rawMessageHeader)true に設定すると、生のConsumerRecordがキーKafkaHeaders.RAW_DATAおよびIntegrationMessageHeaderAccessor.SOURCE_DATAのヘッダーとして含まれます。voidstart()voidstop()クラス org.springframework.integration.endpoint.AbstractMessageSource から継承されたメソッド
buildMessage, getBeanName, getComponentName, getManagedName, getManagedType, getOverrides, isLoggingEnabled, receive, registerMetricsCaptor, setBeanName, setHeaderExpressions, setLoggingEnabled, setManagedName, setManagedTypeクラス org.springframework.integration.util.AbstractExpressionEvaluator から継承されたメソッド
afterPropertiesSet, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, getBeanFactory, getEvaluationContext, getEvaluationContext, getMessageBuilderFactory, setBeanFactory, setConversionServiceクラス java.lang.ObjectSE から継承されたメソッド
clone, equalsSE, finalize, getClass, hashCode, notify, notifyAll, toString, wait, waitSE, waitSEインターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたメソッド
getThisAs
フィールドの詳細
REMAINING_RECORDS
前回のポーリングから残っているレコードの数。- 導入:
- 3.2
- 関連事項:
- 定数フィールド値
newAssignment
public volatile boolean newAssignment
コンストラクターの詳細
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)指定されたパラメーターを使用してインスタンスを作成します。ポーリングごとに複数のレコードをフェッチすることは無効になります。- パラメーター:
consumerFactory- コンシューマーファクトリ。consumerProperties- コンシューマーの特性。- 関連事項:
KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)指定されたパラメーターを使用してインスタンスを構築します。各ポーリングで最大max.poll.recordsを取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリがDefaultKafkaConsumerFactoryの場合はmax.poll.recordsが 1 に強制変換され、それ以外の場合はIllegalArgumentExceptionSE で拒否されます。重要: true の場合、max.poll.interval.ms内で受信したレコード数を消費するのに十分な速度でAbstractMessageSource.receive()を呼び出す必要があります。false の場合、max.poll.interval.ms内でAbstractMessageSource.receive()を呼び出す必要があります。pause()は、前回のポーリングのレコードが消費されるまで有効になりません。- パラメーター:
consumerFactory- コンシューマーファクトリ。consumerProperties- コンシューマーの特性。allowMultiFetch-max.poll.records > 1を許可する場合は true。
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory)指定されたパラメーターを使用してインスタンスを作成します。ポーリングごとに複数のレコードをフェッチすることは無効になります。- パラメーター:
consumerFactory- コンシューマーファクトリ。consumerProperties- コンシューマーの特性。ackCallbackFactory- ack コールバックファクトリ。- 関連事項:
KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, boolean allowMultiFetch)指定されたパラメーターを使用してインスタンスを構築します。各ポーリングで最大max.poll.recordsを取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリがDefaultKafkaConsumerFactoryの場合はmax.poll.recordsが 1 に強制変換され、それ以外の場合はIllegalArgumentExceptionSE で拒否されます。重要: true の場合、max.poll.interval.ms内で受信したレコード数を消費するのに十分な速度でAbstractMessageSource.receive()を呼び出す必要があります。false の場合、max.poll.interval.ms内でAbstractMessageSource.receive()を呼び出す必要があります。pause()は、前回のポーリングのレコードが消費されるまで有効になりません。- パラメーター:
consumerFactory- コンシューマーファクトリ。consumerProperties- コンシューマーの特性。ackCallbackFactory- ack コールバックファクトリ。allowMultiFetch-max.poll.records > 1を許可する場合は true。
メソッドの詳細
getAssignedPartitions
現在割り当てられているパーティションを返します。- 戻り値:
- パーティション。
onInit
protected void onInit()- オーバーライド:
- クラス
AbstractExpressionEvaluatorのonInit
getConsumerProperties
public org.springframework.kafka.listener.ConsumerProperties getConsumerProperties()構成されたコンシューマープロパティへの参照を取得します。ソースを開始する前に、プロパティをさらにカスタマイズできます。- 戻り値:
- プロパティ。
getGroupId
getClientId
getPollTimeout
protected long getPollTimeout()getMessageConverter
protected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter()setMessageConverter
public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)デフォルトのMessagingMessageConverterを置き換えるようにメッセージコンバーターを設定します。- パラメーター:
messageConverter- コンバーター。
getPayloadType
setPayloadType
ペイロード型を設定します。型対応のメッセージコンバーターが提供されている場合にのみ適用されます。- パラメーター:
payloadType- 変換する型。
getRebalanceListener
protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener()getComponentType
- 次で指定:
- インターフェース
NamedComponentのgetComponentType
isRawMessageHeader
protected boolean isRawMessageHeader()setRawMessageHeader
public void setRawMessageHeader(boolean rawMessageHeader)true に設定すると、生のConsumerRecordがキーKafkaHeaders.RAW_DATAおよびIntegrationMessageHeaderAccessor.SOURCE_DATAのヘッダーとして含まれます。呼び出し元がレコードにアクセスしてエラーを処理できるようにします。- パラメーター:
rawMessageHeader- ヘッダーを含める場合は true。
getCommitTimeout
setCloseTimeout
クローズタイムアウトを設定します。デフォルトは 30 秒です。- パラメーター:
closeTimeout- クローズタイムアウト。
isRunning
public boolean isRunning()- 次で指定:
- インターフェース
LifecycleのisRunning - 次で指定:
- インターフェース
ManageableLifecycleのisRunning
start
public void start()- 次で指定:
- インターフェース
Lifecycleのstart - 次で指定:
- インターフェース
ManageableLifecycleのstart
stop
public void stop()- 次で指定:
- インターフェース
Lifecycleのstop - 次で指定:
- インターフェース
ManageableLifecycleのstop
pause
public void pause()インターフェースからコピーされた説明:Pausableエンドポイントを一時停止します。resume
public void resume()インターフェースからコピーされた説明:Pausable一時停止した場合は、エンドポイントを再開します。isPaused
public boolean isPaused()インターフェースからコピーされた説明:Pausableエンドポイントが一時停止しているかどうかを確認します。doReceive
クラスからコピーされた説明:AbstractMessageSourceサブクラスはこのメソッドを実装する必要があります。通常、戻り値は T 型のpayloadですが、戻り値はペイロードが T 型のMessageインスタンスでもかまいません。追加のヘッダーの作成に使用されるAbstractIntegrationMessageBuilderにすることもできます。- 次で指定:
- クラス
AbstractMessageSource<ObjectSE>のdoReceive - 戻り値:
- 返された値。
createConsumer
protected void createConsumer()destroy
public void destroy()- 次で指定:
- インターフェース
DisposableBeanのdestroy - 次で指定:
- インターフェース
IntegrationManagementのdestroy - オーバーライド:
- クラス
AbstractMessageSource<ObjectSE>のdestroy