送信でのパーティションサポート

Kafka ストリームプロセッサーは通常、処理された出力を送信 Kafka トピックに送信します。送信トピックがパーティション化されており、プロセッサーが送信データを特定のパーティションに送信する必要がある場合、アプリケーションは型 StreamPartitioner の Bean を提供する必要があります。詳細については、StreamPartitioner [Apache] (英語) を参照してください。いくつかの例を見てみましょう。

これは、すでに何度も見たのと同じプロセッサーです。

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> process() {

    ...
}

出力バインディングの宛先は次のとおりです。

spring.cloud.stream.bindings.process-out-0.destination: outputTopic

トピック outputTopic に 4 つのパーティションがある場合、パーティション化戦略を提供しないと、Kafka ストリームはデフォルトのパーティション化戦略を使用しますが、特定のユースケースによっては希望する結果にならない場合があります。たとえば、spring に一致するすべてのキーをパーティション 0 に、cloud をパーティション 1 に、stream をパーティション 2 に、その他すべてをパーティション 3 に送信するとします。これはアプリケーションで行う必要があることです。

@Bean
public StreamPartitioner<String, WordCount> streamPartitioner() {
    return (t, k, v, n) -> {
        if (k.equals("spring")) {
            return 0;
        }
        else if (k.equals("cloud")) {
            return 1;
        }
        else if (k.equals("stream")) {
            return 2;
        }
        else {
            return 3;
        }
    };
}

これは基本的な実装ですが、レコードのキー / 値、トピック名、パーティションの総数にアクセスできます。必要に応じて複雑なパーティショニング戦略を実装できます。

また、この Bean 名をアプリケーション構成とともに提供する必要があります。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.streamPartitionerBeanName: streamPartitioner

アプリケーションの各出力トピックは、このように個別に構成する必要があります。