Kafka Streams プロセッサーを選択的に手動で開始する
上記のアプローチでは、StreamsBuilderFactoryManager
を介してアプリケーション内のすべての Kafka ストリームプロセッサーに無条件に自動開始 false
が適用されますが、個別に選択された Kafka ストリームプロセッサーのみが自動開始されないことが望ましい場合がよくあります。たとえば、アプリケーションに 3 つの異なる関数(プロセッサー)があり、そのうちの 1 つについて、アプリケーションの起動の一部として起動したくないとします。このような状況の例を次に示します。
@Bean
public Function<KStream<?, ?>, KStream<?, ?>> process1() {
}
@Bean
public Consumer<KStream<?, ?>> process2() {
}
@Bean
public BiFunction<KStream<?, ?>, KTable<?, ?>, KStream<?, ?>> process3() {
}
上記のこのシナリオでは、spring.kafka.streams.auto-startup
を false
に設定すると、アプリケーションの起動中にどのプロセッサーも自動起動しなくなります。その場合、基礎となる StreamsBuilderFactoryManager
で start()
を呼び出すことにより、上記のようにプログラムで開始する必要があります。ただし、1 つのプロセッサーのみを選択的に無効にするユースケースがある場合は、そのプロセッサーの個々のバインディングに auto-startup
を設定する必要があります。process3
関数を自動起動させたくないと仮定しましょう。これは、process3-in-0
と process3-in-1
の 2 つの入力バインディングを持つ BiFunction
です。このプロセッサーの自動開始を回避するために、これらの入力バインディングのいずれかを選択して、それらに auto-startup
を設定することができます。どのバインディングを選択するかは問題ではありません。必要に応じて、両方で auto-startup
を false
に設定できますが、1 つで十分です。これらは同じファクトリ Bean を共有しているため、両方のバインディングで autoStartup を false に設定する必要はありませんが、わかりやすくするために、そうすることはおそらく理にかなっています。
このプロセッサーの自動起動を無効にするために使用できる Spring Cloud Stream プロパティは次のとおりです。
spring.cloud.stream.bindings.process3-in-0.consumer.auto-startup: false
または
spring.cloud.stream.bindings.process3-in-1.consumer.auto-startup: false
次に、以下に示すように、REST エンドポイントまたは BindingsEndpoint
API を使用して、プロセッサーを手動で起動できます。このためには、Spring Boot アクチュエーターがクラスパスに依存していることを確認する必要があります。
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process3-in-0
または
@Autowired
BindingsEndpoint endpoint;
@Bean
public ApplicationRunner runner() {
return args -> {
endpoint.changeState("process3-in-0", State.STARTED);
};
}
このメカニズムの詳細については、リファレンスドキュメントのこのセクションを参照してください。
このセクションに従って auto-startup を無効にしてバインディングを制御する場合、これはコンシューマーバインディングでのみ使用可能であることに注意してください。つまり、プロデューサーバーインディング process3-out-0 を使用する場合、このプロデューサーバーインディングはコンシューマーバインディングと同じ StreamsBuilderFactoryBean を使用しますが、プロセッサーの自動起動を無効にすることに関しては何の効果もありません。 |