最新の安定バージョンについては、spring-cloud-stream 4.2.1 を使用してください。 |
送信でのパーティションサポート
Kafka ストリームプロセッサーは通常、処理された出力を送信 Kafka トピックに送信します。送信トピックがパーティション化されており、プロセッサーが送信データを特定のパーティションに送信する必要がある場合、アプリケーションは型 StreamPartitioner
の Bean を提供する必要があります。詳細については、StreamPartitioner [Apache] (英語) を参照してください。いくつかの例を見てみましょう。
これは、すでに何度も見たのと同じプロセッサーです。
@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {
...
}
出力バインディングの宛先は次のとおりです。
spring.cloud.stream.bindings.process-out-0.destination: outputTopic
トピック outputTopic
に 4 つのパーティションがある場合、パーティション化戦略を提供しないと、Kafka ストリームはデフォルトのパーティション化戦略を使用しますが、特定のユースケースによっては希望する結果にならない場合があります。たとえば、spring
に一致するすべてのキーをパーティション 0 に、cloud
をパーティション 1 に、stream
をパーティション 2 に、その他すべてをパーティション 3 に送信するとします。これはアプリケーションで行う必要があることです。
@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
return (t, k, v, n) -> {
if (k.equals("spring")) {
return 0;
}
else if (k.equals("cloud")) {
return 1;
}
else if (k.equals("stream")) {
return 2;
}
else {
return 3;
}
};
}
これは基本的な実装ですが、レコードのキー / 値、トピック名、パーティションの総数にアクセスできます。必要に応じて複雑なパーティショニング戦略を実装できます。
また、この Bean 名をアプリケーション構成とともに提供する必要があります。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner
アプリケーションの各出力トピックは、このように個別に構成する必要があります。