@SendTo を使用したリスナー結果の転送

バージョン 2.0 以降、@KafkaListener にも @SendTo アノテーションを付け、メソッド呼び出しが結果を返す場合、結果は @SendTo で指定されたトピックに転送されます。

@SendTo 値には、いくつかの形式があります。

  • @SendTo("someTopic") はリテラルトピックにルーティングします。

  • @SendTo("#{someExpression}") は、アプリケーションコンテキストの初期化中に式を 1 回評価することによって決定されたトピックにルーティングします。

  • @SendTo("!{someExpression}") は、実行時に式を評価することによって決定されたトピックにルーティングします。評価用の #root オブジェクトには、次の 3 つのプロパティがあります。

    • request: 受信 ConsumerRecord (またはバッチリスナーの ConsumerRecords オブジェクト)。

    • sourcerequest から変換された org.springframework.messaging.Message<?>

    • result: メソッドは結果を返します。

  • @SendTo (プロパティなし): これは !{source.headers['kafka_replyTopic']} として扱われます(バージョン 2.1.3 以降)。

バージョン 2.1.11 および 2.2.1 以降、プロパティプレースホルダーは @SendTo 値内で解決されます。

式の評価の結果は、トピック名を表す String である必要があります。次の例は、@SendTo を使用するさまざまな方法を示しています。

@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
    ...
}

@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
    ...
}

@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
    ...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {

    @KafkaHandler
    public String foo(String in) {
        ...
    }

    @KafkaHandler
    @SendTo("!{'annotated25reply2'}")
    public String bar(@Payload(required = false) KafkaNull nul,
            @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}
@SendTo をサポートするには、リスナーコンテナーファクトリに、応答の送信に使用される KafkaTemplate (replyTemplate プロパティ内) が提供される必要があります。これは、クライアント側でリクエスト / 応答処理に使用される ReplyingKafkaTemplate ではなく、KafkaTemplate である必要があります。Spring Boot を使用すると、提供時にテンプレートが自動的に構成されます。独自のファクトリを構成する場合は、以下の例に示すように設定する必要があります。

バージョン 2.2 以降、リスナーコンテナーファクトリに ReplyHeadersConfigurer を追加できます。これを参照して、応答メッセージに設定するヘッダーを決定します。次の例は、ReplyHeadersConfigurer を追加する方法を示しています。

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
    return factory;
}

必要に応じて、ヘッダーを追加することもできます。次の例は、その方法を示しています。

@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(cf());
    factory.setReplyTemplate(template());
    factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {

      @Override
      public boolean shouldCopy(String headerName, Object headerValue) {
        return false;
      }

      @Override
      public Map<String, Object> additionalHeaders() {
        return Collections.singletonMap("qux", "fiz");
      }

    });
    return factory;
}

@SendTo を使用する場合、送信を実行するには、replyTemplate プロパティで KafkaTemplate を使用して ConcurrentKafkaListenerContainerFactory を構成する必要があります。Spring Boot は、自動構成されたテンプレート (または単一のインスタンスが存在する場合は任意) に自動的に接続します。

リクエスト / 応答のセマンティクスを使用しない限り、単純な send(topic, value) メソッドのみが使用されるため、パーティションまたはキーを生成するサブクラスを作成することをお勧めします。次の例は、その方法を示しています。
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory()) {

        @Override
        public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
            return super.send(topic, partitionForData(data), keyForData(data), data);
        }

        ...

    };
}

リスナーメソッドが Message<?> または Collection<Message<?>> を返す場合、リスナーメソッドは応答のメッセージヘッダーを設定する責任があります。例: ReplyingKafkaTemplate からのリクエストを処理する場合、次のようにすることができます。

@KafkaListener(id = "messageReturned", topics = "someTopic")
public Message<?> listen(String in, @Header(KafkaHeaders.REPLY_TOPIC) byte[] replyTo,
        @Header(KafkaHeaders.CORRELATION_ID) byte[] correlation) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .setHeader("someOtherHeader", "someValue")
            .build();
}

リクエスト / 応答セマンティクスを使用する場合、送信者はターゲットパーティションをリクエストできます。

結果が返されない場合でも、@KafkaListener メソッドに @SendTo でアノテーションを付けることができます。これは、失敗したメッセージ配信に関する情報を特定のトピックに転送できる errorHandler の構成を可能にするためです。次の例は、その方法を示しています。

@KafkaListener(id = "voidListenerWithReplyingErrorHandler", topics = "someTopic",
        errorHandler = "voidSendToErrorHandler")
@SendTo("failures")
public void voidListenerWithReplyingErrorHandler(String in) {
    throw new RuntimeException("fail");
}

@Bean
public KafkaListenerErrorHandler voidSendToErrorHandler() {
    return (m, e) -> {
        return ... // some information about the failure and input data
    };
}

詳細については、例外の処理を参照してください。

リスナーメソッドが Iterable を返す場合、デフォルトでは、値が送信されるときに各要素のレコードが返されます。バージョン 2.3.5 以降、@KafkaListener の splitIterables プロパティを false に設定すると、結果全体が単一の ProducerRecord の値として送信されます。これには、応答テンプレートのプロデューサー構成に適切なシリアライザーが必要です。ただし、応答が Iterable<Message<?>> の場合、プロパティは無視され、各メッセージは個別に送信されます。