@KafkaListener
を順番に開始する
一般的な使用例は、別のリスナーがトピック内のすべてのレコードを消費した後にリスナーを開始することです。例: 他のトピックからのレコードを処理する前に、1 つ以上の圧縮されたトピックのコンテンツをメモリにロードしたい場合があります。バージョン 2.7.3 から、新しいコンポーネント ContainerGroupSequencer
が導入されました。@KafkaListener
の containerGroup
プロパティを使用してコンテナーをグループ化し、現在のグループ内のすべてのコンテナーがアイドル状態になったときに次のグループのコンテナーを起動します。
それは例で最もよく説明されています。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
ここでは、g1
と g2
の 2 つのグループに 4 つのリスナーがあります。
アプリケーションコンテキストの初期化中に、シーケンサーは、提供されたグループ内のすべてのコンテナーの autoStartup
プロパティを false
に設定します。また、コンテナー (まだセットが存在していない) の idleEventInterval
を指定された値 (この場合は 5000ms) に設定します。次に、アプリケーションコンテキストによってシーケンサーが開始されると、最初のグループのコンテナーが開始されます。ListenerContainerIdleEvent
が受信されると、各コンテナー内の個々の子コンテナーが停止します。ConcurrentMessageListenerContainer
内のすべての子コンテナーが停止すると、親コンテナーも停止します。グループ内のすべてのコンテナーが停止されると、次のグループのコンテナーが起動されます。グループまたはグループ内のコンテナーの数に制限はありません。
デフォルトでは、最終グループ(上記の g2
)のコンテナーは、アイドル状態になっても停止しません。この動作を変更するには、シーケンサーで stopLastGroupWhenIdle
を true
に設定します。
余談ですが、以前は各グループのコンテナーが containerGroup
という Bean 名を持つ型 Collection<MessageListenerContainer>
の Bean に追加されていました。これらのコレクションは、グループ名である Bean 名に接尾辞 .group
を付けた型 ContainerGroup
の Bean が推奨されるようになりました。上の例では、g1.group
と g2.group
という 2 つの Bean が存在します。Collection
Bean は将来のリリースで削除される予定です。