Kafka ストリームベースのバインダーと通常の Kafka バインダーを備えたマルチバインダー
通常の Kafka バインダーに基づく関数 / コンシューマー / サプライヤーと Kafka ストリームベースのプロセッサーの両方を備えたアプリケーションを使用できます。ただし、単一の関数またはコンシューマー内で両方を混在させることはできません。
これは、同じアプリケーション内に両方のバインダーベースのコンポーネントがある例です。
@Bean
public Function<String, String> process() {
return s -> s;
}
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {
return input -> input;
}
これは、構成の関連部分です。
spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
上記と同じアプリケーションを使用している場合、状況は少し複雑になりますが、2 つの異なる Kafka クラスターを処理します。たとえば、通常の process
は Kafka クラスター 1 とクラスター 2 の両方に作用します(クラスター 1 からデータを受信してクラスターに送信します) -2)Kafka Streams プロセッサーが Kafka クラスター 2 に作用しています。次に、Spring Cloud Stream が提供するマルチバインダー機能を使用する必要があります。
そのシナリオで構成がどのように変更されるかを次に示します。
# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.function.definition=process;kstreamProcess
# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2
# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3
上記の構成に注意してください。2 種類のバインダーがありますが、全部で 3 つのバインダーがあります。最初のバインダーはクラスター 1 に基づく通常の Kafka バインダー(kafka1
)、次にクラスター 2 に基づく別の Kafka バインダー(kafka2
)、最後に kstream
バインダー(kafka3
)です。アプリケーションの最初のプロセッサーは、kafka1
からデータを受信し、kafka2
に公開します。ここで、両方のバインダーは通常の Kafka バインダーに基づいていますが、クラスターが異なります。2 番目のプロセッサーである Kafka Streams プロセッサーは、kafka2
と同じクラスターであるがバインダー型が異なる kafka3
からのデータを消費します。
Kafka Streams ファミリーのバインダーには 3 つの異なるバインダー型(kstream
、ktable
、globalktable
)があるため、アプリケーションにこれらのバインダーのいずれかに基づく複数のバインディングがある場合は、バインダー型として明示的に指定する必要があります。
たとえば、以下のようなプロセッサーを使用している場合
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {
...
}
次に、これをマルチバインダーシナリオで次のように構成する必要があります。これは、単一のアプリケーション内で複数のクラスターを処理する複数のプロセッサーが存在する真のマルチバインダーシナリオがある場合にのみ必要であることに注意してください。その場合、他のプロセッサーのバインダー型やクラスターと区別するために、バインダーにバインディングを明示的に提供する必要があります。
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1 #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2 #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3 #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream
# rest of the configuration is omitted.