K
- 鍵の型。V
- 値の型。public class KafkaMessageDrivenChannelAdapter<K,V> extends MessageProducerSupport implements OrderlyShutdownCapable, Pausable
修飾子と型 | クラスと説明 |
---|---|
static class | KafkaMessageDrivenChannelAdapter.ListenerMode コンテナー、レコード、バッチのリスナーモード。 |
lifecycleCondition, lifecycleLock
EXPRESSION_PARSER, logger
DEFAULT_PHASE
コンストラクターと説明 |
---|
KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer) モード KafkaMessageDrivenChannelAdapter.ListenerMode.record でインスタンスを作成します。 |
KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode) 提供されたモードでインスタンスを構築します。 |
修飾子と型 | メソッドと説明 |
---|---|
int | afterShutdown() スケジューラー、エグゼキューターなどの通常のシャットダウンの後、シャットダウン遅延が経過した後、残りのアクティブなスケジューラー / エグゼキュータースレッドの強制シャットダウンの前に呼び出されます。オプションで、まだ処理中のアクティブなメッセージの数を返すことができます。 |
int | beforeShutdown() シャットダウンが始まる前に呼び出されます。 |
protected void | doStart() デフォルトでは何もしません。 |
protected void | doStop() デフォルトでは何もしません。 |
StringSE | getComponentType() サブクラスはこのメソッドを実装して、コンポーネント型情報を提供できます。 |
protected AttributeAccessor | getErrorMessageAttributes(Message<?> message) errorMessageStrategy でエラーメッセージを作成するときに使用する AttributeAccessor を設定します。 |
boolean | isPaused() エンドポイントが一時停止しているかどうかを確認します。 |
protected void | onInit() サブクラスは、初期化ロジック用にこれを実装できます。 |
void | pause() エンドポイントを一時停止します。 |
void | resume() 一時停止した場合は、エンドポイントを再開します。 |
void | setAckDiscarded(boolean ackDiscarded) FilteringMessageListenerAdapter が破棄されたレコードを確認する必要があるかどうかを示す boolean フラグ。 |
void | setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter) メッセージコンバーターを設定して、バッチベースのコンシューマーで使用します。 |
void | setBindSourceRecord(boolean bindSourceRecord) true に設定すると、 IntegrationMessageHeaderAccessor.SOURCE_DATA という名前のヘッダーにソースコンシューマーレコードがバインドされます。 |
void | setFilterInRetry(boolean filterInRetry) RetryingMessageListenerAdapter と FilteringMessageListenerAdapter の両方が存在する場合に、RetryingMessageListenerAdapter と FilteringMessageListenerAdapter が互いにラップされる順序を指定する boolean フラグ。 |
void | setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter) メッセージコンバーターを設定します。モードに応じて、 RecordMessageConverter または BatchMessageConverter でなければなりません。 |
void | setOnPartitionsAssignedSeekCallback(java.util.function.BiConsumerSE<MapSE<org.apache.kafka.common.TopicPartition,LongSE>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) KafkaMessageListenerContainer からの ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback) 呼び出し中にシーク管理用の BiConsumer SE を指定します。 |
void | setPayloadType(ClassSE<?> payloadType) 型認識メッセージコンバーター( StringJsonMessageConverter など)を使用する場合は、コンバーターが作成するペイロード型を設定します。 |
void | setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K, V> recordFilterStrategy) KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener を FilteringMessageListenerAdapter にラップする RecordFilterStrategy を指定します。 |
void | setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) レコードコンバーターを使用するようにメッセージコンバーターを設定します。 |
void | setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends ObjectSE> recoveryCallback) 再試行操作用の RecoveryCallback インスタンス。null の場合、再試行が終了した後、例外がコンテナーにスローされます(エラーチャネルが設定されていない場合)。 |
void | setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate) KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener を RetryingMessageListenerAdapter にラップする RetryTemplate インスタンスを指定します。 |
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisher
destroy, doStop, getPhase, getRole, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
cloneSE, equalsSE, finalizeSE, getClassSE, hashCodeSE, notifySE, notifyAllSE, waitSE, waitSE, waitSE
isRunning, start, stop
getBeanName, getComponentName
public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer)
KafkaMessageDrivenChannelAdapter.ListenerMode.record
でインスタンスを作成します。messageListenerContainer
- コンテナー。public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode)
messageListenerContainer
- コンテナー。mode
- モード。public void setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter)
RecordMessageConverter
または BatchMessageConverter
でなければなりません。messageConverter
- コンバーター。public void setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
messageConverter
- コンバーター。public void setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter)
messageConverter
- コンバーター。public void setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K,V> recordFilterStrategy)
KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener
を FilteringMessageListenerAdapter
にラップする RecordFilterStrategy
を指定します。recordFilterStrategy
- 使用する RecordFilterStrategy
。public void setAckDiscarded(boolean ackDiscarded)
FilteringMessageListenerAdapter
が破棄されたレコードを確認する必要があるかどうかを示す boolean
フラグ。setRecordFilterStrategy(RecordFilterStrategy)
が指定されていない場合は意味がありません。ackDiscarded
- 破棄されたメッセージに確認応答(コミットオフセット)する場合は true。public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener
を RetryingMessageListenerAdapter
にラップする RetryTemplate
インスタンスを指定します。retryTemplate
- 使用する RetryTemplate
。public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends ObjectSE> recoveryCallback)
RecoveryCallback
インスタンス。null の場合、再試行が終了した後、例外がコンテナーにスローされます(エラーチャネルが設定されていない場合)。setRetryTemplate(RetryTemplate)
が指定されていない場合は意味がありません。recoveryCallback
- リカバリコールバック。public void setFilterInRetry(boolean filterInRetry)
RetryingMessageListenerAdapter
と FilteringMessageListenerAdapter
の両方が存在する場合に、RetryingMessageListenerAdapter
と FilteringMessageListenerAdapter
が互いにラップされる順序を指定する boolean
フラグ。RetryTemplate
または RecordFilterStrategy
のいずれか 1 つだけが存在する場合は意味がありません。filterInRetry
- RetryingMessageListenerAdapter
および FilteringMessageListenerAdapter
折り返しの順序。デフォルトは false
です。public void setPayloadType(ClassSE<?> payloadType)
StringJsonMessageConverter
など)を使用する場合、コンバーターが作成するペイロード型を設定します。デフォルトは Object
SE です。payloadType
- 型。public void setOnPartitionsAssignedSeekCallback(java.util.function.BiConsumerSE<MapSE<org.apache.kafka.common.TopicPartition,LongSE>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)
KafkaMessageListenerContainer
からの ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)
呼び出し中のシーク管理用に BiConsumer
SE を指定します。これは、内部 MessagingMessageListenerAdapter
実装から呼び出されます。onPartitionsAssignedCallback
- 使用する BiConsumer
SEConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
public void setBindSourceRecord(boolean bindSourceRecord)
IntegrationMessageHeaderAccessor.SOURCE_DATA
という名前のヘッダーにソースコンシューマーレコードがバインドされます。バッチリスナーには適用されません。bindSourceRecord
- バインドする場合は true。public StringSE getComponentType()
IntegrationObjectSupport
NamedComponent
の getComponentType
IntegrationObjectSupport
の getComponentType
protected void onInit()
IntegrationObjectSupport
MessageProducerSupport
の onInit
protected void doStart()
MessageProducerSupport
MessageProducerSupport
の doStart
protected void doStop()
MessageProducerSupport
MessageProducerSupport
の doStop
public void pause()
Pausable
public void resume()
Pausable
public boolean isPaused()
Pausable
public int beforeShutdown()
OrderlyShutdownCapable
OrderlyShutdownCapable
の beforeShutdown
public int afterShutdown()
OrderlyShutdownCapable
OrderlyShutdownCapable
の afterShutdown
protected AttributeAccessor getErrorMessageAttributes(Message<?> message)
MessageProducerSupport
errorMessageStrategy
でエラーメッセージを作成するときに使用する AttributeAccessor
を設定します。MessageProducerSupport
の getErrorMessageAttributes
message
- メッセージ。