@KafkaListener を順番に開始する

一般的な使用例は、別のリスナーがトピック内のすべてのレコードを消費した後にリスナーを開始することです。例: 他のトピックからのレコードを処理する前に、1 つ以上の圧縮されたトピックのコンテンツをメモリにロードしたい場合があります。バージョン 2.7.3 から、新しいコンポーネント ContainerGroupSequencer が導入されました。@KafkaListener の containerGroup プロパティを使用してコンテナーをグループ化し、現在のグループ内のすべてのコンテナーがアイドル状態になったときに次のグループのコンテナーを起動します。

それは例で最もよく説明されています。

@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}

@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}

@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}

@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}

@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
    return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}

ここでは、g1 と g2 の 2 つのグループに 4 つのリスナーがあります。

アプリケーションコンテキストの初期化中に、シーケンサーは、提供されたグループ内のすべてのコンテナーの autoStartup プロパティを false に設定します。また、コンテナー (まだセットが存在していない) の idleEventInterval を指定された値 (この場合は 5000ms) に設定します。次に、アプリケーションコンテキストによってシーケンサーが開始されると、最初のグループのコンテナーが開始されます。ListenerContainerIdleEvent が受信されると、各コンテナー内の個々の子コンテナーが停止します。ConcurrentMessageListenerContainer 内のすべての子コンテナーが停止すると、親コンテナーも停止します。グループ内のすべてのコンテナーが停止されると、次のグループのコンテナーが起動されます。グループまたはグループ内のコンテナーの数に制限はありません。

デフォルトでは、最終グループ(上記の g2)のコンテナーは、アイドル状態になっても停止しません。この動作を変更するには、シーケンサーで stopLastGroupWhenIdle を true に設定します。

余談ですが、以前は各グループのコンテナーが containerGroup という Bean 名を持つ型 Collection<MessageListenerContainer> の Bean に追加されていました。これらのコレクションは、グループ名である Bean 名に接尾辞 .group を付けた型 ContainerGroup の Bean が推奨されるようになりました。上の例では、g1.group と g2.group という 2 つの Bean が存在します。Collection Bean は将来のリリースで削除される予定です。