リスナーコンテナーの一時停止と再開
バージョン 2.1.3 は、pause()
および resume()
メソッドをリスナーコンテナーに追加しました。以前は、ConsumerAwareMessageListener
内でコンシューマーを一時停止し、Consumer
オブジェクトへのアクセスを提供する ListenerContainerIdleEvent
をリッスンすることで再開できました。イベントリスナーを使用してアイドル状態のコンテナー内のコンシューマーを一時停止することはできますが、場合によっては、イベントリスナーがコンシューマースレッドで呼び出される保証がないため、これはスレッドセーフではありませんでした。コンシューマーを安全に一時停止および再開するには、リスナーコンテナーで pause
および resume
メソッドを使用する必要があります。pause()
は、次の poll()
の直前に有効になります。resume()
は、現在の poll()
が戻った直後に有効になります。コンテナーが一時停止されると、引き続きコンシューマーの poll()
に進み、グループ管理が使用されている場合のリバランスを回避しますが、レコードは取得しません。詳細については、Kafka のドキュメントを参照してください。
バージョン 2.1.5 以降、isPauseRequested()
を呼び出して、pause()
が呼び出されたかどうかを確認できます。ただし、コンシューマーは実際にはまだ一時停止していない可能性があります。すべての Consumer
インスタンスが実際に一時停止している場合、isConsumerPaused()
は true を返します。
さらに (2.1.5 以降)、ConsumerPausedEvent
および ConsumerResumedEvent
インスタンスはコンテナーとともに source
プロパティとして公開され、TopicPartition
インスタンスは partitions
プロパティに含まれます。
バージョン 2.9 以降、新しいコンテナープロパティ pauseImmediate
を true に設定すると、現在のレコードが処理された後に一時停止が有効になります。デフォルトでは、一時停止は前回のポーリングのすべてのレコードが処理されたときに有効になります。pauseImmediate を参照してください。
次の単純な Spring Boot アプリケーションは、コンテナーレジストリを使用して @KafkaListener
メソッドのコンテナーへの参照を取得し、そのコンシューマーを一時停止または再開し、対応するイベントを受信することを示しています。
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
次のリストは、前の例の結果を示しています。
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2