Apache Kafka ストリームのサポート

バージョン 1.1.4 以降、Spring for Apache Kafka は Kafka ストリーム [Apache] (英語) のファーストクラスのサポートを提供します。Spring アプリケーションから使用するには、kafka-streams jar がクラスパスに存在する必要があります。これは Spring for Apache Kafka プロジェクトのオプションの依存関係であり、推移的にダウンロードされることはありません。

基本

リファレンス Apache Kafka Streams ドキュメントでは、API を使用する次の方法が提案されています。

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

2 つの主要なコンポーネントがあります。

  • StreamsBuilderKStream (または KTable) インスタンスを構築するための API を使用します。

  • KafkaStreams: それらのインスタンスのライフサイクルを管理するため。

単一の StreamsBuilder によって KafkaStreams インスタンスに公開されるすべての KStream インスタンスは、ロジックが異なっていても、同時に開始および停止されます。つまり、StreamsBuilder によって定義されるすべてのストリームは、単一のライフサイクルコントロールに関連付けられます。KafkaStreams インスタンスが streams.close() によって閉じられると、再起動することはできません。代わりに、ストリーム処理を再開するための新しい KafkaStreams インスタンスを作成する必要があります。

Spring 管理

Spring アプリケーションコンテキストの観点から Kafka ストリームの使用を簡素化し、コンテナーを介したライフサイクル管理を使用するために、Spring for Apache Kafka には StreamsBuilderFactoryBean が導入されています。これは、StreamsBuilder シングルトンインスタンスを Bean として公開する AbstractFactoryBean 実装です。次の例では、そのような Bean を作成します。

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
バージョン 2.2 以降、ストリーム設定は StreamsConfig ではなく KafkaStreamsConfiguration オブジェクトとして提供されるようになりました。

StreamsBuilderFactoryBean は、内部 KafkaStreams インスタンスのライフサイクルを管理するために SmartLifecycle も実装します。Kafka ストリーム API と同様に、KafkaStreams を開始する前に KStream インスタンスを定義する必要があります。これは、Kafka ストリームの Spring API にも当てはまります。StreamsBuilderFactoryBean でデフォルトの autoStartup = true を使用する場合、アプリケーションコンテキストがリフレッシュされる前に StreamsBuilder で KStream インスタンスを宣言する必要があります。例: KStream は通常の Bean 定義にすることができますが、Kafka ストリーム API は影響なしに使用されます。次の例は、その方法を示しています。

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

ライフサイクルを手動で制御する場合 (たとえば、何らかの条件による停止と開始)、ファクトリ Bean (&) プレフィックスを使用して、StreamsBuilderFactoryBean Bean を直接参照できます。StreamsBuilderFactoryBean は内部 KafkaStreams インスタンスを使用するため、安全に停止して再起動できます。新しい KafkaStreams が各 start() 上に作成されます。KStream インスタンスのライフサイクルを個別に制御したい場合は、別の StreamsBuilderFactoryBean インスタンスの使用を検討することもできます。

StreamsBuilderFactoryBean で KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListener オプションを指定することもできます。これは、内部 KafkaStreams インスタンスに委譲されます。また、これらのオプションを StreamsBuilderFactoryBean で間接的に設定する以外に、バージョン 2.1.5 以降、KafkaStreamsCustomizer コールバックインターフェースを使用して内部 KafkaStreams インスタンスを構成できます。KafkaStreamsCustomizer は、StreamsBuilderFactoryBean によって提供されるオプションをオーバーライドすることに注意してください。一部の KafkaStreams 操作を直接実行する必要がある場合は、StreamsBuilderFactoryBean.getKafkaStreams() を使用してその内部 KafkaStreams インスタンスにアクセスできます。StreamsBuilderFactoryBean Bean を型別にオートワイヤーできますが、次の例に示すように、Bean 定義で完全な型を使用する必要があります。

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

または、インターフェース Bean 定義を使用する場合は、名前で @Qualifier をインジェクション用に追加できます。次の例は、その方法を示しています。

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

バージョン 2.4.1 以降、ファクトリ Bean には型 KafkaStreamsInfrastructureCustomizer の新しいプロパティ infrastructureCustomizer があります。これにより、ストリームが作成される前に、StreamsBuilder (状態ストアを追加するなど) および / または Topology のカスタマイズが可能になります。

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

デフォルトの no-op 実装は、どちらかが必要でない場合に両方のメソッドを実装する必要がないように提供されています。

複数のカスタマイザを適用する必要がある場合のために、CompositeKafkaStreamsInfrastructureCustomizer が提供されます。

KafkaStreams Micrometer サポート

バージョン 2.5.3 で導入され、ファクトリ Bean によって管理される KafkaStreams オブジェクトの micrometer メーターを自動的に登録するように KafkaStreamsMicrometerListener を構成できます。

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

JSON の直列化と逆直列化をストリーミングします

JSON 形式でトピックまたは状態ストアを読み書きするときにデータをシリアライズおよびデシリアライズするために、Spring for Apache Kafka は、JSON を使用する JsonSerde 実装を提供し、直列化、逆直列化、メッセージ変換で説明されている JsonSerializer および JsonDeserializer に委譲します。JsonSerde 実装は、コンストラクター (ターゲット型または ObjectMapper) を通じて同じ構成オプションを提供します。次の例では、JsonSerde を使用して Kafka ストリームの Cat ペイロードをシリアライズおよびデシリアライズします (インスタンスが必要な場合はいつでも JsonSerde を同様の方法で使用できます)。

stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");

プロデューサー / コンシューマーファクトリで使用するためにシリアライザー / デシリアライザーをプログラムで構築する場合、バージョン 2.3 以降、流れるような API を使用できるため、構成が簡単になります。

stream.through(
    new JsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

KafkaStreamBrancher を使用する

KafkaStreamBrancher クラスは、KStream の上に条件付き ブランチ を構築するためのより便利な方法を導入します。

KafkaStreamBrancher を使用しない次の例を考えてみましょう。

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

次の例では、KafkaStreamBrancher を使用しています。

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

構成

Kafka ストリーム環境を構成するには、StreamsBuilderFactoryBean に KafkaStreamsConfiguration インスタンスが必要です。すべての可能なオプションについては、Apache Kafka のドキュメント [Apache] (英語) を参照してください。

バージョン 2.2 以降、ストリーム設定は StreamsConfig ではなく KafkaStreamsConfiguration オブジェクトとして提供されるようになりました。

特にマイクロサービスを開発する場合、ほとんどの場合、定型コードを回避するために、Spring for Apache Kafka は @EnableKafkaStreams アノテーションを提供します。これは、@Configuration クラスに配置する必要があります。必要なのは、defaultKafkaStreamsConfig という名前の KafkaStreamsConfiguration Bean を宣言することだけです。defaultKafkaStreamsBuilder という名前の StreamsBuilderFactoryBean Bean は、アプリケーションコンテキストで自動的に宣言されます。追加の StreamsBuilderFactoryBean Bean を宣言して使用することもできます。StreamsBuilderFactoryBeanConfigurer を実装する Bean を提供することにより、その Bean の追加のカスタマイズを実行できます。そのような Bean が複数ある場合、Ordered.order プロパティに従って適用されます。

デフォルトでは、ファクトリ Bean が停止すると、KafkaStreams.cleanUp() メソッドが呼び出されます。バージョン 2.1.2 以降、ファクトリ Bean には追加のコンストラクターがあり、start() または stop() 中に cleanUp() メソッドを呼び出すかどうかを制御できるプロパティを持つ CleanupConfig オブジェクトを取得します。バージョン 2.7 以降、デフォルトではローカル状態をクリーンアップしません。

ヘッダーエンリッチャー

バージョン 3.0 は ContextualProcessor の HeaderEnricherProcessor 拡張を追加しました。非推奨の Transformer インターフェースを実装した非推奨の HeaderEnricher と同じ機能を提供します。これは、ストリーム処理内でヘッダーを追加するために使用できます。ヘッダー値は SpEL 式です。式評価のルートオブジェクトには 3 つのプロパティがあります。

  • record - org.apache.kafka.streams.processor.api.Record (keyvaluetimestampheaders)

  • key - 現在のレコードのキー

  • value - 現在のレコードの値

  • context - ProcessorContext、現在のレコードのメタデータへのアクセスを許可します

式は byte[] または String ( UTF-8 を使用して byte[] に変換される) を返す必要があります。

ストリーム内でエンリッチャーを使用するには:

.process(() -> new HeaderEnricherProcessor(expressions))

プロセッサーは key または value を変更しません。ヘッダーを追加するだけです。

レコードごとに新しいインスタンスが必要です。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

これは簡単な例で、1 つのリテラルヘッダーと 1 つの変数を追加します。

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

バージョン 3.0 では、ContextualProcessor の MessagingProcessor 拡張機能が追加され、非推奨の Transformer インターフェースを実装した非推奨の MessagingTransformer と同じ機能が提供されました。これにより、Kafka ストリームトポロジが Spring Integration フローなどの Spring メッセージングコンポーネントと対話できるようになります。トランスには MessagingFunction の実装が必要です。

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration は、その GatewayProxyFactoryBean を使用した実装を自動的に提供します。また、キー、値、メタデータ (ヘッダーを含む) を Spring メッセージング Message<?> との間で変換するために MessagingMessageConverter が必要です。詳細については、【 KStream から Spring Integration フローを呼び出す ] を参照してください。

デシリアライズ例外からの回復

バージョン 2.3 では、逆直列化例外が発生したときに何らかのアクションを実行できる RecoveringDeserializationExceptionHandler が導入されました。RecoveringDeserializationExceptionHandler が実装である DeserializationExceptionHandler については、Kafka のドキュメントを参照してください。RecoveringDeserializationExceptionHandler は ConsumerRecordRecoverer 実装で構成されます。フレームワークは、失敗したレコードを配信不能トピックに送信する DeadLetterPublishingRecoverer を提供します。この回復プログラムの詳細については、デッドレターレコードの公開を参照してください。

回復者を構成するには、次のプロパティをストリーム構成に追加します。

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

もちろん、recoverer() Bean は ConsumerRecordRecoverer の独自の実装にすることができます。

Kafka ストリームの例

次の例は、この章で取り上げたすべてのトピックを組み合わせたものです。

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}