クラス KafkaMessageDrivenChannelAdapter<K,V>
java.lang.ObjectSE
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.endpoint.AbstractEndpoint
org.springframework.integration.endpoint.MessageProducerSupport
org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter<K,V>
- 型パラメーター:
K
- 鍵の型。V
- 値の型。
- 実装されたすべてのインターフェース:
Aware
、BeanFactoryAware
、BeanNameAware
、DisposableBean
、InitializingBean
、SmartInitializingSingleton
、ApplicationContextAware
、Lifecycle
、Phased
、SmartLifecycle
、ExpressionCapable
、OrderlyShutdownCapable
、MessageProducer
、Pausable
、IntegrationPattern
、KafkaInboundEndpoint
、NamedComponent
、IntegrationInboundManagement
、IntegrationManagement
、ManageableLifecycle
、ManageableSmartLifecycle
、TrackableComponent
public class KafkaMessageDrivenChannelAdapter<K,V>
extends MessageProducerSupport
implements KafkaInboundEndpoint, OrderlyShutdownCapable, Pausable
メッセージ駆動型チャネルアダプター。
- 導入:
- 5.4
- 作成者:
- Marius Bogoevici, Gary Russell, Artem Bilan, Urs Keller
ネストされたクラスのサマリー
インターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたネストクラス / インターフェース
IntegrationManagement.ManagementOverrides
フィールドサマリー
クラス org.springframework.integration.endpoint.AbstractEndpoint から継承されたフィールド
lifecycleCondition, lifecycleLock
クラス org.springframework.integration.context.IntegrationObjectSupport から継承されたフィールド
EXPRESSION_PARSER, logger
インターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたフィールド
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
インターフェース org.springframework.integration.kafka.inbound.KafkaInboundEndpoint から継承されたフィールド
ATTRIBUTES_HOLDER, CONTEXT_ACKNOWLEDGMENT, CONTEXT_CONSUMER, CONTEXT_RECORD
インターフェース org.springframework.context.SmartLifecycle から継承されたフィールド
DEFAULT_PHASE
コンストラクターのサマリー
コンストラクター説明KafkaMessageDrivenChannelAdapter
(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer) モードKafkaMessageDrivenChannelAdapter.ListenerMode.record
でインスタンスを作成します。KafkaMessageDrivenChannelAdapter
(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode) 提供されたモードでインスタンスを構築します。メソッドのサマリー
修飾子と型メソッド説明int
スケジューラー、エグゼキューターなどの通常のシャットダウンの後、シャットダウン遅延が経過した後、残りのアクティブなスケジューラー / エグゼキュータースレッドの強制シャットダウンの前に呼び出されます。オプションで、まだ処理中のアクティブなメッセージの数を返すことができます。int
シャットダウンが始まる前に呼び出されます。protected void
doStart()
デフォルトでは何もしません。protected void
doStop()
デフォルトでは何もしません。サブクラスはこのメソッドを実装して、コンポーネント型情報を提供できます。protected AttributeAccessor
getErrorMessageAttributes
(Message<?> message) errorMessageStrategy
でエラーメッセージを作成するときに使用するAttributeAccessor
を設定します。boolean
isPaused()
エンドポイントが一時停止しているかどうかを確認します。protected void
onInit()
サブクラスは、初期化ロジック用にこれを実装できます。void
pause()
エンドポイントを一時停止します。void
resume()
一時停止した場合は、エンドポイントを再開します。void
setAckDiscarded
(boolean ackDiscarded) FilteringMessageListenerAdapter
が破棄されたレコードを確認する必要があるかどうかを示すboolean
フラグ。void
setBatchMessageConverter
(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter) メッセージコンバーターを設定して、バッチベースのコンシューマーで使用します。void
setBindSourceRecord
(boolean bindSourceRecord) true に設定すると、IntegrationMessageHeaderAccessor.SOURCE_DATA
という名前のヘッダーにソースコンシューマーレコードがバインドされます。void
setFilterInRetry
(boolean filterInRetry) フィルター操作と再試行操作が実行される順序を指定するboolean
フラグ。void
setMessageConverter
(org.springframework.kafka.support.converter.MessageConverter messageConverter) メッセージコンバーターを設定します。モードに応じて、RecordMessageConverter
またはBatchMessageConverter
でなければなりません。void
setOnPartitionsAssignedSeekCallback
(BiConsumerSE<MapSE<org.apache.kafka.common.TopicPartition, LongSE>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) KafkaMessageListenerContainer
からのConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)
呼び出し中にシーク管理用のBiConsumer
SE を指定します。void
setPayloadType
(ClassSE<?> payloadType) StringJsonMessageConverter
などの型対応メッセージコンバーターを使用する場合は、コンバーターが作成するペイロード型を設定します。void
setRecordFilterStrategy
(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K, V> recordFilterStrategy) KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener
をFilteringMessageListenerAdapter
にラップするRecordFilterStrategy
を指定します。void
setRecordMessageConverter
(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) レコードコンバーターを使用するようにメッセージコンバーターを設定します。void
setRecoveryCallback
(org.springframework.retry.RecoveryCallback<?> recoveryCallback) 再試行操作用のRecoveryCallback
インスタンス。null の場合、再試行が終了した後、例外がコンテナーにスローされます(エラーチャネルが設定されていない場合)。void
setRetryTemplate
(org.springframework.retry.support.RetryTemplate retryTemplate) 配信の再試行に使用するRetryTemplate
インスタンスを指定します。クラス org.springframework.integration.endpoint.MessageProducerSupport から継承されたメソッド
afterSingletonsInstantiated, buildErrorMessage, getErrorChannel, getErrorMessageStrategy, getIntegrationPatternType, getMessagingTemplate, getOutputChannel, isObserved, registerObservationRegistry, sendErrorMessageIfNecessary, sendMessage, setErrorChannel, setErrorChannelName, setErrorMessageStrategy, setObservationConvention, setOutputChannel, setOutputChannelName, setSendTimeout, setShouldTrack, subscribeToPublisher
クラス org.springframework.integration.endpoint.AbstractEndpoint から継承されたメソッド
destroy, doStop, getPhase, getRole, isActive, isAutoStartup, isRunning, setAutoStartup, setPhase, setRole, start, stop, stop
クラス org.springframework.integration.context.IntegrationObjectSupport から継承されたメソッド
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
クラス java.lang.ObjectSE から継承されたメソッド
clone, equalsSE, finalize, getClass, hashCode, notify, notifyAll, wait, waitSE, waitSE
インターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたメソッド
destroy, getManagedName, getManagedType, getOverrides, getThisAs, isLoggingEnabled, registerMetricsCaptor, setLoggingEnabled, setManagedName, setManagedType
インターフェース org.springframework.integration.kafka.inbound.KafkaInboundEndpoint から継承されたメソッド
doWithRetry
インターフェース org.springframework.integration.support.management.ManageableLifecycle から継承されたメソッド
isRunning, start, stop
インターフェース org.springframework.integration.support.context.NamedComponent から継承されたメソッド
getBeanName, getComponentName
コンストラクターの詳細
KafkaMessageDrivenChannelAdapter
public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer) モードKafkaMessageDrivenChannelAdapter.ListenerMode.record
でインスタンスを作成します。- パラメーター:
messageListenerContainer
- コンテナー。
KafkaMessageDrivenChannelAdapter
public KafkaMessageDrivenChannelAdapter(org.springframework.kafka.listener.AbstractMessageListenerContainer<K, V> messageListenerContainer, KafkaMessageDrivenChannelAdapter.ListenerMode mode) 提供されたモードでインスタンスを構築します。- パラメーター:
messageListenerContainer
- コンテナー。mode
- モード。
メソッドの詳細
setMessageConverter
public void setMessageConverter(org.springframework.kafka.support.converter.MessageConverter messageConverter) メッセージコンバーターを設定します。モードに応じて、RecordMessageConverter
またはBatchMessageConverter
でなければなりません。- パラメーター:
messageConverter
- コンバーター。
setRecordMessageConverter
public void setRecordMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) レコードコンバーターを使用するようにメッセージコンバーターを設定します。- パラメーター:
messageConverter
- コンバーター。
setBatchMessageConverter
public void setBatchMessageConverter(org.springframework.kafka.support.converter.BatchMessageConverter messageConverter) メッセージコンバーターを設定して、バッチベースのコンシューマーで使用します。- パラメーター:
messageConverter
- コンバーター。
setRecordFilterStrategy
public void setRecordFilterStrategy(org.springframework.kafka.listener.adapter.RecordFilterStrategy<K, V> recordFilterStrategy) KafkaMessageDrivenChannelAdapter.IntegrationRecordMessageListener
をFilteringMessageListenerAdapter
にラップするRecordFilterStrategy
を指定します。- パラメーター:
recordFilterStrategy
- 使用するRecordFilterStrategy
。
setAckDiscarded
public void setAckDiscarded(boolean ackDiscarded) FilteringMessageListenerAdapter
が破棄されたレコードを確認する必要があるかどうかを示すboolean
フラグ。setRecordFilterStrategy(RecordFilterStrategy)
が指定されていない場合は意味がありません。- パラメーター:
ackDiscarded
- 破棄されたメッセージに確認応答(コミットオフセット)する場合は true。
setRetryTemplate
public void setRetryTemplate(org.springframework.retry.support.RetryTemplate retryTemplate) 配信の再試行に使用するRetryTemplate
インスタンスを指定します。IMPORTANT: この形式の再試行はブロックされており、ポーリングされたすべてのレコードの合計再試行遅延が
max.poll.interval.ms
を超える可能性がある場合、再調整が発生する可能性があります。代わりに、KafkaErrorSendingMessageRecoverer
で構成されたリスナーコンテナーにDefaultErrorHandler
を追加することを検討してください。- パラメーター:
retryTemplate
- 使用するRetryTemplate
。
setRecoveryCallback
public void setRecoveryCallback(org.springframework.retry.RecoveryCallback<?> recoveryCallback) 再試行操作用のRecoveryCallback
インスタンス。null の場合、再試行が使い果たされた後にコンテナーに例外がスローされます (エラーチャネルが構成されていない場合)。setRetryTemplate(RetryTemplate)
が指定されている場合にのみ使用されます。エラーチャネルが指定されている場合、デフォルトはErrorMessageSendingRecoverer
です。再試行が使い果たされた後にコンテナーに例外をスローする場合は、null に設定します。- パラメーター:
recoveryCallback
- リカバリコールバック。
setFilterInRetry
public void setFilterInRetry(boolean filterInRetry) フィルター操作と再試行操作が実行される順序を指定するboolean
フラグ。RetryTemplate
またはRecordFilterStrategy
のいずれかのみが存在する場合、または存在しない場合は意味がありません。true の場合、フィルターは再試行のたびに呼び出されます。false の場合、フィルターはコンテナーからの配信ごとに 1 回だけ呼び出されます。- パラメーター:
filterInRetry
- 再試行ごとにフィルター処理する場合は true。デフォルトはfalse
です。
setPayloadType
StringJsonMessageConverter
などの型対応メッセージコンバーターを使用する場合は、コンバーターが作成するペイロード型を設定します。デフォルトはObject
SE です。- パラメーター:
payloadType
- 型。
setOnPartitionsAssignedSeekCallback
public void setOnPartitionsAssignedSeekCallback(BiConsumerSE<MapSE<org.apache.kafka.common.TopicPartition, LongSE>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback> onPartitionsAssignedCallback) KafkaMessageListenerContainer
からのConsumerSeekAware.onPartitionsAssigned(Map, ConsumerSeekAware.ConsumerSeekCallback)
呼び出し中のシーク管理用にBiConsumer
SE を指定します。これは、内部MessagingMessageListenerAdapter
実装から呼び出されます。- パラメーター:
onPartitionsAssignedCallback
- 使用するBiConsumer
SE- 関連事項:
ConsumerSeekAware.onPartitionsAssigned(java.util.Map<org.apache.kafka.common.TopicPartition, java.lang.Long>, org.springframework.kafka.listener.ConsumerSeekAware.ConsumerSeekCallback)
setBindSourceRecord
public void setBindSourceRecord(boolean bindSourceRecord) true に設定すると、IntegrationMessageHeaderAccessor.SOURCE_DATA
という名前のヘッダーにソースコンシューマーレコードがバインドされます。バッチリスナーには適用されません。- パラメーター:
bindSourceRecord
- バインドする場合は true。
getComponentType
クラスからコピーされた説明:IntegrationObjectSupport
サブクラスはこのメソッドを実装して、コンポーネント型情報を提供できます。- 次で指定:
- インターフェース
NamedComponent
のgetComponentType
- オーバーライド:
- クラス
IntegrationObjectSupport
のgetComponentType
onInit
protected void onInit()クラスからコピーされた説明:IntegrationObjectSupport
サブクラスは、初期化ロジック用にこれを実装できます。- オーバーライド:
- クラス
MessageProducerSupport
のonInit
doStart
protected void doStart()クラスからコピーされた説明:MessageProducerSupport
デフォルトでは何もしません。サブクラスは、ライフサイクル管理の動作が必要な場合、これをオーバーライドできます。"lifecycleLock" によって保護されています。- オーバーライド:
- クラス
MessageProducerSupport
のdoStart
doStop
protected void doStop()クラスからコピーされた説明:MessageProducerSupport
デフォルトでは何もしません。サブクラスは、ライフサイクル管理の動作が必要な場合、これをオーバーライドできます。- オーバーライド:
- クラス
MessageProducerSupport
のdoStop
pause
public void pause()インターフェースからコピーされた説明:Pausable
エンドポイントを一時停止します。resume
public void resume()インターフェースからコピーされた説明:Pausable
一時停止した場合は、エンドポイントを再開します。isPaused
public boolean isPaused()インターフェースからコピーされた説明:Pausable
エンドポイントが一時停止しているかどうかを確認します。beforeShutdown
public int beforeShutdown()インターフェースからコピーされた説明:OrderlyShutdownCapable
シャットダウンが始まる前に呼び出されます。実装は、新しいメッセージの受け入れを停止する必要があります。オプションで、処理中のアクティブなメッセージの数を返すことができます。- 次で指定:
- インターフェース
OrderlyShutdownCapable
のbeforeShutdown
- 戻り値:
- 使用可能な場合、アクティブなメッセージの数。
afterShutdown
public int afterShutdown()インターフェースからコピーされた説明:OrderlyShutdownCapable
スケジューラー、エグゼキューターなどの通常のシャットダウンの後、シャットダウン遅延が経過した後、残りのアクティブなスケジューラー / エグゼキュータースレッドの強制シャットダウンの前に呼び出されます。オプションで、まだ処理中のアクティブなメッセージの数を返すことができます。- 次で指定:
- インターフェース
OrderlyShutdownCapable
のafterShutdown
- 戻り値:
- 使用可能な場合、アクティブなメッセージの数。
getErrorMessageAttributes
クラスからコピーされた説明:MessageProducerSupport
errorMessageStrategy
でエラーメッセージを作成するときに使用するAttributeAccessor
を設定します。- オーバーライド:
- クラス
MessageProducerSupport
のgetErrorMessageAttributes
- パラメーター:
message
- メッセージ。- 戻り値:
- 属性。