リスナーのリバランス
ContainerProperties
には consumerRebalanceListener
と呼ばれるプロパティがあり、Kafka クライアントの ConsumerRebalanceListener
インターフェースの実装を取ります。このプロパティが指定されていない場合、コンテナーは、INFO
レベルでリバランスイベントをログに記録するロギングリスナーを構成します。フレームワークは、サブインターフェース ConsumerAwareRebalanceListener
も追加します。次のリストは、ConsumerAwareRebalanceListener
インターフェースの定義を示しています。
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
パーティションが取り消されると、2 つのコールバックがあることに注意してください。最初のものはすぐに呼び出されます。2 つ目は、保留中のオフセットがコミットされた後に呼び出されます。これは、次の例に示すように、外部リポジトリでオフセットを維持する場合に役立ちます。
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
バージョン 2.4 以降、新しいメソッド onPartitionsLost() が追加されました ( ConsumerRebalanceLister の同じ名前のメソッドと同様)。ConsumerRebalanceLister のデフォルト実装は、単純に onPartitionsRevoked を呼び出します。ConsumerAwareRebalanceListener のデフォルト実装は何も行いません。リスナーコンテナーに (いずれかの型の) カスタムリスナーを指定する場合、実装で onPartitionsLost から onPartitionsRevoked を呼び出さないことが重要です。ConsumerRebalanceListener を実装する場合は、デフォルトのメソッドをオーバーライドする必要があります。これは、リスナーコンテナーが実装上のメソッドを呼び出した後、onPartitionsLost の実装から独自の onPartitionsRevoked を呼び出すためです。デリゲートをデフォルトの動作に実装すると、Consumer がコンテナーのリスナーでそのメソッドを呼び出すたびに、onPartitionsRevoked が 2 回呼び出されます。 |