モニター
リスナーのパフォーマンスの監視
バージョン 2.3 以降では、クラスパス上で Micrometer が検出され、アプリケーションコンテキストに単一の MeterRegistry が存在する場合、リスナーコンテナーはリスナーの Micrometer Timer を自動的に作成および更新します。ContainerProperty の micrometerEnabled ~ false を設定することでタイマーを無効にできます。
2 つのタイマーが維持されます。1 つはリスナーの呼び出しが成功した場合、もう 1 つは失敗した場合です。
タイマーには spring.kafka.listener という名前が付けられ、次のタグが付いています。
name: (コンテナー Bean 名)result:successまたはfailureexception:noneまたはListenerExecutionFailedException
ContainerProperties の micrometerTags プロパティを使用して追加のタグを追加できます。
バージョン 2.9.8、3.0.6 以降では、ContainerProperties の micrometerTagsProvider に関数を提供できます。この関数は ConsumerRecord<?, ?> を受け取り、そのレコードに基づいて micrometerTags 内の静的タグとマージできるタグを返します。
並行コンテナーを使用すると、スレッドごとにタイマーが作成され、name タグに -n という接尾辞が付けられます。ここで、n は 0 から concurrency-1 です。 |
KafkaTemplate パフォーマンスの監視
バージョン 2.5 以降では、クラスパスで Micrometer が検出され、アプリケーションコンテキストに単一の MeterRegistry が存在する場合、テンプレートは送信操作用に Micrometer Timer を自動的に作成および更新します。テンプレートの micrometerEnabled プロパティを false に設定することで、タイマーを無効にすることができます。
2 つのタイマーが維持されます。1 つはリスナーの呼び出しが成功した場合、もう 1 つは失敗した場合です。
タイマーには spring.kafka.template という名前が付けられ、次のタグが付いています。
name: (テンプレート Bean 名)result:successまたはfailureexception:noneまたは失敗時の例外クラス名
テンプレートの micrometerTags プロパティを使用して、タグを追加できます。
バージョン 2.9.8、3.0.6 以降では、KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) プロパティを提供できます。この関数は ProducerRecord<?, ?> を受け取り、そのレコードに基づいて micrometerTags 内の静的タグとマージできるタグを返します。
Micrometer ネイティブメトリクス
バージョン 2.5 以降、フレームワークは、プロデューサーとコンシューマーが作成およびクローズされるたびに Micrometer KafkaClientMetrics インスタンスを管理するファクトリリスナーを提供します。
この機能を有効にするには、リスナーをプロデューサーおよびコンシューマーファクトリに追加するだけです。
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
} リスナーに渡されたコンシューマー / プロデューサー id は、タグ名 spring.id でメーターのタグに追加されます。
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();StreamsBuilderFactoryBean にも同様のリスナーが提供されています - KafkaStreams Micrometer サポートを参照してください。
バージョン 3.3 以降、提供された Kafka クライアントの MeterRegistry への io.micrometer.core.instrument.binder.kafka.KafkaMetrics バインディングを管理するために、KafkaMetricsSupport 抽象クラスが導入されました。このクラスは、前述の MicrometerConsumerListener、MicrometerProducerListener、KafkaStreamsMicrometerListener のスーパーです。ただし、Kafka クライアントのあらゆるユースケースに使用できます。クラスを継承し、その bindClient() および unbindClient() API を呼び出して、Kafka クライアントメトリクスを Micrometer コレクターに接続する必要があります。
Micrometer Observation
バージョン 3.0 以降、KafkaTemplate およびリスナーコンテナーの監視に Micrometer を使用することがサポートされるようになりました。
KafkaTemplate と ContainerProperties の observationEnabled を true に設定して、観測を有効にします。これにより、タイマーが監視ごとに管理されるようになるため、Micrometer タイマーが無効になります。
| Micrometer 観測はバッチリスナーをサポートしていません。これにより Micrometer タイマーが有効になります |
詳細については、Micrometer トレース (英語) を参照してください。
タイマー / トレースにタグを追加するには、カスタム KafkaTemplateObservationConvention または KafkaListenerObservationConvention をテンプレートまたはリスナーコンテナーにそれぞれ構成します。
デフォルトの実装では、テンプレートの監視用に bean.name タグが追加され、コンテナー用に listener.id タグが追加されます。
DefaultKafkaTemplateObservationConvention または DefaultKafkaListenerObservationConvention をサブクラス化するか、まったく新しい実装を提供できます。
記録されるデフォルトの観測値の詳細については、"Micrometer 観測資料" を参照してください。
バージョン 3.0.6 以降では、コンシューマーまたはプロデューサーレコードの情報に基づいて、タイマーとトレースに動的タグを追加できます。これを行うには、カスタム KafkaListenerObservationConvention および / または KafkaTemplateObservationConvention をそれぞれリスナーコンテナープロパティまたは KafkaTemplate に追加します。両方の観測コンテキストの record プロパティには、それぞれ ConsumerRecord または ProducerRecord が含まれます。
送信側および受信側のコンテキスト remoteServiceName プロパティは、Kafka clusterId プロパティに設定されています。これは、KafkaAdmin によって取得されます。何らかの理由 (管理者権限がないなど) でクラスター ID を取得できない場合は、バージョン 3.1 以降、KafkaAdmin に手動で clusterId を設定し、KafkaTemplate およびリスナーコンテナーに挿入できます。null (デフォルト) の場合、管理者は describeCluster 管理操作を呼び出してブローカーから取得します。
バッチリスナーの観測
バッチリスナーを使用する場合、デフォルトでは、ObservationRegistry が存在する場合でも、監視は作成されません。これは、監視のスコープがスレッドに関連付けられており、バッチリスナーでは監視とレコードの間に 1 対 1 のマッピングが存在しないためです。
バッチリスナーでレコードごとの監視を有効にするには、コンテナーファクトリプロパティ recordObservationsInBatch を true に設定します。
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setRecordObservationsInBatch(true);
return factory;
} このプロパティが true の場合、バッチ内の各レコードに対して監視が作成されますが、その監視はリスナーメソッドには伝播されません。アプリケーションは監視コンテキストを使用して、バッチ内の各レコードの処理を追跡できます。これにより、バッチコンテキスト内であっても、各レコードの処理状況を可視化できます。