クラス KafkaProducerMessageHandler<K,V>
java.lang.ObjectSE
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.handler.AbstractMessageProducingHandler
org.springframework.integration.handler.AbstractReplyProducingMessageHandler
org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler<K,V>
- 型パラメーター:
K
- 鍵の型。V
- 値の型。
- 実装されたすべてのインターフェース:
org.reactivestreams.Subscriber<Message<?>>
、Aware
、BeanClassLoaderAware
、BeanFactoryAware
、BeanNameAware
、DisposableBean
、InitializingBean
、ApplicationContextAware
、Lifecycle
、Ordered
、ExpressionCapable
、Orderable
、MessageProducer
、HeaderPropagationAware
、IntegrationPattern
、NamedComponent
、IntegrationManagement
、ManageableLifecycle
、TrackableComponent
、MessageHandler
、reactor.core.CoreSubscriber<Message<?>>
public class KafkaProducerMessageHandler<K,V>
extends AbstractReplyProducingMessageHandler
implements ManageableLifecycle
Apache Kafka のメッセージハンドラー。
ReplyingKafkaTemplate
と一緒に提供される場合、送信ゲートウェイのハンドラーとして使用されます。シンプルな KafkaTemplate
と一緒に提供される場合、送信チャネルアダプターのハンドラーとして使用されます。 ハンドラーは、事前に作成された ProducerRecord
ペイロードの受信もサポートします。その場合、ほとんどの構成プロパティ(setTopicExpression(Expression)
など)は無視されます。ハンドラーがゲートウェイとして使用される場合、ProducerRecord
のヘッダーは、すでにそのようなヘッダーが含まれていない限り、KafkaHeaders.REPLY_TOPIC
を追加するように拡張されます。ハンドラーは追加のヘッダーをマップしません。このようなペイロードを提供することは、ヘッダーがすでにマップされていることを前提としています。
- 導入:
- 5.4
- 作成者:
- Soby Chacko, Artem Bilan, Gary Russell, Marius Bogoevici, Biju Kunjummen, Tom van den Berge
ネストされたクラスのサマリー
ネストされたクラスクラス org.springframework.integration.handler.AbstractReplyProducingMessageHandler から継承されたネストクラス / インターフェース
AbstractReplyProducingMessageHandler.RequestHandler
インターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたネストクラス / インターフェース
IntegrationManagement.ManagementOverrides
フィールドサマリー
クラス org.springframework.integration.handler.AbstractMessageProducingHandler から継承されたフィールド
messagingTemplate
クラス org.springframework.integration.context.IntegrationObjectSupport から継承されたフィールド
EXPRESSION_PARSER, logger
インターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたフィールド
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
インターフェース org.springframework.core.Ordered から継承されたフィールド
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE
コンストラクターのサマリー
コンストラクターコンストラクター説明KafkaProducerMessageHandler
(org.springframework.kafka.core.KafkaTemplate<K, V> kafkaTemplate) メソッドのサマリー
修飾子と型メソッド説明protected void
doInit()
サブクラスはこのメソッドを実装して、コンポーネント型情報を提供できます。protected MessageChannel
org.springframework.kafka.support.KafkaHeaderMapper
org.springframework.kafka.core.KafkaTemplate<?,
?> protected MessageChannel
protected MessageChannel
protected ObjectSE
handleRequestMessage
(Message<?> message) サブクラスはこのメソッドを実装して、リクエストメッセージを処理する必要があります。boolean
void
processSendResult
(Message<?> message, org.apache.kafka.clients.producer.ProducerRecord<K, V> producerRecord, CompletableFutureSE<org.springframework.kafka.support.SendResult<K, V>> future, MessageChannel metadataChannel) void
setAssignmentDuration
(DurationSE assignmentDuration) デフォルトの返信先トピック / パーティションを決定するために、ゲートウェイとして使用される場合に、パーティションの割り当てを待機する時間を設定します。void
setErrorMessageStrategy
(ErrorMessageStrategy errorMessageStrategy) 送信失敗後にエラーメッセージを送信するときに使用するエラーメッセージ戦略の実装を設定します。void
setFlushExpression
(Expression flushExpression) Boolean
SE に評価される SpEL 式を指定して、送信後にプロデューサーをフラッシュする必要があるかどうかを判別します。void
setFuturesChannel
(MessageChannel futuresChannel) フューチャーズチャンネルを設定します。void
setFuturesChannelName
(StringSE futuresChannelName) フューチャーズチャンネル名を設定します。void
setHeaderMapper
(org.springframework.kafka.support.KafkaHeaderMapper headerMapper) 使用するヘッダーマッパーを設定します。void
setMessageKeyExpression
(Expression messageKeyExpression) void
setPartitionIdExpression
(Expression partitionIdExpression) void
setProducerRecordCreator
(KafkaProducerMessageHandler.ProducerRecordCreator<K, V> producerRecordCreator) KafkaProducerMessageHandler.ProducerRecordCreator
を設定してProducerRecord
を作成します。void
setReplyMessageConverter
(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) ゲートウェイ応答用のメッセージコンバーターを設定します。void
setReplyPayloadType
(TypeSE payloadType) 型認識メッセージコンバーター(StringJsonMessageConverter
など)を使用する場合は、コンバーターが作成するペイロード型を設定します。void
setSendFailureChannel
(MessageChannel sendFailureChannel) 障害チャネルを設定します。void
setSendFailureChannelName
(StringSE sendFailureChannelName) 障害チャネル名を設定します。void
setSendSuccessChannel
(MessageChannel sendSuccessChannel) 成功チャネルを設定します。void
setSendSuccessChannelName
(StringSE sendSuccessChannelName) 成功チャンネル名を設定します。final void
setSendTimeout
(long sendTimeout) このKafkaProducerMessageHandler
が操作結果の送信を待機する時間のタイムアウトをミリ秒単位で指定します。void
setSendTimeoutExpression
(Expression sendTimeoutExpression) SpEL 式を指定して、このKafkaProducerMessageHandler
が操作結果の送信を待機する時間のタイムアウトをミリ秒単位で評価します。void
setSync
(boolean sync) KafkaProducerMessageHandler
が送信操作の結果を待機する必要があるかどうかを示すboolean
。void
setTimeoutBuffer
(int timeoutBuffer) 構成済みのdelivery.timeout.ms
に追加されるバッファーをミリ秒単位で設定して、sync
が true の場合に将来の送信完了を待機する最小時間を決定します。void
setTimestampExpression
(Expression timestampExpression) SpEL 式を指定して、Kafka レコードに追加されるタイムスタンプを評価します。void
setTopicExpression
(Expression topicExpression) void
setUseTemplateConverter
(boolean useTemplateConverter) テンプレートのメッセージコンバーターを使用してproducerRecordCreator
の代わりにProducerRecord
を作成するには、true に設定します。void
start()
void
stop()
クラス org.springframework.integration.handler.AbstractReplyProducingMessageHandler から継承されたメソッド
doInvokeAdvisedRequestHandler, getBeanClassLoader, getIntegrationPatternType, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReply
クラス org.springframework.integration.handler.AbstractMessageProducingHandler から継承されたメソッド
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setupMessageProcessor, shouldCopyRequestHeaders, shouldSplitOutput, updateNotPropagatedHeaders
クラス org.springframework.integration.handler.AbstractMessageHandler から継承されたメソッド
handleMessage, onComplete, onError, onNext, onSubscribe, setObservationConvention
クラス org.springframework.integration.handler.MessageHandlerSupport から継承されたメソッド
buildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getObservationRegistry, getOrder, getOverrides, isLoggingEnabled, isObserved, registerMetricsCaptor, registerObservationRegistry, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrack
クラス org.springframework.integration.context.IntegrationObjectSupport から継承されたメソッド
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
クラス java.lang.ObjectSE から継承されたメソッド
clone, equalsSE, finalize, getClass, hashCode, notify, notifyAll, wait, waitSE, waitSE
インターフェース reactor.core.CoreSubscriber から継承されたメソッド
currentContext
インターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたメソッド
getThisAs
インターフェース org.springframework.integration.support.context.NamedComponent から継承されたメソッド
getBeanName, getComponentName
コンストラクターの詳細
KafkaProducerMessageHandler
メソッドの詳細
setTopicExpression
setMessageKeyExpression
setPartitionIdExpression
setTimestampExpression
SpEL 式を指定して、Kafka レコードに追加されるタイムスタンプを評価します。結果の値は、ミリ秒単位のエポック時間を表すLong
SE 型である必要があります。- パラメーター:
timestampExpression
- 送信操作の結果を待機するタイムスタンプのExpression
。
setFlushExpression
Boolean
SE に評価される SpEL 式を指定して、送信後にプロデューサーをフラッシュする必要があるかどうかを判別します。デフォルトでは、KafkaIntegrationHeaders.FLUSH
ヘッダーでBoolean
SE 値を検索します。存在しない場合は false。- パラメーター:
flushExpression
-Expression
setHeaderMapper
public void setHeaderMapper(org.springframework.kafka.support.KafkaHeaderMapper headerMapper) 使用するヘッダーマッパーを設定します。- パラメーター:
headerMapper
- マッパー ; null にして、ヘッダーマッピングを無効にすることができます。
getHeaderMapper
public org.springframework.kafka.support.KafkaHeaderMapper getHeaderMapper()getKafkaTemplate
public org.springframework.kafka.core.KafkaTemplate<?,?> getKafkaTemplate()setSync
public void setSync(boolean sync) KafkaProducerMessageHandler
が送信操作の結果を待機する必要があるかどうかを示すboolean
。デフォルトはfalse
です。sync
モードでは、ダウンストリーム送信操作の例外が再スローされます。- パラメーター:
sync
- 送信モード。デフォルトでは非同期です。
setSendTimeout
public final void setSendTimeout(long sendTimeout) このKafkaProducerMessageHandler
が操作結果の送信を待機する時間のタイムアウトをミリ秒単位で指定します。デフォルトは kafkadelivery.timeout.ms
プロパティ +5 秒です。タイムアウトが適用されます。成功または失敗チャネルに送信するときにも適用されます。- オーバーライド:
- クラス
AbstractMessageProducingHandler
のsetSendTimeout
- パラメーター:
sendTimeout
- 送信操作の結果を待機するためのタイムアウト。
setSendTimeoutExpression
SpEL 式を指定して、このKafkaProducerMessageHandler
が操作結果の送信を待機する時間のタイムアウトをミリ秒単位で評価します。デフォルトは kafkadelivery.timeout.ms
プロパティ +5 秒です。タイムアウトはsync
モードでのみ適用されます。この式の結果がその値よりも小さい場合は、より高い値が使用されます。- パラメーター:
sendTimeoutExpression
- 送信操作の結果を待機するためのタイムアウト用のExpression
。- 関連事項:
setSendFailureChannel
障害チャネルを設定します。送信が失敗した後、ErrorMessage
は、失敗したメッセージと原因を含むKafkaSendFailureException
のペイロードとともにこのチャネルに送信されます。- パラメーター:
sendFailureChannel
- 障害チャネル。
setSendFailureChannelName
障害チャネル名を設定します。送信が失敗した後、ErrorMessage
は、失敗したメッセージと原因を含むKafkaSendFailureException
のペイロードとともにこのチャネル名に送信されます。- パラメーター:
sendFailureChannelName
- 障害チャネル名。
setSendSuccessChannel
成功チャネルを設定します。- パラメーター:
sendSuccessChannel
- 成功チャネル。
setSendSuccessChannelName
成功チャンネル名を設定します。- パラメーター:
sendSuccessChannelName
- 成功チャネル名。
setFuturesChannel
フューチャーズチャンネルを設定します。- パラメーター:
futuresChannel
- フューチャーズチャンネル。
setFuturesChannelName
フューチャーズチャンネル名を設定します。- パラメーター:
futuresChannelName
- フューチャーズチャンネル名。
setErrorMessageStrategy
送信失敗後にエラーメッセージを送信するときに使用するエラーメッセージ戦略の実装を設定します。null にすることはできません。- パラメーター:
errorMessageStrategy
- 実装。
setReplyMessageConverter
public void setReplyMessageConverter(org.springframework.kafka.support.converter.RecordMessageConverter messageConverter) ゲートウェイ応答用のメッセージコンバーターを設定します。- パラメーター:
messageConverter
- コンバーター。- 関連事項:
setReplyPayloadType
型認識メッセージコンバーター(StringJsonMessageConverter
など)を使用する場合、コンバーターが作成するペイロード型を設定します。デフォルトはObject
SE です。- パラメーター:
payloadType
- 型。- 関連事項:
setProducerRecordCreator
public void setProducerRecordCreator(KafkaProducerMessageHandler.ProducerRecordCreator<K, V> producerRecordCreator) KafkaProducerMessageHandler.ProducerRecordCreator
を設定してProducerRecord
を作成します。useTemplateConverter
が true の場合は無視されます。- パラメーター:
producerRecordCreator
- クリエイター。- 関連事項:
setTimeoutBuffer
public void setTimeoutBuffer(int timeoutBuffer) 構成済みのdelivery.timeout.ms
に追加されるバッファーをミリ秒単位で設定して、sync
が true の場合に将来の送信完了を待機する最小時間を決定します。- パラメーター:
timeoutBuffer
- バッファ。- 関連事項:
setUseTemplateConverter
public void setUseTemplateConverter(boolean useTemplateConverter) テンプレートのメッセージコンバーターを使用してproducerRecordCreator
の代わりにProducerRecord
を作成するには、true に設定します。- パラメーター:
useTemplateConverter
- コンバーターを使用する場合は true。- 導入:
- 5.5.5
- 関連事項:
setAssignmentDuration
デフォルトの返信先トピック / パーティションを決定するために、ゲートウェイとして使用される場合に、パーティションの割り当てを待機する時間を設定します。- パラメーター:
assignmentDuration
- 設定する割り当て期間。- 導入:
- 6.0
getComponentType
クラスからコピーされた説明:IntegrationObjectSupport
サブクラスはこのメソッドを実装して、コンポーネント型情報を提供できます。- 次で指定:
- インターフェース
NamedComponent
のgetComponentType
- オーバーライド:
- クラス
MessageHandlerSupport
のgetComponentType
getSendFailureChannel
getSendSuccessChannel
getFuturesChannel
doInit
protected void doInit()- オーバーライド:
- クラス
AbstractReplyProducingMessageHandler
のdoInit
start
public void start()- 次で指定:
- インターフェース
Lifecycle
のstart
- 次で指定:
- インターフェース
ManageableLifecycle
のstart
stop
public void stop()- 次で指定:
- インターフェース
Lifecycle
のstop
- 次で指定:
- インターフェース
ManageableLifecycle
のstop
isRunning
public boolean isRunning()- 次で指定:
- インターフェース
Lifecycle
のisRunning
- 次で指定:
- インターフェース
ManageableLifecycle
のisRunning
handleRequestMessage
クラスからコピーされた説明:AbstractReplyProducingMessageHandler
サブクラスはこのメソッドを実装して、リクエストメッセージを処理する必要があります。戻り値は、メッセージ、MessageBuilder、任意のプレーンオブジェクトです。基本クラスは、これらの開始点のいずれかからの返信メッセージの最終的な作成を処理します。戻り値が null の場合、メッセージフローはここで終了します。- 次で指定:
- クラス
AbstractReplyProducingMessageHandler
のhandleRequestMessage
- パラメーター:
message
- リクエストメッセージ。- 戻り値:
- メッセージを処理した結果、または
null
processSendResult
public void processSendResult(Message<?> message, org.apache.kafka.clients.producer.ProducerRecord<K, V> producerRecord, CompletableFutureSE<org.springframework.kafka.support.SendResult<K, throws InterruptedExceptionSE, ExecutionExceptionSEV>> future, MessageChannel metadataChannel)