コンシューマーリバランスの実施

Kafka クライアントは、強制的な再バランス [Apache] (英語) をトリガーするオプションをサポートするようになりました。バージョン 3.1.2 以降、Spring for Apache Kafka には、メッセージリスナーコンテナーを介して Kafka コンシューマー上でこの API を呼び出すオプションが提供されます。この API を呼び出すと、強制的なリバランスをトリガーするよう Kafka コンシューマーに警告するだけです。実際のリバランスは、次の poll() 操作の一部としてのみ発生します。すでにリバランスが進行中の場合、強制的なリバランスの呼び出しは NO-OP です。呼び出し元は、現在のリバランスが完了するまで待ってから、別のリバランスを呼び出す必要があります。詳細については、enforceRebalance の Javadoc を参照してください。

次のコードスニペットは、メッセージリスナーコンテナーを使用してリバランスを強制する本質を示しています。

@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
    System.out.println("From KafkaListener: " + in);
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
    return args -> {
        final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
        System.out.println("Enforcing a rebalance");
        Thread.sleep(5_000);
        listenerContainer.enforceRebalance();
        Thread.sleep(5_000);
    };
}

上記のコードが示すように、アプリケーションは KafkaListenerEndpointRegistry を使用してメッセージリスナーコンテナーにアクセスし、それに対して enforceRebalance API を呼び出します。リスナーコンテナーで enforceRebalance を呼び出すと、その呼び出しが基になる Kafka コンシューマーに委譲されます。Kafka コンシューマーは、次の poll() 操作の一部としてリバランスをトリガーします。