クラスの @KafkaListener
クラスレベルで @KafkaListener を使用する場合は、メソッドレベルで @KafkaHandler を指定する必要があります。このクラスまたはそのサブクラスのメソッドに @KafkaHandler が指定されていない場合、フレームワークはそのような設定を拒否します。メソッドの目的を明確かつ簡潔にするには、@KafkaHandler アノテーションが必要です。そうでない場合、追加の制約なしにこのメソッドまたは他のメソッドについて判断することが困難になります。
メッセージが配信されると、変換されたメッセージペイロード型に基づいて、呼び出すメソッドが決定されます。次の例は、その方法を示しています。
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
} バージョン 2.1.3 以降、他のメソッドに一致するものがない場合に呼び出されるデフォルトのメソッドとして @KafkaHandler メソッドを指定できます。せいぜい 1 つのメソッドをそのように指定することができます。@KafkaHandler メソッドを使用する場合、ペイロードはすでにドメインオブジェクトに変換されている必要があります(一致を実行できるようにするため)。カスタムデシリアライザー、JacksonJsonDeserializer、TypePrecedence を TYPE_ID に設定した JacksonJsonMessageConverter を使用します。詳細については、直列化、逆直列化、メッセージ変換を参照してください。
Spring がメソッド引数を解決する方法にいくつかの制限があるため、デフォルトの @KafkaHandler は個別のヘッダーを受信できません。コンシューマーレコードのメタデータに従って、ConsumerRecordMetadata を使用する必要があります。 |
例:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
} オブジェクトが String の場合、これは機能しません。topic パラメーターも object への参照を取得します。
デフォルトの方法でレコードに関するメタデータが必要な場合は、次を使用します。
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
} また、これもうまく動作しません。topic は payload に解決されます。
@KafkaHandler(isDefault = true)
public void listenDefault(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// payload.equals(topic) is True.
...
}デフォルトメソッドで個別のカスタムヘッダーが必要なユースケースがある場合は、次のようにします。
@KafkaHandler(isDefault = true)
void listenDefault(String payload, @Headers Map<String, Object> headers) {
Object myValue = headers.get("MyCustomHeader");
...
}