プログラミングモデル

Kafka Streams バインダーによって提供されるプログラミングモデルを使用する場合、高レベルの DSL をストリーミング (英語) と、高レベルと低レベルの両方のプロセッサー -API (英語) の組み合わせの両方をオプションとして使用できます。高レベルと低レベルの両方の API を混在させる場合、これは通常、KStream で transform または process API メソッドを呼び出すことによって実現されます。

関数スタイル

Spring Cloud Stream 3.0.0 以降、Kafka Streams バインダーを使用すると、Java 8 で使用可能な関数型プログラミングスタイルを使用してアプリケーションを設計および開発できます。これは、アプリケーションを java.util.function.Function 型または java.util.function.Consumer 型のラムダ式として簡潔に表すことができることを意味します。

非常に基本的な例を見てみましょう。

@SpringBootApplication
public class SimpleConsumerApplication {

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input ->
                input.foreach((key, value) -> {
                    System.out.println("Key: " + key + " Value: " + value);
                });
    }
}

シンプルではありますが、これは完全なスタンドアロン Spring Boot アプリケーションであり、ストリーム処理に Kafka ストリームを活用しています。これは、送信バインディングがなく、受信バインディングが 1 つしかないコンシューマーアプリケーションです。アプリケーションはデータを消費し、KStream キーと値からの情報を標準出力に記録するだけです。アプリケーションには、SpringBootApplication アノテーションと Bean としてマークされたメソッドが含まれています。Bean メソッドは、KStream でパラメーター化された型 java.util.function.Consumer です。次に、実装では、本質的にラムダ式であるコンシューマーオブジェクトを返します。ラムダ式の中には、データを処理するためのコードが含まれています。

このアプリケーションでは、型 KStream の単一の入力バインディングがあります。バインダーは、process-in-0 という名前でアプリケーションのこのバインディングを作成します。つまり、関数 Bean 名の後にダッシュ文字(-)が続き、リテラル in の後に別のダッシュが続き、パラメーターの順序位置が続きます。このバインディング名を使用して、宛先などの他のプロパティを設定します。例: spring.cloud.stream.bindings.process-in-0.destination=my-topic

宛先プロパティがバインディングに設定されていない場合(アプリケーションに十分な権限がある場合)、トピックはバインディングと同じ名前で作成されるか、そのトピックはすでに使用可能であると予想されます。

uber-jar(kstream-consumer-app.jar など)としてビルドしたら、次のように上記の例を実行できます。

アプリケーションが Spring の Component アノテーションを使用して機能 Bean を定義することを選択した場合、バインダーはそのモデルもサポートします。上記の 関数 Bean は、次のように書き換えることができます。

@Component(name = "process")
public class SimpleConsumer implements java.util.function.Consumer<KStream<Object, String>> {

    @Override
    public void accept(KStream<Object, String> input) {
        input.foreach((key, value) -> {
            System.out.println("Key: " + key + " Value: " + value);
        });
    }
}
java -jar kstream-consumer-app.jar --spring.cloud.stream.bindings.process-in-0.destination=my-topic

これは別の例で、入力バインディングと出力バインディングの両方を備えたフルプロセッサーです。これは、アプリケーションがトピックからデータを受信し、各単語の出現回数がタンブリングタイムウィンドウで計算される典型的な単語カウントの例です。

@SpringBootApplication
public class WordCountProcessorApplication {

  @Bean
  public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    return input -> input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .map((key, value) -> new KeyValue<>(value, value))
                .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                .windowedBy(TimeWindows.of(5000))
                .count(Materialized.as("word-counts-state-store"))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(), new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))));
  }

	public static void main(String[] args) {
		SpringApplication.run(WordCountProcessorApplication.class, args);
	}
}

ここでも、これは完全な Spring Boot アプリケーションです。ここでの最初のアプリケーションとの違いは、Bean メソッドの型が java.util.function.Function であるということです。Function の最初のパラメーター化された型は入力 KStream 用で、2 番目の型は出力用です。メソッド本体には、Function 型のラムダ式が提供され、実装として実際のビジネスロジックが提供されます。前述のコンシューマーベースのアプリケーションと同様に、ここでの入力バインディングは、デフォルトで process-in-0 という名前が付けられています。出力の場合、バインディング名も自動的に process-out-0 に設定されます。

uber-jar(wordcount-processor.jar など)としてビルドしたら、次のように上記の例を実行できます。

java -jar wordcount-processor.jar --spring.cloud.stream.bindings.process-in-0.destination=words --spring.cloud.stream.bindings.process-out-0.destination=counts

このアプリケーションは、Kafka トピック words からのメッセージを消費し、計算結果は出力トピック counts に公開されます。

Spring Cloud Stream は、受信トピックと発信トピックの両方からのメッセージが KStream オブジェクトとして自動的にバインドされることを保証します。開発者は、コードのビジネス面、つまりプロセッサーに必要なロジックの記述に専念できます。Kafka Streams インフラストラクチャに必要な Kafka Streams 固有の構成のセットアップは、フレームワークによって自動的に処理されます。

上で見た 2 つの例には、単一の KStream 入力バインディングがあります。どちらの場合も、バインディングは単一のトピックからレコードを受け取りました。複数のトピックを単一の KStream バインディングに多重化する場合は、以下の宛先としてコンマ区切りの Kafka トピックを指定できます。

spring.cloud.stream.bindings.process-in-0.destination=topic-1,topic-2,topic-3

さらに、トピックを通常の表現と照合する場合は、トピックパターンを宛先として提供することもできます。

spring.cloud.stream.bindings.process-in-0.destination=input.*

複数の入力バインディング

多くの重要な Kafka Streams アプリケーションは、複数のバインディングを介して複数のトピックからのデータを消費することがよくあります。たとえば、1 つのトピックは Kstream として消費され、別のトピックは KTable または GlobalKTable として消費されます。アプリケーションがデータをテーブル型として受け取りたいと思う理由はたくさんあります。基になるトピックがデータベースからの変更データキャプチャー(CDC)メカニズムを介して入力される、またはアプリケーションがダウンストリーム処理の最新の更新のみを気にするユースケースを考えてみてください。データを KTable または GlobalKTable としてバインドする必要があるとアプリケーションが指定した場合、Kafka Streams バインダーは、宛先を KTable または GlobalKTable に適切にバインドし、アプリケーションが操作できるようにします。Kafka Streams バインダーで複数の入力バインディングがどのように処理されるかについていくつかの異なるシナリオを見ていきます。

Kafka ストリームバインダーの BiFunction

これは、2 つの入力と 1 つの出力がある例です。この場合、アプリケーションは java.util.function.BiFunction を活用できます。

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
    return (userClicksStream, userRegionsTable) -> (userClicksStream
            .leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
                            "UNKNOWN" : region, clicks),
                    Joined.with(Serdes.String(), Serdes.Long(), null))
            .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(),
                    regionWithClicks.getClicks()))
            .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
            .reduce(Long::sum)
            .toStream());
}

ここでも、基本的なテーマは前の例と同じですが、ここでは 2 つの入力があります。Java の BiFunction サポートは、入力を目的の宛先にバインドするために使用されます。入力用にバインダーによって生成されるデフォルトのバインディング名は、それぞれ process-in-0 と process-in-1 です。デフォルトの出力バインディングは process-out-0 です。この例では、BiFunction の最初のパラメーターは最初の入力の KStream としてバインドされ、2 番目のパラメーターは 2 番目の入力の KTable としてバインドされます。

Kafka ストリームバインダーの BiConsumer

入力が 2 つあるが出力がない場合は、以下に示すように java.util.function.BiConsumer を使用できます。

@Bean
public BiConsumer<KStream<String, Long>, KTable<String, String>> process() {
    return (userClicksStream, userRegionsTable) -> {}
}

2 つの入力を超えて

3 つ以上の入力がある場合はどうなるでしょうか? 3 つ以上の入力が必要な場合があります。その場合、バインダーを使用すると、チェーンの部分関数を実行できます。関数型プログラミングの専門用語では、この手法は一般にカリー化として知られています。Java 8 の一部として関数型プログラミングのサポートが追加されたことで、Java でカレー関数を記述できるようになりました。Spring Cloud Stream Kafka Streams バインダーは、この機能を利用して、複数の入力バインディングを有効にすることができます。

例を見てみましょう。

@Bean
public Function<KStream<Long, Order>,
        Function<GlobalKTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    return orders -> (
              customers -> (
                    products -> (
                        orders.join(customers,
                            (orderId, order) -> order.getCustomerId(),
                                (order, customer) -> new CustomerOrder(customer, order))
                                .join(products,
                                        (orderId, customerOrder) -> customerOrder
                                                .productId(),
                                        (customerOrder, product) -> {
                                            EnrichedOrder enrichedOrder = new EnrichedOrder();
                                            enrichedOrder.setProduct(product);
                                            enrichedOrder.setCustomer(customerOrder.customer);
                                            enrichedOrder.setOrder(customerOrder.order);
                                            return enrichedOrder;
                                        })
                        )
                )
    );
}

上記のバインディングモデルの詳細を見てみましょう。このモデルでは、受信に 3 つの部分的に適用された関数があります。f(x)f(y)f(z) と呼びましょう。これらの関数を真の数学関数の意味で拡張すると、f(x) → (fy) → f(z) → KStream<Long, EnrichedOrder> のようになります。x 変数は KStream<Long, Order> を表し、y 変数は GlobalKTable<Long, Customer> を表し、z 変数は GlobalKTable<Long, Product> を表します。最初の関数 f(x) には、アプリケーションの最初の入力バインディング(KStream<Long, Order>)があり、その出力は関数 f(y)です。関数 f(y) には、アプリケーションの 2 番目の入力バインディング(GlobalKTable<Long, Customer>)があり、その出力はさらに別の関数 f(z) です。関数 f(z) の入力は、アプリケーションの 3 番目の入力(GlobalKTable<Long, Product>)であり、その出力は、アプリケーションの最終出力バインディングである KStream<Long, EnrichedOrder> です。それぞれ KStreamGlobalKTableGlobalKTable である 3 つの部分関数からの入力は、ラムダ式の一部としてビジネスロジックを実装するためのメソッド本体で利用できます。

入力バインディングには、それぞれ enrichOrder-in-0enrichOrder-in-1enrichOrder-in-2 という名前が付けられています。出力バインディングの名前は enrichOrder-out-0 です。

カレー関数を使用すると、事実上任意の数の入力を持つことができます。ただし、Java で上記のように入力の数が少なく、それらに部分的に適用された関数を超えると、コードが読み取れなくなる可能性があることに注意してください。Kafka Streams アプリケーションが必要とする入力バインディングの数が適度に少なく、この機能モデルを使用する場合は、設計を再考し、アプリケーションを適切に分解することをお勧めします。

出力バインディング

Kafka Streams バインダーは、出力バインディングとして KStream または KTable のいずれかの型を許可します。バックグラウンドでは、バインダーは KStream で to メソッドを使用して、結果のレコードを出力トピックに送信します。アプリケーションが関数の出力として KTable を提供する場合でも、バインダーは KStream の to メソッドに委譲することにより、この手法を使用します。

たとえば、以下の両方の機能が機能します。

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

複数の出力バインディング

Kafka Streams を使用すると、送信データを複数のトピックに書き込むことができます。この機能は、Kafka ストリームでは分岐として知られています。複数の出力バインディングを使用する場合は、送信戻り値の型として KStream の配列(KStream[])を指定する必要があります。

次に例を示します。

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>[]> process() {

    Predicate<Object, WordCount> isEnglish = (k, v) -> v.word.equals("english");
    Predicate<Object, WordCount> isFrench = (k, v) -> v.word.equals("french");
    Predicate<Object, WordCount> isSpanish = (k, v) -> v.word.equals("spanish");

    return input -> {
        final Map<String, KStream<Object, WordCount>> stringKStreamMap = input
                .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
                .groupBy((key, value) -> value)
                .windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
                .count(Materialized.as("WordCounts-branch"))
                .toStream()
                .map((key, value) -> new KeyValue<>(null, new WordCount(key.key(), value,
                        new Date(key.window().start()), new Date(key.window().end()))))
                .split()
                .branch(isEnglish)
                .branch(isFrench)
                .branch(isSpanish)
                .noDefaultBranch();

        return stringKStreamMap.values().toArray(new KStream[0]);
    };
}

プログラミングモデルは同じままですが、送信のパラメーター化された型は KStream[] です。上記の関数のデフォルトの出力バインディング名は、それぞれ process-out-0process-out-1process-out-2 です。バインダーが 3 つの出力バインディングを生成する理由は、返された KStream 配列の長さを 3 として検出するためです。この例では、noDefaultBranch() を提供していることに注意してください。代わりに defaultBranch() を使用した場合、追加の出力バインディングが必要になり、基本的に長さ 4 の KStream 配列が返されます。

Kafka ストリームの関数ベースのプログラミングスタイルの概要

要約すると、次の表は、機能パラダイムで使用できるさまざまなオプションを示しています。

入力数 出力数 使用するコンポーネント

1

0

java.util.function.Consumer

2

0

java.util.function.BiConsumer

1

1..n

java.util.function.Function

2

1..n

java.util.function.BiFunction

> = 3

0..n

カレー関数を使用する

  • この表に複数の出力がある場合、型は単に KStream[] になります。

Kafka Streams バインダーの関数合成

Kafka Streams バインダーは、線形トポロジーの最小限の形式の機能合成をサポートします。Java 関数型 API サポートを使用すると、複数の関数を記述し、andThen メソッドを使用して独自に作成できます。例: 次の 2 つの関数があると仮定します。

@Bean
public Function<KStream<String, String>, KStream<String, String>> foo() {
    return input -> input.peek((s, s2) -> {});
}

@Bean
public Function<KStream<String, String>, KStream<String, Long>> bar() {
    return input -> input.peek((s, s2) -> {});
}

バインダーに機能合成のサポートがなくても、次のようにこれら 2 つの機能を合成できます。

@Bean
public Function<KStream<String, String>, KStream<String, Long>> composed() {
    foo().andThen(bar());
}

次に、フォーム spring.cloud.function.definition=foo;bar;composed の定義を提供できます。バインダーで関数合成がサポートされているため、明示的な関数合成を行っているこの 3 番目の関数を記述する必要はありません。

代わりにこれを行うことができます:

spring.cloud.function.definition=foo|bar

あなたもこれを行うことができます:

spring.cloud.function.definition=foo|bar;foo;bar

この例の合成関数のデフォルトのバインディング名は、foobar-in-0 と foobar-out-0 になります。

Kafka ストリームビンサーの機能構成の制限

java.util.function.Function Bean をお持ちの場合は、別の機能または複数の機能で構成できます。同じ機能 Bean を java.util.function.Consumer で構成することもできます。この場合、コンシューマーが最後に構成されたコンポーネントです。関数は複数の関数で構成でき、java.util.function.Consumer Bean で終了することもできます。

型 java.util.function.BiFunction の Bean を構成する場合、BiFunction は定義の最初の関数である必要があります。構成されたエンティティは、型 java.util.function.Function または java.util.funciton.Consumer のいずれかである必要があります。つまり、BiFunction Bean を取得してから、別の BiFunction で構成することはできません。

BiConsumer の型または Consumer が最初のコンポーネントである定義で構成することはできません。これが定義の最後のコンポーネントでない限り、出力が配列(分岐の場合は KStream[])である関数で構成することもできません。

関数定義の BiFunction の最初の Function も、カレー形式を使用する場合があります。例: 以下が可能です。

@Bean
public Function<KStream<String, String>, Function<KTable<String, String>, KStream<String, String>>> curriedFoo() {
    return a -> b ->
            a.join(b, (value1, value2) -> value1 + value2);
}

@Bean
public Function<KStream<String, String>, KStream<String, String>> bar() {
    return input -> input.mapValues(value -> value + "From-anotherFooFunc");
}

関数定義は curriedFoo|bar である可能性があります。バックグラウンドでは、バインダーはカリー化された関数の 2 つの入力バインディングと、定義の最後の関数に基づいた出力バインディングを作成します。この場合のデフォルトの入力バインディングは、curriedFoobar-in-0 と curriedFoobar-in-1 になります。この例のデフォルトの出力バインディングは curriedFoobar-out-0 になります。

関数合成の出力として KTable を使用する際の特記事項

次の 2 つの機能があるとしましょう。

@Bean
public Function<KStream<String, String>, KTable<String, String>> foo() {
    return KStream::toTable;
    };
}

@Bean
public Function<KTable<String, String>, KStream<String, String>> bar() {
    return KTable::toStream;
}

それらを foo|bar として構成できますが、最初の関数(foo)は出力として KTable を持っているため、2 番目の関数(この場合は bar)は入力として KTable を持っている必要があることに注意してください。