クラス KafkaMessageSource<K,V>
java.lang.ObjectSE
org.springframework.integration.util.AbstractExpressionEvaluator
org.springframework.integration.endpoint.AbstractMessageSource<ObjectSE>
org.springframework.integration.kafka.inbound.KafkaMessageSource<K,V>
- 型パラメーター:
K
- 鍵の型。V
- 値の型。
- 実装されたすべてのインターフェース:
Aware
、BeanClassLoaderAware
、BeanFactoryAware
、BeanNameAware
、DisposableBean
、InitializingBean
、Lifecycle
、MessageSource<ObjectSE>
、Pausable
、IntegrationPattern
、NamedComponent
、IntegrationInboundManagement
、IntegrationManagement
、ManageableLifecycle
public class KafkaMessageSource<K,V>
extends AbstractMessageSource<ObjectSE>
implements Pausable, BeanClassLoaderAware
Apache Kafka のポーリングされたメッセージソース。一度に 1 つのスレッドだけがデータをポーリング (またはメッセージを確認) できます。
NOTE: アプリケーションがメッセージの順序が正しくないことを確認した場合、オフセットの前のすべてのメッセージが確認応答されるまで、確認応答は延期されます。複数のレコードが取得され、以前のオフセットが再キューイングされた場合、後続のオフセットからのレコードは、正常に処理された場合でも再配信されます。アプリケーションはべき等性を実装する必要があります。
バージョン 3.1.2 以降、このソースは Pausable
を実装しており、Consumer
を一時停止および再開できます。コンシューマーが一時停止している間、リバランスを防ぐために、max.poll.interval.ms
内で AbstractMessageSource.receive()
を呼び出し続ける必要があります。
- 導入:
- 5.4
- 作成者:
- Gary Russell, Mark Norkin, Artem Bilan, Anshul Mehra, Christian Tzolov, Ngoc Nhan
ネストされたクラスのサマリー
ネストされたクラス修飾子と型クラス説明static class
Kafka の場合は AcknowledgmentCallback。static final record
KafkaAckInfo の場合は AcknowledgmentCallbackFactory。static interface
KafkaAckCallback を構築するための情報。class
KafkaAckCallback を構築するための情報。インターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたネストクラス / インターフェース
IntegrationManagement.ManagementOverrides
フィールドのサマリー
フィールドクラス org.springframework.integration.util.AbstractExpressionEvaluator から継承されたフィールド
EXPRESSION_PARSER, logger
インターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたフィールド
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
コンストラクターの概要
コンストラクターコンストラクター説明KafkaMessageSource
(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties) 指定されたパラメーターを使用してインスタンスを作成します。KafkaMessageSource
(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch) 指定されたパラメーターを使用してインスタンスを作成します。KafkaMessageSource
(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory) 指定されたパラメーターを使用してインスタンスを作成します。KafkaMessageSource
(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch) 指定されたパラメーターを使用してインスタンスを作成します。メソッドのサマリー
修飾子と型メソッド説明protected void
void
destroy()
protected ObjectSE
サブクラスはこのメソッドを実装する必要があります。CollectionSE<org.apache.kafka.common.TopicPartition>
現在割り当てられているパーティションを返します。protected StringSE
protected DurationSE
org.springframework.kafka.listener.ConsumerProperties
構成されたコンシューマープロパティへの参照を取得します。ソースを開始する前に、プロパティをさらにカスタマイズできます。protected StringSE
protected org.springframework.kafka.support.converter.RecordMessageConverter
protected ClassSE<?>
protected long
protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener
boolean
isPaused()
エンドポイントが一時停止しているかどうかを確認します。protected boolean
boolean
protected void
onInit()
void
pause()
エンドポイントを一時停止します。void
resume()
一時停止した場合は、エンドポイントを再開します。void
setBeanClassLoader
(ClassLoaderSE classLoader) void
setCloseTimeout
(DurationSE closeTimeout) クローズタイムアウトを設定します。デフォルトは 30 秒です。void
setMessageConverter
(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) デフォルトのMessagingMessageConverter
を置き換えるようにメッセージコンバーターを設定します。void
setPayloadType
(ClassSE<?> payloadType) ペイロード型を設定します。void
setRawMessageHeader
(boolean rawMessageHeader) true に設定すると、生のConsumerRecord
がキーKafkaHeaders.RAW_DATA
およびIntegrationMessageHeaderAccessor.SOURCE_DATA
のヘッダーとして含まれます。void
start()
void
stop()
クラス org.springframework.integration.endpoint.AbstractMessageSource から継承されたメソッド
buildMessage, getBeanName, getComponentName, getManagedName, getManagedType, getOverrides, isLoggingEnabled, receive, registerMetricsCaptor, setBeanName, setHeaderExpressions, setLoggingEnabled, setManagedName, setManagedType
クラス org.springframework.integration.util.AbstractExpressionEvaluator から継承されたメソッド
afterPropertiesSet, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, getBeanFactory, getEvaluationContext, getEvaluationContext, getMessageBuilderFactory, setBeanFactory, setConversionService, setSimpleEvaluationContext
クラス java.lang.ObjectSE から継承されたメソッド
clone, equalsSE, finalize, getClass, hashCode, notify, notifyAll, toString, wait, waitSE, waitSE
インターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたメソッド
getThisAs, isObserved, registerObservationRegistry
インターフェース org.springframework.integration.core.MessageSource から継承されたメソッド
getIntegrationPatternType
フィールドの詳細
REMAINING_RECORDS
前回のポーリングから残っているレコードの数。- 導入:
- 3.2
- 関連事項:
newAssignment
public volatile boolean newAssignment
コンストラクターの詳細
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties) 指定されたパラメーターを使用してインスタンスを作成します。ポーリングごとに複数のレコードをフェッチすることは無効になります。- パラメーター:
consumerFactory
- コンシューマーファクトリ。consumerProperties
- コンシューマーの特性。- 関連事項:
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch) 指定されたパラメーターを使用してインスタンスを構築します。各ポーリングで最大max.poll.records
を取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリがDefaultKafkaConsumerFactory
の場合はmax.poll.records
が 1 に強制変換され、それ以外の場合はIllegalArgumentException
SE で拒否されます。重要: true の場合、max.poll.interval.ms
内で受信したレコード数を消費するのに十分な速度でAbstractMessageSource.receive()
を呼び出す必要があります。false の場合、max.poll.interval.ms
内でAbstractMessageSource.receive()
を呼び出す必要があります。pause()
は、前回のポーリングのレコードが消費されるまで有効になりません。- パラメーター:
consumerFactory
- コンシューマーファクトリ。consumerProperties
- コンシューマーの特性。allowMultiFetch
-max.poll.records > 1
を許可する場合は true。
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory) 指定されたパラメーターを使用してインスタンスを作成します。ポーリングごとに複数のレコードをフェッチすることは無効になります。- パラメーター:
consumerFactory
- コンシューマーファクトリ。consumerProperties
- コンシューマーの特性。ackCallbackFactory
- ack コールバックファクトリ。- 関連事項:
KafkaMessageSource
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch) 指定されたパラメーターを使用してインスタンスを構築します。各ポーリングで最大max.poll.records
を取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリがDefaultKafkaConsumerFactory
の場合はmax.poll.records
が 1 に強制変換され、それ以外の場合はIllegalArgumentException
SE で拒否されます。重要: true の場合、max.poll.interval.ms
内で受信したレコード数を消費するのに十分な速度でAbstractMessageSource.receive()
を呼び出す必要があります。false の場合、max.poll.interval.ms
内でAbstractMessageSource.receive()
を呼び出す必要があります。pause()
は、前回のポーリングのレコードが消費されるまで有効になりません。- パラメーター:
consumerFactory
- コンシューマーファクトリ。consumerProperties
- コンシューマーの特性。ackCallbackFactory
- ack コールバックファクトリ。allowMultiFetch
-max.poll.records > 1
を許可する場合は true。
メソッドの詳細
getAssignedPartitions
現在割り当てられているパーティションを返します。- 戻り値:
- パーティション。
setBeanClassLoader
- 次で指定:
- インターフェース
BeanClassLoaderAware
のsetBeanClassLoader
onInit
protected void onInit()- オーバーライド:
- クラス
AbstractExpressionEvaluator
のonInit
getConsumerProperties
public org.springframework.kafka.listener.ConsumerProperties getConsumerProperties()構成されたコンシューマープロパティへの参照を取得します。ソースを開始する前に、プロパティをさらにカスタマイズできます。- 戻り値:
- プロパティ。
getGroupId
getClientId
getPollTimeout
protected long getPollTimeout()getMessageConverter
protected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter()setMessageConverter
public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) デフォルトのMessagingMessageConverter
を置き換えるようにメッセージコンバーターを設定します。- パラメーター:
messageConverter
- コンバーター。
getPayloadType
setPayloadType
ペイロード型を設定します。型対応のメッセージコンバーターが提供されている場合にのみ適用されます。- パラメーター:
payloadType
- 変換する型。
getRebalanceListener
protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener()getComponentType
- 次で指定:
- インターフェース
NamedComponent
のgetComponentType
isRawMessageHeader
protected boolean isRawMessageHeader()setRawMessageHeader
public void setRawMessageHeader(boolean rawMessageHeader) true に設定すると、生のConsumerRecord
がキーKafkaHeaders.RAW_DATA
およびIntegrationMessageHeaderAccessor.SOURCE_DATA
のヘッダーとして含まれます。呼び出し元がレコードにアクセスしてエラーを処理できるようにします。- パラメーター:
rawMessageHeader
- ヘッダーを含める場合は true。
getCommitTimeout
setCloseTimeout
クローズタイムアウトを設定します。デフォルトは 30 秒です。- パラメーター:
closeTimeout
- クローズタイムアウト。
isRunning
public boolean isRunning()- 次で指定:
- インターフェース
Lifecycle
のisRunning
- 次で指定:
- インターフェース
ManageableLifecycle
のisRunning
start
public void start()- 次で指定:
- インターフェース
Lifecycle
のstart
- 次で指定:
- インターフェース
ManageableLifecycle
のstart
stop
public void stop()- 次で指定:
- インターフェース
Lifecycle
のstop
- 次で指定:
- インターフェース
ManageableLifecycle
のstop
pause
public void pause()インターフェースからコピーされた説明:Pausable
エンドポイントを一時停止します。resume
public void resume()インターフェースからコピーされた説明:Pausable
一時停止した場合は、エンドポイントを再開します。isPaused
public boolean isPaused()インターフェースからコピーされた説明:Pausable
エンドポイントが一時停止しているかどうかを確認します。doReceive
クラスからコピーされた説明:AbstractMessageSource
サブクラスはこのメソッドを実装する必要があります。通常、返される値は型 T のpayload
になりますが、返される値はペイロードが型 T であるMessage
インスタンスである場合もあります。また、追加のヘッダーの設定に使用されるAbstractIntegrationMessageBuilder
である場合もあります。- 次で指定:
- クラス
AbstractMessageSource<ObjectSE>
のdoReceive
- 戻り値:
- 返された値。
createConsumer
protected void createConsumer()destroy
public void destroy()- 次で指定:
- インターフェース
DisposableBean
のdestroy
- 次で指定:
- インターフェース
IntegrationManagement
のdestroy
- オーバーライド:
- クラス
AbstractMessageSource<ObjectSE>
のdestroy