このバージョンはまだ開発中であり、まだ安定しているとは見なされていません。最新の安定バージョンについては、Spring for Apache Kafka 4.0.5 を使用してください!

Apache Kafka ストリームのサポート

バージョン 1.1.4 以降、Spring for Apache Kafka は Kafka ストリーム [Apache] (英語) のファーストクラスのサポートを提供します。Spring アプリケーションから使用するには、kafka-streams jar がクラスパスに存在する必要があります。これは Spring for Apache Kafka プロジェクトのオプションの依存関係であり、推移的にダウンロードされることはありません。

基本

リファレンス Apache Kafka Streams ドキュメントでは、API を使用する次の方法が提案されています。

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

2 つの主要なコンポーネントがあります。

  • StreamsBuilderKStream (または KTable) インスタンスを構築するための API を使用します。

  • KafkaStreams: それらのインスタンスのライフサイクルを管理するため。

単一の StreamsBuilder によって KafkaStreams インスタンスに公開されるすべての KStream インスタンスは、ロジックが異なっていても、同時に開始および停止されます。つまり、StreamsBuilder によって定義されるすべてのストリームは、単一のライフサイクルコントロールに関連付けられます。KafkaStreams インスタンスが streams.close() によって閉じられると、再起動することはできません。代わりに、ストリーム処理を再開するための新しい KafkaStreams インスタンスを作成する必要があります。

Spring 管理

Spring アプリケーションコンテキストの観点から Kafka ストリームの使用を簡素化し、コンテナーを介したライフサイクル管理を使用するために、Spring for Apache Kafka には StreamsBuilderFactoryBean が導入されています。これは、StreamsBuilder シングルトンインスタンスを Bean として公開する AbstractFactoryBean 実装です。次の例では、そのような Bean を作成します。

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
バージョン 2.2 以降、ストリーム設定は StreamsConfig ではなく KafkaStreamsConfiguration オブジェクトとして提供されるようになりました。

StreamsBuilderFactoryBean は、内部 KafkaStreams インスタンスのライフサイクルを管理するために SmartLifecycle も実装します。Kafka ストリーム API と同様に、KafkaStreams を開始する前に KStream インスタンスを定義する必要があります。これは、Kafka ストリームの Spring API にも当てはまります。StreamsBuilderFactoryBean でデフォルトの autoStartup = true を使用する場合、アプリケーションコンテキストがリフレッシュされる前に StreamsBuilder で KStream インスタンスを宣言する必要があります。例: KStream は通常の Bean 定義にすることができますが、Kafka ストリーム API は影響なしに使用されます。次の例は、その方法を示しています。

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

ライフサイクルを手動で制御する場合 (たとえば、何らかの条件による停止と開始)、ファクトリ Bean (&) プレフィックスを使用して、StreamsBuilderFactoryBean Bean を直接参照できます。StreamsBuilderFactoryBean は内部 KafkaStreams インスタンスを使用するため、安全に停止して再起動できます。新しい KafkaStreams が各 start() 上に作成されます。KStream インスタンスのライフサイクルを個別に制御したい場合は、別の StreamsBuilderFactoryBean インスタンスの使用を検討することもできます。

また、StreamsBuilderFactoryBean で KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListener オプションを指定することもできます。これは、内部 KafkaStreams インスタンスに委譲されます。

また、これらのオプションを StreamsBuilderFactoryBean で間接的に設定すること以外に、KafkaStreamsCustomizer コールバックインターフェースを使用して次の操作を行うこともできます。

  1. ( バージョン 2.1.5 から) customize(KafkaStreams) を使用して内部 KafkaStreams インスタンスを構成する

  2. ( バージョン 3.3.0 から) initKafkaStreams(Topology, Properties, KafkaClientSupplier) を使用して KafkaStreams のカスタム実装をインスタンス化します

KafkaStreamsCustomizer は StreamsBuilderFactoryBean によって提供されるオプションをオーバーライドすることに注意してください。

いくつかの KafkaStreams 操作を直接実行する必要がある場合は、StreamsBuilderFactoryBean.getKafkaStreams() を使用してその内部 KafkaStreams インスタンスにアクセスできます。

StreamsBuilderFactoryBean Bean を型別にオートワイヤーできますが、次の例に示すように、Bean 定義では完全な型を使用する必要があります。

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

または、インターフェース Bean 定義を使用する場合は、名前で @Qualifier をインジェクション用に追加できます。次の例は、その方法を示しています。

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

バージョン 2.4.1 以降、ファクトリ Bean には型 KafkaStreamsInfrastructureCustomizer の新しいプロパティ infrastructureCustomizer があります。これにより、ストリームが作成される前に、StreamsBuilder (状態ストアを追加するなど) および / または Topology のカスタマイズが可能になります。

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

デフォルトの no-op 実装は、どちらかが必要でない場合に両方のメソッドを実装する必要がないように提供されています。

複数のカスタマイザを適用する必要がある場合のために、CompositeKafkaStreamsInfrastructureCustomizer が提供されます。

KafkaStreams Micrometer サポート

バージョン 2.5.3 で導入され、ファクトリ Bean によって管理される KafkaStreams オブジェクトの micrometer メーターを自動的に登録するように KafkaStreamsMicrometerListener を構成できます。

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

JSON の直列化と逆直列化をストリーミングします

JSON 形式でトピックまたは状態ストアを読み書きするときにデータをシリアライズおよびデシリアライズするために、Spring for Apache Kafka は、JSON を使用する JacksonJsonSerde 実装を提供し、直列化、逆直列化、メッセージ変換で説明されている JacksonJsonSerializer および JacksonJsonDeserializer に委譲します。JacksonJsonSerde 実装は、コンストラクター (ターゲット型または ObjectMapper) を通じて同じ構成オプションを提供します。次の例では、JacksonJsonSerde を使用して Kafka ストリームの Cat ペイロードをシリアライズおよびデシリアライズします (インスタンスが必要な場合はいつでも JacksonJsonSerde を同様の方法で使用できます)。

stream.through(Serdes.Integer(), new JacksonJsonSerde<>(Cat.class), "cats");

プロデューサー / コンシューマーファクトリで使用するためにシリアライザー / デシリアライザーをプログラムで構築する場合、バージョン 2.3 以降、流れるような API を使用できるため、構成が簡単になります。

stream.through(
    new JacksonJsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JacksonJsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

KafkaStreamBrancher を使用する

KafkaStreamBrancher クラスは、KStream の上に条件付き ブランチを構築するためのより便利な方法を導入します。

KafkaStreamBrancher を使用しない次の例を考えてみましょう。

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

次の例では、KafkaStreamBrancher を使用しています。

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

構成

Kafka ストリーム環境を構成するには、StreamsBuilderFactoryBean に KafkaStreamsConfiguration インスタンスが必要です。すべての可能なオプションについては、Apache Kafka のドキュメント [Apache] (英語) を参照してください。

バージョン 2.2 以降、ストリーム設定は StreamsConfig ではなく KafkaStreamsConfiguration オブジェクトとして提供されるようになりました。

特にマイクロサービスを開発する場合、ほとんどの場合、定型コードを回避するために、Spring for Apache Kafka は @EnableKafkaStreams アノテーションを提供します。これは、@Configuration クラスに配置する必要があります。必要なのは、defaultKafkaStreamsConfig という名前の KafkaStreamsConfiguration Bean を宣言することだけです。defaultKafkaStreamsBuilder という名前の StreamsBuilderFactoryBean Bean は、アプリケーションコンテキストで自動的に宣言されます。追加の StreamsBuilderFactoryBean Bean を宣言して使用することもできます。StreamsBuilderFactoryBeanConfigurer を実装する Bean を提供することにより、その Bean の追加のカスタマイズを実行できます。そのような Bean が複数ある場合、Ordered.order プロパティに従って適用されます。

クリーンアップと停止の構成

ファクトリが停止すると、KafkaStreams.close() が 2 つのパラメーターで呼び出されます。

  • closeTimeout : スレッドがシャットダウンするまでの待機時間(デフォルトは DEFAULT_CLOSE_TIMEOUT で 10 秒に設定されています)。StreamsBuilderFactoryBean.setCloseTimeout() を使用して設定できます。

  • グループを閉じる: グループからのコンシューマーの離脱呼び出しをトリガーします (デフォルトは false)。StreamsBuilderFactoryBean.setLeaveGroupOnClose() を使用して構成できます。

デフォルトでは、ファクトリ Bean が停止すると、KafkaStreams.cleanUp() メソッドが呼び出されます。バージョン 2.1.2 以降、ファクトリ Bean には追加のコンストラクターがあり、start() または stop() 中に cleanUp() メソッドを呼び出すかどうかを制御できるプロパティを持つ CleanupConfig オブジェクトを取得します。バージョン 2.7 以降、デフォルトではローカル状態をクリーンアップしません。

グループプロトコル構成

バージョン 4.1 以降では、groupProtocol プロパティを設定することで、基盤となる Kafka Streams コンシューマーが使用するコンシューマーグループプロトコルを構成できます。これにより、コンシューマーが classic プロトコルに依存するか、KIP-1071 (Streams のサーバー側リバランス、Kafka 4.2 で GA) で導入された新しい streams グループプロトコルを使用するかを明示的に管理できます。これは新しいクラスターではデフォルトで有効になっていますが、既存のクラスターではこのプロパティを使用して有効にすることができます。StreamsBuilderFactoryBean.setGroupProtocol() を使用して構成できます。

以下の例は、StreamsBuilderFactoryBeanConfigurer を使用してグループプロトコルを設定する方法を示しています。

@Bean
public StreamsBuilderFactoryBeanConfigurer groupProtocolConfigurer() {
    return fb -> fb.setGroupProtocol(GroupProtocol.STREAMS);
}

ヘッダーエンリッチャー

バージョン 3.0 は ContextualProcessor の HeaderEnricherProcessor 拡張を追加しました。非推奨の Transformer インターフェースを実装した非推奨の HeaderEnricher と同じ機能を提供します。これは、ストリーム処理内でヘッダーを追加するために使用できます。ヘッダー値は SpEL 式です。式評価のルートオブジェクトには 3 つのプロパティがあります。

  • record - org.apache.kafka.streams.processor.api.Record (keyvaluetimestampheaders)

  • key - 現在のレコードのキー

  • value - 現在のレコードの値

  • context - ProcessorContext、現在のレコードのメタデータへのアクセスを許可します

式は byte[] または String ( UTF-8 を使用して byte[] に変換される) を返す必要があります。

ストリーム内でエンリッチャーを使用するには:

.process(() -> new HeaderEnricherProcessor(expressions))

プロセッサーは key または value を変更しません。ヘッダーを追加するだけです。

レコードごとに新しいインスタンスが必要です。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

これは簡単な例で、1 つのリテラルヘッダーと 1 つの変数を追加します。

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

バージョン 3.0 では、ContextualProcessor の MessagingProcessor 拡張機能が追加され、非推奨の Transformer インターフェースを実装した非推奨の MessagingTransformer と同じ機能が提供されました。これにより、Kafka ストリームトポロジが Spring Integration フローなどの Spring メッセージングコンポーネントと対話できるようになります。トランスには MessagingFunction の実装が必要です。

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration は、その GatewayProxyFactoryBean を使用した実装を自動的に提供します。また、キー、値、メタデータ (ヘッダーを含む) を Spring メッセージング Message<?> との間で変換するために MessagingMessageConverter が必要です。詳細については、KStream から Spring Integration フローを呼び出すを参照してください。

回復戦略

このフレームワークは、同じ回復戦略に従う以下の例外ハンドラーを提供します。

復旧戦略は、優先順位順に以下のとおりです。

  • KafkaStreamsDeadLetterDestinationResolver が定義されている場合は、ストリームを再開し、ネイティブの Kafka Streams DLQ を使用して、失敗したレコードを解決済みのトピックパーティションに転送します。

  • errors.dead.letter.queue.topic.name が定義され、トピック名に設定されている場合、ネイティブの Kafka Streams DLQ を使用してストリームを再開し、失敗したレコードをそのトピックに転送します。

  • If a ConsumerRecordRecoverer implementation is defined, invoke it and resume the stream without producing dead-letter records, as handling is delegated to the ConsumerRecordRecoverer. For example, the provided DeadLetterPublishingRecoverer implementation can be used.

  • Fail the stream without producing dead-letter records.

When a dead-letter record is published to a dead-letter topic, whether through the native Kafka Streams DLQ or via a ConsumerRecordRecoverer implementation, the record is enriched with the Spring for Apache Kafka DLT headers.

デシリアライズ例外からの回復

Version 2.3 introduced the RecoveringDeserializationExceptionHandler, which can take some action when a deserialization exception occurs. It implements the DeserializationExceptionHandler interface (refer to the Kafka documentation for details) and follows the Spring for Apache Kafka recovery strategies.

To configure the RecoveringDeserializationExceptionHandler, add the following property to your streams configuration:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    ...
    return new KafkaStreamsConfiguration(props);
}

Recovery from Processing Exceptions

Version 4.1 introduces the RecoveringProcessingExceptionHandler, which can take some action when an exception occurs during stream processing. It implements the ProcessingExceptionHandler interface (refer to the Kafka documentation for details), introduced by KIP-1033 [Apache] (英語) and follows the Spring for Apache Kafka recovery strategies.

To enable the RecoveringProcessingExceptionHandler, add the following property to your streams configuration:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringProcessingExceptionHandler.class);
    ...
}

Recovery from Production Exceptions

Version 4.1 introduces the RecoveringProductionExceptionHandler, which can take some action when an exception occurs during record production or serialization. It implements the ProductionExceptionHandler interface (refer to the Kafka documentation for details) and follows the Spring for Apache Kafka recovery strategies.

To configure the RecoveringProductionExceptionHandler, add the following property to your streams configuration:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringProductionExceptionHandler.class);
    ...
}

Dead Letter Publishing Recoverer

The framework provides the DeadLetterPublishingRecoverer, which sends the failed record to a dead-letter topic. See Publishing Dead-letter records for more information about this recoverer.

To configure the recoverer for an exception handler, add the following properties to your streams configuration:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

もちろん、recoverer() Bean は ConsumerRecordRecoverer の独自の実装にすることができます。

Dead Letter Destination Resolver

A bean of type KafkaStreamsDeadLetterDestinationResolver can be defined to activate native DLQ routing in the exception handlers.

It determines the DLQ topic name and partition resolution logic to use, based on the error handler context, the input record of the failed processor, and the exception thrown:

@Bean
public KafkaStreamsDeadLetterDestinationResolver resolver() {
    return (context, record, ex) -> {
        if (ex instanceof FooException) return new TopicPartition("dlqTopic1", -1);
        if (record instanceof Foo) return new TopicPartition("dlqTopic2", -1);
        if (context.processorNodeId().equals("processor-1")) return new TopicPartition("dlqTopic3", -1);
        return new TopicPartition("defaultDlqTopic", -1);
    };
}

A negative partition number indicates that the partition will be determined by the default partitioner.

The KafkaStreamsDeadLetterDestinationResolver can then be injected into the exception handlers as follows:

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringProcessingExceptionHandler.class);
    props.put(RecoveringProcessingExceptionHandler.DLQ_DESTINATION_RESOLVER, resolver());
    ...
    return new KafkaStreamsConfiguration(props);
}

Dead Letter Queue Topic Name Property

The Kafka Streams property errors.dead.letter.queue.topic.name can be defined and set to a topic name to activate native DLQ routing in the exception handlers. This directly specifies the DLQ topic to which the enabled exception handlers will route all failed records.

Alternatively, this can be set programmatically through the StreamsBuilderFactoryBean:

@Bean
public StreamsBuilderFactoryBeanConfigurer streamsBuilderFactoryBeanConfigurer() {
    return sfb -> sfb.setDeadLetterTopicName("deadLetterQueueTopic");
}

インタラクティブクエリのサポート

バージョン 3.2 以降、Spring for Apache Kafka は Kafka Streams の対話型クエリに必要な基本機能を提供します。対話型クエリは、アプリケーション内のステートフルストアを継続的にクエリする方法を提供するため、ステートフル Kafka Streams アプリケーションで役立ちます。アプリケーションが対象システムの現在のビューを具体化する必要がある場合、対話型クエリはその方法を提供します。対話型クエリの詳細については、この記事 [Apache] (英語) を参照してください。Spring for Apache Kafka のサポートは、KafkaStreamsInteractiveQueryService と呼ばれる API を中心に展開されています。これは、Kafka Streams ライブラリの対話型クエリ API のファサードです。アプリケーションは、このサービスのインスタンスを Bean として作成し、後でそれを使用して名前で状態ストアを取得できます。

次のコードスニペットに例を示します。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

Kafka ストリームアプリケーションに app-store という状態ストアがあると仮定すると、そのストアは以下に示すように KafkaStreamsInteractiveQuery API を介して取得できます。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

アプリケーションが状態ストアにアクセスできるようになると、そこからキーと値の情報を照会できるようになります。

この場合、アプリケーションが使用する状態ストアは読み取り専用のキー値ストアです。Kafka Streams アプリケーションが使用できる状態ストアには、他にも種類があります。たとえば、アプリケーションがウィンドウベースのストアをクエリする場合、Kafka Streams アプリケーションのビジネスロジックでそのストアを構築し、後で取得することができます。このため、KafkaStreamsInteractiveQueryService でクエリ可能なストアを取得するための API には汎用ストア型 シグネチャーがあり、エンドユーザーが適切な型を割り当てることができます。

以下は API からの型署名です。

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

このメソッドを呼び出すとき、ユーザーは上記の例のように適切な状態ストア型を具体的に要求できます。

状態ストアの取得を再試行しています

KafkaStreamsInteractiveQueryService を使用して状態ストアを取得しようとすると、さまざまな理由で状態ストアが見つからない可能性があります。これらの理由が一時的なものである場合、KafkaStreamsInteractiveQueryService はカスタム RetryTemplate を挿入できるようにすることで状態ストアの取得を再試行するオプションを提供します。デフォルトでは、KafkaStreamsInteractiveQueryService で使用される RetryTemplate は、1 秒の固定バックオフで最大 3 回の試行を使用します。

最大試行回数 10 回でカスタム RetryTemplate を KafkaStreamsInteractiveQueryService に挿入する方法を次に示します。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

リモート状態ストアのクエリ

状態ストア retrieveQueryableStore を取得するための上記の API は、ローカルで使用可能なキー値状態ストアを対象としています。本番環境では、Kafka Streams アプリケーションはパーティションの数に基づいて分散される可能性が最も高くなります。トピックに 4 つのパーティションがあり、同じ Kafka Streams プロセッサーのインスタンスが 4 つ実行されている場合、各インスタンスはトピックの 1 つのパーティションの処理を担当する可能性があります。このシナリオでは、retrieveQueryableStore を呼び出しても、有効なストアが返される可能性はありますが、インスタンスが探している正しい結果が得られない可能性があります。4 つのパーティションがあるトピックにさまざまなキーに関するデータがあり、1 つのパーティションが常に特定のキーを担当していると仮定します。retrieveQueryableStore を呼び出しているインスタンスが、このインスタンスがホストしていないキーに関する情報を探している場合、データは受信されません。これは、現在の Kafka Streams インスタンスがこのキーについて何も知らないためです。これを修正するには、呼び出し元のインスタンスがまず、特定のキーがホストされている Kafka Streams プロセッサーインスタンスのホスト情報を持っていることを確認する必要があります。これは、以下のように、同じ application.id にある任意の Kafka ストリームインスタンスから取得できます。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

上記のサンプルコードでは、呼び出しインスタンスが app-store という状態ストアから特定のキー 12345 を照会しています。API には対応するキーシリアライザーも必要で、この場合は IntegerSerializer です。Kafka Streams は、同じ application.id にあるすべてのインスタンスを調べて、どのインスタンスがこの特定のキーをホストしているかを見つけようとします。見つかったら、そのホスト情報を HostInfo オブジェクトとして返します。

API は次のようになります。

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

このように分散方式で同じ application.id の Kafka ストリームプロセッサーの複数のインスタンスを使用する場合、アプリケーションは、REST エンドポイントなどの RPC エンドポイントを介して状態ストアをクエリできる RPC レイヤーを提供する必要があります。詳細については、この記事 [Apache] (英語) を参照してください。Spring for Apache Kafka を使用する場合、spring-web テクノロジを使用して Spring ベースの REST エンドポイントを追加するのは非常に簡単です。REST エンドポイントが存在すると、キーがホストされている HostInfo がインスタンスに認識されていれば、そのエンドポイントを使用して任意の Kafka ストリームインスタンスから状態ストアをクエリできます。

インスタンスをホストしているキーが現在のインスタンスである場合、アプリケーションは RPC メカニズムを呼び出す必要はなく、JVM 内で呼び出しを行う必要があります。ただし、課題は、特定のサーバーがコンシューマーの再バランスのためにパーティションを失う可能性があるため、呼び出しを行っているインスタンスがキーがホストされている場所であることをアプリケーションが認識できない可能性があることです。この課題を解決するために、KafkaStreamsInteractiveQueryService は、現在の HostInfo を返す API メソッド getCurrentKafkaStreamsApplicationHostInfo() を介して現在のホスト情報を照会するための便利な API を提供します。その考え方は、アプリケーションが最初にキーが保持されている場所に関する情報を取得し、次に HostInfo を現在のインスタンスに関する情報と比較するというものです。HostInfo データが一致する場合は、retrieveQueryableStore を介して単純な JVM 呼び出しを続行でき、それ以外の場合は RPC オプションに進みます。

Kafka ストリームの例

次の例は、この章で説明したさまざまなトピックを組み合わせたものです。

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}