高レベル DSL と低レベルプロセッサー API の混合

Kafka Streams は、API の 2 つのバリアントを提供します。API のような高レベルの DSL があり、多くの関数型プログラマーに馴染みのあるさまざまな操作をチェーンで実行できます。Kafka Streams は、低レベルのプロセッサー API へのアクセスも提供します。プロセッサー API は非常に強力であり、はるかに低いレベルで物事を制御する機能を提供しますが、本質的に不可欠です。Spring Cloud Stream 用の Kafka Streams バインダーを使用すると、高レベル DSL を使用するか、DSL とプロセッサー API の両方を混在させることができます。これらのバリアントの両方を組み合わせると、アプリケーションのさまざまなユースケースを制御するための多くのオプションが得られます。アプリケーションは、transform または process メソッド API 呼び出しを使用して、プロセッサー API にアクセスできます。

これは、process API を使用して Spring Cloud Stream アプリケーションで DSL とプロセッサー API の両方を組み合わせる方法を示しています。

@Bean
public Consumer<KStream<Object, String>> process() {
    return input ->
        input.process(() -> new Processor<Object, String>() {
            @Override
            @SuppressWarnings("unchecked")
            public void init(ProcessorContext context) {
               this.context = context;
            }

            @Override
            public void process(Object key, String value) {
                //business logic
            }

            @Override
            public void close() {

        });
}

これは、transform API を使用した例です。

@Bean
public Consumer<KStream<Object, String>> process() {
    return (input, a) ->
        input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
            @Override
            public void init(ProcessorContext context) {

            }

            @Override
            public void close() {

            }

            @Override
            public KeyValue<Object, String> transform(Object key, String value) {
                // business logic - return transformed KStream;
            }
        });
}

process API メソッド呼び出しは終端記号操作ですが、transform API は非終端記号であり、DSL またはプロセッサー API のいずれかを使用してさらに処理を続行できる潜在的に変換された KStream を提供します。