Spring Cloud Sleuth を使用したトレース

Spring Cloud SleuthSpring Cloud Stream Kafka Streams バインダーベースのアプリケーションのクラスパス上にある場合、そのコンシューマーとプロデューサーの両方にトレース情報が自動的に組み込まれます。ただし、アプリケーション固有の操作をトレースするには、ユーザーコードで明示的にインストルメント化する必要があります。これは、アプリケーションの Spring Cloud Sleuth から KafkaStreamsTracing Bean を注入し、この注入された Bean を介してさまざまな Kafka ストリーム操作を呼び出すことで実行できます。これはそれを使用するいくつかの例です。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
                LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
                return new KeyValue<>(value.getRegion(),
                        value.getClicks());
            }))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum, Materialized.as(CLICK_UPDATES))
            .toStream());
}

上記の例では、明示的なトレースインストルメンテーションを追加する場所が 2 つあります。まず、受信 KStream からのキー / 値情報をログに記録します。この情報がログに記録されると、関連するスパン ID とトレース ID もログに記録されるため、監視システムはそれらを追跡し、同じスパン ID と関連付けることができます。次に、map 操作を呼び出すときは、KStream クラスで直接呼び出すのではなく、transform 操作内にラップしてから、KafkaStreamsTracing から map を呼び出します。この場合も、ログに記録されるメッセージにはスパン ID とトレース ID が含まれます。

これは別の例で、さまざまな Kafka Streams ヘッダーにアクセスするために低レベルのトランスフォーマー API を使用しています。spring-cloud-sleuth がクラスパス上にある場合、すべてのトレースヘッダーにもこのようにアクセスできます。

@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
    return input -> input.transform(kafkaStreamsTracing.transformer(
            "transformer-1",
            () -> new Transformer<String, String, KeyValue<String, String>>() {
                ProcessorContext context;

                @Override
                public void init(ProcessorContext context) {
                    this.context = context;
                }

                @Override
                public KeyValue<String, String> transform(String key, String value) {
                    LOG.info("Headers: " + this.context.headers());
                    LOG.info("K/V:" + key + "/" + value);
                    // More transformations, business logic execution, etc. go here.
                    return KeyValue.pair(key, value);
                }

                @Override
                public void close() {
                }
            }));
}