高レベル DSL と低レベルプロセッサー API の混合
Kafka Streams は、API の 2 つのバリアントを提供します。API のような高レベルの DSL があり、多くの関数型プログラマーに馴染みのあるさまざまな操作をチェーンで実行できます。Kafka Streams は、低レベルのプロセッサー API へのアクセスも提供します。プロセッサー API は非常に強力であり、はるかに低いレベルで物事を制御する機能を提供しますが、本質的に不可欠です。Spring Cloud Stream 用の Kafka Streams バインダーを使用すると、高レベル DSL を使用するか、DSL とプロセッサー API の両方を混在させることができます。これらのバリアントの両方を組み合わせると、アプリケーションのさまざまなユースケースを制御するための多くのオプションが得られます。アプリケーションは、transform
または process
メソッド API 呼び出しを使用して、プロセッサー API にアクセスできます。
これは、process
API を使用して Spring Cloud Stream アプリケーションで DSL とプロセッサー API の両方を組み合わせる方法を示しています。
@Bean
public Consumer<KStream<Object, String>> process() {
return input ->
input.process(() -> new Processor<Object, String>() {
@Override
@SuppressWarnings("unchecked")
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(Object key, String value) {
//business logic
}
@Override
public void close() {
});
}
これは、transform
API を使用した例です。
@Bean
public Consumer<KStream<Object, String>> process() {
return (input, a) ->
input.transform(() -> new Transformer<Object, String, KeyValue<Object, String>>() {
@Override
public void init(ProcessorContext context) {
}
@Override
public void close() {
}
@Override
public KeyValue<Object, String> transform(Object key, String value) {
// business logic - return transformed KStream;
}
});
}
process
API メソッド呼び出しは終端記号操作ですが、transform
API は非終端記号であり、DSL またはプロセッサー API のいずれかを使用してさらに処理を続行できる潜在的に変換された KStream
を提供します。