@KafkaListener
アノテーション
@KafkaListener
アノテーションは、Bean メソッドをリスナーコンテナーのリスナーとして指定するために使用されます。Bean は、メソッドパラメーターに一致するように必要に応じてデータを変換するコンバーターなど、さまざまな機能で構成された MessagingMessageListenerAdapter
にラップされています。
#{…}
またはプロパティプレースホルダー(${…}
)を使用して、SpEL でアノテーションのほとんどの属性を構成できます。詳細については、Javadoc を参照してください。
レコードリスナー
@KafkaListener
アノテーションは、単純な POJO リスナーにメカニズムを提供します。次の例は、その使用方法を示しています。
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
このメカニズムでは、@Configuration
クラスの 1 つに @EnableKafka
アノテーションが必要であり、リスナーコンテナーファクトリが必要です。これは、基盤となる ConcurrentMessageListenerContainer
を構成するために使用されます。デフォルトでは、kafkaListenerContainerFactory
という名前の Bean が予期されています。次の例は、ConcurrentMessageListenerContainer
の使用方法を示しています。
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
return props;
}
}
コンテナーのプロパティを設定するには、ファクトリで getContainerProperties()
メソッドを使用する必要があることに注意してください。これは、コンテナーに注入された実際のプロパティのテンプレートとして使用されます。
バージョン 2.1.1 以降、アノテーションによって作成されたコンシューマーの client.id
プロパティを設定できるようになりました。clientIdPrefix
の接尾辞は -n
です。ここで、n
は、並行性を使用する場合のコンテナー番号を表す整数です。
バージョン 2.2 以降、アノテーション自体のプロパティを使用して、コンテナーファクトリの concurrency
および autoStartup
プロパティをオーバーライドできるようになりました。プロパティには、単純な値、プロパティプレースホルダー、SpEL 式を使用できます。次の例は、その方法を示しています。
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
明示的なパーティション割り当て
明示的なトピックとパーティション(およびオプションでそれらの初期オフセット)を使用して POJO リスナーを構成することもできます。次の例は、その方法を示しています。
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
partitions
または partitionOffsets
属性で各パーティションを指定できますが、両方を指定することはできません。
ほとんどのアノテーションプロパティと同様に、SpEL 式を使用できます。パーティションの大規模なリストを生成する方法の例については、すべてのパーティションを手動で割り当てるを参照してください。
バージョン 2.5.5 以降、割り当てられたすべてのパーティションに初期オフセットを適用できます。
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
*
ワイルドカードは、partitions
属性のすべてのパーティションを表します。各 @TopicPartition
には、ワイルドカードを使用した @PartitionOffset
が 1 つだけ存在する必要があります。
さらに、リスナーが ConsumerSeekAware
を実装すると、手動割り当てを使用している場合でも、onPartitionsAssigned
が呼び出されるようになりました。これにより、たとえば、その時点で任意のシーク操作が可能になります。
バージョン 2.6.4 以降では、コンマで区切られたパーティションのリスト、またはパーティション範囲を指定できます。
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
範囲は包括的です。上記の例では、パーティション 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15
が割り当てられます。
初期オフセットを指定するときにも同じ手法を使用できます。
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初期オフセットは、6 つのパーティションすべてに適用されます。
3.2 以降、@PartitionOffset
は SeekPosition.END
、SeekPosition.BEGINNING
、SeekPosition.TIMESTAMP
、seekPosition
をサポートし、SeekPosition
列挙名と一致します:
@KafkaListener(id = "seekPositionTime", topicPartitions = {
@TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
@PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
@PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
})
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
seekPosition が END
または BEGINNING
を設定すると、initialOffset
と relativeToCurrent
は無視されます。seekPosition が TIMESTAMP
を設定すると、initialOffset
はタイムスタンプを意味します。
手動による確認
手動 AckMode
を使用する場合、リスナーに Acknowledgment
を提供することもできます。手動 AckMode
をアクティブにするには、ContainerProperties
の ack-mode を適切な手動モードに設定する必要があります。次の例では、別のコンテナーファクトリの使用方法も示しています。このカスタムコンテナーファクトリは、getContainerProperties()
を呼び出して AckMode
を手動型に設定し、次に setAckMode
を呼び出す必要があります。そうしないと、Acknowledgment
オブジェクトは null になります。
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
コンシューマーレコードのメタデータ
最後に、レコードに関するメタデータはメッセージヘッダーから入手できます。次のヘッダー名を使用して、メッセージのヘッダーを取得できます。
KafkaHeaders.OFFSET
KafkaHeaders.RECEIVED_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE
バージョン 2.5 以降、受信レコードに null
キーがある場合、RECEIVED_KEY
は存在しません。以前は、ヘッダーに null
値が入力されていました。この変更は、null
値のヘッダーが存在しない spring-messaging
規則とフレームワークを一致させるためです。
次の例は、ヘッダーの使用方法を示しています。
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
パラメーターアノテーション (@Payload 、@Header ) は、リスナーメソッドの具体的な実装で指定する必要があります。インターフェースで定義されている場合は検出されません。 |
バージョン 2.5 以降では、個別のヘッダーを使用する代わりに、ConsumerRecordMetadata
パラメーターでレコードメタデータを受け取ることができます。
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
これには、キーと値を除く ConsumerRecord
からのすべてのデータが含まれます。
バッチリスナー
バージョン 1.1 以降では、コンシューマーポーリングから受信したコンシューマーレコードのバッチ全体を受信するように @KafkaListener
メソッドを構成できます。
ノンブロッキング再試行はバッチリスナーではサポートされていません。 |
バッチリスナーを作成するようにリスナーコンテナーファクトリを構成するには、batchListener
プロパティを設定します。次の例は、その方法を示しています。
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
バージョン 2.8 以降では、@KafkaListener アノテーションの batch プロパティを使用して、ファクトリの batchListener プロパティをオーバーライドできます。これとコンテナーエラーハンドラーの変更により、同じファクトリをレコードリスナーとバッチリスナーの両方で使用できるようになります。 |
バージョン 2.9.6 以降、コンテナーファクトリには、recordMessageConverter および batchMessageConverter プロパティ用の個別の setter があります。以前は、レコードリスナーとバッチリスナーの両方に適用されるプロパティ messageConverter が 1 つしかありませんでした。 |
次の例は、ペイロードのリストを受信する方法を示しています。
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
トピック、パーティション、オフセットなどは、ペイロードと並列のヘッダーで使用できます。次の例は、ヘッダーの使用方法を示しています。
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
または、各メッセージに各オフセットとその他の詳細が含まれる Message<?>
オブジェクトの List
を受け取ることもできますが、メソッドで定義されている唯一のパラメーター(手動コミットを使用する場合のオプションの Acknowledgment
や Consumer<?, ?>
パラメーターを除く)である必要があります。次の例は、その方法を示しています。
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
この場合、ペイロードに対して変換は実行されません。
BatchMessagingMessageConverter
が RecordMessageConverter
で構成されている場合は、ジェネリクス型を Message
パラメーターに追加して、ペイロードを変換することもできます。詳細については、バッチリスナーを使用したペイロード変換を参照してください。
ConsumerRecord<?, ?>
オブジェクトのリストを受け取ることもできますが、メソッドで定義されている唯一のパラメーター(手動コミットおよび Consumer<?, ?>
パラメーターを使用する場合のオプションの Acknowledgment
を除く)である必要があります。次の例は、その方法を示しています。
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
バージョン 2.2 以降、リスナーは poll()
メソッドによって返される完全な ConsumerRecords<?, ?>
オブジェクトを受信できるため、リスナーは partitions()
(リスト内の TopicPartition
インスタンスを返す)や records(TopicPartition)
(選択レコードを取得する)などの追加のメソッドにアクセスできます。繰り返しますが、これはメソッドの唯一のパラメーター(手動コミットまたは Consumer<?, ?>
パラメーターを使用する場合のオプションの Acknowledgment
を除く)である必要があります。次の例は、その方法を示しています。
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
コンテナーファクトリに RecordFilterStrategy が構成されている場合、ConsumerRecords<?, ?> リスナーでは無視され、WARN ログメッセージが発行されます。<List<?>> 形式のリスナーが使用されている場合、レコードはバッチリスナーでのみフィルタリングできます。デフォルトでは、レコードは一度に 1 つずつフィルタリングされます。バージョン 2.8 以降では、filterBatch をオーバーライドして、1 回の呼び出しでバッチ全体をフィルタリングできます。 |
アノテーションプロパティ
バージョン 2.0 以降、id
プロパティ(存在する場合)が Kafka コンシューマー group.id
プロパティとして使用され、コンシューマーファクトリで構成されたプロパティが存在する場合はオーバーライドされます。groupId
を明示的に設定するか、idIsGroup
を false に設定して、コンシューマーファクトリ group.id
を使用する以前の動作を復元することもできます。
次の例に示すように、ほとんどのアノテーションプロパティ内でプロパティプレースホルダーまたは SpEL 式を使用できます。
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
バージョン 2.1.2 以降、SpEL 式は特別なトークン __listener
をサポートします。これは、このアノテーションが存在する現在の Bean インスタンスを表す疑似 Bean 名です。
次の例を考えてみましょう。
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
前の例の Bean が与えられた場合、次を使用できます。
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
万が一、__listener
という実際の Bean がある場合は、beanRef
属性を使用して式トークンを変更できます。次の例は、その方法を示しています。
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
バージョン 2.2.4 以降、Kafka コンシューマープロパティをアノテーションに直接指定できます。これらは、コンシューマーファクトリで構成された同じ名前のプロパティをオーバーライドします。この方法で group.id
および client.id
プロパティを指定することはできません。それらは無視されます。それらには groupId
および clientIdPrefix
アノテーションプロパティを使用します。
次の例に示すように、プロパティは、通常の Java Properties
ファイル形式 ( foo:bar
、foo=bar
、または foo bar
) の個別の文字列として指定されます。
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下は、RoutingKafkaTemplate
を使用するの例に対応するリスナーの例です。
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}