最新の安定バージョンについては、spring-cloud-stream 5.0.0 を使用してください。

StreamsBuilderFactoryBean コンフィギュラー

多くの場合、KafkaStreams オブジェクトを作成する StreamsBuilderFactoryBean をカスタマイズすることが必要になります。Spring Kafka によって提供される基礎的なサポートに基づいて、バインダーを使用して StreamsBuilderFactoryBean をカスタマイズできます。StreamsBuilderFactoryBeanConfigurer を使用して、StreamsBuilderFactoryBean 自体をカスタマイズできます。次に、このコンフィギュラーを通じて StreamsBuilderFactoryBean にアクセスできるようになると、KafkaStreamsCustomzier を使用して対応する KafkaStreams をカスタマイズできます。これらのカスタマイザーはどちらも Spring for Apache Kafka プロジェクトの一部です。

StreamsBuilderFactoryBeanConfigurer の使用例を次に示します。

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setStateListener((newState, oldState) -> {
         //Do some action here!
    });
}

上記は、StreamsBuilderFactoryBean をカスタマイズするためにできることの例として示されています。基本的に、StreamsBuilderFactoryBean から使用可能なミューテーション操作を呼び出してカスタマイズできます。このカスタマイザは、ファクトリ Bean が開始される直前にバインダーによって呼び出されます。

StreamsBuilderFactoryBean にアクセスできるようになったら、基になる KafkaStreams オブジェクトをカスタマイズすることもできます。これを行うための青写真があります。

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                });
            }
        });
    };
}

KafkaStreamsCustomizer は、基礎となる KafkaStreams が開始される直前に StreamsBuilderFactoryBeabn によって呼び出されます。

アプリケーション全体に存在できる StreamsBuilderFactoryBeanConfigurer は 1 つだけです。次に、複数の Kafka ストリームプロセッサーはそれぞれ個別の StreamsBuilderFactoryBean オブジェクトによってバックアップされるため、どのように説明しますか? その場合、それらのプロセッサーごとにカスタマイズを変える必要がある場合、アプリケーションはアプリケーション ID に基づいて何らかのフィルターを適用する必要があります。

たとえば

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return factoryBean -> {
        if (factoryBean.getStreamsConfiguration().getProperty(StreamsConfig.APPLICATION_ID_CONFIG)
                .equals("processor1-application-id")) {
            factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
                @Override
                public void customize(KafkaStreams kafkaStreams) {
                    kafkaStreams.setUncaughtExceptionHandler((t, e) -> {

                    });
                }
            });
        }
    };

StreamsBuilderFactoryBeanConfigurer を使用したグローバル状態ストアの登録

上で記述されていたように、バインダーはグローバル状態ストアを機能として登録するための第一級の方法を提供していません。そのためには、StreamsBuilderFactoryBeanConfigurer を介してカスタマイザーを使用する必要があります。その方法は次のとおりです。

@Bean
public StreamsBuilderFactoryBeanConfigurer customizer() {
    return streamsBuilderFactoryBean -> {
        try {
            streamsBuilderFactoryBean.setInfrastructureCustomizer(new KafkaStreamsInfrastructureCustomizer() {
                  @Override
                  public void configureBuilder(StreamsBuilder builder) {
                      builder.addGlobalStore(
                              ...
                      );
                  }
              });
        }
        catch (Exception e) {

        }
    };
}

StreamsBuilder のカスタマイズは、上記のように KafkaStreamsInfrastructureCustomizer を介して行う必要があります。StreamsBuilder オブジェクトにアクセスするために StreamsBuilderFactoryBean#getObject() が呼び出されると、Bean が初期化中である可能性があり、循環依存関係の問題が発生するため、機能しない可能性があります。

複数のプロセッサーがある場合は、上記のようにアプリケーション ID を使用して他の StreamsBuilderFactoryBean オブジェクトを除外し、グローバル状態ストアを適切な StreamsBuilder に接続する必要があります。

StreamsBuilderFactoryBeanConfigurer を使用した運用例外ハンドラーの登録

エラー処理のセクションでは、バインダーが本番例外を処理するためのファーストクラスの方法を提供しないことを示しました。その場合でも、StreamsBuilderFactoryBean カスタマイザーを使用して実動例外ハンドラーを登録することができます。下記参照。

@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
    return fb -> {
        fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
                            CustomProductionExceptionHandler.class);
    };
}

繰り返しになりますが、複数のプロセッサーがある場合は、正しい StreamsBuilderFactoryBean に対して適切に設定することをお勧めします。構成プロパティを使用してこのような本番例外ハンドラーを追加することもできますが(詳細については以下を参照)、プログラムによるアプローチを選択する場合はこれがオプションです。