Kafka Streams バインダーでの結合の視覚化と制御

バージョン 3.1.2 以降、Kafka Streams バインダーはバインディングの視覚化と制御をサポートします。サポートされているライフサイクルフェーズは、STOPPED と STARTED の 2 つだけです。ライフサイクルフェーズ PAUSED および RESUMED は、Kafka Streams バインダーでは使用できません。

バインディングの視覚化と制御をアクティブにするには、アプリケーションに次の 2 つの依存関係を含める必要があります。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-web</artifactId>
</dependency>

webflux を使用したい場合は、標準の Web 依存関係の代わりに spring-boot-starter-webflux を含めることができます。

さらに、次のプロパティも設定する必要があります。

management.endpoints.web.exposure.include=bindings

この機能をさらに説明するために、次のアプリケーションをガイドとして使用してみましょう。

@SpringBootApplication
public class KafkaStreamsApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaStreamsApplication.class, args);
	}

	@Bean
	public Consumer<KStream<String, String>> consumer() {
		return s -> s.foreach((key, value) -> System.out.println(value));
	}

	@Bean
	public Function<KStream<String, String>, KStream<String, String>> function() {
		return ks -> ks;
	}

}

ご覧のとおり、アプリケーションには 2 つの Kafka ストリーム関数があります。1 つはコンシューマー関数で、もう 1 つは関数です。コンシューマーバインディングは、デフォルトで consumer-in-0 という名前が付けられています。同様に、関数の場合、入力バインディングは function-in-0 であり、出力バインディングは function-out-0 です。

アプリケーションが開始されると、次のバインディングエンドポイントを使用してバインディングの詳細を見つけることができます。

 curl http://localhost:8080/actuator/bindings | jq .
[
  {
    "bindingName": "consumer-in-0",
    "name": "consumer-in-0",
    "group": "consumer-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-in-0",
    "name": "function-in-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": true,
    "extendedInfo": {}
  },
  {
    "bindingName": "function-out-0",
    "name": "function-out-0",
    "group": "function-applicationId",
    "pausable": false,
    "state": "running",
    "paused": false,
    "input": false,
    "extendedInfo": {}
  }
]

3 つのバインディングすべての詳細については、上記を参照してください。

ここで、consumer-in-0 バインディングを停止しましょう。

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0

こでは、このバインディングを介してレコードは受信されません。

バインディングを再開します。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/consumer-in-0

1 つの関数に複数のバインディングが存在する場合、それらのバインディングのいずれかでこれらの操作を呼び出すことが機能します。これは、単一の関数のすべてのバインディングが同じ StreamsBuilderFactoryBean によってサポートされているためです。上記の機能では、function-in-0 または function-out-0 のいずれかが機能します。