最新の安定バージョンについては、spring-cloud-stream 4.2.1 を使用してください。 |
ステートストア
ステートストアは、高レベル DSL が使用され、ステートストアをトリガーする適切な呼び出しが行われると、Kafka ストリームによって自動的に作成されます。
受信 KTable
バインディングを名前付き状態ストアとして実体化する場合は、次の戦略を使用して実行できます。
次の機能があるとしましょう。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
次に、次のプロパティを設定することにより、受信 KTable
データが指定された状態ストアにマテリアライズされます。
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.materializedAs: incoming-store
アプリケーションでカスタム状態ストアを Bean として定義すると、それらが検出され、バインダーによって Kafka Streams ビルダーに追加されます。特にプロセッサー API を使用する場合は、状態ストアを手動で登録する必要があります。そのために、アプリケーションで StateStore を Bean として作成できます。このような Bean の定義例を次に示します。
@Bean
public StoreBuilder myStore() {
return Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("my-store"), Serdes.Long(),
Serdes.Long());
}
@Bean
public StoreBuilder otherStore() {
return Stores.windowStoreBuilder(
Stores.persistentWindowStore("other-store",
1L, 3, 3L, false), Serdes.Long(),
Serdes.Long());
}
これらの状態ストアには、アプリケーションから直接アクセスできます。
ブートストラップ中に、上記の Bean はバインダーによって処理され、Streams ビルダーオブジェクトに渡されます。
ステートストアへのアクセス:
Processor<Object, Product>() {
WindowStore<Object, String> state;
@Override
public void init(ProcessorContext processorContext) {
state = (WindowStore)processorContext.getStateStore("mystate");
}
...
}
これは、グローバルステートストアの登録に関しては機能しません。グローバルステートストアを登録するには、StreamsBuilderFactoryBean
のカスタマイズに関する以下のセクションを参照してください。