K
- 鍵の型。V
- 値の型。public class KafkaMessageSource<K,V> extends AbstractMessageSource<ObjectSE> implements Pausable
NOTE: アプリケーションがメッセージの順序が正しくないことを確認した場合、オフセットの前のすべてのメッセージが確認応答されるまで、確認応答は延期されます。複数のレコードが取得され、以前のオフセットが再キューイングされた場合、後続のオフセットからのレコードは、正常に処理された場合でも再配信されます。アプリケーションはべき等性を実装する必要があります。
バージョン 3.1.2 以降、このソースは Pausable
を実装しており、Consumer
を一時停止および再開できます。コンシューマーが一時停止している間、リバランスを防ぐために、max.poll.interval.ms
内で AbstractMessageSource.receive()
を呼び出し続ける必要があります。
修飾子と型 | クラスと説明 |
---|---|
static class | KafkaMessageSource.KafkaAckCallback<K, V> Kafka の場合は AcknowledgmentCallback。 |
static class | KafkaMessageSource.KafkaAckCallbackFactory<K, V> KafkaAckInfo の場合は AcknowledgmentCallbackFactory。 |
static interface | KafkaMessageSource.KafkaAckInfo<K, V> KafkaAckCallback を構築するための情報。 |
class | KafkaMessageSource.KafkaAckInfoImpl KafkaAckCallback を構築するための情報。 |
IntegrationManagement.ManagementOverrides
修飾子と型 | フィールドと説明 |
---|---|
static StringSE | REMAINING_RECORDS 前回のポーリングから残っているレコードの数。 |
EXPRESSION_PARSER, logger
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
コンストラクターと説明 |
---|
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties) 指定されたパラメーターを使用してインスタンスを作成します。 |
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch) 指定されたパラメーターを使用してインスタンスを作成します。 |
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory) 指定されたパラメーターを使用してインスタンスを作成します。 |
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch) 指定されたパラメーターを使用してインスタンスを作成します。 |
修飾子と型 | メソッドと説明 |
---|---|
protected void | createConsumer() |
void | destroy() |
protected ObjectSE | doReceive() サブクラスはこのメソッドを実装する必要があります。 |
CollectionSE<org.apache.kafka.common.TopicPartition> | getAssignedPartitions() 現在割り当てられているパーティションを返します。 |
protected StringSE | getClientId() |
protected java.time.Duration | getCommitTimeout() |
StringSE | getComponentType() |
org.springframework.kafka.listener.ConsumerProperties | getConsumerProperties() 構成されたコンシューマープロパティへの参照を取得します。ソースを開始する前に、プロパティをさらにカスタマイズできます。 |
protected StringSE | getGroupId() |
protected org.springframework.kafka.support.converter.RecordMessageConverter | getMessageConverter() |
protected ClassSE<?> | getPayloadType() |
protected long | getPollTimeout() |
protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener | getRebalanceListener() |
boolean | isPaused() エンドポイントが一時停止しているかどうかを確認します。 |
protected boolean | isRawMessageHeader() |
boolean | isRunning() |
protected void | onInit() |
void | pause() エンドポイントを一時停止します。 |
void | resume() 一時停止した場合は、エンドポイントを再開します。 |
void | setCloseTimeout(java.time.Duration closeTimeout) クローズタイムアウトを設定します。デフォルトは 30 秒です。 |
void | setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) デフォルトの MessagingMessageConverter を置き換えるようにメッセージコンバーターを設定します。 |
void | setPayloadType(ClassSE<?> payloadType) ペイロード型を設定します。 |
void | setRawMessageHeader(boolean rawMessageHeader) true に設定すると、生の ConsumerRecord がキー KafkaHeaders.RAW_DATA および IntegrationMessageHeaderAccessor.SOURCE_DATA のヘッダーとして含まれます。 |
void | start() |
void | stop() |
buildMessage, getBeanName, getComponentName, getManagedName, getManagedType, getOverrides, isLoggingEnabled, receive, registerMetricsCaptor, setBeanName, setHeaderExpressions, setLoggingEnabled, setManagedName, setManagedType
afterPropertiesSet, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, getBeanFactory, getEvaluationContext, getEvaluationContext, getMessageBuilderFactory, setBeanFactory, setConversionService
cloneSE, equalsSE, finalizeSE, getClassSE, hashCodeSE, notifySE, notifyAllSE, toStringSE, waitSE, waitSE, waitSE
getIntegrationPatternType
getThisAs
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)
consumerFactory
- コンシューマーファクトリ。consumerProperties
- コンシューマーの特性。KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)
max.poll.records
を取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリが DefaultKafkaConsumerFactory
の場合は max.poll.records
が 1 に強制変換され、それ以外の場合は IllegalArgumentException
SE で拒否されます。重要: true の場合、max.poll.interval.ms
内で受信したレコード数を消費するのに十分な速度で AbstractMessageSource.receive()
を呼び出す必要があります。false の場合、max.poll.interval.ms
内で AbstractMessageSource.receive()
を呼び出す必要があります。pause()
は、前回のポーリングのレコードが消費されるまで有効になりません。consumerFactory
- コンシューマーファクトリ。consumerProperties
- コンシューマーの特性。allowMultiFetch
- max.poll.records > 1
を許可する場合は true。public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory)
consumerFactory
- コンシューマーファクトリ。consumerProperties
- コンシューマーの特性。ackCallbackFactory
- ack コールバックファクトリ。KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)
public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, boolean allowMultiFetch)
max.poll.records
を取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリが DefaultKafkaConsumerFactory
の場合は max.poll.records
が 1 に強制変換され、それ以外の場合は IllegalArgumentException
SE で拒否されます。重要: true の場合、max.poll.interval.ms
内で受信したレコード数を消費するのに十分な速度で AbstractMessageSource.receive()
を呼び出す必要があります。false の場合、max.poll.interval.ms
内で AbstractMessageSource.receive()
を呼び出す必要があります。pause()
は、前回のポーリングのレコードが消費されるまで有効になりません。consumerFactory
- コンシューマーファクトリ。consumerProperties
- コンシューマーの特性。ackCallbackFactory
- ack コールバックファクトリ。allowMultiFetch
- max.poll.records > 1
を許可する場合は true。public CollectionSE<org.apache.kafka.common.TopicPartition> getAssignedPartitions()
protected void onInit()
AbstractExpressionEvaluator
の onInit
public org.springframework.kafka.listener.ConsumerProperties getConsumerProperties()
protected StringSE getGroupId()
protected StringSE getClientId()
protected long getPollTimeout()
protected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter()
public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
MessagingMessageConverter
を置き換えるようにメッセージコンバーターを設定します。messageConverter
- コンバーター。protected ClassSE<?> getPayloadType()
public void setPayloadType(ClassSE<?> payloadType)
payloadType
- 変換する型。protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener()
public StringSE getComponentType()
NamedComponent
の getComponentType
protected boolean isRawMessageHeader()
public void setRawMessageHeader(boolean rawMessageHeader)
ConsumerRecord
がキー KafkaHeaders.RAW_DATA
および IntegrationMessageHeaderAccessor.SOURCE_DATA
のヘッダーとして含まれます。呼び出し元がレコードにアクセスしてエラーを処理できるようにします。rawMessageHeader
- ヘッダーを含める場合は true。protected java.time.Duration getCommitTimeout()
public void setCloseTimeout(java.time.Duration closeTimeout)
closeTimeout
- クローズタイムアウト。public boolean isRunning()
Lifecycle
の isRunning
ManageableLifecycle
の isRunning
public void start()
Lifecycle
の start
ManageableLifecycle
の start
public void stop()
Lifecycle
の stop
ManageableLifecycle
の stop
public void pause()
Pausable
public void resume()
Pausable
public boolean isPaused()
Pausable
protected ObjectSE doReceive()
AbstractMessageSource
payload
ですが、戻り値はペイロードが T 型の Message
インスタンスでもかまいません。追加のヘッダーの作成に使用される AbstractIntegrationMessageBuilder
にすることもできます。AbstractMessageSource<ObjectSE>
の doReceive
protected void createConsumer()
public void destroy()
DisposableBean
の destroy
IntegrationManagement
の destroy
AbstractMessageSource<ObjectSE>
の destroy