Kafka Streams プロセッサーを選択的に手動で開始する

上記のアプローチでは、StreamsBuilderFactoryManager を介してアプリケーション内のすべての Kafka ストリームプロセッサーに無条件に自動開始 false が適用されますが、個別に選択された Kafka ストリームプロセッサーのみが自動開始されないことが望ましい場合がよくあります。たとえば、アプリケーションに 3 つの異なる関数(プロセッサー)があり、そのうちの 1 つについて、アプリケーションの起動の一部として起動したくないとします。このような状況の例を次に示します。

@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {

}

@Bean
public Consumer<KStream<?, ?>> process2() {

}

@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {

}

上記のこのシナリオでは、spring.kafka.streams.auto-startup を false に設定すると、アプリケーションの起動中にどのプロセッサーも自動起動しなくなります。その場合、基礎となる StreamsBuilderFactoryManager で start() を呼び出すことにより、上記のようにプログラムで開始する必要があります。ただし、1 つのプロセッサーのみを選択的に無効にするユースケースがある場合は、そのプロセッサーの個々のバインディングに auto-startup を設定する必要があります。process3 関数を自動起動させたくないと仮定しましょう。これは、process3-in-0 と process3-in-1 の 2 つの入力バインディングを持つ BiFunction です。このプロセッサーの自動開始を回避するために、これらの入力バインディングのいずれかを選択して、それらに auto-startup を設定することができます。どのバインディングを選択するかは問題ではありません。必要に応じて、両方で auto-startup を false に設定できますが、1 つで十分です。これらは同じファクトリ Bean を共有しているため、両方のバインディングで autoStartup を false に設定する必要はありませんが、わかりやすくするために、そうすることはおそらく理にかなっています。

このプロセッサーの自動起動を無効にするために使用できる Spring Cloud Stream プロパティは次のとおりです。

spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false

または

spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false

次に、以下に示すように、REST エンドポイントまたは BindingsEndpoint API を使用して、プロセッサーを手動で起動できます。このためには、Spring Boot アクチュエーターがクラスパスに依存していることを確認する必要があります。

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0

または

@Autowired
BindingsEndpoint endpoint;

@Bean
public ApplicationRunner runner() {
    return args -> {
        endpoint.changeState("process3-in-0", State.STARTED);
    };
}

このメカニズムの詳細については、リファレンスドキュメントのこのセクションを参照してください。

このセクションに従って auto-startup を無効にしてバインディングを制御する場合、これはコンシューマーバインディングでのみ使用可能であることに注意してください。つまり、プロデューサーバーインディング process3-out-0 を使用する場合、このプロデューサーバーインディングはコンシューマーバインディングと同じ StreamsBuilderFactoryBean を使用しますが、プロセッサーの自動起動を無効にすることに関しては何の効果もありません。