メッセージ送信

このセクションでは、メッセージの送信方法について説明します。

KafkaTemplate を使用する

このセクションでは、KafkaTemplate を使用してメッセージを送信する方法について説明します。

概要

KafkaTemplate はプロデューサーをラップし、Kafka トピックにデータを送信するための便利なメソッドを提供します。次のリストは、KafkaTemplate の関連するメソッドを示しています。

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

詳細については、Javadoc を参照してください。

バージョン 3.0 では、以前は ListenableFuture を返していたメソッドが CompletableFuture を返すように変更されました。移行を容易にするために、2.9 バージョンはメソッド usingCompletableFuture() を追加しました。これは、CompletableFuture 戻り値の型で同じメソッドを提供しました。この方法は使用できなくなりました。

sendDefault API では、デフォルトのトピックがテンプレートに提供されている必要があります。

API は timestamp をパラメーターとして受け取り、このタイムスタンプをレコードに保存します。ユーザー指定のタイムスタンプがどのように保存されるかは、Kafka トピックで構成されたタイムスタンプの型によって異なります。トピックが CREATE_TIME を使用するように構成されている場合、ユーザー指定のタイムスタンプが記録されます (指定されていない場合は生成されます)。トピックが LOG_APPEND_TIME を使用するように構成されている場合、ユーザー指定のタイムスタンプは無視され、ブローカーはローカルブローカー時間を追加します。

metrics メソッドと partitionsFor メソッドは、基盤となる Producer [Apache] (英語) の同じメソッドに委譲します。execute メソッドは、基盤となる Producer [Apache] (英語) への直接アクセスを提供します。

テンプレートを使用するには、プロデューサーファクトリを構成し、テンプレートのコンストラクターで提供します。次の例は、その方法を示しています。

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

バージョン 2.5 以降、ファクトリの ProducerConfig プロパティをオーバーライドして、同じファクトリから異なるプロデューサー構成でテンプレートを作成できるようになりました。

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

型 ProducerFactory<?, ?> の Bean(Spring Boot によって自動構成されたものなど)は、さまざまなナロージェネリクス型で参照できることに注意してください。

標準の <bean/> 定義を使用してテンプレートを構成することもできます。

次に、テンプレートを使用するために、そのメソッドの 1 つを呼び出すことができます。

Message<?> パラメーターを指定してメソッドを使用すると、トピック、パーティション、キー、タイムスタンプの情報が、次の項目を含むメッセージヘッダーで提供されます。

  • KafkaHeaders.TOPIC

  • KafkaHeaders.PARTITION

  • KafkaHeaders.KEY

  • KafkaHeaders.TIMESTAMP

メッセージペイロードはデータです。

オプションで、Future が完了するのを待つ代わりに、送信の結果(成功または失敗)を含む非同期コールバックを取得するように ProducerListener を使用して KafkaTemplate を構成できます。次のリストは、ProducerListener インターフェースの定義を示しています。

public interface ProducerListener<K, V> {

    void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);

    void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
            Exception exception);

}

デフォルトでは、テンプレートは LoggingProducerListener で構成されています。LoggingProducerListener はエラーをログに記録し、送信が成功しても何もしません。

便宜上、メソッドの 1 つだけを実装する場合に備えて、デフォルトのメソッド実装が提供されています。

send メソッドが CompletableFuture<SendResult> を返すことに注意してください。コールバックをリスナーに登録して、送信の結果を非同期で受信できます。次の例は、その方法を示しています。

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

SendResult には、ProducerRecord と RecordMetadata の 2 つのプロパティがあります。これらのオブジェクトについては、Kafka API のドキュメントを参照してください。

Throwable は KafkaProducerException にキャストできます。その producerRecord プロパティには、失敗したレコードが含まれています。

結果を待つために送信スレッドをブロックしたい場合は、future の get() メソッドを呼び出すことができます。タイムアウトのあるメソッドを使用することをお勧めします。linger.ms を設定した場合は、待機する前に flush() を呼び出すか、便宜上、テンプレートに autoFlush パラメーターを持つコンストラクターがあり、送信ごとにテンプレートが flush() になるようにすることができます。フラッシングが必要になるのは、linger.ms プロデューサープロパティを設定していて、部分的なバッチをすぐに送信したい場合だけです。

サンプル

このセクションでは、Kafka にメッセージを送信する例を示します。

例 1: ノンブロッキング (非同期)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<Integer, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
ブロッキング (同期化)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

ExecutionException の原因は、producerRecord プロパティを持つ KafkaProducerException であることに注意してください。

RoutingKafkaTemplate を使用する

バージョン 2.5 以降では、RoutingKafkaTemplate を使用して、宛先 topic 名に基づいて実行時にプロデューサーを選択できます。

ルーティングテンプレートは、トランザクション、executeflushmetrics 操作についてトピックが不明であるため、これらの操作をサポートしていません。

テンプレートには、java.util.regex.Pattern から ProducerFactory<Object, Object> インスタンスへのマップが必要です。このマップは順番にトラバースされるため、順序付けする必要があります(LinkedHashMap など)。最初に、より具体的なパターンを追加する必要があります。

次の単純な Spring Boot アプリケーションは、同じテンプレートを使用して、それぞれが異なる値のシリアライザーを使用して、異なるトピックに送信する方法の例を示しています。

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

この例に対応する @KafkaListener は、アノテーションプロパティに示されています。

同様の結果を達成するための別の手法については、同じトピックに異なる型を送信する追加機能を使用して、シリアライザーとデシリアライザーの委譲を参照してください。

DefaultKafkaProducerFactory を使用する

KafkaTemplate を使用するに見られるように、ProducerFactory はプロデューサーを作成するために使用されます。

トランザクションを使用しない場合、KafkaProducer JavaDocs で推奨されているように、デフォルトで DefaultKafkaProducerFactory はすべてのクライアントによって使用されるシングルトンプロデューサーを作成します。ただし、テンプレートで flush() を呼び出すと、同じプロデューサーを使用する他のスレッドに遅延が発生する可能性があります。バージョン 2.3 以降、DefaultKafkaProducerFactory には新しいプロパティ producerPerThread があります。true に設定すると、この課題を回避するために、ファクトリはスレッドごとに個別のプロデューサーを作成 (およびキャッシュ) します。

producerPerThread が true ある場合はプロデューサーが不要になった際に、ユーザーコードは、提供時に closeThreadBoundProducer() を呼び出す必要があります。これにより、プロデューサーが物理的に閉じられ、ThreadLocal から削除されます。reset() または destroy() を呼び出しても、これらのプロデューサーはクリーンアップされません。

DefaultKafkaProducerFactory を作成する場合、プロパティのマップのみを取り込むコンストラクター(KafkaTemplate の使用の例を参照)を呼び出すことにより、構成からキーや値の Serializer クラスを取得できます。または、Serializer インスタンスを DefaultKafkaProducerFactory コンストラクターに渡すこともできます(すべての Producer が同じインスタンスを共有する場合)。または、Producer ごとに個別の Serializer インスタンスを取得するために使用される Supplier<Serializer>(バージョン 2.3 以降)を提供することもできます。

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

バージョン 2.5.10 以降、ファクトリの作成後にプロデューサープロパティを更新できるようになりました。これは、たとえば、資格情報の変更後に SSL キー / トラストストアの場所を更新する必要がある場合に役立つことがあります。変更は既存のプロデューサーインスタンスには影響しません。reset() を呼び出して既存のプロデューサーを閉じ、新しいプロパティを使用して新しいプロデューサーが作成されるようにします。注: トランザクションプロデューサーファクトリを非トランザクションに変更したり、その逆を行ったりすることはできません。

2 つの新しいメソッドが提供されるようになりました。

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

バージョン 2.8 以降、シリアライザーをオブジェクトとして(コンストラクター内または setter 経由で)提供する場合、ファクトリは configure() メソッドを呼び出して、構成プロパティを使用して構成します。

ReplyingKafkaTemplate を使用する

バージョン 2.1.3 は、リクエスト / 応答セマンティクスを提供するために KafkaTemplate のサブクラスを導入しました。このクラスの名前は ReplyingKafkaTemplate で、2 つの追加メソッドがあります。以下にメソッドシグネチャーを示します。

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

Message<?> を使用したリクエスト / 応答も参照してください)。

結果は CompletableFuture であり、結果(またはタイムアウトの場合は例外)が非同期で入力されます。結果には、KafkaTemplate.send() を呼び出した結果である sendFuture プロパティも含まれます。この future を使用して、送信操作の結果を判別できます。

バージョン 3.0 では、これらのメソッド (およびその sendFuture プロパティ) によって返される先物は、ListenableFuture ではなく CompletableFuture に変更されました。

最初の方法が使用される場合、または replyTimeout 引数が null の場合、テンプレートの defaultReplyTimeout プロパティが使用されます(デフォルトでは 5 秒)。

バージョン 2.8.8 以降、テンプレートには新しいメソッド waitForAssignment があります。これは、応答コンテナーが auto.offset.reset=latest で構成されている場合に役立ち、コンテナーが初期化される前にリクエストと応答が送信されないようにします。

手動のパーティション割り当て(グループ管理なし)を使用する場合、最初のポーリングが完了するまで通知が送信されないため、待機時間はコンテナーの pollTimeout プロパティよりも長くする必要があります。

次の Spring Boot アプリケーションは、この機能の使用方法の例を示しています。

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

Boot の自動構成されたコンテナーファクトリを使用して応答コンテナーを作成できることに注意してください。

重要なデシリアライザーが応答に使用されている場合は、構成済みのデシリアライザーに委譲する ErrorHandlingDeserializer の使用を検討してください。このように構成すると、RequestReplyFuture は例外的に完了し、DeserializationException を cause プロパティに含めることで ExecutionException をキャッチできます。

バージョン 2.6.7 以降、DeserializationException の検出に加えて、テンプレートは、提供されている場合、replyErrorChecker 関数を呼び出します。例外が返された場合、将来は例外的に完了します。

次に例を示します。

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

テンプレートはヘッダー(デフォルトでは KafkaHeaders.CORRELATION_ID という名前)を設定します。これはサーバー側でエコーバックする必要があります。

この場合、次の @KafkaListener アプリケーションが応答します。

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

@KafkaListener インフラストラクチャは相関 ID をエコーし、応答トピックを決定します。

返信の送信の詳細については、@SendTo を使用したリスナー結果の転送を参照してください。テンプレートは、デフォルトのヘッダー KafKaHeaders.REPLY_TOPIC を使用して、返信先のトピックを示します。

バージョン 2.2 以降、テンプレートは、構成された応答コンテナーから応答トピックまたはパーティションを検出しようとします。コンテナーが単一のトピックまたは単一の TopicPartitionOffset をリッスンするように構成されている場合、コンテナーは応答ヘッダーを設定するために使用されます。コンテナーが別の方法で構成されている場合、ユーザーは応答ヘッダーを設定する必要があります。この場合、INFO ログメッセージが初期化中に書き込まれます。次の例では、KafkaHeaders.REPLY_TOPIC を使用しています。

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

単一の応答 TopicPartitionOffset で構成する場合、各インスタンスが異なるパーティションでリッスンする限り、複数のテンプレートに同じ応答トピックを使用できます。単一の応答トピックで構成する場合、各インスタンスは異なる group.id を使用する必要があります。この場合、すべてのインスタンスが各応答を受信しますが、リクエストを送信したインスタンスのみが相関 ID を検出します。これは自動スケーリングに役立つ場合がありますが、追加のネットワークトラフィックのオーバーヘッドと、不要な応答を破棄するためのわずかなコストが伴います。この設定を使用する場合は、テンプレートの sharedReplyTopic を true に設定することをお勧めします。これにより、デフォルトの ERROR ではなく DEBUG への予期しない応答のログレベルが低下します。

以下は、同じ共有返信トピックを使用するように返信コンテナーを構成する例です。

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
複数のクライアントインスタンスがあり、前の段落で説明したように構成しない場合、各インスタンスには専用の応答トピックが必要です。別の方法は、KafkaHeaders.REPLY_PARTITION を設定し、インスタンスごとに専用のパーティションを使用することです。Header には、4 バイトの int(ビッグエンディアン)が含まれています。サーバーはこのヘッダーを使用して、応答を正しいパーティションにルーティングする必要があります(@KafkaListener がこれを行います)。ただし、この場合、応答コンテナーは Kafka のグループ管理機能を使用してはならず、固定パーティションでリッスンするように構成する必要があります(ContainerProperties コンストラクターで TopicPartitionOffset を使用することにより)。
DefaultKafkaHeaderMapper では、Jackson がクラスパス上にある必要があります(@KafkaListener の場合)。使用できない場合、メッセージコンバーターにはヘッダーマッパーがないため、前に示したように、SimpleKafkaHeaderMapper を使用して MessagingMessageConverter を構成する必要があります。

デフォルトでは、3 つのヘッダーが使用されます。

  • KafkaHeaders.CORRELATION_ID - 応答をリクエストに関連付けるために使用されます

  • KafkaHeaders.REPLY_TOPIC - サーバーに返信先を指示するために使用されます

  • KafkaHeaders.REPLY_PARTITION - (オプション)どのパーティションに応答するかをサーバーに指示するために使用されます

これらのヘッダー名は、@KafkaListener インフラストラクチャーが応答をルーティングするために使用します。

バージョン 2.3 以降、ヘッダー名をカスタマイズできます。テンプレートには 3 つのプロパティ correlationHeaderNamereplyTopicHeaderNamereplyPartitionHeaderName があります。これは、サーバーが Spring アプリケーションではない(または @KafkaListener を使用していない)場合に役立ちます。

逆に、リクエスト元のアプリケーションが Spring アプリケーションではなく、バージョン 3.0 以降の別のヘッダーに相関情報を配置する場合、リスナーコンテナーファクトリでカスタム correlationHeaderName を構成すると、そのヘッダーがエコーバックされます。以前は、リスナーはカスタム相関ヘッダーをエコーする必要がありました。

Message<?> を使用したリクエスト / 応答

バージョン 2.7 は、spring-messaging の Message<?> 抽象化を送受信するためのメソッドを ReplyingKafkaTemplate に追加しました。

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

これらはテンプレートのデフォルトの replyTimeout を使用します。また、メソッド呼び出しでタイムアウトが発生する可能性のあるオーバーロードされたバージョンもあります。

バージョン 3.0 では、これらのメソッド (およびその sendFuture プロパティ) によって返される先物は、ListenableFuture ではなく CompletableFuture に変更されました。

コンシューマーの Deserializer またはテンプレートの MessageConverter が、構成を介して、または応答メッセージにメタデータを入力することにより、追加情報なしでペイロードを変換できる場合は、最初の方法を使用します。

メッセージコンバーターを支援するために、戻り値の型の型情報を提供する必要がある場合は、2 番目の方法を使用してください。これにより、サーバー側が Spring アプリケーションでない場合など、応答に型メタデータがない場合でも、同じテンプレートが異なる型を受け取ることができます。以下は後者の例です。

テンプレート Bean
  • Java

  • Kotlin

@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}
@Bean
fun template(
    pf: ProducerFactory<String?, String>?,
    factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.groupId = "request.replies"
    val template = ReplyingKafkaTemplate(pf, replyContainer)
    template.messageConverter = ByteArrayJsonMessageConverter()
    template.defaultTopic = "requests"
    return template
}
テンプレートの使用
  • Java

  • Kotlin

RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
        object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
        object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })

返信型 Message<?>

@KafkaListener が 2.5 より前のバージョンの Message<?> を返す場合、応答トピックと相関 ID ヘッダーを設定する必要がありました。この例では、リクエストの返信トピックヘッダーを使用します。

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

これは、応答レコードにキーを設定する方法も示しています。

バージョン 2.5 以降、フレームワークはこれらのヘッダーが欠落しているかどうかを検出し、トピック(@SendTo 値から決定されたトピックまたは受信 KafkaHeaders.REPLY_TOPIC ヘッダー(存在する場合))を入力します。また、存在する場合は、受信 KafkaHeaders.CORRELATION_ID および KafkaHeaders.REPLY_PARTITION をエコーします。

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

複数の返信を集約する

ReplyingKafkaTemplate を使用するのテンプレートは、厳密には単一のリクエスト / 応答シナリオ用です。1 つのメッセージの複数の受信者が応答を返す場合は、AggregatingReplyingKafkaTemplate を使用できます。これは、Scatter-Gather Enterprise Integration パターン (英語) のクライアント側の実装です。

ReplyingKafkaTemplate と同様に、AggregatingReplyingKafkaTemplate コンストラクターは、プロデューサーファクトリとリスナーコンテナーを使用して応答を受け取ります。応答を受信するたびに参照される 3 番目のパラメーター BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy があります。述語が true を返す場合、ConsumerRecord のコレクションは、sendAndReceive メソッドによって返される Future を完了するために使用されます。

追加のプロパティ returnPartialOnTimeout があります(デフォルトは false)。これが true に設定されている場合、KafkaReplyTimeoutException で future を完了する代わりに、部分的な結果が正常に future を完了します(少なくとも 1 つの応答レコードが受信されている場合)。

バージョン 2.3.5 以降、述語はタイムアウト後にも呼び出されます(returnPartialOnTimeout が true の場合)。最初の引数は、現在のレコードのリストです。この呼び出しがタイムアウトによるものである場合、2 番目は true です。述語はレコードのリストを変更できます。

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

戻り値の型は、ConsumerRecord のコレクションである値を持つ ConsumerRecord であることに注意してください。「外部」 ConsumerRecord は「実際の」レコードではなく、リクエストに対して受信した実際の応答レコードのホルダーとして、テンプレートによって合成されます。通常のリリースが発生すると(リリース戦略が true を返す)、トピックは aggregatedResults に設定されます。returnPartialOnTimeout が true で、タイムアウトが発生した場合(および、少なくとも 1 つの応答レコードが受信された場合)、トピックは partialResultsAfterTimeout に設定されます。テンプレートは、これらの「トピック」名に一定の静的変数を提供します。

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

Collection の実際の ConsumerRecord には、応答を受信する実際のトピックが含まれています。

応答のリスナーコンテナーは AckMode.MANUAL または AckMode.MANUAL_IMMEDIATE で構成する必要があります。コンシューマープロパティ enable.auto.commit は false (バージョン 2.3 以降のデフォルト) である必要があります。メッセージが失われる可能性を回避するために、テンプレートは、未処理のリクエストがゼロの場合、つまり、最後の未処理リクエストがリリース戦略によってリリースされた場合にのみオフセットをコミットします。リバランス後は、重複した返信配信が発生する機能があります。これらは、実行中のリクエストでは無視されます。すでにリリースされた返信に対する重複した返信を受信すると、エラーログメッセージが表示される場合があります。
この集約テンプレートで ErrorHandlingDeserializer を使用する場合、フレームワークは DeserializationException を自動的に検出しません。代わりに、レコード (null 値を持つ) がヘッダーに逆直列化例外とともにそのまま返されます。アプリケーションは、ユーティリティメソッド ReplyingKafkaTemplate.checkDeserialization() メソッドを呼び出して、逆直列化例外が発生したかどうかを確認することをお勧めします。詳細については、JavaDocs を参照してください。replyErrorChecker も、この集約テンプレートでは呼び出されません。応答の各要素に対してチェックを実行する必要があります。