コンシューマーリバランスの実施
Kafka クライアントは、強制的な再バランス [Apache] (英語) をトリガーするオプションをサポートするようになりました。バージョン 3.1.2
以降、Spring for Apache Kafka には、メッセージリスナーコンテナーを介して Kafka コンシューマー上でこの API を呼び出すオプションが提供されます。この API を呼び出すと、強制的なリバランスをトリガーするよう Kafka コンシューマーに警告するだけです。実際のリバランスは、次の poll()
操作の一部としてのみ発生します。すでにリバランスが進行中の場合、強制的なリバランスの呼び出しは NO-OP です。呼び出し元は、現在のリバランスが完了するまで待ってから、別のリバランスを呼び出す必要があります。詳細については、enforceRebalance
の Javadoc を参照してください。
次のコードスニペットは、メッセージリスナーコンテナーを使用してリバランスを強制する本質を示しています。
@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
System.out.println("From KafkaListener: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
return args -> {
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
System.out.println("Enforcing a rebalance");
Thread.sleep(5_000);
listenerContainer.enforceRebalance();
Thread.sleep(5_000);
};
}
上記のコードが示すように、アプリケーションは KafkaListenerEndpointRegistry
を使用してメッセージリスナーコンテナーにアクセスし、それに対して enforceRebalance
API を呼び出します。リスナーコンテナーで enforceRebalance
を呼び出すと、その呼び出しが基になる Kafka コンシューマーに委譲されます。Kafka コンシューマーは、次の poll()
操作の一部としてリバランスをトリガーします。