特定のオフセットを求めて

シークするには、リスナーは次のメソッドを持つ ConsumerSeekAware を実装する必要があります。

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions)

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

registerSeekCallback は、コンテナーの起動時およびパーティションが割り当てられるたびに呼び出されます。このコールバックは、初期化後の任意の時点でシークするときに使用する必要があります。コールバックへの参照を保存する必要があります。複数のコンテナー (または ConcurrentMessageListenerContainer) で同じリスナーを使用する場合は、コールバックを ThreadLocal またはリスナー Thread によってキー設定されたその他の構造に保存する必要があります。

グループ管理の場合、パーティションの割り当て時に onPartitionsAssigned が呼び出されます。たとえば、コールバックを呼び出すことにより、パーティションの初期オフセットを設定するために、このメソッドを使用できます。このメソッドを使用して、このスレッドのコールバックを割り当てられたパーティションに関連付けることもできます (以下の例を参照)。registerSeekCallback に渡されるものではなく、コールバック引数を使用する必要があります。バージョン 2.5.5 以降では、手動パーティション割り当てを使用している場合でも、このメソッドが呼び出されます。

onPartitionsRevoked は、コンテナーが停止したとき、または Kafka が割り当てを取り消したときに呼び出されます。このスレッドのコールバックを破棄し、取り消されたパーティションへの関連付けをすべて削除する必要があります。

コールバックには次のメソッドがあります。

void seek(String topic, int partition, long offset);

void seekToBeginning(String topic, int partition);

void seekToBeginning(Collection<TopicPartitions> partitions);

void seekToEnd(String topic, int partition);

void seekToEnd(Collection<TopicPartitions> partitions);

void seekRelative(String topic, int partition, long offset, boolean toCurrent);

void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

seekRelative は、相対シークを実行するためにバージョン 2.3 で追加されました。

  • offset 負および toCurrentfalse - パーティションの最後に対してシークします。

  • offset ポジティブおよび toCurrentfalse - パーティションの先頭からの相対シーク。

  • offset 負および toCurrenttrue - 現在の位置に対してシーク (巻き戻し)。

  • offset 正および toCurrenttrue - 現在の位置を基準にシークします (早送り)。

seekToTimestamp メソッドもバージョン 2.3 で追加されました。

onIdleContainer または onPartitionsAssigned メソッドで複数のパーティションに対して同じタイムスタンプをシークする場合は、コンシューマーの offsetsForTimes メソッドへの 1 回の呼び出しでタイムスタンプのオフセットを見つける方が効率的であるため、2 番目の方法が推奨されます。他の場所から呼び出されると、コンテナーはすべてのタイムスタンプシークリクエストを収集し、offsetsForTimes を 1 回呼び出します。

アイドル状態のコンテナーが検出されたときに、onIdleContainer() からシーク操作を実行することもできます。アイドル状態のコンテナーの検出を有効にする方法については、アイドル状態のコンシューマーと無反応なコンシューマーの検出を参照してください。

コレクションを受け入れる seekToBeginning メソッドは、たとえば、コンパクトなトピックを処理していて、アプリケーションを起動するたびに最初にシークしたい場合に便利です。
public class MyListener implements ConsumerSeekAware {

    ...

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }

}

実行時に任意にシークするには、適切なスレッドの registerSeekCallback からのコールバック参照を使用します。

これは、コールバックの使用方法を示す簡単な Spring Boot アプリケーションです。トピックに 10 件のレコードを送信します。コンソールで <Enter> を押すと、すべてのパーティションが最初にシークします。

@SpringBootApplication
public class SeekExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeekExampleApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(
                new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        partitions.forEach(tp -> this.callbacks.remove(tp));
        this.callbackForThread.remove();
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}

物事を単純にするために、バージョン 2.3 は AbstractConsumerSeekAware クラスを追加しました。これは、トピック / パーティションに使用されるコールバックを追跡します。次の例は、コンテナーがアイドル状態になるたびに、各パーティションで最後に処理されたレコードを探す方法を示しています。また、任意の外部呼び出しでパーティションを 1 レコードずつ巻き戻すことを可能にするメソッドもあります。

public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
    public void listen(String in) {
        ...
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {

            assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind all partitions one record.
    */
    public void rewindAllOneRecord() {
        getSeekCallbacks()
            .forEach((tp, callback) ->
                callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind one partition one record.
    */
    public void rewindOnePartitionOneRecord(String topic, int partition) {
        getSeekCallbackFor(new TopicPartition(topic, partition))
            .seekRelative(topic, partition, -1, true);
    }

}

バージョン 2.6 は、抽象クラスに便利なメソッドを追加しました。

  • seekToBeginning() - 割り当てられたすべてのパーティションを先頭までシークします。

  • seekToEnd() - 割り当てられたすべてのパーティションを最後までシークします。

  • seekToTimestamp(long timestamp) - 割り当てられたすべてのパーティションを、そのタイムスタンプで表されるオフセットにシークします。

例:

public class MyListener extends AbstractConsumerSeekAware {

    @KafkaListener(...)
    void listn(...) {
        ...
    }
}

public class SomeOtherBean {

    MyListener listener;

    ...

    void someMethod() {
        this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
    }

}