リアクティブ Kafka バインダーにおける可観測性
このセクションでは、リアクティブ Kafka バインダーでマイクロメータベースの観測可能性を有効にする方法について説明します。
プロデューサーバーインディング
プロデューサーバーインディングには、監視可能性のサポートが組み込まれています。これを有効にするには、次のプロパティを設定します。
spring.cloud.stream.kafka.binder.enable-observation このプロパティを true に設定すると、レコードの公開を監視できます。StreamBridge を使用した公開レコードと通常の Supplier<?> Bean の両方を監視できます。
コンシューマー拘束
コンシューマー側で可観測性を有効にするのは、プロデューサー側よりも複雑です。コンシューマーバインディングの開始点は 2 つあります。
プロデューサーバーインディングを介してデータが公開されるトピック
Spring Cloud Stream 外でデータが生成されるトピック
最初のケースでは、アプリケーションは理想的には、監視ヘッダーをコンシューマーの受信側に伝達することを望みます。2 番目のケースでは、上流の監視が開始されていない場合は、新しい監視が開始されます。
サンプル: 可観測性を備えた機能
@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {
return s -> s.flatMap(record -> {
Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
observationRegistry
);
return Mono.deferContextual(contextView -> Mono.just(record)
.map(rec -> new String(rec.value()).toLowerCase())
.map(rec -> MessageBuilder.withPayload(rec)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
.build()))
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
});
}この例では:
レコードが受信されると、観測が作成されます。
上流観測がある場合、
KafkaRecordReceiverContextの一部になります。Monoはコンテキストを遅延させて作成されます。map操作が呼び出されると、コンテキストは正しい観測値にアクセスできるようになります。flatMap操作の結果は、Flux<Message<?>>としてバインディングに送り返されます。送信レコードには、入力バインディングと同じ監視ヘッダーが含まれます。
サンプル: 可観測性を備えたコンシューマー
@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
observationRegistry).observe(() -> System.out.println(record)))
.subscribe();
}この場合:
出力バインディングがないため、
FluxではflatMapではなくdoOnNextが使用されます。observeを直接呼び出すと、観測が開始され、終了すると適切にシャットダウンされます。