モニター

リスナーのパフォーマンスの監視

バージョン 2.3 以降では、クラスパス上で Micrometer が検出され、アプリケーションコンテキストに単一の MeterRegistry が存在する場合、リスナーコンテナーはリスナーの Micrometer Timer を自動的に作成および更新します。ContainerProperty の micrometerEnabled ~ false を設定することでタイマーを無効にできます。

2 つのタイマーが維持されます。1 つはリスナーの呼び出しが成功した場合、もう 1 つは失敗した場合です。

タイマーには spring.kafka.listener という名前が付けられ、次のタグが付いています。

  • name : (コンテナー Bean 名)

  • result : success または failure

  • exception : none または ListenerExecutionFailedException

ContainerProperties の micrometerTags プロパティを使用して追加のタグを追加できます。

バージョン 2.9.8、3.0.6 以降では、ContainerProperties の micrometerTagsProvider に関数を提供できます。この関数は ConsumerRecord<?, ?> を受け取り、そのレコードに基づいて micrometerTags 内の静的タグとマージできるタグを返します。

並行コンテナーを使用すると、スレッドごとにタイマーが作成され、name タグに -n という接尾辞が付けられます。ここで、n は 0 から concurrency-1 です。

KafkaTemplate パフォーマンスの監視

バージョン 2.5 以降では、クラスパスで Micrometer が検出され、アプリケーションコンテキストに単一の MeterRegistry が存在する場合、テンプレートは送信操作用に Micrometer Timer を自動的に作成および更新します。テンプレートの micrometerEnabled プロパティを false に設定することで、タイマーを無効にすることができます。

2 つのタイマーが維持されます。1 つはリスナーの呼び出しが成功した場合、もう 1 つは失敗した場合です。

タイマーには spring.kafka.template という名前が付けられ、次のタグが付いています。

  • name : (テンプレート Bean 名)

  • result : success または failure

  • exception : none または失敗時の例外クラス名

テンプレートの micrometerTags プロパティを使用して、タグを追加できます。

バージョン 2.9.8、3.0.6 以降では、KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) プロパティを提供できます。この関数は ProducerRecord<?, ?> を受け取り、そのレコードに基づいて micrometerTags 内の静的タグとマージできるタグを返します。

Micrometer ネイティブメトリクス

バージョン 2.5 以降、フレームワークは、プロデューサーとコンシューマーが作成およびクローズされるたびに Micrometer KafkaClientMetrics インスタンスを管理するファクトリリスナーを提供します。

この機能を有効にするには、リスナーをプロデューサーおよびコンシューマーファクトリに追加するだけです。

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

リスナーに渡されたコンシューマー / プロデューサー id は、タグ名 spring.id でメーターのタグに追加されます。

Kafka メトリクスの 1 つを取得する例
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

StreamsBuilderFactoryBean にも同様のリスナーが提供されています - KafkaStreams Micrometer サポートを参照してください。

Micrometer Observation

バージョン 3.0 以降、KafkaTemplate およびリスナーコンテナーの監視に Micrometer を使用することがサポートされるようになりました。

KafkaTemplate と ContainerProperties の observationEnabled を true に設定して、観測を有効にします。これにより、タイマーが監視ごとに管理されるようになるため、Micrometer タイマーが無効になります。

Micrometer 観察はバッチリスナーをサポートしていません。これにより Micrometer タイマーが有効になります

詳細については、Micrometer トレース (英語) を参照してください。

タイマー / トレースにタグを追加するには、カスタム KafkaTemplateObservationConvention または KafkaListenerObservationConvention をテンプレートまたはリスナーコンテナーにそれぞれ構成します。

デフォルトの実装では、テンプレートの監視用に bean.name タグが追加され、コンテナー用に listener.id タグが追加されます。

DefaultKafkaTemplateObservationConvention または DefaultKafkaListenerObservationConvention をサブクラス化するか、まったく新しい実装を提供できます。

記録されるデフォルトの観測値の詳細については、"Micrometer 観測資料" を参照してください。

バージョン 3.0.6 以降では、コンシューマーまたはプロデューサーレコードの情報に基づいて、タイマーとトレースに動的タグを追加できます。これを行うには、カスタム KafkaListenerObservationConvention および / または KafkaTemplateObservationConvention をそれぞれリスナーコンテナープロパティまたは KafkaTemplate に追加します。両方の観測コンテキストの record プロパティには、それぞれ ConsumerRecord または ProducerRecord が含まれます。

送信側および受信側のコンテキスト remoteServiceName プロパティは、Kafka clusterId プロパティに設定されています。これは、KafkaAdmin によって取得されます。何らかの理由 (管理者権限がないなど) でクラスター ID を取得できない場合は、バージョン 3.1 以降、KafkaAdmin に手動で clusterId を設定し、KafkaTemplate およびリスナーコンテナーに挿入できます。null (デフォルト) の場合、管理者は describeCluster 管理操作を呼び出してブローカーから取得します。