モニター
リスナーのパフォーマンスの監視
バージョン 2.3 以降では、クラスパス上で Micrometer
が検出され、アプリケーションコンテキストに単一の MeterRegistry
が存在する場合、リスナーコンテナーはリスナーの Micrometer Timer
を自動的に作成および更新します。ContainerProperty
の micrometerEnabled
~ false
を設定することでタイマーを無効にできます。
2 つのタイマーが維持されます。1 つはリスナーの呼び出しが成功した場合、もう 1 つは失敗した場合です。
タイマーには spring.kafka.listener
という名前が付けられ、次のタグが付いています。
name
: (コンテナー Bean 名)result
:success
またはfailure
exception
:none
またはListenerExecutionFailedException
ContainerProperties
の micrometerTags
プロパティを使用して追加のタグを追加できます。
バージョン 2.9.8、3.0.6 以降では、ContainerProperties
の micrometerTagsProvider
に関数を提供できます。この関数は ConsumerRecord<?, ?>
を受け取り、そのレコードに基づいて micrometerTags
内の静的タグとマージできるタグを返します。
並行コンテナーを使用すると、スレッドごとにタイマーが作成され、name タグに -n という接尾辞が付けられます。ここで、n は 0 から concurrency-1 です。 |
KafkaTemplate パフォーマンスの監視
バージョン 2.5 以降では、クラスパスで Micrometer
が検出され、アプリケーションコンテキストに単一の MeterRegistry
が存在する場合、テンプレートは送信操作用に Micrometer Timer
を自動的に作成および更新します。テンプレートの micrometerEnabled
プロパティを false
に設定することで、タイマーを無効にすることができます。
2 つのタイマーが維持されます。1 つはリスナーの呼び出しが成功した場合、もう 1 つは失敗した場合です。
タイマーには spring.kafka.template
という名前が付けられ、次のタグが付いています。
name
: (テンプレート Bean 名)result
:success
またはfailure
exception
:none
または失敗時の例外クラス名
テンプレートの micrometerTags
プロパティを使用して、タグを追加できます。
バージョン 2.9.8、3.0.6 以降では、KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)
プロパティを提供できます。この関数は ProducerRecord<?, ?>
を受け取り、そのレコードに基づいて micrometerTags
内の静的タグとマージできるタグを返します。
Micrometer ネイティブメトリクス
バージョン 2.5 以降、フレームワークは、プロデューサーとコンシューマーが作成およびクローズされるたびに Micrometer KafkaClientMetrics
インスタンスを管理するファクトリリスナーを提供します。
この機能を有効にするには、リスナーをプロデューサーおよびコンシューマーファクトリに追加するだけです。
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
リスナーに渡されたコンシューマー / プロデューサー id
は、タグ名 spring.id
でメーターのタグに追加されます。
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
StreamsBuilderFactoryBean
にも同様のリスナーが提供されています - KafkaStreams Micrometer サポートを参照してください。
バージョン 3.3 以降、提供された Kafka クライアントの MeterRegistry
への io.micrometer.core.instrument.binder.kafka.KafkaMetrics
バインディングを管理するために、KafkaMetricsSupport
抽象クラスが導入されました。このクラスは、前述の MicrometerConsumerListener
、MicrometerProducerListener
、KafkaStreamsMicrometerListener
のスーパーです。ただし、Kafka クライアントのあらゆるユースケースに使用できます。クラスを継承し、その bindClient()
および unbindClient()
API を呼び出して、Kafka クライアントメトリクスを Micrometer コレクターに接続する必要があります。
Micrometer Observation
バージョン 3.0 以降、KafkaTemplate
およびリスナーコンテナーの監視に Micrometer を使用することがサポートされるようになりました。
KafkaTemplate
と ContainerProperties
の observationEnabled
を true
に設定して、観測を有効にします。これにより、タイマーが監視ごとに管理されるようになるため、Micrometer タイマーが無効になります。
Micrometer 観察はバッチリスナーをサポートしていません。これにより Micrometer タイマーが有効になります |
詳細については、Micrometer トレース (英語) を参照してください。
タイマー / トレースにタグを追加するには、カスタム KafkaTemplateObservationConvention
または KafkaListenerObservationConvention
をテンプレートまたはリスナーコンテナーにそれぞれ構成します。
デフォルトの実装では、テンプレートの監視用に bean.name
タグが追加され、コンテナー用に listener.id
タグが追加されます。
DefaultKafkaTemplateObservationConvention
または DefaultKafkaListenerObservationConvention
をサブクラス化するか、まったく新しい実装を提供できます。
記録されるデフォルトの観測値の詳細については、"Micrometer 観測資料" を参照してください。
バージョン 3.0.6 以降では、コンシューマーまたはプロデューサーレコードの情報に基づいて、タイマーとトレースに動的タグを追加できます。これを行うには、カスタム KafkaListenerObservationConvention
および / または KafkaTemplateObservationConvention
をそれぞれリスナーコンテナープロパティまたは KafkaTemplate
に追加します。両方の観測コンテキストの record
プロパティには、それぞれ ConsumerRecord
または ProducerRecord
が含まれます。
送信側および受信側のコンテキスト remoteServiceName
プロパティは、Kafka clusterId
プロパティに設定されています。これは、KafkaAdmin
によって取得されます。何らかの理由 (管理者権限がないなど) でクラスター ID を取得できない場合は、バージョン 3.1 以降、KafkaAdmin
に手動で clusterId
を設定し、KafkaTemplate
およびリスナーコンテナーに挿入できます。null
(デフォルト) の場合、管理者は describeCluster
管理操作を呼び出してブローカーから取得します。