K - 鍵の型。V - 値の型。public class KafkaMessageSource<K,V> extends AbstractMessageSource<ObjectSE> implements Pausable
NOTE: アプリケーションがメッセージの順序が正しくないことを確認した場合、オフセットの前のすべてのメッセージが確認応答されるまで、確認応答は延期されます。複数のレコードが取得され、以前のオフセットが再キューイングされた場合、後続のオフセットからのレコードは、正常に処理された場合でも再配信されます。アプリケーションはべき等性を実装する必要があります。
バージョン 3.1.2 以降、このソースは Pausable を実装しており、Consumer を一時停止および再開できます。コンシューマーが一時停止している間、リバランスを防ぐために、max.poll.interval.ms 内で AbstractMessageSource.receive() を呼び出し続ける必要があります。
| 修飾子と型 | クラスと説明 |
|---|---|
static class | KafkaMessageSource.KafkaAckCallback<K, V>Kafka の場合は AcknowledgmentCallback。 |
static class | KafkaMessageSource.KafkaAckCallbackFactory<K, V>KafkaAckInfo の場合は AcknowledgmentCallbackFactory。 |
static interface | KafkaMessageSource.KafkaAckInfo<K, V>KafkaAckCallback を構築するための情報。 |
class | KafkaMessageSource.KafkaAckInfoImplKafkaAckCallback を構築するための情報。 |
IntegrationManagement.ManagementOverrides| 修飾子と型 | フィールドと説明 |
|---|---|
static StringSE | REMAINING_RECORDS 前回のポーリングから残っているレコードの数。 |
EXPRESSION_PARSER, loggerMETER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME| コンストラクターと説明 |
|---|
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties) 指定されたパラメーターを使用してインスタンスを作成します。 |
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch) 指定されたパラメーターを使用してインスタンスを作成します。 |
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory) 指定されたパラメーターを使用してインスタンスを作成します。 |
KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K, V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K, V> ackCallbackFactory, boolean allowMultiFetch) 指定されたパラメーターを使用してインスタンスを作成します。 |
| 修飾子と型 | メソッドと説明 |
|---|---|
protected void | createConsumer() |
void | destroy() |
protected ObjectSE | doReceive() サブクラスはこのメソッドを実装する必要があります。 |
CollectionSE<org.apache.kafka.common.TopicPartition> | getAssignedPartitions() 現在割り当てられているパーティションを返します。 |
protected StringSE | getClientId() |
protected java.time.Duration | getCommitTimeout() |
StringSE | getComponentType() |
org.springframework.kafka.listener.ConsumerProperties | getConsumerProperties() 構成されたコンシューマープロパティへの参照を取得します。ソースを開始する前に、プロパティをさらにカスタマイズできます。 |
protected StringSE | getGroupId() |
protected org.springframework.kafka.support.converter.RecordMessageConverter | getMessageConverter() |
protected ClassSE<?> | getPayloadType() |
protected long | getPollTimeout() |
protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener | getRebalanceListener() |
boolean | isPaused() エンドポイントが一時停止しているかどうかを確認します。 |
protected boolean | isRawMessageHeader() |
boolean | isRunning() |
protected void | onInit() |
void | pause() エンドポイントを一時停止します。 |
void | resume() 一時停止した場合は、エンドポイントを再開します。 |
void | setCloseTimeout(java.time.Duration closeTimeout) クローズタイムアウトを設定します。デフォルトは 30 秒です。 |
void | setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) デフォルトの MessagingMessageConverter を置き換えるようにメッセージコンバーターを設定します。 |
void | setPayloadType(ClassSE<?> payloadType) ペイロード型を設定します。 |
void | setRawMessageHeader(boolean rawMessageHeader)true に設定すると、生の ConsumerRecord がキー KafkaHeaders.RAW_DATA および IntegrationMessageHeaderAccessor.SOURCE_DATA のヘッダーとして含まれます。 |
void | start() |
void | stop() |
buildMessage, getBeanName, getComponentName, getManagedName, getManagedType, getOverrides, isLoggingEnabled, receive, registerMetricsCaptor, setBeanName, setHeaderExpressions, setLoggingEnabled, setManagedName, setManagedTypeafterPropertiesSet, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, evaluateExpression, getBeanFactory, getEvaluationContext, getEvaluationContext, getMessageBuilderFactory, setBeanFactory, setConversionServicecloneSE, equalsSE, finalizeSE, getClassSE, hashCodeSE, notifySE, notifyAllSE, toStringSE, waitSE, waitSE, waitSEgetIntegrationPatternTypegetThisAspublic KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties)
consumerFactory - コンシューマーファクトリ。consumerProperties - コンシューマーの特性。KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, boolean allowMultiFetch)
max.poll.records を取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリが DefaultKafkaConsumerFactory の場合は max.poll.records が 1 に強制変換され、それ以外の場合は IllegalArgumentExceptionSE で拒否されます。重要: true の場合、max.poll.interval.ms 内で受信したレコード数を消費するのに十分な速度で AbstractMessageSource.receive() を呼び出す必要があります。false の場合、max.poll.interval.ms 内で AbstractMessageSource.receive() を呼び出す必要があります。pause() は、前回のポーリングのレコードが消費されるまで有効になりません。consumerFactory - コンシューマーファクトリ。consumerProperties - コンシューマーの特性。allowMultiFetch - max.poll.records > 1 を許可する場合は true。public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory)
consumerFactory - コンシューマーファクトリ。consumerProperties - コンシューマーの特性。ackCallbackFactory - ack コールバックファクトリ。KafkaMessageSource(ConsumerFactory, ConsumerProperties, KafkaAckCallbackFactory, boolean)public KafkaMessageSource(org.springframework.kafka.core.ConsumerFactory<K,V> consumerFactory, org.springframework.kafka.listener.ConsumerProperties consumerProperties, KafkaMessageSource.KafkaAckCallbackFactory<K,V> ackCallbackFactory, boolean allowMultiFetch)
max.poll.records を取得できるようにするには、"allowMultiFetch" を true に設定します。false (デフォルト) の場合、コンシューマーファクトリが DefaultKafkaConsumerFactory の場合は max.poll.records が 1 に強制変換され、それ以外の場合は IllegalArgumentExceptionSE で拒否されます。重要: true の場合、max.poll.interval.ms 内で受信したレコード数を消費するのに十分な速度で AbstractMessageSource.receive() を呼び出す必要があります。false の場合、max.poll.interval.ms 内で AbstractMessageSource.receive() を呼び出す必要があります。pause() は、前回のポーリングのレコードが消費されるまで有効になりません。consumerFactory - コンシューマーファクトリ。consumerProperties - コンシューマーの特性。ackCallbackFactory - ack コールバックファクトリ。allowMultiFetch - max.poll.records > 1 を許可する場合は true。public CollectionSE<org.apache.kafka.common.TopicPartition> getAssignedPartitions()
protected void onInit()
AbstractExpressionEvaluator の onInit public org.springframework.kafka.listener.ConsumerProperties getConsumerProperties()
protected StringSE getGroupId()
protected StringSE getClientId()
protected long getPollTimeout()
protected org.springframework.kafka.support.converter.RecordMessageConverter getMessageConverter()
public void setMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
MessagingMessageConverter を置き換えるようにメッセージコンバーターを設定します。messageConverter - コンバーター。protected ClassSE<?> getPayloadType()
public void setPayloadType(ClassSE<?> payloadType)
payloadType - 変換する型。protected org.apache.kafka.clients.consumer.ConsumerRebalanceListener getRebalanceListener()
public StringSE getComponentType()
NamedComponent の getComponentType protected boolean isRawMessageHeader()
public void setRawMessageHeader(boolean rawMessageHeader)
ConsumerRecord がキー KafkaHeaders.RAW_DATA および IntegrationMessageHeaderAccessor.SOURCE_DATA のヘッダーとして含まれます。呼び出し元がレコードにアクセスしてエラーを処理できるようにします。rawMessageHeader - ヘッダーを含める場合は true。protected java.time.Duration getCommitTimeout()
public void setCloseTimeout(java.time.Duration closeTimeout)
closeTimeout - クローズタイムアウト。public boolean isRunning()
Lifecycle の isRunning ManageableLifecycle の isRunning public void start()
Lifecycle の start ManageableLifecycle の start public void stop()
Lifecycle の stop ManageableLifecycle の stop public void pause()
Pausablepublic void resume()
Pausablepublic boolean isPaused()
Pausableprotected ObjectSE doReceive()
AbstractMessageSourcepayload ですが、戻り値はペイロードが T 型の Message インスタンスでもかまいません。追加のヘッダーの作成に使用される AbstractIntegrationMessageBuilder にすることもできます。AbstractMessageSource<ObjectSE> の doReceive protected void createConsumer()
public void destroy()
DisposableBean の destroy IntegrationManagement の destroy AbstractMessageSource<ObjectSE> の destroy