クラスの @KafkaListener
クラスレベルで @KafkaListener
を使用する場合は、メソッドレベルで @KafkaHandler
を指定する必要があります。メッセージが配信されると、変換されたメッセージペイロード型を使用して、呼び出すメソッドが決定されます。次の例は、その方法を示しています。
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
バージョン 2.1.3 以降、他のメソッドに一致するものがない場合に呼び出されるデフォルトのメソッドとして @KafkaHandler
メソッドを指定できます。せいぜい 1 つのメソッドをそのように指定することができます。@KafkaHandler
メソッドを使用する場合、ペイロードはすでにドメインオブジェクトに変換されている必要があります(一致を実行できるようにするため)。カスタムデシリアライザー、JsonDeserializer
、TypePrecedence
を TYPE_ID
に設定した JsonMessageConverter
を使用します。詳細については、直列化、逆直列化、メッセージ変換を参照してください。
Spring がメソッド引数を解決する方法にいくつかの制限があるため、デフォルトの @KafkaHandler は個別のヘッダーを受信できません。コンシューマーレコードのメタデータに従って、ConsumerRecordMetadata を使用する必要があります。 |
例:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
オブジェクトが String
の場合、これは機能しません。topic
パラメーターも object
への参照を取得します。
デフォルトの方法でレコードに関するメタデータが必要な場合は、次を使用します。
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}