リスナーのリバランス

ContainerProperties には consumerRebalanceListener と呼ばれるプロパティがあり、Kafka クライアントの ConsumerRebalanceListener インターフェースの実装を取ります。このプロパティが指定されていない場合、コンテナーは、INFO レベルでリバランスイベントをログに記録するロギングリスナーを構成します。フレームワークは、サブインターフェース ConsumerAwareRebalanceListener も追加します。次のリストは、ConsumerAwareRebalanceListener インターフェースの定義を示しています。

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

パーティションが取り消されると、2 つのコールバックがあることに注意してください。最初のものはすぐに呼び出されます。2 つ目は、保留中のオフセットがコミットされた後に呼び出されます。これは、次の例に示すように、外部リポジトリでオフセットを維持する場合に役立ちます。

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
        store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
        consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});
バージョン 2.4 以降、新しいメソッド onPartitionsLost() が追加されました ( ConsumerRebalanceLister の同じ名前のメソッドと同様)。ConsumerRebalanceLister のデフォルト実装は、単純に onPartitionsRevoked を呼び出します。ConsumerAwareRebalanceListener のデフォルト実装は何も行いません。リスナーコンテナーに (いずれかの型の) カスタムリスナーを指定する場合、実装で onPartitionsLost から onPartitionsRevoked を呼び出さないことが重要です。ConsumerRebalanceListener を実装する場合は、デフォルトのメソッドをオーバーライドする必要があります。これは、リスナーコンテナーが実装上のメソッドを呼び出した後、onPartitionsLost の実装から独自の onPartitionsRevoked を呼び出すためです。デリゲートをデフォルトの動作に実装すると、Consumer がコンテナーのリスナーでそのメソッドを呼び出すたびに、onPartitionsRevoked が 2 回呼び出されます。