クラスの @KafkaListener 

クラスレベルで @KafkaListener を使用する場合は、メソッドレベルで @KafkaHandler を指定する必要があります。メッセージが配信されると、変換されたメッセージペイロード型を使用して、呼び出すメソッドが決定されます。次の例は、その方法を示しています。

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String foo) {
        ...
    }

    @KafkaHandler
    public void listen(Integer bar) {
        ...
    }

    @KafkaHandler(isDefault = true)
    public void listenDefault(Object object) {
        ...
    }

}

バージョン 2.1.3 以降、他のメソッドに一致するものがない場合に呼び出されるデフォルトのメソッドとして @KafkaHandler メソッドを指定できます。せいぜい 1 つのメソッドをそのように指定することができます。@KafkaHandler メソッドを使用する場合、ペイロードはすでにドメインオブジェクトに変換されている必要があります(一致を実行できるようにするため)。カスタムデシリアライザー、JsonDeserializerTypePrecedence を TYPE_ID に設定した JsonMessageConverter を使用します。詳細については、直列化、逆直列化、メッセージ変換を参照してください。

Spring がメソッド引数を解決する方法にいくつかの制限があるため、デフォルトの @KafkaHandler は個別のヘッダーを受信できません。コンシューマーレコードのメタデータに従って、ConsumerRecordMetadata を使用する必要があります。

例:

@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    ...
}

オブジェクトが String の場合、これは機能しません。topic パラメーターも object への参照を取得します。

デフォルトの方法でレコードに関するメタデータが必要な場合は、次を使用します。

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
    String topic = meta.topic();
    ...
}