リスナーコンテナーの一時停止と再開

バージョン 2.1.3 は、pause() および resume() メソッドをリスナーコンテナーに追加しました。以前は、ConsumerAwareMessageListener 内でコンシューマーを一時停止し、Consumer オブジェクトへのアクセスを提供する ListenerContainerIdleEvent をリッスンすることで再開できました。イベントリスナーを使用してアイドル状態のコンテナー内のコンシューマーを一時停止することはできますが、場合によっては、イベントリスナーがコンシューマースレッドで呼び出される保証がないため、これはスレッドセーフではありませんでした。コンシューマーを安全に一時停止および再開するには、リスナーコンテナーで pause および resume メソッドを使用する必要があります。pause() は、次の poll() の直前に有効になります。resume() は、現在の poll() が戻った直後に有効になります。コンテナーが一時停止されると、引き続きコンシューマーの poll() に進み、グループ管理が使用されている場合のリバランスを回避しますが、レコードは取得しません。詳細については、Kafka のドキュメントを参照してください。

バージョン 2.1.5 以降、isPauseRequested() を呼び出して、pause() が呼び出されたかどうかを確認できます。ただし、コンシューマーは実際にはまだ一時停止していない可能性があります。すべての Consumer インスタンスが実際に一時停止している場合、isConsumerPaused() は true を返します。

さらに (2.1.5 以降)、ConsumerPausedEvent および ConsumerResumedEvent インスタンスはコンテナーとともに source プロパティとして公開され、TopicPartition インスタンスは partitions プロパティに含まれます。

バージョン 2.9 以降、新しいコンテナープロパティ pauseImmediate を true に設定すると、現在のレコードが処理された後に一時停止が有効になります。デフォルトでは、一時停止は前回のポーリングのすべてのレコードが処理されたときに有効になります。pauseImmediate を参照してください。

次の単純な Spring Boot アプリケーションは、コンテナーレジストリを使用して @KafkaListener メソッドのコンテナーへの参照を取得し、そのコンシューマーを一時停止または再開し、対応するイベントを受信することを示しています。

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

次のリストは、前の例の結果を示しています。

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2