特定のオフセットを求めて
シークするには、リスナーは次のメソッドを持つ ConsumerSeekAware
を実装する必要があります。
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
registerSeekCallback
は、コンテナーの起動時およびパーティションが割り当てられるたびに呼び出されます。このコールバックは、初期化後の任意の時点でシークするときに使用する必要があります。コールバックへの参照を保存する必要があります。複数のコンテナー (または ConcurrentMessageListenerContainer
) で同じリスナーを使用する場合は、コールバックを ThreadLocal
またはリスナー Thread
によってキー設定されたその他の構造に保存する必要があります。
グループ管理の場合、パーティションの割り当て時に onPartitionsAssigned
が呼び出されます。たとえば、コールバックを呼び出すことにより、パーティションの初期オフセットを設定するために、このメソッドを使用できます。このメソッドを使用して、このスレッドのコールバックを割り当てられたパーティションに関連付けることもできます (以下の例を参照)。registerSeekCallback
に渡されるものではなく、コールバック引数を使用する必要があります。バージョン 2.5.5 以降では、手動パーティション割り当てを使用している場合でも、このメソッドが呼び出されます。
onPartitionsRevoked
は、コンテナーが停止したとき、または Kafka が割り当てを取り消したときに呼び出されます。このスレッドのコールバックを破棄し、取り消されたパーティションへの関連付けをすべて削除する必要があります。
コールバックには次のメソッドがあります。
void seek(String topic, int partition, long offset);
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
String getGroupId();
seek
メソッドの 2 つの異なるバリアントは、任意のオフセットをシークする方法を提供します。オフセットを計算するために引数として Function
を受け取るメソッドは、フレームワークのバージョン 3.2 で追加されました。この関数は、現在のオフセット (コンシューマーによって返される現在の位置、次にフェッチされるオフセット) へのアクセスを提供します。ユーザーは、関数定義の一部として、コンシューマーの現在のオフセットに基づいて、シークするオフセットを決定できます。
seekRelative
は、相対シークを実行するためにバージョン 2.3 で追加されました。
offset
負およびtoCurrent
false
- パーティションの最後に対してシークします。offset
ポジティブおよびtoCurrent
false
- パーティションの先頭からの相対シーク。offset
負およびtoCurrent
true
- 現在の位置に対してシーク (巻き戻し)。offset
正およびtoCurrent
true
- 現在の位置を基準にシークします (早送り)。
seekToTimestamp
メソッドもバージョン 2.3 で追加されました。
onIdleContainer または onPartitionsAssigned メソッドで複数のパーティションに対して同じタイムスタンプをシークする場合は、コンシューマーの offsetsForTimes メソッドへの 1 回の呼び出しでタイムスタンプのオフセットを見つける方が効率的であるため、2 番目の方法が推奨されます。他の場所から呼び出されると、コンテナーはすべてのタイムスタンプシークリクエストを収集し、offsetsForTimes を 1 回呼び出します。 |
アイドル状態のコンテナーが検出されたときに、onIdleContainer()
からシーク操作を実行することもできます。アイドル状態のコンテナーの検出を有効にする方法については、アイドル状態のコンシューマーと無反応なコンシューマーの検出を参照してください。
コレクションを受け入れる seekToBeginning メソッドは、たとえば、コンパクトなトピックを処理していて、アプリケーションを起動するたびに最初にシークしたい場合に便利です。 |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
実行時に任意にシークするには、適切なスレッドの registerSeekCallback
からのコールバック参照を使用します。
これは、コールバックの使用方法を示す簡単な Spring Boot アプリケーションです。トピックに 10 件のレコードを送信します。コンソールで <Enter>
を押すと、すべてのパーティションが最初にシークします。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
物事を単純にするために、バージョン 2.3 は AbstractConsumerSeekAware
クラスを追加しました。これは、トピック / パーティションに使用されるコールバックを追跡します。次の例は、コンテナーがアイドル状態になるたびに、各パーティションで最後に処理されたレコードを探す方法を示しています。また、任意の外部呼び出しでパーティションを 1 レコードずつ巻き戻すことを可能にするメソッドもあります。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getTopicsAndCallbacks()
.forEach((tp, callbacks) ->
callbacks.forEach(callback -> callback.seekRelative(tp.topic(), tp.partition(), -1, true))
);
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbacksFor(new TopicPartition(topic, partition))
.forEach(callback -> callback.seekRelative(topic, partition, -1, true));
}
}
バージョン 2.6 は、抽象クラスに便利なメソッドを追加しました。
seekToBeginning()
- 割り当てられたすべてのパーティションを先頭までシークします。seekToEnd()
- 割り当てられたすべてのパーティションを最後までシークします。seekToTimestamp(long timestamp)
- 割り当てられたすべてのパーティションを、そのタイムスタンプで表されるオフセットにシークします。
例:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listen(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}
バージョン 3.3 以降、ConsumerSeekAware.ConsumerSeekCallback
インターフェースに新しいメソッド getGroupId()
が導入されました。このメソッドは、特定のシークコールバックに関連付けられたコンシューマーグループを識別する必要がある場合に特に便利です。
AbstractConsumerSeekAware を継承するクラスを使用する場合、1 つのリスナーで実行されるシーク操作は、同じクラスのすべてのリスナーに影響を与える可能性があります。これは、必ずしも望ましい動作とは限りません。これに対処するには、コールバックによって提供される getGroupId() メソッドを使用できます。これにより、関心のあるコンシューマーグループのみを対象に、シーク操作を選択的に実行できます。 |