Kafka ストリームベースのバインダーと通常の Kafka バインダーを備えたマルチバインダー

通常の Kafka バインダーに基づく関数 / コンシューマー / サプライヤーと Kafka ストリームベースのプロセッサーの両方を備えたアプリケーションを使用できます。ただし、単一の関数またはコンシューマー内で両方を混在させることはできません。

これは、同じアプリケーション内に両方のバインダーベースのコンポーネントがある例です。

@Bean
public Function<String, String> process() {
    return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

    return input -> input;
}

これは、構成の関連部分です。

spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar

上記と同じアプリケーションを使用している場合、状況は少し複雑になりますが、2 つの異なる Kafka クラスターを処理します。たとえば、通常の process は Kafka クラスター 1 とクラスター 2 の両方に作用します(クラスター 1 からデータを受信してクラスターに送信します) -2)Kafka Streams プロセッサーが Kafka クラスター 2 に作用しています。次に、Spring Cloud Stream が提供するマルチバインダー機能を使用する必要があります。

そのシナリオで構成がどのように変更されるかを次に示します。

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3

上記の構成に注意してください。2 種類のバインダーがありますが、全部で 3 つのバインダーがあります。最初のバインダーはクラスター 1 に基づく通常の Kafka バインダー(kafka1)、次にクラスター 2 に基づく別の Kafka バインダー(kafka2)、最後に kstream バインダー(kafka3)です。アプリケーションの最初のプロセッサーは、kafka1 からデータを受信し、kafka2 に公開します。ここで、両方のバインダーは通常の Kafka バインダーに基づいていますが、クラスターが異なります。2 番目のプロセッサーである Kafka Streams プロセッサーは、kafka2 と同じクラスターであるがバインダー型が異なる kafka3 からのデータを消費します。

Kafka Streams ファミリーのバインダーには 3 つの異なるバインダー型(kstreamktableglobalktable)があるため、アプリケーションにこれらのバインダーのいずれかに基づく複数のバインディングがある場合は、バインダー型として明示的に指定する必要があります。

たとえば、以下のようなプロセッサーを使用している場合

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    ...
}

次に、これをマルチバインダーシナリオで次のように構成する必要があります。これは、単一のアプリケーション内で複数のクラスターを処理する複数のプロセッサーが存在する真のマルチバインダーシナリオがある場合にのみ必要であることに注意してください。その場合、他のプロセッサーのバインダー型やクラスターと区別するために、バインダーにバインディングを明示的に提供する必要があります。

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1  #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2  #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3  #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.