Apache Kafka ストリームのサポート

バージョン 1.1.4 以降、Spring for Apache Kafka は Kafka ストリーム [Apache] (英語) のファーストクラスのサポートを提供します。Spring アプリケーションから使用するには、kafka-streams jar がクラスパスに存在する必要があります。これは Spring for Apache Kafka プロジェクトのオプションの依存関係であり、推移的にダウンロードされることはありません。

基本

リファレンス Apache Kafka Streams ドキュメントでは、API を使用する次の方法が提案されています。

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

2 つの主要なコンポーネントがあります。

  • StreamsBuilderKStream (または KTable) インスタンスを構築するための API を使用します。

  • KafkaStreams: それらのインスタンスのライフサイクルを管理するため。

単一の StreamsBuilder によって KafkaStreams インスタンスに公開されるすべての KStream インスタンスは、ロジックが異なっていても、同時に開始および停止されます。つまり、StreamsBuilder によって定義されるすべてのストリームは、単一のライフサイクルコントロールに関連付けられます。KafkaStreams インスタンスが streams.close() によって閉じられると、再起動することはできません。代わりに、ストリーム処理を再開するための新しい KafkaStreams インスタンスを作成する必要があります。

Spring 管理

Spring アプリケーションコンテキストの観点から Kafka ストリームの使用を簡素化し、コンテナーを介したライフサイクル管理を使用するために、Spring for Apache Kafka には StreamsBuilderFactoryBean が導入されています。これは、StreamsBuilder シングルトンインスタンスを Bean として公開する AbstractFactoryBean 実装です。次の例では、そのような Bean を作成します。

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
バージョン 2.2 以降、ストリーム設定は StreamsConfig ではなく KafkaStreamsConfiguration オブジェクトとして提供されるようになりました。

StreamsBuilderFactoryBean は、内部 KafkaStreams インスタンスのライフサイクルを管理するために SmartLifecycle も実装します。Kafka ストリーム API と同様に、KafkaStreams を開始する前に KStream インスタンスを定義する必要があります。これは、Kafka ストリームの Spring API にも当てはまります。StreamsBuilderFactoryBean でデフォルトの autoStartup = true を使用する場合、アプリケーションコンテキストがリフレッシュされる前に StreamsBuilder で KStream インスタンスを宣言する必要があります。例: KStream は通常の Bean 定義にすることができますが、Kafka ストリーム API は影響なしに使用されます。次の例は、その方法を示しています。

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

ライフサイクルを手動で制御する場合 (たとえば、何らかの条件による停止と開始)、ファクトリ Bean (&) プレフィックスを使用して、StreamsBuilderFactoryBean Bean を直接参照できます。StreamsBuilderFactoryBean は内部 KafkaStreams インスタンスを使用するため、安全に停止して再起動できます。新しい KafkaStreams が各 start() 上に作成されます。KStream インスタンスのライフサイクルを個別に制御したい場合は、別の StreamsBuilderFactoryBean インスタンスの使用を検討することもできます。

StreamsBuilderFactoryBean で KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListener オプションを指定することもできます。これは、内部 KafkaStreams インスタンスに委譲されます。また、これらのオプションを StreamsBuilderFactoryBean で間接的に設定する以外に、バージョン 2.1.5 以降、KafkaStreamsCustomizer コールバックインターフェースを使用して内部 KafkaStreams インスタンスを構成できます。KafkaStreamsCustomizer は、StreamsBuilderFactoryBean によって提供されるオプションをオーバーライドすることに注意してください。一部の KafkaStreams 操作を直接実行する必要がある場合は、StreamsBuilderFactoryBean.getKafkaStreams() を使用してその内部 KafkaStreams インスタンスにアクセスできます。StreamsBuilderFactoryBean Bean を型別にオートワイヤーできますが、次の例に示すように、Bean 定義で完全な型を使用する必要があります。

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

または、インターフェース Bean 定義を使用する場合は、名前で @Qualifier をインジェクション用に追加できます。次の例は、その方法を示しています。

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

バージョン 2.4.1 以降、ファクトリ Bean には型 KafkaStreamsInfrastructureCustomizer の新しいプロパティ infrastructureCustomizer があります。これにより、ストリームが作成される前に、StreamsBuilder (状態ストアを追加するなど) および / または Topology のカスタマイズが可能になります。

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

デフォルトの no-op 実装は、どちらかが必要でない場合に両方のメソッドを実装する必要がないように提供されています。

複数のカスタマイザを適用する必要がある場合のために、CompositeKafkaStreamsInfrastructureCustomizer が提供されます。

KafkaStreams Micrometer サポート

バージョン 2.5.3 で導入され、ファクトリ Bean によって管理される KafkaStreams オブジェクトの micrometer メーターを自動的に登録するように KafkaStreamsMicrometerListener を構成できます。

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

JSON の直列化と逆直列化をストリーミングします

JSON 形式でトピックまたは状態ストアを読み書きするときにデータをシリアライズおよびデシリアライズするために、Spring for Apache Kafka は、JSON を使用する JsonSerde 実装を提供し、直列化、逆直列化、メッセージ変換で説明されている JsonSerializer および JsonDeserializer に委譲します。JsonSerde 実装は、コンストラクター (ターゲット型または ObjectMapper) を通じて同じ構成オプションを提供します。次の例では、JsonSerde を使用して Kafka ストリームの Cat ペイロードをシリアライズおよびデシリアライズします (インスタンスが必要な場合はいつでも JsonSerde を同様の方法で使用できます)。

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

プロデューサー / コンシューマーファクトリで使用するためにシリアライザー / デシリアライザーをプログラムで構築する場合、バージョン 2.3 以降、流れるような API を使用できるため、構成が簡単になります。

stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

KafkaStreamBrancher を使用する

KafkaStreamBrancher クラスは、KStream の上に条件付き ブランチ を構築するためのより便利な方法を導入します。

KafkaStreamBrancher を使用しない次の例を考えてみましょう。

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

次の例では、KafkaStreamBrancher を使用しています。

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

構成

Kafka ストリーム環境を構成するには、StreamsBuilderFactoryBean に KafkaStreamsConfiguration インスタンスが必要です。すべての可能なオプションについては、Apache Kafka のドキュメント [Apache] (英語) を参照してください。

バージョン 2.2 以降、ストリーム設定は StreamsConfig ではなく KafkaStreamsConfiguration オブジェクトとして提供されるようになりました。

特にマイクロサービスを開発する場合、ほとんどの場合、定型コードを回避するために、Spring for Apache Kafka は @EnableKafkaStreams アノテーションを提供します。これは、@Configuration クラスに配置する必要があります。必要なのは、defaultKafkaStreamsConfig という名前の KafkaStreamsConfiguration Bean を宣言することだけです。defaultKafkaStreamsBuilder という名前の StreamsBuilderFactoryBean Bean は、アプリケーションコンテキストで自動的に宣言されます。追加の StreamsBuilderFactoryBean Bean を宣言して使用することもできます。StreamsBuilderFactoryBeanConfigurer を実装する Bean を提供することにより、その Bean の追加のカスタマイズを実行できます。そのような Bean が複数ある場合、Ordered.order プロパティに従って適用されます。

クリーンアップと停止の構成

ファクトリが停止すると、KafkaStreams.close() が 2 つのパラメーターで呼び出されます。

  • closeTimeout : スレッドがシャットダウンするまで待機する時間 (デフォルトでは DEFAULT_CLOSE_TIMEOUT が 10 秒に設定されています)。StreamsBuilderFactoryBean.setCloseTimeout() を使用して構成できます。

  • グループを閉じる: グループからのコンシューマーの離脱呼び出しをトリガーします (デフォルトは false)。StreamsBuilderFactoryBean.setLeaveGroupOnClose() を使用して構成できます。

デフォルトでは、ファクトリ Bean が停止すると、KafkaStreams.cleanUp() メソッドが呼び出されます。バージョン 2.1.2 以降、ファクトリ Bean には追加のコンストラクターがあり、start() または stop() 中に cleanUp() メソッドを呼び出すかどうかを制御できるプロパティを持つ CleanupConfig オブジェクトを取得します。バージョン 2.7 以降、デフォルトではローカル状態をクリーンアップしません。

ヘッダーエンリッチャー

バージョン 3.0 は ContextualProcessor の HeaderEnricherProcessor 拡張を追加しました。非推奨の Transformer インターフェースを実装した非推奨の HeaderEnricher と同じ機能を提供します。これは、ストリーム処理内でヘッダーを追加するために使用できます。ヘッダー値は SpEL 式です。式評価のルートオブジェクトには 3 つのプロパティがあります。

  • record - org.apache.kafka.streams.processor.api.Record (keyvaluetimestampheaders)

  • key - 現在のレコードのキー

  • value - 現在のレコードの値

  • context - ProcessorContext、現在のレコードのメタデータへのアクセスを許可します

式は byte[] または String ( UTF-8 を使用して byte[] に変換される) を返す必要があります。

ストリーム内でエンリッチャーを使用するには:

.process(() -> new HeaderEnricherProcessor(expressions))

プロセッサーは key または value を変更しません。ヘッダーを追加するだけです。

レコードごとに新しいインスタンスが必要です。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

これは簡単な例で、1 つのリテラルヘッダーと 1 つの変数を追加します。

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

バージョン 3.0 では、ContextualProcessor の MessagingProcessor 拡張機能が追加され、非推奨の Transformer インターフェースを実装した非推奨の MessagingTransformer と同じ機能が提供されました。これにより、Kafka ストリームトポロジが Spring Integration フローなどの Spring メッセージングコンポーネントと対話できるようになります。トランスには MessagingFunction の実装が必要です。

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration は、その GatewayProxyFactoryBean を使用した実装を自動的に提供します。また、キー、値、メタデータ (ヘッダーを含む) を Spring メッセージング Message<?> との間で変換するために MessagingMessageConverter が必要です。詳細については、【 KStream から Spring Integration フローを呼び出す ] を参照してください。

デシリアライズ例外からの回復

バージョン 2.3 では、逆直列化例外が発生したときに何らかのアクションを実行できる RecoveringDeserializationExceptionHandler が導入されました。RecoveringDeserializationExceptionHandler が実装である DeserializationExceptionHandler については、Kafka のドキュメントを参照してください。RecoveringDeserializationExceptionHandler は ConsumerRecordRecoverer 実装で構成されます。フレームワークは、失敗したレコードを配信不能トピックに送信する DeadLetterPublishingRecoverer を提供します。この回復プログラムの詳細については、デッドレターレコードの公開を参照してください。

回復者を構成するには、次のプロパティをストリーム構成に追加します。

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

もちろん、recoverer() Bean は ConsumerRecordRecoverer の独自の実装にすることができます。

インタラクティブクエリのサポート

バージョン 3.2 以降、Spring for Apache Kafka は Kafka Streams の対話型クエリに必要な基本機能を提供します。対話型クエリは、アプリケーション内のステートフルストアを継続的にクエリする方法を提供するため、ステートフル Kafka Streams アプリケーションで役立ちます。アプリケーションが対象システムの現在のビューを具体化する必要がある場合、対話型クエリはその方法を提供します。対話型クエリの詳細については、この記事 [Apache] (英語) を参照してください。Spring for Apache Kafka のサポートは、KafkaStreamsInteractiveQueryService と呼ばれる API を中心に展開されています。これは、Kafka Streams ライブラリの対話型クエリ API のファサードです。アプリケーションは、このサービスのインスタンスを Bean として作成し、後でそれを使用して名前で状態ストアを取得できます。

次のコードスニペットに例を示します。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

Kafka Streams アプリケーションに app-store という状態ストアがあると仮定すると、そのストアは以下に示すように KafkStreamsInteractiveQuery API を介して取得できます。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

アプリケーションが状態ストアにアクセスできるようになると、そこからキーと値の情報を照会できるようになります。

この場合、アプリケーションが使用する状態ストアは読み取り専用のキー値ストアです。Kafka Streams アプリケーションが使用できる状態ストアには、他にも種類があります。たとえば、アプリケーションがウィンドウベースのストアをクエリする場合、Kafka Streams アプリケーションのビジネスロジックでそのストアを構築し、後で取得することができます。このため、KafkaStreamsInteractiveQueryService でクエリ可能なストアを取得するための API には汎用ストア型 シグネチャーがあり、エンドユーザーが適切な型を割り当てることができます。

以下は API からの型署名です。

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

このメソッドを呼び出すとき、ユーザーは上記の例のように適切な状態ストア型を具体的に要求できます。

状態ストアの取得を再試行しています

KafkaStreamsInteractiveQueryService を使用して状態ストアを取得しようとすると、さまざまな理由で状態ストアが見つからない可能性があります。これらの理由が一時的なものである場合、KafkaStreamsInteractiveQueryService はカスタム RetryTemplate を挿入できるようにすることで状態ストアの取得を再試行するオプションを提供します。デフォルトでは、KafkaStreamsInteractiveQueryService で使用される RetryTemmplate は、1 秒の固定バックオフで最大 3 回の試行を使用します。

最大試行回数 10 回でカスタム RetryTemmplate を KafkaStreamsInteractiveQueryService に挿入する方法を次に示します。

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

リモート 状態ストアのクエリ

状態ストア retrieveQueryableStore を取得するための上記の API は、ローカルで使用可能なキー値状態ストアを対象としています。本番環境では、Kafka Streams アプリケーションはパーティションの数に基づいて分散される可能性が最も高くなります。トピックに 4 つのパーティションがあり、同じ Kafka Streams プロセッサーのインスタンスが 4 つ実行されている場合、各インスタンスはトピックの 1 つのパーティションの処理を担当する可能性があります。このシナリオでは、retrieveQueryableStore を呼び出しても、有効なストアが返される可能性はありますが、インスタンスが探している正しい結果が得られない可能性があります。4 つのパーティションがあるトピックにさまざまなキーに関するデータがあり、1 つのパーティションが常に特定のキーを担当していると仮定します。retrieveQueryableStore を呼び出しているインスタンスが、このインスタンスがホストしていないキーに関する情報を探している場合、データは受信されません。これは、現在の Kafka Streams インスタンスがこのキーについて何も知らないためです。これを修正するには、呼び出し元のインスタンスがまず、特定のキーがホストされている Kafka Streams プロセッサーインスタンスのホスト情報を持っていることを確認する必要があります。これは、以下のように、同じ application.id にある任意の Kafka ストリームインスタンスから取得できます。

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

上記のサンプルコードでは、呼び出しインスタンスが app-store という状態ストアから特定のキー 12345 を照会しています。API には対応するキーシリアライザーも必要で、この場合は IntegerSerializer です。Kafka Streams は、同じ application.id にあるすべてのインスタンスを調べて、どのインスタンスがこの特定のキーをホストしているかを見つけようとします。見つかったら、そのホスト情報を HostInfo オブジェクトとして返します。

API は次のようになります。

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

このように分散方式で同じ application.id の Kafka ストリームプロセッサーの複数のインスタンスを使用する場合、アプリケーションは、REST エンドポイントなどの RPC エンドポイントを介して状態ストアをクエリできる RPC レイヤーを提供する必要があります。詳細については、この記事 [Apache] (英語) を参照してください。Spring for Apache Kafka を使用する場合、spring-web テクノロジを使用して Spring ベースの REST エンドポイントを追加するのは非常に簡単です。REST エンドポイントが存在すると、キーがホストされている HostInfo がインスタンスに認識されていれば、そのエンドポイントを使用して任意の Kafka ストリームインスタンスから状態ストアをクエリできます。

インスタンスをホストしているキーが現在のインスタンスである場合、アプリケーションは RPC メカニズムを呼び出す必要はなく、JVM 内で呼び出しを行う必要があります。ただし、課題は、特定のサーバーがコンシューマーの再バランスのためにパーティションを失う可能性があるため、呼び出しを行っているインスタンスがキーがホストされている場所であることをアプリケーションが認識できない可能性があることです。この課題を解決するために、KafkaStreamsInteractiveQueryService は、現在の HostInfo を返す API メソッド getCurrentKafkaStreamsApplicationHostInfo() を介して現在のホスト情報を照会するための便利な API を提供します。その考え方は、アプリケーションが最初にキーが保持されている場所に関する情報を取得し、次に HostInfo を現在のインスタンスに関する情報と比較するというものです。HostInfo データが一致する場合は、retrieveQueryableStore を介して単純な JVM 呼び出しを続行でき、それ以外の場合は RPC オプションに進みます。

Kafka ストリームの例

次の例は、この章で説明したさまざまなトピックを組み合わせたものです。

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}