K - 鍵の型。V - 値の型。public class KafkaMessageDrivenChannelAdapter<K,V> extends MessageProducerSupport implements OrderlyShutdownCapable, Pausable
| 修飾子と型 | クラスと説明 |
|---|---|
static class | KafkaMessageDrivenChannelAdapter.ListenerMode コンテナー、レコード、バッチのリスナーモード。 |
lifecycleCondition, lifecycleLockEXPRESSION_PARSER, loggerDEFAULT_PHASE| コンストラクターと説明 |
|---|
KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer) モード KafkaMessageDrivenChannelAdapter.ListenerMode.record でインスタンスを作成します。 |
KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode) 提供されたモードでインスタンスを構築します。 |
| 修飾子と型 | メソッドと説明 |
|---|---|
int | afterShutdown() スケジューラー、エグゼキューターなどの通常のシャットダウンの後、シャットダウン遅延が経過した後、残りのアクティブなスケジューラー / エグゼキュータースレッドの強制シャットダウンの前に呼び出されます。オプションで、まだ処理中のアクティブなメッセージの数を返すことができます。 |
int | beforeShutdown() シャットダウンが始まる前に呼び出されます。 |
protected void | doStart() デフォルトでは何もしません。 |
protected void | doStop() デフォルトでは何もしません。 |
StringSE | getComponentType() サブクラスはこのメソッドを実装して、コンポーネント型情報を提供できます。 |
protected AttributeAccessor | getErrorMessageAttributes(Message<?> message)errorMessageStrategy でエラーメッセージを作成するときに使用する AttributeAccessor を設定します。 |
boolean | isPaused() エンドポイントが一時停止しているかどうかを確認します。 |
protected void | onInit() サブクラスは、初期化ロジック用にこれを実装できます。 |
void | pause() エンドポイントを一時停止します。 |
void | resume() 一時停止した場合は、エンドポイントを再開します。 |
void | setAckDiscarded(boolean ackDiscarded)FilteringMessageListenerAdapter が破棄されたレコードを確認する必要があるかどうかを示す boolean フラグ。 |
void | setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter) メッセージコンバーターを設定して、バッチベースのコンシューマーで使用します。 |
void | setBindSourceRecord(boolean bindSourceRecord)true に設定すると、 IntegrationMessageHeaderAccessor.SOURCE_DATA という名前のヘッダーにソースコンシューマーレコードがバインドされます。 |
void | setFilterInRetry(boolean filterInRetry)RetryingMessageListenerAdapter と FilteringMessageListenerAdapter の両方が存在する場合に、RetryingMessageListenerAdapter と FilteringMessageListenerAdapter が互いにラップされる順序を指定する boolean フラグ。 |
void | setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter) メッセージコンバーターを設定します。モードに応じて、 RecordMessageConverter または BatchMessageConverter でなければなりません。 |
void | setOnPartitionsAssignedSeekCallback(java.util.function.BiConsumerSE<MapSE<org.apache.kafka.common.TopicPartition,LongSE>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)KafkaMessageListenerContainer からの ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback) 呼び出し中にシーク管理用の BiConsumerSE を指定します。 |
void | setPayloadType(ClassSE<?> payloadType) 型認識メッセージコンバーター( StringJsonMessageConverter など)を使用する場合は、コンバーターが作成するペイロード型を設定します。 |
void | setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K, V> recordFilterStrategy)KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener を FilteringMessageListenerAdapter にラップする RecordFilterStrategy を指定します。 |
void | setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) レコードコンバーターを使用するようにメッセージコンバーターを設定します。 |
void | setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends ObjectSE> recoveryCallback) 再試行操作用の RecoveryCallback インスタンス。null の場合、再試行が終了した後、例外がコンテナーにスローされます(エラーチャネルが設定されていない場合)。 |
void | setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener を RetryingMessageListenerAdapter にラップする RetryTemplate インスタンスを指定します。 |
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisherdestroy, doStop, getPhase, getRole, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stopafterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringcloneSE, equalsSE, finalizeSE, getClassSE, hashCodeSE, notifySE, notifyAllSE, waitSE, waitSE, waitSEisRunning, start, stopgetBeanName, getComponentNamepublic KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer)
KafkaMessageDrivenChannelAdapter.ListenerMode.record でインスタンスを作成します。messageListenerContainer - コンテナー。public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K,V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode)
messageListenerContainer - コンテナー。mode - モード。public void setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter)
RecordMessageConverter または BatchMessageConverter でなければなりません。messageConverter - コンバーター。public void setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter)
messageConverter - コンバーター。public void setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter)
messageConverter - コンバーター。public void setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K,V> recordFilterStrategy)
KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener を FilteringMessageListenerAdapter にラップする RecordFilterStrategy を指定します。recordFilterStrategy - 使用する RecordFilterStrategy。public void setAckDiscarded(boolean ackDiscarded)
FilteringMessageListenerAdapter が破棄されたレコードを確認する必要があるかどうかを示す boolean フラグ。setRecordFilterStrategy(RecordFilterStrategy) が指定されていない場合は意味がありません。ackDiscarded - 破棄されたメッセージに確認応答(コミットオフセット)する場合は true。public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate)
KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener を RetryingMessageListenerAdapter にラップする RetryTemplate インスタンスを指定します。retryTemplate - 使用する RetryTemplate。public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<? extends ObjectSE> recoveryCallback)
RecoveryCallback インスタンス。null の場合、再試行が終了した後、例外がコンテナーにスローされます(エラーチャネルが設定されていない場合)。setRetryTemplate(RetryTemplate) が指定されていない場合は意味がありません。recoveryCallback - リカバリコールバック。public void setFilterInRetry(boolean filterInRetry)
RetryingMessageListenerAdapter と FilteringMessageListenerAdapter の両方が存在する場合に、RetryingMessageListenerAdapter と FilteringMessageListenerAdapter が互いにラップされる順序を指定する boolean フラグ。RetryTemplate または RecordFilterStrategy のいずれか 1 つだけが存在する場合は意味がありません。filterInRetry - RetryingMessageListenerAdapter および FilteringMessageListenerAdapter 折り返しの順序。デフォルトは false です。public void setPayloadType(ClassSE<?> payloadType)
StringJsonMessageConverter など)を使用する場合、コンバーターが作成するペイロード型を設定します。デフォルトは ObjectSE です。payloadType - 型。public void setOnPartitionsAssignedSeekCallback(java.util.function.BiConsumerSE<MapSE<org.apache.kafka.common.TopicPartition,LongSE>,org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback)
KafkaMessageListenerContainer からの ConsumerSeekAware.ConsumerSeekCallback#onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback) 呼び出し中のシーク管理用に BiConsumerSE を指定します。これは、内部 MessagingMessageListenerAdapter 実装から呼び出されます。onPartitionsAssignedCallback - 使用する BiConsumerSEConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)public void setBindSourceRecord(boolean bindSourceRecord)
IntegrationMessageHeaderAccessor.SOURCE_DATA という名前のヘッダーにソースコンシューマーレコードがバインドされます。バッチリスナーには適用されません。bindSourceRecord - バインドする場合は true。public StringSE getComponentType()
IntegrationObjectSupportNamedComponent の getComponentType IntegrationObjectSupport の getComponentType protected void onInit()
IntegrationObjectSupportMessageProducerSupport の onInit protected void doStart()
MessageProducerSupportMessageProducerSupport の doStart protected void doStop()
MessageProducerSupportMessageProducerSupport の doStop public void pause()
Pausablepublic void resume()
Pausablepublic boolean isPaused()
Pausablepublic int beforeShutdown()
OrderlyShutdownCapableOrderlyShutdownCapable の beforeShutdown public int afterShutdown()
OrderlyShutdownCapableOrderlyShutdownCapable の afterShutdown protected AttributeAccessor getErrorMessageAttributes(Message<?> message)
MessageProducerSupporterrorMessageStrategy でエラーメッセージを作成するときに使用する AttributeAccessor を設定します。MessageProducerSupport の getErrorMessageAttributes message - メッセージ。