最新の安定バージョンについては、spring-cloud-stream 4.2.1 を使用してください。

ステートストア

ステートストアは、高レベル DSL が使用され、ステートストアをトリガーする適切な呼び出しが行われると、Kafka ストリームによって自動的に作成されます。

受信 KTable バインディングを名前付き状態ストアとして実体化する場合は、次の戦略を使用して実行できます。

次の機能があるとしましょう。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
   ...
}

次に、次のプロパティを設定することにより、受信 KTable データが指定された状態ストアにマテリアライズされます。

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store

アプリケーションでカスタム状態ストアを Bean として定義すると、それらが検出され、バインダーによって Kafka Streams ビルダーに追加されます。特にプロセッサー API を使用する場合は、状態ストアを手動で登録する必要があります。そのために、アプリケーションで StateStore を Bean として作成できます。このような Bean の定義例を次に示します。

@Bean
public StoreBuilder myStore() {
    return Stores.keyValueStoreBuilder(
            Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
            Serdes.Long());
}

@Bean
public StoreBuilder otherStore() {
    return Stores.windowStoreBuilder(
            Stores.persistentWindowStore("other-store",
                    1L, 3, 3L, false), Serdes.Long(),
            Serdes.Long());
}

これらの状態ストアには、アプリケーションから直接アクセスできます。

ブートストラップ中に、上記の Bean はバインダーによって処理され、Streams ビルダーオブジェクトに渡されます。

ステートストアへのアクセス:

Processor<Object, Product>() {

    WindowStore<Object, String> state;

    @Override
    public void init(ProcessorContext processorContext) {
        state = (WindowStore)processorContext.getStateStore("mystate");
    }
    ...
}

これは、グローバルステートストアの登録に関しては機能しません。グローバルステートストアを登録するには、StreamsBuilderFactoryBean のカスタマイズに関する以下のセクションを参照してください。