リスナーのリバランス
ContainerProperties には consumerRebalanceListener と呼ばれるプロパティがあり、Kafka クライアントの ConsumerRebalanceListener インターフェースの実装を取ります。このプロパティが指定されていない場合、コンテナーは、INFO レベルでリバランスイベントをログに記録するロギングリスナーを構成します。フレームワークは、サブインターフェース ConsumerAwareRebalanceListener も追加します。次のリストは、ConsumerAwareRebalanceListener インターフェースの定義を示しています。
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}パーティションが取り消されると、2 つのコールバックがあることに注意してください。最初のものはすぐに呼び出されます。2 つ目は、保留中のオフセットがコミットされた後に呼び出されます。これは、次の例に示すように、外部リポジトリでオフセットを維持する場合に役立ちます。
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
}); バージョン 2.4 以降、新しいメソッド onPartitionsLost() が追加されました ( ConsumerRebalanceLister の同じ名前のメソッドと同様)。ConsumerRebalanceLister のデフォルト実装は、単純に onPartitionsRevoked を呼び出します。ConsumerAwareRebalanceListener のデフォルト実装は何も行いません。リスナーコンテナーに (いずれかの型の) カスタムリスナーを指定する場合、実装で onPartitionsLost から onPartitionsRevoked を呼び出さないことが重要です。ConsumerRebalanceListener を実装する場合は、デフォルトのメソッドをオーバーライドする必要があります。これは、リスナーコンテナーが実装上のメソッドを呼び出した後、onPartitionsLost の実装から独自の onPartitionsRevoked を呼び出すためです。デリゲートをデフォルトの動作に実装すると、Consumer がコンテナーのリスナーでそのメソッドを呼び出すたびに、onPartitionsRevoked が 2 回呼び出されます。 |
Kafka 4.0 コンシューマーリバランスプロトコル
Spring for Apache Kafka 4.0 は、サーバー主導の増分パーティション割り当てによりパフォーマンスを向上させる Apache Kafka 4.0 の新しいコンシューマーリバランスプロトコル [Apache] (英語) (KIP-848)をサポートしています。これにより、コンシューマーグループの再バランス調整によるダウンタイムが短縮されます。
新しいプロトコルを有効にするには、group.protocol プロパティを構成します。
spring.kafka.consumer.properties.group.protocol=consumer上記のプロパティは Spring Boot プロパティであることにご注意ください。Spring Boot を使用していない場合は、以下のように手動で設定することをお勧めします。
あるいは、プログラムで設定します。
Map<String, Object> props = new HashMap<>();
props.put("group.protocol", "consumer");
ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props); 新しいプロトコルは ConsumerAwareRebalanceListener とシームレスに連携します。増分リバランスにより、onPartitionsAssigned は従来のプロトコルで一般的だった単一のコールバックとは異なり、より小さなパーティションセットで複数回呼び出される場合があります。
新しいプロトコルは、サーバー側のパーティション割り当てを使用し、spring.kafka.consumer.partition-assignment-strategy で設定されたクライアント側のカスタム割り当ては無視します。カスタム割り当てが検出された場合は、警告がログに記録されます。カスタム割り当てを使用するには、group.protocol=classic を設定してください(group.protocol に値を指定しない場合は、これがデフォルトになります)。