タイムスタンプ抽出

Kafka Streams を使用すると、タイムスタンプのさまざまな概念に基づいてコンシューマーレコードの処理を制御できます。デフォルトでは、Kafka Streams はコンシューマーレコードに埋め込まれたタイムスタンプメタデータを抽出します。入力バインディングごとに異なる TimestampExtractor 実装を提供することにより、このデフォルトの動作を変更できます。これを行う方法の詳細を次に示します。

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
    return orderStream ->
            customers ->
                products -> orderStream;
}

@Bean
public TimestampExtractor timestampExtractor() {
    return new WallclockTimestampExtractor();
}

次に、コンシューマーバインディングごとに上記の TimestampExtractor Bean 名を設定します。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"

カスタムタイムスタンプエクストラクターを設定するために入力コンシューマーバインディングをスキップすると、そのコンシューマーはデフォルト設定を使用します。