Kafka Streams プロセッサーを手動で開始する

Spring Cloud Stream Kafka Streams バインダーは、Spring for Apache Kafka の StreamsBuilderFactoryBean の上に StreamsBuilderFactoryManager と呼ばれる抽象化を提供します。このマネージャー API は、バインダーベースのアプリケーションでプロセッサーごとに複数の StreamsBuilderFactoryBean を制御するために使用されます。バインダーを使用するときに、アプリケーションでさまざまな StreamsBuilderFactoryBean オブジェクトの自動起動を手動で制御する場合は、StreamsBuilderFactoryManager を使用する必要があります。プロセッサーの自動始動をオフにするために、プロパティ spring.kafka.streams.auto-startup を使用し、これを false に設定することができます。次に、アプリケーションで、以下のようなものを使用して、StreamsBuilderFactoryManager を使用してプロセッサーを起動できます。

@Bean
public ApplicationRunner runner(StreamsBuilderFactoryManager sbfm) {
    return args -> {
        sbfm.start();
    };
}

この機能は、アプリケーションをメインスレッドで起動し、Kafka Streams プロセッサーを個別に起動させる場合に便利です。例: 復元する必要のある大きな状態ストアがある場合、デフォルトの場合のようにプロセッサーが正常に起動すると、アプリケーションの起動がブロックされる可能性があります。ある種のライブネスプローブメカニズム(Kubernetes など)を使用している場合、アプリケーションがダウンしていると見なして再起動を試みる場合があります。これを修正するには、spring.kafka.streams.auto-startup を false に設定し、上記のアプローチに従うことができます。

Spring Cloud Stream バインダーを使用する場合、StreamsBuilderFactoryBean オブジェクトはバインダーによって内部的に管理されるため、Spring for Apache Kafka から StreamsBuilderFactoryBean を直接処理するのではなく、StreamsBuilderFactoryManager を処理することに注意してください。