メッセージ送信
このセクションでは、メッセージの送信方法について説明します。
KafkaTemplate
を使用する
このセクションでは、KafkaTemplate
を使用してメッセージを送信する方法について説明します。
概要
KafkaTemplate
はプロデューサーをラップし、Kafka トピックにデータを送信するための便利なメソッドを提供します。次のリストは、KafkaTemplate
の関連するメソッドを示しています。
CompletableFuture<SendResult<K, V>> sendDefault(V data);
CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, V data);
CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
CompletableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
<T> T executeInTransaction(OperationsCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
interface OperationsCallback<K, V, T> {
T doInOperations(KafkaOperations<K, V> operations);
}
詳細については、Javadoc を参照してください。
sendDefault
API では、デフォルトのトピックがテンプレートに提供されている必要があります。
API は timestamp
をパラメーターとして受け取り、このタイムスタンプをレコードに保存します。ユーザー指定のタイムスタンプの保存方法は、Kafka トピックで構成されているタイムスタンプの種類によって異なります。トピックが CREATE_TIME
を使用するように構成されている場合、ユーザー指定のタイムスタンプが記録されます (指定されていない場合は生成されます)。トピックが LOG_APPEND_TIME
を使用するように構成されている場合、ユーザー指定のタイムスタンプは無視され、ブローカーはローカルブローカー時間を追加します。
metrics
メソッドと partitionsFor
メソッドは、基盤となる Producer
[Apache] (英語) の同じメソッドに委譲します。execute
メソッドは、基盤となる Producer
[Apache] (英語) への直接アクセスを提供します。
テンプレートを使用するには、プロデューサーファクトリを構成し、テンプレートのコンストラクターで提供します。次の例は、その方法を示しています。
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
バージョン 2.5 以降、ファクトリの ProducerConfig
プロパティをオーバーライドして、同じファクトリから異なるプロデューサー構成でテンプレートを作成できるようになりました。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
型 ProducerFactory<?, ?>
の Bean(Spring Boot によって自動構成されたものなど)は、さまざまなナロージェネリクス型で参照できることに注意してください。
標準の <bean/>
定義を使用してテンプレートを構成することもできます。
次に、テンプレートを使用するために、そのメソッドの 1 つを呼び出すことができます。
Message<?>
パラメーターを指定してメソッドを使用すると、トピック、パーティション、キー、タイムスタンプの情報が、次の項目を含むメッセージヘッダーで提供されます。
KafkaHeaders.TOPIC
KafkaHeaders.PARTITION
KafkaHeaders.KEY
KafkaHeaders.TIMESTAMP
メッセージペイロードはデータです。
オプションで、Future
が完了するのを待つ代わりに、送信の結果(成功または失敗)を含む非同期コールバックを取得するように ProducerListener
を使用して KafkaTemplate
を構成できます。次のリストは、ProducerListener
インターフェースの定義を示しています。
public interface ProducerListener<K, V> {
default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
}
default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
}
}
デフォルトでは、テンプレートは LoggingProducerListener
で構成されています。LoggingProducerListener
はエラーをログに記録し、送信が成功しても何もしません。
便宜上、メソッドの 1 つだけを実装する場合に備えて、デフォルトのメソッド実装が提供されています。
send メソッドが CompletableFuture<SendResult>
を返すことに注意してください。コールバックをリスナーに登録して、送信の結果を非同期で受信できます。次の例は、その方法を示しています。
CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
...
});
SendResult
には、ProducerRecord
と RecordMetadata
の 2 つのプロパティがあります。これらのオブジェクトについては、Kafka API のドキュメントを参照してください。
Throwable
は KafkaProducerException
にキャストできます。その producerRecord
プロパティには、失敗したレコードが含まれています。
結果を待つために送信スレッドをブロックしたい場合は、future の get()
メソッドを呼び出すことができます。タイムアウトのあるメソッドを使用することをお勧めします。linger.ms
を設定した場合は、待機する前に flush()
を呼び出すか、便宜上、テンプレートに autoFlush
パラメーターを持つコンストラクターがあり、送信ごとにテンプレートが flush()
になるようにすることができます。フラッシングが必要になるのは、linger.ms
プロデューサープロパティを設定していて、部分的なバッチをすぐに送信したい場合だけです。
サンプル
このセクションでは、Kafka にメッセージを送信する例を示します。
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
CompletableFuture<SendResult<String, String>> future = template.send(record);
future.whenComplete((result, ex) -> {
if (ex == null) {
handleSuccess(data);
}
else {
handleFailure(data, record, ex);
}
});
}
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
ExecutionException
の原因は、producerRecord
プロパティを持つ KafkaProducerException
であることに注意してください。
RoutingKafkaTemplate
を使用する
バージョン 2.5 以降では、RoutingKafkaTemplate
を使用して、宛先 topic
名に基づいて実行時にプロデューサーを選択できます。
ルーティングテンプレートは、トランザクション、execute 、flush 、metrics 操作についてトピックが不明であるため、これらの操作をサポートしていません。 |
テンプレートには、java.util.regex.Pattern
から ProducerFactory<Object, Object>
インスタンスへのマップが必要です。このマップは順番にトラバースされるため、順序付けする必要があります(LinkedHashMap
など)。最初に、より具体的なパターンを追加する必要があります。
次の単純な Spring Boot アプリケーションは、同じテンプレートを使用して、それぞれが異なる値のシリアライザーを使用して、異なるトピックに送信する方法の例を示しています。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
この例に対応する @KafkaListener
は、アノテーションプロパティに示されています。
同様の結果を達成するための別の手法については、同じトピックに異なる型を送信する追加機能を使用して、シリアライザーとデシリアライザーの委譲を参照してください。
DefaultKafkaProducerFactory
を使用する
KafkaTemplate
を使用するに見られるように、ProducerFactory
はプロデューサーを作成するために使用されます。
トランザクションを使用しない場合、KafkaProducer
JavaDocs で推奨されているように、デフォルトで DefaultKafkaProducerFactory
はすべてのクライアントによって使用されるシングルトンプロデューサーを作成します。ただし、テンプレートで flush()
を呼び出すと、同じプロデューサーを使用する他のスレッドに遅延が発生する可能性があります。バージョン 2.3 以降、DefaultKafkaProducerFactory
には新しいプロパティ producerPerThread
があります。true
に設定すると、この課題を回避するために、ファクトリはスレッドごとに個別のプロデューサーを作成 (およびキャッシュ) します。
producerPerThread が true ある場合はプロデューサーが不要になった際に、ユーザーコードは、提供時に closeThreadBoundProducer() を呼び出す必要があります。これにより、プロデューサーが物理的に閉じられ、ThreadLocal から削除されます。reset() または destroy() を呼び出しても、これらのプロデューサーはクリーンアップされません。 |
DefaultKafkaProducerFactory
を作成する場合、プロパティのマップのみを取り込むコンストラクター(KafkaTemplate
の使用の例を参照)を呼び出すことにより、構成からキーや値の Serializer
クラスを取得できます。または、Serializer
インスタンスを DefaultKafkaProducerFactory
コンストラクターに渡すこともできます(すべての Producer
が同じインスタンスを共有する場合)。または、Producer
ごとに個別の Serializer
インスタンスを取得するために使用される Supplier<Serializer>
(バージョン 2.3 以降)を提供することもできます。
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
バージョン 2.5.10 以降では、ファクトリの作成後にプロデューサープロパティを更新できるようになりました。これは、たとえば、資格情報の変更後に SSL キー / 信頼ストアの場所を更新する必要がある場合に役立ちます。変更は既存のプロデューサーインスタンスには影響しません。reset()
を呼び出して既存のプロデューサーを閉じ、新しいプロパティを使用して新しいプロデューサーを作成します。
トランザクションプロデューサーファクトリを非トランザクションに変更することはできません。また、その逆も同様です。 |
2 つの新しいメソッドが提供されるようになりました。
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
バージョン 2.8 以降、シリアライザーをオブジェクトとして(コンストラクター内または setter 経由で)提供する場合、ファクトリは configure()
メソッドを呼び出して、構成プロパティを使用して構成します。
ReplyingKafkaTemplate
を使用する
バージョン 2.1.3 は、リクエスト / 応答セマンティクスを提供するために KafkaTemplate
のサブクラスを導入しました。このクラスの名前は ReplyingKafkaTemplate
で、2 つの追加メソッドがあります。以下にメソッドシグネチャーを示します。
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
(Message<?>
を使用したリクエスト / 応答も参照してください)。
結果は CompletableFuture
であり、結果(またはタイムアウトの場合は例外)が非同期で入力されます。結果には、KafkaTemplate.send()
を呼び出した結果である sendFuture
プロパティも含まれます。この future を使用して、送信操作の結果を判別できます。
最初の方法が使用される場合、または replyTimeout
引数が null
の場合、テンプレートの defaultReplyTimeout
プロパティが使用されます(デフォルトでは 5 秒)。
バージョン 2.8.8 以降、テンプレートには新しいメソッド waitForAssignment
があります。これは、応答コンテナーが auto.offset.reset=latest
で構成されている場合に役立ち、コンテナーが初期化される前にリクエストと応答が送信されないようにします。
手動のパーティション割り当て(グループ管理なし)を使用する場合、最初のポーリングが完了するまで通知が送信されないため、待機時間はコンテナーの pollTimeout プロパティよりも長くする必要があります。 |
次の Spring Boot アプリケーションは、この機能の使用方法の例を示しています。
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
Boot の自動構成されたコンテナーファクトリを使用して応答コンテナーを作成できることに注意してください。
重要なデシリアライザーが応答に使用されている場合は、構成済みのデシリアライザーに委譲する ErrorHandlingDeserializer
の使用を検討してください。このように構成すると、RequestReplyFuture
は例外的に完了し、DeserializationException
を cause
プロパティに含めることで ExecutionException
をキャッチできます。
バージョン 2.6.7 以降、DeserializationException
の検出に加えて、テンプレートは、提供されている場合、replyErrorChecker
関数を呼び出します。例外が返された場合、将来は例外的に完了します。
次に例を示します。
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause() instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
テンプレートはヘッダー(デフォルトでは KafkaHeaders.CORRELATION_ID
という名前)を設定します。これはサーバー側でエコーバックする必要があります。
この場合、次の @KafkaListener
アプリケーションが応答します。
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
@KafkaListener
インフラストラクチャは相関 ID をエコーし、応答トピックを決定します。
返信の送信の詳細については、@SendTo
を使用したリスナー結果の転送を参照してください。テンプレートは、デフォルトのヘッダー KafKaHeaders.REPLY_TOPIC
を使用して、返信先のトピックを示します。
バージョン 2.2 以降、テンプレートは、構成された応答コンテナーから応答トピックまたはパーティションを検出しようとします。コンテナーが単一のトピックまたは単一の TopicPartitionOffset
をリッスンするように構成されている場合、コンテナーは応答ヘッダーを設定するために使用されます。コンテナーが別の方法で構成されている場合、ユーザーは応答ヘッダーを設定する必要があります。この場合、INFO
ログメッセージが初期化中に書き込まれます。次の例では、KafkaHeaders.REPLY_TOPIC
を使用しています。
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
単一の応答 TopicPartitionOffset
で構成する場合、各インスタンスが異なるパーティションでリッスンする限り、複数のテンプレートに同じ応答トピックを使用できます。単一の応答トピックで構成する場合、各インスタンスは異なる group.id
を使用する必要があります。この場合、すべてのインスタンスが各応答を受信しますが、リクエストを送信したインスタンスのみが相関 ID を検出します。これは自動スケーリングに役立つ場合がありますが、追加のネットワークトラフィックのオーバーヘッドと、不要な応答を破棄するためのわずかなコストが伴います。この設定を使用する場合は、テンプレートの sharedReplyTopic
を true
に設定することをお勧めします。これにより、デフォルトの ERROR ではなく DEBUG への予期しない応答のログレベルが低下します。
以下は、同じ共有返信トピックを使用するように返信コンテナーを構成する例です。
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
複数のクライアントインスタンスがあり、前の段落で説明したように構成しない場合、各インスタンスには専用の応答トピックが必要です。別の方法は、KafkaHeaders.REPLY_PARTITION を設定し、インスタンスごとに専用のパーティションを使用することです。Header には、4 バイトの int(ビッグエンディアン)が含まれています。サーバーはこのヘッダーを使用して、応答を正しいパーティションにルーティングする必要があります(@KafkaListener がこれを行います)。ただし、この場合、応答コンテナーは Kafka のグループ管理機能を使用してはならず、固定パーティションでリッスンするように構成する必要があります(ContainerProperties コンストラクターで TopicPartitionOffset を使用することにより)。 |
DefaultKafkaHeaderMapper では、Jackson がクラスパス上にある必要があります(@KafkaListener の場合)。使用できない場合、メッセージコンバーターにはヘッダーマッパーがないため、前に示したように、SimpleKafkaHeaderMapper を使用して MessagingMessageConverter を構成する必要があります。 |
デフォルトでは、3 つのヘッダーが使用されます。
KafkaHeaders.CORRELATION_ID
- 応答をリクエストに関連付けるために使用されますKafkaHeaders.REPLY_TOPIC
- サーバーに返信先を指示するために使用されますKafkaHeaders.REPLY_PARTITION
- (オプション)どのパーティションに応答するかをサーバーに指示するために使用されます
これらのヘッダー名は、@KafkaListener
インフラストラクチャーが応答をルーティングするために使用します。
バージョン 2.3 以降、ヘッダー名をカスタマイズできます。テンプレートには 3 つのプロパティ correlationHeaderName
、replyTopicHeaderName
、replyPartitionHeaderName
があります。これは、サーバーが Spring アプリケーションではない(または @KafkaListener
を使用していない)場合に役立ちます。
逆に、リクエスト元のアプリケーションが Spring アプリケーションではなく、バージョン 3.0 以降の別のヘッダーに相関情報を配置する場合、リスナーコンテナーファクトリでカスタム correlationHeaderName を構成すると、そのヘッダーがエコーバックされます。以前は、リスナーはカスタム相関ヘッダーをエコーする必要がありました。 |
Message<?>
を使用したリクエスト / 応答
バージョン 2.7 は、spring-messaging
の Message<?>
抽象化を送受信するためのメソッドを ReplyingKafkaTemplate
に追加しました。
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
これらはテンプレートのデフォルトの replyTimeout
を使用します。また、メソッド呼び出しでタイムアウトが発生する可能性のあるオーバーロードされたバージョンもあります。
コンシューマーの Deserializer
またはテンプレートの MessageConverter
が、構成を介して、または応答メッセージにメタデータを入力することにより、追加情報なしでペイロードを変換できる場合は、最初の方法を使用します。
メッセージコンバーターを支援するために、戻り値の型の型情報を提供する必要がある場合は、2 番目の方法を使用してください。これにより、サーバー側が Spring アプリケーションでない場合など、応答に型メタデータがない場合でも、同じテンプレートが異なる型を受け取ることができます。以下は後者の例です。
Java
Kotlin
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
@Bean
fun template(
pf: ProducerFactory<String?, String>?,
factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
val template = ReplyingKafkaTemplate(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.defaultTopic = "requests"
return template
}
Java
Kotlin
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())
val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
返信型 Message<?>
@KafkaListener
が 2.5 より前のバージョンの Message<?>
を返す場合、応答トピックと相関 ID ヘッダーを設定する必要がありました。この例では、リクエストの返信トピックヘッダーを使用します。
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
これは、応答レコードにキーを設定する方法も示しています。
バージョン 2.5 以降、フレームワークはこれらのヘッダーが欠落しているかどうかを検出し、トピック(@SendTo
値から決定されたトピックまたは受信 KafkaHeaders.REPLY_TOPIC
ヘッダー(存在する場合))を入力します。また、存在する場合は、受信 KafkaHeaders.CORRELATION_ID
および KafkaHeaders.REPLY_PARTITION
をエコーします。
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.KEY, 42)
.build();
}
複数の返信を集約する
ReplyingKafkaTemplate
を使用するのテンプレートは、厳密には単一のリクエスト / 応答シナリオ用です。1 つのメッセージの複数の受信者が応答を返す場合は、AggregatingReplyingKafkaTemplate
を使用できます。これは、Scatter-Gather Enterprise Integration パターン (英語) のクライアント側の実装です。
ReplyingKafkaTemplate
と同様に、AggregatingReplyingKafkaTemplate
コンストラクターは、プロデューサーファクトリとリスナーコンテナーを使用して応答を受け取ります。応答を受信するたびに参照される 3 番目のパラメーター BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy
があります。述語が true
を返す場合、ConsumerRecord
のコレクションは、sendAndReceive
メソッドによって返される Future
を完了するために使用されます。
追加のプロパティ returnPartialOnTimeout
があります(デフォルトは false)。これが true
に設定されている場合、KafkaReplyTimeoutException
で future を完了する代わりに、部分的な結果が正常に future を完了します(少なくとも 1 つの応答レコードが受信されている場合)。
バージョン 2.3.5 以降、述語はタイムアウト後にも呼び出されます(returnPartialOnTimeout
が true
の場合)。最初の引数は、現在のレコードのリストです。この呼び出しがタイムアウトによるものである場合、2 番目は true
です。述語はレコードのリストを変更できます。
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
戻り値の型は、ConsumerRecord
のコレクションである値を持つ ConsumerRecord
であることに注意してください。「外部」 ConsumerRecord
は「実際の」レコードではなく、リクエストに対して受信した実際の応答レコードのホルダーとして、テンプレートによって合成されます。通常のリリースが発生すると(リリース戦略が true を返す)、トピックは aggregatedResults
に設定されます。returnPartialOnTimeout
が true で、タイムアウトが発生した場合(および、少なくとも 1 つの応答レコードが受信された場合)、トピックは partialResultsAfterTimeout
に設定されます。テンプレートは、これらの「トピック」名に一定の静的変数を提供します。
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
Collection
の実際の ConsumerRecord
には、応答を受信する実際のトピックが含まれています。
応答のリスナーコンテナーは AckMode.MANUAL または AckMode.MANUAL_IMMEDIATE で構成する必要があります。コンシューマープロパティ enable.auto.commit は false (バージョン 2.3 以降のデフォルト) である必要があります。メッセージが失われる可能性を回避するために、テンプレートは、未処理のリクエストがゼロの場合、つまり、最後の未処理リクエストがリリース戦略によってリリースされた場合にのみオフセットをコミットします。リバランス後は、重複した返信配信が発生する機能があります。これらは、実行中のリクエストでは無視されます。すでにリリースされた返信に対する重複した返信を受信すると、エラーログメッセージが表示される場合があります。 |
この集約テンプレートで ErrorHandlingDeserializer を使用する場合、フレームワークは DeserializationException を自動的に検出しません。代わりに、レコード (null 値を持つ) がヘッダーに逆直列化例外とともにそのまま返されます。アプリケーションは、ユーティリティメソッド ReplyingKafkaTemplate.checkDeserialization() メソッドを呼び出して、逆直列化例外が発生したかどうかを確認することをお勧めします。詳細については、JavaDocs を参照してください。replyErrorChecker も、この集約テンプレートでは呼び出されません。応答の各要素に対してチェックを実行する必要があります。 |