クラス ReplyingKafkaTemplate<K,V,R>
java.lang.ObjectSE
org.springframework.kafka.core.KafkaTemplate<K,V>
org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K,V,R>
- 型パラメーター:
K- 鍵の型。V- 送信データ型。R- 応答データ型。
- 実装されているすべてのインターフェース:
EventListenerSE、Aware、BeanNameAware、DisposableBean、InitializingBean、SmartInitializingSingleton、ApplicationContextAware、ApplicationListener<ContextStoppedEvent>、Lifecycle、Phased、SmartLifecycle、KafkaOperations<K,、V> BatchMessageListener<K,、R> ConsumerSeekAware、GenericMessageListener<ListSE<org.apache.kafka.clients.consumer.ConsumerRecord<K,、R>>> ReplyingKafkaOperations<K,V, R>
- 既知の直属サブクラス
AggregatingReplyingKafkaTemplate
public class ReplyingKafkaTemplate<K,V,R>
extends KafkaTemplate<K,V>
implements BatchMessageListener<K,R>, InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K,V,R>, ConsumerSeekAware
リクエスト / 応答セマンティクスを実装する KafkaTemplate。
- 導入:
- 2.1.3
- 作成者:
- Gary Russell, Artem Bilan, Borahm Lee, Francois Rosiere, Mikhail Polivakha
ネストされたクラスの要約
インターフェース org.springframework.kafka.listener.ConsumerSeekAware から継承されたネストクラス / インターフェース
ConsumerSeekAware.ConsumerSeekCallbackインターフェース org.springframework.kafka.core.KafkaOperations から継承されたネストクラス / インターフェース
KafkaOperations.OperationsCallback<K,V, T>, KafkaOperations.ProducerCallback<K, V, T> フィールドのサマリー
クラス org.springframework.kafka.core.KafkaTemplate から継承されたフィールド
loggerインターフェース org.springframework.kafka.core.KafkaOperations から継承されたフィールド
DEFAULT_POLL_TIMEOUTインターフェース org.springframework.context.SmartLifecycle から継承されたフィールド
DEFAULT_PHASEコンストラクターの概要
コンストラクターコンストラクター説明ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer) ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer, boolean autoFlush) メソッドのサマリー
修飾子と型メソッド説明voidstatic @Nullable DeserializationExceptioncheckDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, LogAccessor logger) キーまたは値のいずれかが逆直列化に失敗した場合はDeserializationExceptionを返します。それ以外の場合は null。protected @Nullable ExceptionSEcheckForErrors(org.apache.kafka.clients.consumer.ConsumerRecord<K, R> record) 返信にエラーがないか確認してください。voiddestroy()@Nullable CollectionSE<org.apache.kafka.common.TopicPartition> 応答するリスナーコンテナーに割り当てられたトピック / パーティションを返します。protected StringSE相関ヘッダー名を返します。protected DurationSEsendAndReceive(ProducerRecord, Duration)呼び出しで replyTimeout が提供されない場合に使用される応答タイムアウトを返します。intgetPhase()protected booleanhandleTimeout(ObjectSE correlationId, RequestReplyFuture<K, V, R> future) リクエストがタイムアウトしたことをサブクラスに通知して、状態をクリーンアップし、オプションでフューチャーを完了することができるようにするために使用されます。booleanprotected booleanprotected booleanこの相関 ID がまだアクティブな場合は true を返します。booleanprotected voidlogLateArrival(org.apache.kafka.clients.consumer.ConsumerRecord<K, R> record, ObjectSE correlationId) void手動パーティション割り当てを使用する場合、最初のポーリングが完了したときに呼び出されます。auto.offset.reset=latestを使用していて、初期位置が確立されるまで待つ必要がある場合に便利です。voidkafka からのデータで呼び出されます。sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K, V> record) リクエストを送信し、デフォルトのタイムアウトで応答を受信します。sendAndReceive(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, @Nullable DurationSE replyTimeout) リクエストを送信し、返信を受け取ります。sendAndReceive(Message<?> message) リクエストメッセージを送信し、デフォルトのタイムアウトで応答メッセージを受信します。sendAndReceive(Message<?> message, @Nullable DurationSE replyTimeout) リクエストメッセージを送信し、応答メッセージを受信します。<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message, @Nullable DurationSE replyTimeout, @Nullable ParameterizedTypeReference<P> returnType) リクエストメッセージを送信し、応答メッセージを受信します。<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message, @Nullable ParameterizedTypeReference<P> returnType) リクエストメッセージを送信し、応答メッセージを受信します。voidsetAutoStartup(boolean autoStartup) voidsetBinaryCorrelation(boolean binaryCorrelation) バイナリ表現ではなく、相関関係の文字列表現を correlationId として使用するには、false に設定します。voidsetCorrelationHeaderName(StringSE correlationHeaderName) 相関 ID のカスタムヘッダー名を設定します。voidsetCorrelationIdStrategy(FunctionSE<org.apache.kafka.clients.producer.ProducerRecord<K, V>, CorrelationKey> correlationStrategy) リクエストレコードごとに一意の相関キーを確立するために呼び出される関数を設定します。voidsetDefaultReplyTimeout(DurationSE defaultReplyTimeout) sendAndReceive(ProducerRecord, Duration)呼び出しで replyTimeout が提供されない場合に使用される応答タイムアウトを設定します。voidsetPhase(int phase) voidsetReplyErrorChecker(FunctionSE<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, @Nullable ExceptionSE> replyErrorChecker) サーバーから返されたエラーの応答を調べる関数を設定します。voidsetReplyPartitionHeaderName(StringSE replyPartitionHeaderName) 応答パーティションのカスタムヘッダー名を設定します。voidsetReplyTopicHeaderName(StringSE replyTopicHeaderName) 返信トピックのカスタムヘッダー名を設定します。voidsetSharedReplyTopic(boolean sharedReplyTopic) 複数のテンプレートが返信に同じトピックを使用している場合は、true に設定します。voidsetTaskScheduler(TaskScheduler scheduler) voidstart()voidstop()voidstop(RunnableSE callback) booleanwaitForAssignment(DurationSE duration) パーティションが割り当てられるまで待ちます。例:クラス org.springframework.kafka.core.KafkaTemplate から継承されたメソッド
afterSingletonsInstantiated, closeProducer, clusterId, doSend, execute, executeInTransaction, flush, getDefaultTopic, getKafkaAdmin, getMessageConverter, getMicrometerTagsProvider, getObservationRegistry, getProducerFactory, getProducerFactory, getTheProducer, getTransactionIdPrefix, inTransaction, isAllowNonTransactional, isTransactional, metrics, onApplicationEvent, partitionsFor, receive, receive, send, send, send, send, send, send, sendDefault, sendDefault, sendDefault, sendDefault, sendOffsetsToTransaction, setAllowNonTransactional, setApplicationContext, setBeanName, setCloseTimeout, setConsumerFactory, setDefaultTopic, setKafkaAdmin, setMessageConverter, setMessagingConverter, setMicrometerEnabled, setMicrometerTags, setMicrometerTagsProvider, setObservationConvention, setObservationEnabled, setObservationRegistry, setProducerInterceptor, setProducerListener, setTransactionIdPrefixクラス java.lang.ObjectSE から継承されたメソッド
clone, equalsSE, finalize, getClass, hashCode, notify, notifyAll, toString, wait, waitSE, waitSEインターフェース org.springframework.context.ApplicationListener から継承されたメソッド
supportsAsyncExecutionインターフェース org.springframework.kafka.listener.BatchMessageListener から継承されたメソッド
onMessage, wantsPollResultインターフェース org.springframework.kafka.listener.ConsumerSeekAware から継承されたメソッド
onIdleContainer, onPartitionsAssigned, onPartitionsRevoked, registerSeekCallback, unregisterSeekCallbackインターフェース org.springframework.kafka.listener.GenericMessageListener から継承されたメソッド
onMessage, onMessage, onMessageインターフェース org.springframework.kafka.core.KafkaOperations から継承されたメソッド
receive, receiveインターフェース org.springframework.context.SmartLifecycle から継承されたメソッド
isPauseable
コンストラクターの詳細
ReplyingKafkaTemplate
public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer) ReplyingKafkaTemplate
public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer, boolean autoFlush)
メソッドの詳細
setTaskScheduler
getDefaultReplyTimeout
sendAndReceive(ProducerRecord, Duration)呼び出しで replyTimeout が提供されない場合に使用される応答タイムアウトを返します。- 戻り値:
- タイムアウト。
- 導入:
- 2.3
setDefaultReplyTimeout
sendAndReceive(ProducerRecord, Duration)呼び出しで replyTimeout が提供されない場合に使用される応答タイムアウトを設定します。- パラメーター:
defaultReplyTimeout- タイムアウト。- 導入:
- 2.3
isRunning
getPhase
public int getPhase()- 次で指定:
- インターフェース
PhasedのgetPhase - 次で指定:
- インターフェース
SmartLifecycleのgetPhase
setPhase
public void setPhase(int phase) isAutoStartup
public boolean isAutoStartup()- 次で指定:
- インターフェース
SmartLifecycleのisAutoStartup
setAutoStartup
public void setAutoStartup(boolean autoStartup) getAssignedReplyTopicPartitions
public @Nullable CollectionSE<org.apache.kafka.common.TopicPartition> getAssignedReplyTopicPartitions()応答するリスナーコンテナーに割り当てられたトピック / パーティションを返します。- 戻り値:
- トピック / パーティション。
setCorrelationIdStrategy
public void setCorrelationIdStrategy(FunctionSE<org.apache.kafka.clients.producer.ProducerRecord<K, V>, CorrelationKey> correlationStrategy) リクエストレコードごとに一意の相関キーを確立するために呼び出される関数を設定します。- パラメーター:
correlationStrategy- 関数。- 導入:
- 2.3
setCorrelationHeaderName
相関 ID のカスタムヘッダー名を設定します。デフォルトKafkaHeaders.CORRELATION_ID。- パラメーター:
correlationHeaderName- ヘッダー名。- 導入:
- 2.3
getCorrelationHeaderName
setReplyTopicHeaderName
返信トピックのカスタムヘッダー名を設定します。デフォルトKafkaHeaders.REPLY_TOPIC。- パラメーター:
replyTopicHeaderName- ヘッダー名。- 導入:
- 2.3
setReplyPartitionHeaderName
応答パーティションのカスタムヘッダー名を設定します。デフォルトKafkaHeaders.REPLY_PARTITION。- パラメーター:
replyPartitionHeaderName- 応答パーティションのヘッダー名。- 導入:
- 2.3
setReplyErrorChecker
public void setReplyErrorChecker(FunctionSE<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, @Nullable ExceptionSE> replyErrorChecker) サーバーから返されたエラーの応答を調べる関数を設定します。- パラメーター:
replyErrorChecker- エラーチェッカー機能。- 導入:
- 2.6.7
setBinaryCorrelation
public void setBinaryCorrelation(boolean binaryCorrelation) バイナリ表現ではなく、相関関係の文字列表現を correlationId として使用するには、false に設定します。デフォルトは true です。- パラメーター:
binaryCorrelation- 文字列の場合は false。- 導入:
- 3.0
isBinaryCorrelation
protected boolean isBinaryCorrelation()afterPropertiesSet
public void afterPropertiesSet()- 次で指定:
- インターフェース
InitializingBeanのafterPropertiesSet
start
stop
stop
- 次で指定:
- インターフェース
SmartLifecycleのstop
onFirstPoll
public void onFirstPoll()インターフェースからコピーされた説明:ConsumerSeekAware手動パーティション割り当てを使用する場合、最初のポーリングが完了したときに呼び出されます。auto.offset.reset=latestを使用していて、初期位置が確立されるまで待つ必要がある場合に便利です。- 次で指定:
- インターフェース
ConsumerSeekAwareのonFirstPoll
waitForAssignment
インターフェースからコピーされた説明:ReplyingKafkaOperationsパーティションが割り当てられるまで待ちます。例:auto.offset.reset=latestの場合。手動割り当てを使用する場合、期間はコンテナーのpollTimeoutプロパティよりも長くする必要があります。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,のV, R> waitForAssignment - パラメーター:
duration- どのぐらい待つのか。- 戻り値:
- パーティションが割り当てられている場合は true。
- 例外:
InterruptedExceptionSE- 待機中にスレッドが中断された場合。
sendAndReceive
インターフェースからコピーされた説明:ReplyingKafkaOperationsリクエストメッセージを送信し、デフォルトのタイムアウトで応答メッセージを受信します。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,のV, R> sendAndReceive - パラメーター:
message- 送信するメッセージ。- 戻り値:
- RequestReplyMessageFuture。
sendAndReceive
public RequestReplyMessageFuture<K,V> sendAndReceive(Message<?> message, @Nullable DurationSE replyTimeout) インターフェースからコピーされた説明:ReplyingKafkaOperationsリクエストメッセージを送信し、応答メッセージを受信します。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,のV, R> sendAndReceive - パラメーター:
message- 送信するメッセージ。replyTimeout- 応答タイムアウト。null の場合、デフォルトが使用されます。- 戻り値:
- RequestReplyMessageFuture。
sendAndReceive
public <P> RequestReplyTypedMessageFuture<K,V, sendAndReceiveP> (Message<?> message, @Nullable ParameterizedTypeReference<P> returnType) インターフェースからコピーされた説明:ReplyingKafkaOperationsリクエストメッセージを送信し、応答メッセージを受信します。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,のV, R> sendAndReceive - 型パラメーター:
P- 応答ペイロード型。- パラメーター:
message- 送信するメッセージ。returnType- 応答ペイロード型のメッセージコンバーターへのヒント。- 戻り値:
- RequestReplyMessageFuture。
sendAndReceive
public <P> RequestReplyTypedMessageFuture<K,V, sendAndReceiveP> (Message<?> message, @Nullable DurationSE replyTimeout, @Nullable ParameterizedTypeReference<P> returnType) インターフェースからコピーされた説明:ReplyingKafkaOperationsリクエストメッセージを送信し、応答メッセージを受信します。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,のV, R> sendAndReceive - 型パラメーター:
P- 応答ペイロード型。- パラメーター:
message- 送信するメッセージ。replyTimeout- 応答タイムアウト。null の場合、デフォルトが使用されます。returnType- 応答ペイロード型のメッセージコンバーターへのヒント。- 戻り値:
- RequestReplyMessageFuture。
sendAndReceive
public RequestReplyFuture<K,V, sendAndReceiveR> (org.apache.kafka.clients.producer.ProducerRecord<K, V> record) インターフェースからコピーされた説明:ReplyingKafkaOperationsリクエストを送信し、デフォルトのタイムアウトで応答を受信します。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,のV, R> sendAndReceive - パラメーター:
record- 送信するレコード。- 戻り値:
- RequestReplyFuture。
sendAndReceive
public RequestReplyFuture<K,V, sendAndReceiveR> (org.apache.kafka.clients.producer.ProducerRecord<K, V> record, @Nullable DurationSE replyTimeout) インターフェースからコピーされた説明:ReplyingKafkaOperationsリクエストを送信し、返信を受け取ります。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,のV, R> sendAndReceive - パラメーター:
record- 送信するレコード。replyTimeout- 応答タイムアウト。null の場合、デフォルトが使用されます。- 戻り値:
- RequestReplyFuture。
handleTimeout
リクエストがタイムアウトしたことをサブクラスに通知して、状態をクリーンアップし、オプションでフューチャーを完了することができるようにするために使用されます。- パラメーター:
correlationId- 相関 ID。future- 未来。- 戻り値:
- true は、将来が完了したことを示します。
- 導入:
- 2.3
isPending
この相関 ID がまだアクティブな場合は true を返します。- パラメーター:
correlationId- 相関 ID。- 戻り値:
- 保留中の場合は true。
- 導入:
- 2.3
destroy
public void destroy()- 次で指定:
- インターフェース
DisposableBeanのdestroy - オーバーライド:
- クラス
KafkaTemplate<K,のV> destroy
onMessage
インターフェースからコピーされた説明:GenericMessageListenerkafka からのデータで呼び出されます。- 次で指定:
- インターフェース
GenericMessageListener<K>のonMessage - パラメーター:
data- 処理されるデータ。
checkForErrors
protected @Nullable ExceptionSE checkForErrors(org.apache.kafka.clients.consumer.ConsumerRecord<K, R> record) 返信にエラーがないか確認してください。デフォルトの実装はDeserializationExceptionをチェックし、replyErrorChecker関数を呼び出します。- パラメーター:
record- レコード。- 戻り値:
- 例外、または存在しない場合は null。
- 導入:
- 2.6.7
checkDeserialization
public static @Nullable DeserializationException checkDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, LogAccessor logger) キーまたは値のいずれかが逆直列化に失敗した場合は、DeserializationExceptionを返します。それ以外の場合は null。それがキーであるか値であるかを判別する必要がある場合は、代わりにSerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADERおよびSerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADERを使用してSerializationUtils.getExceptionFromHeader(ConsumerRecord, String, LogAccessor)を呼び出します。- パラメーター:
record- レコード。logger-LogAccessor。- 戻り値:
DeserializationExceptionまたはnull。- 導入:
- 2.2.15
logLateArrival