クラス ReplyingKafkaTemplate<K,V,R>
java.lang.ObjectSE
org.springframework.kafka.core.KafkaTemplate<K,V>
org.springframework.kafka.requestreply.ReplyingKafkaTemplate<K,V,R>
- 型パラメーター:
K
- 鍵の型。V
- 送信データ型。R
- 応答データ型。
- 実装されたすべてのインターフェース:
EventListenerSE
、Aware
、BeanNameAware
、DisposableBean
、InitializingBean
、SmartInitializingSingleton
、ApplicationContextAware
、ApplicationListener<ContextStoppedEvent>
、Lifecycle
、Phased
、SmartLifecycle
、KafkaOperations<K,
、V> BatchMessageListener<K,
、R> ConsumerSeekAware
、GenericMessageListener<ListSE<org.apache.kafka.clients.consumer.ConsumerRecord<K,
、R>>> ReplyingKafkaOperations<K,
V, R>
- 既知の直属サブクラス
AggregatingReplyingKafkaTemplate
public class ReplyingKafkaTemplate<K,V,R>
extends KafkaTemplate<K,V>
implements BatchMessageListener<K,R>, InitializingBean, SmartLifecycle, DisposableBean, ReplyingKafkaOperations<K,V,R>, ConsumerSeekAware
リクエスト / 応答セマンティクスを実装する KafkaTemplate。
- 導入:
- 2.1.3
- 作成者:
- Gary Russell, Artem Bilan, Borahm Lee
ネストされたクラスのサマリー
インターフェース org.springframework.kafka.listener.ConsumerSeekAware から継承されたネストクラス / インターフェース
ConsumerSeekAware.ConsumerSeekCallback
インターフェース org.springframework.kafka.core.KafkaOperations から継承されたネストクラス / インターフェース
KafkaOperations.OperationsCallback<K,
V, T>, KafkaOperations.ProducerCallback<K, V, T> フィールドサマリー
クラス org.springframework.kafka.core.KafkaTemplate から継承されたフィールド
logger
インターフェース org.springframework.kafka.core.KafkaOperations から継承されたフィールド
DEFAULT_POLL_TIMEOUT
インターフェース org.springframework.context.SmartLifecycle から継承されたフィールド
DEFAULT_PHASE
コンストラクターのサマリー
コンストラクターコンストラクター説明ReplyingKafkaTemplate
(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer) ReplyingKafkaTemplate
(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer, boolean autoFlush) メソッドのサマリー
修飾子と型メソッド説明void
static DeserializationException
checkDeserialization
(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, LogAccessor logger) キーまたは値のいずれかが逆直列化に失敗した場合はDeserializationException
を返します。それ以外の場合は null。protected ExceptionSE
checkForErrors
(org.apache.kafka.clients.consumer.ConsumerRecord<K, R> record) 返信にエラーがないか確認してください。void
destroy()
CollectionSE<org.apache.kafka.common.TopicPartition>
応答するリスナーコンテナーに割り当てられたトピック / パーティションを返します。protected StringSE
相関ヘッダー名を返します。protected DurationSE
sendAndReceive(ProducerRecord, Duration)
呼び出しで replyTimeout が提供されない場合に使用される応答タイムアウトを返します。int
getPhase()
protected boolean
handleTimeout
(ObjectSE correlationId, RequestReplyFuture<K, V, R> future) リクエストがタイムアウトしたことをサブクラスに通知して、状態をクリーンアップし、オプションでフューチャーを完了することができるようにするために使用されます。boolean
protected boolean
protected boolean
この相関 ID がまだアクティブな場合は true を返します。boolean
protected void
logLateArrival
(org.apache.kafka.clients.consumer.ConsumerRecord<K, R> record, ObjectSE correlationId) void
手動パーティション割り当てを使用する場合、最初のポーリングが完了したときに呼び出されます。auto.offset.reset=latest
を使用していて、初期位置が確立されるまで待つ必要がある場合に便利です。void
kafka からのデータで呼び出されます。sendAndReceive
(org.apache.kafka.clients.producer.ProducerRecord<K, V> record) リクエストを送信し、デフォルトのタイムアウトで応答を受信します。sendAndReceive
(org.apache.kafka.clients.producer.ProducerRecord<K, V> record, DurationSE replyTimeout) リクエストを送信し、返信を受け取ります。sendAndReceive
(Message<?> message) リクエストメッセージを送信し、デフォルトのタイムアウトで応答メッセージを受信します。sendAndReceive
(Message<?> message, DurationSE replyTimeout) リクエストメッセージを送信し、応答メッセージを受信します。<P> RequestReplyTypedMessageFuture<K,
V, P> sendAndReceive
(Message<?> message, DurationSE replyTimeout, ParameterizedTypeReference<P> returnType) リクエストメッセージを送信し、応答メッセージを受信します。<P> RequestReplyTypedMessageFuture<K,
V, P> sendAndReceive
(Message<?> message, ParameterizedTypeReference<P> returnType) リクエストメッセージを送信し、応答メッセージを受信します。void
setAutoStartup
(boolean autoStartup) void
setBinaryCorrelation
(boolean binaryCorrelation) バイナリ表現ではなく、correlationId として相関の文字列表現を使用するには、false に設定します。void
setCorrelationHeaderName
(StringSE correlationHeaderName) 相関 ID のカスタムヘッダー名を設定します。void
setCorrelationIdStrategy
(FunctionSE<org.apache.kafka.clients.producer.ProducerRecord<K, V>, CorrelationKey> correlationStrategy) リクエストレコードごとに一意の相関キーを確立するために呼び出される関数を設定します。void
setDefaultReplyTimeout
(DurationSE defaultReplyTimeout) sendAndReceive(ProducerRecord, Duration)
呼び出しで replyTimeout が提供されない場合に使用される応答タイムアウトを設定します。void
setPhase
(int phase) void
setReplyErrorChecker
(FunctionSE<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, ExceptionSE> replyErrorChecker) サーバーから返されたエラーの応答を調べる関数を設定します。void
setReplyPartitionHeaderName
(StringSE replyPartitionHeaderName) 応答パーティションのカスタムヘッダー名を設定します。void
setReplyTopicHeaderName
(StringSE replyTopicHeaderName) 返信トピックのカスタムヘッダー名を設定します。void
setSharedReplyTopic
(boolean sharedReplyTopic) 複数のテンプレートが返信に同じトピックを使用している場合は、true に設定します。void
setTaskScheduler
(TaskScheduler scheduler) void
start()
void
stop()
void
stop
(RunnableSE callback) boolean
waitForAssignment
(DurationSE duration) パーティションが割り当てられるまで待ちます。例:クラス org.springframework.kafka.core.KafkaTemplate から継承されたメソッド
afterSingletonsInstantiated, closeProducer, doSend, execute, executeInTransaction, flush, getDefaultTopic, getKafkaAdmin, getMessageConverter, getMicrometerTagsProvider, getProducerFactory, getProducerFactory, getTheProducer, getTransactionIdPrefix, inTransaction, isAllowNonTransactional, isTransactional, metrics, onApplicationEvent, partitionsFor, receive, receive, send, send, send, send, send, send, sendDefault, sendDefault, sendDefault, sendDefault, sendOffsetsToTransaction, setAllowNonTransactional, setApplicationContext, setBeanName, setCloseTimeout, setConsumerFactory, setDefaultTopic, setKafkaAdmin, setMessageConverter, setMessagingConverter, setMicrometerEnabled, setMicrometerTags, setMicrometerTagsProvider, setObservationConvention, setObservationEnabled, setProducerInterceptor, setProducerListener, setTransactionIdPrefix
クラス java.lang.ObjectSE から継承されたメソッド
clone, equalsSE, finalize, getClass, hashCode, notify, notifyAll, toString, wait, waitSE, waitSE
インターフェース org.springframework.context.ApplicationListener から継承されたメソッド
supportsAsyncExecution
インターフェース org.springframework.kafka.listener.BatchMessageListener から継承されたメソッド
onMessage, wantsPollResult
インターフェース org.springframework.kafka.listener.ConsumerSeekAware から継承されたメソッド
onIdleContainer, onPartitionsAssigned, onPartitionsRevoked, registerSeekCallback, unregisterSeekCallback
インターフェース org.springframework.kafka.listener.GenericMessageListener から継承されたメソッド
onMessage, onMessage, onMessage
インターフェース org.springframework.kafka.core.KafkaOperations から継承されたメソッド
receive, receive
コンストラクターの詳細
ReplyingKafkaTemplate
public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer) ReplyingKafkaTemplate
public ReplyingKafkaTemplate(ProducerFactory<K, V> producerFactory, GenericMessageListenerContainer<K, R> replyContainer, boolean autoFlush)
メソッドの詳細
setTaskScheduler
getDefaultReplyTimeout
sendAndReceive(ProducerRecord, Duration)
呼び出しで replyTimeout が提供されない場合に使用される応答タイムアウトを返します。- 戻り値:
- タイムアウト。
- 導入:
- 2.3
setDefaultReplyTimeout
sendAndReceive(ProducerRecord, Duration)
呼び出しで replyTimeout が提供されない場合に使用される応答タイムアウトを設定します。- パラメーター:
defaultReplyTimeout
- タイムアウト。- 導入:
- 2.3
isRunning
public boolean isRunning()getPhase
public int getPhase()- 次で指定:
- インターフェース
Phased
のgetPhase
- 次で指定:
- インターフェース
SmartLifecycle
のgetPhase
setPhase
public void setPhase(int phase) isAutoStartup
public boolean isAutoStartup()- 次で指定:
- インターフェース
SmartLifecycle
のisAutoStartup
setAutoStartup
public void setAutoStartup(boolean autoStartup) getAssignedReplyTopicPartitions
応答するリスナーコンテナーに割り当てられたトピック / パーティションを返します。- 戻り値:
- トピック / パーティション。
setCorrelationIdStrategy
public void setCorrelationIdStrategy(FunctionSE<org.apache.kafka.clients.producer.ProducerRecord<K, V>, CorrelationKey> correlationStrategy) リクエストレコードごとに一意の相関キーを確立するために呼び出される関数を設定します。- パラメーター:
correlationStrategy
- 関数。- 導入:
- 2.3
setCorrelationHeaderName
相関 ID のカスタムヘッダー名を設定します。デフォルトKafkaHeaders.CORRELATION_ID
。- パラメーター:
correlationHeaderName
- ヘッダー名。- 導入:
- 2.3
getCorrelationHeaderName
相関ヘッダー名を返します。- 戻り値:
- ヘッダー名。
- 導入:
- 2.8.8
setReplyTopicHeaderName
返信トピックのカスタムヘッダー名を設定します。デフォルトKafkaHeaders.REPLY_TOPIC
。- パラメーター:
replyTopicHeaderName
- ヘッダー名。- 導入:
- 2.3
setReplyPartitionHeaderName
応答パーティションのカスタムヘッダー名を設定します。デフォルトKafkaHeaders.REPLY_PARTITION
。- パラメーター:
replyPartitionHeaderName
- 応答パーティションのヘッダー名。- 導入:
- 2.3
setReplyErrorChecker
public void setReplyErrorChecker(FunctionSE<org.apache.kafka.clients.consumer.ConsumerRecord<?, ?>, ExceptionSE> replyErrorChecker) サーバーから返されたエラーの応答を調べる関数を設定します。- パラメーター:
replyErrorChecker
- エラーチェッカー機能。- 導入:
- 2.6.7
setBinaryCorrelation
public void setBinaryCorrelation(boolean binaryCorrelation) バイナリ表現ではなく、correlationId として相関の文字列表現を使用するには、false に設定します。デフォルトは真です。- パラメーター:
binaryCorrelation
- 文字列の場合は false。- 導入:
- 3.0
isBinaryCorrelation
protected boolean isBinaryCorrelation()afterPropertiesSet
public void afterPropertiesSet()- 次で指定:
- インターフェース
InitializingBean
のafterPropertiesSet
start
public void start()stop
public void stop()stop
- 次で指定:
- インターフェース
SmartLifecycle
のstop
onFirstPoll
public void onFirstPoll()インターフェースからコピーされた説明:ConsumerSeekAware
手動パーティション割り当てを使用する場合、最初のポーリングが完了したときに呼び出されます。auto.offset.reset=latest
を使用していて、初期位置が確立されるまで待つ必要がある場合に便利です。- 次で指定:
- インターフェース
ConsumerSeekAware
のonFirstPoll
waitForAssignment
インターフェースからコピーされた説明:ReplyingKafkaOperations
パーティションが割り当てられるまで待ちます。例:auto.offset.reset=latest
の場合。手動割り当てを使用する場合、期間はコンテナーのpollTimeout
プロパティよりも長くする必要があります。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,
のV, R> waitForAssignment
- パラメーター:
duration
- どのぐらい待つのか。- 戻り値:
- パーティションが割り当てられている場合は true。
- 例外:
InterruptedExceptionSE
- 待機中にスレッドが中断された場合。
sendAndReceive
インターフェースからコピーされた説明:ReplyingKafkaOperations
リクエストメッセージを送信し、デフォルトのタイムアウトで応答メッセージを受信します。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,
のV, R> sendAndReceive
- パラメーター:
message
- 送信するメッセージ。- 戻り値:
- RequestReplyMessageFuture。
sendAndReceive
インターフェースからコピーされた説明:ReplyingKafkaOperations
リクエストメッセージを送信し、応答メッセージを受信します。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,
のV, R> sendAndReceive
- パラメーター:
message
- 送信するメッセージ。replyTimeout
- 応答タイムアウト。null の場合、デフォルトが使用されます。- 戻り値:
- RequestReplyMessageFuture。
sendAndReceive
public <P> RequestReplyTypedMessageFuture<K,V, sendAndReceiveP> (Message<?> message, @Nullable ParameterizedTypeReference<P> returnType) インターフェースからコピーされた説明:ReplyingKafkaOperations
リクエストメッセージを送信し、応答メッセージを受信します。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,
のV, R> sendAndReceive
- 型パラメーター:
P
- 応答ペイロード型。- パラメーター:
message
- 送信するメッセージ。returnType
- 応答ペイロード型のメッセージコンバーターへのヒント。- 戻り値:
- RequestReplyMessageFuture。
sendAndReceive
public <P> RequestReplyTypedMessageFuture<K,V, sendAndReceiveP> (Message<?> message, @Nullable DurationSE replyTimeout, @Nullable ParameterizedTypeReference<P> returnType) インターフェースからコピーされた説明:ReplyingKafkaOperations
リクエストメッセージを送信し、応答メッセージを受信します。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,
のV, R> sendAndReceive
- 型パラメーター:
P
- 応答ペイロード型。- パラメーター:
message
- 送信するメッセージ。replyTimeout
- 応答タイムアウト。null の場合、デフォルトが使用されます。returnType
- 応答ペイロード型のメッセージコンバーターへのヒント。- 戻り値:
- RequestReplyMessageFuture。
sendAndReceive
public RequestReplyFuture<K,V, sendAndReceiveR> (org.apache.kafka.clients.producer.ProducerRecord<K, V> record) インターフェースからコピーされた説明:ReplyingKafkaOperations
リクエストを送信し、デフォルトのタイムアウトで応答を受信します。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,
のV, R> sendAndReceive
- パラメーター:
record
- 送信するレコード。- 戻り値:
- RequestReplyFuture。
sendAndReceive
public RequestReplyFuture<K,V, sendAndReceiveR> (org.apache.kafka.clients.producer.ProducerRecord<K, V> record, @Nullable DurationSE replyTimeout) インターフェースからコピーされた説明:ReplyingKafkaOperations
リクエストを送信し、返信を受け取ります。- 次で指定:
- インターフェース
ReplyingKafkaOperations<K,
のV, R> sendAndReceive
- パラメーター:
record
- 送信するレコード。replyTimeout
- 応答タイムアウト。null の場合、デフォルトが使用されます。- 戻り値:
- RequestReplyFuture。
handleTimeout
リクエストがタイムアウトしたことをサブクラスに通知して、状態をクリーンアップし、オプションでフューチャーを完了することができるようにするために使用されます。- パラメーター:
correlationId
- 相関 ID。future
- 未来。- 戻り値:
- true は、将来が完了したことを示します。
- 導入:
- 2.3
isPending
この相関 ID がまだアクティブな場合は true を返します。- パラメーター:
correlationId
- 相関 ID。- 戻り値:
- 保留中の場合は true。
- 導入:
- 2.3
destroy
public void destroy()- 次で指定:
- インターフェース
DisposableBean
のdestroy
- オーバーライド:
- クラス
KafkaTemplate<K,
のV> destroy
onMessage
インターフェースからコピーされた説明:GenericMessageListener
kafka からのデータで呼び出されます。- 次で指定:
- インターフェース
GenericMessageListener<K>
のonMessage
- パラメーター:
data
- 処理されるデータ。
checkForErrors
@Nullable protected ExceptionSE checkForErrors(org.apache.kafka.clients.consumer.ConsumerRecord<K, R> record) 返信にエラーがないか確認してください。デフォルトの実装はDeserializationException
をチェックし、replyErrorChecker
関数を呼び出します。- パラメーター:
record
- レコード。- 戻り値:
- 例外、または存在しない場合は null。
- 導入:
- 2.6.7
checkDeserialization
@Nullable public static DeserializationException checkDeserialization(org.apache.kafka.clients.consumer.ConsumerRecord<?, ?> record, LogAccessor logger) キーまたは値のいずれかが逆直列化に失敗した場合は、DeserializationException
を返します。それ以外の場合は null。それがキーであるか値であるかを判別する必要がある場合は、代わりにSerializationUtils.KEY_DESERIALIZER_EXCEPTION_HEADER
およびSerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER
を使用してSerializationUtils.getExceptionFromHeader(ConsumerRecord, String, LogAccessor)
を呼び出します。- パラメーター:
record
- レコード。logger
-LogAccessor
。- 戻り値:
DeserializationException
またはnull
。- 導入:
- 2.2.15
logLateArrival