インタラクティブクエリ

Kafka Streams バインダー API は、InteractiveQueryService と呼ばれるクラスを公開して、状態ストアをインタラクティブにクエリします。これは、アプリケーションで Spring Bean としてアクセスできます。アプリケーションからこの Bean にアクセスする簡単な方法は、Bean を autowire することです。

@Autowired
private InteractiveQueryService interactiveQueryService;

この Bean にアクセスできるようになると、関心のある特定の状態ストアを照会できます。下記参照。

ReadOnlyKeyValueStore<Object, Object> keyValueStore =
						interactiveQueryService.getQueryableStoreType("my-store", QueryableStoreTypes.keyValueStore());

起動時に、ストアを取得するための上記のメソッド呼び出しが失敗する場合があります。例: まだ状態ストアの初期化の途中である可能性があります。このような場合は、この操作を再試行すると便利です。Kafka Streams バインダーは、これに対応するための単純な再試行メカニズムを提供します。

以下は、この再試行を制御するために使用できる 2 つのプロパティです。

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.maxAttempts - デフォルトは 1 です。

  • spring.cloud.stream.kafka.streams.binder.stateStoreRetry.backOffInterval - デフォルトは 1000 ミリ秒です。

kafka ストリームアプリケーションの複数のインスタンスが実行されている場合、インタラクティブにクエリする前に、クエリしている特定のキーをホストしているアプリケーションインスタンスを特定する必要があります。InteractiveQueryService API は、ホスト情報を識別するためのメソッドを提供します。

これを機能させるには、プロパティ application.server を次のように構成する必要があります。

spring.cloud.stream.kafka.streams.binder.configuration.application.server: <server>:<port>

コードスニペットは次のとおりです。

org.apache.kafka.streams.state.HostInfo hostInfo = interactiveQueryService.getHostInfo("store-name",
						key, keySerializer);

if (interactiveQueryService.getCurrentHostInfo().equals(hostInfo)) {

    //query from the store that is locally available
}
else {
    //query from the remote host
}

これらのホスト検索メソッドの詳細については、メソッドに関する Javadoc を参照してください。これらのメソッドでも、起動時に、基になる KafkaStreams オブジェクトの準備ができていない場合、例外がスローされる可能性があります。前述の再試行プロパティは、これらのメソッドにも適用できます。

InteractiveQueryService を通じて利用可能な他の API メソッド

次の API メソッドを使用して、指定されたストアとキーの組み合わせに関連付けられた KeyQueryMetadata オブジェクトを取得します。

public <K> KeyQueryMetadata getKeyQueryMetadata(String store, K key, Serializer<K> serializer)

次の API メソッドを使用して、指定されたストアとキーの組み合わせに関連付けられた KakfaStreams オブジェクトを取得します。

public <K> KafkaStreams getKafkaStreams(String store, K key, Serializer<K> serializer)

ストアクエリパラメーターのカスタマイズ

InteractiveQueryService を介してストアにクエリを実行する前に、ストアクエリパラメーターを微調整する必要がある場合があります。この目的のために、バインダーの 4.0.1 バージョンから始めて、StoreQueryParameter を引数として取る customize メソッドとの関数インターフェースである StoreQueryParametersCustomizer 用の Bean を提供できます。これがそのメソッドシグネチャーです。

StoreQueryParameters<T> customize(StoreQueryParameters<T> storeQueryParameters);

このアプローチを使用すると、アプリケーションは古いストアを有効にするなど、StoreQueryParameters をさらにカスタマイズできます。

この Bean がこのアプリケーションに存在する場合、InteractiveQueryService は状態ストアを照会する前にその customize メソッドを呼び出します。

アプリケーションで使用可能な StoreQueryParametersCustomizer 用の一意の Bean が必要であることに注意してください。