リアクティブ Kafka バインダーにおける可観測性

このセクションでは、リアクティブ Kafka バインダーでマイクロメータベースの観測可能性を有効にする方法について説明します。

プロデューサーバーインディング

プロデューサーバーインディングには、監視可能性のサポートが組み込まれています。これを有効にするには、次のプロパティを設定します。

spring.cloud.stream.kafka.binder.enable-observation

このプロパティを true に設定すると、レコードの公開を監視できます。StreamBridge を使用した公開レコードと通常の Supplier<?> Bean の両方を監視できます。

コンシューマー拘束

コンシューマー側で可観測性を有効にするのは、プロデューサー側よりも複雑です。コンシューマーバインディングの開始点は 2 つあります。

  1. プロデューサーバーインディングを介してデータが公開されるトピック

  2. Spring Cloud Stream 外でデータが生成されるトピック

最初のケースでは、アプリケーションは理想的には、監視ヘッダーをコンシューマーの受信側に伝達することを望みます。2 番目のケースでは、上流の監視が開始されていない場合は、新しい監視が開始されます。

サンプル: 可観測性を備えた機能

@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {

	return s -> s.flatMap(record -> {
		Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
		null,
		KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
		() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
		observationRegistry
		);

		return Mono.deferContextual(contextView -> Mono.just(record)
			.map(rec -> new String(rec.value()).toLowerCase())
			.map(rec -> MessageBuilder.withPayload(rec)
				.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
				.build()))
			.doOnTerminate(receiverObservation::stop)
			.doOnError(receiverObservation::error)
			.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
    });
}

この例では:

  1. レコードが受信されると、観測が作成されます。

  2. 上流観測がある場合、KafkaRecordReceiverContext の一部になります。

  3. Mono はコンテキストを遅延させて作成されます。

  4. map 操作が呼び出されると、コンテキストは正しい観測値にアクセスできるようになります。

  5. flatMap 操作の結果は、Flux<Message<?>> としてバインディングに送り返されます。

  6. 送信レコードには、入力バインディングと同じ監視ヘッダーが含まれます。

サンプル: 可観測性を備えたコンシューマー

@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
	return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
			null,
			KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
			() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
			observationRegistry).observe(() -> System.out.println(record)))
		.subscribe();
}

この場合:

  1. 出力バインディングがないため、Flux では flatMap ではなく doOnNext が使用されます。

  2. observe を直接呼び出すと、観測が開始され、終了すると適切にシャットダウンされます。