Kafka バインダーによるパーティショニング

Apache Kafka はトピックのパーティション分割をネイティブにサポートします。

たとえば、メッセージ処理を厳密に並べ替える場合など、特定のパーティションにデータを送信すると有利な場合があります(特定の顧客宛てのすべてのメッセージは同じパーティションに送信する必要があります)。

次の例は、プロデューサー側とコンシューマー側を構成する方法を示しています。

@SpringBootApplication
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        generate-out-0:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12
Apache Kafka はネイティブでパーティション分割をサポートしているため、例のようにカスタムパーティションキーを使用しているか、ペイロード自体を含む式を使用している場合を除き、上記のバインダーパーティション分割に依存する必要がないことに留意することが重要です。バインダーが提供するパーティション分割の選択は、ネイティブパーティション分割をサポートしていないミドルウェアテクノロジを対象としています。上記の例では、パーティションの決定要因となる partitionKey というカスタムキーを使用していることに注意してください。この場合はバインダーパーティション分割を使用するのが適切です。ネイティブ Kafka パーティション分割を使用する場合、つまり partition-key-expression を提供しない場合は、Apache Kafka によってパーティションが選択されます。デフォルトでは、使用可能なパーティション数に対するレコードキーのハッシュ値になります。送信レコードにキーを追加するには、spring-messaging Message<?> で KafkaHeaders.KEY ヘッダーを目的のキー値に設定します。デフォルトでは、レコードキーが指定されていない場合、Apache Kafka は Apache Kafka ドキュメント (英語) に記述されているロジックに基づいてパーティションを選択します。
トピックは、すべてのコンシューマーグループに必要な同時実行性を実現するのに十分なパーティションを持つようにプロビジョニングする必要があります。上記の構成は、最大 12 のコンシューマーインスタンスをサポートします(concurrency が 2, 4 の場合は 6、同時実行性が 3 の場合など)。一般に、将来のコンシューマーまたは同時実行性の増加を考慮して、パーティションを「オーバープロビジョニング」するのが最善です。
前述の構成では、デフォルトのパーティショニング (key.hashCode() % partitionCount) を使用しています。これは、キー値に応じて、適切にバランスの取れたアルゴリズムを提供する場合と提供しない場合があります。特に、このパーティショニング戦略は、スタンドアロンの Kafka プロデューサー (Kafka ストリームで使用されるものなど) で使用されるデフォルトとは異なることに注意してください。つまり、同じキー値が、これらのクライアントによって生成されたときに、パーティション間で異なるバランスをとる可能性があることを意味します。partitionSelectorExpression または partitionSelectorClass プロパティを使用して、このデフォルトをオーバーライドできます。

パーティションは Kafka によってネイティブに処理されるため、コンシューマー側で特別な構成は必要ありません。Kafka は、インスタンス全体にパーティションを割り当てます。

kafka トピックの PartitionCount は、実行時に変更される可能性があります (管理タスクなどにより)。その後、計算されたパーティションは異なります (たとえば、新しいパーティションが使用されます)。Spring Cloud Stream ランタイムの 4.0.3 以降、パーティション数の変更がサポートされるようになります。更新間隔を設定するには、パラメーター "spring.kafka.producer.properties.metadata.max.age.ms" も参照してください。いくつかの制限により、メッセージの「ペイロード」を参照する「パーティションキー式」を使用することはできません。その場合、メカニズムは無効になります。全体的な動作はデフォルトでは無効になっていますが、構成パラメーター "Producer.dynamicPartitionUpdatesEnabled=true" を使用して有効にできます。

次の Spring Boot アプリケーションは、Kafka ストリームをリッスンし、各メッセージの送信先のパーティション ID を(コンソールに)出力します。

@SpringBootApplication
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(WebApplicationType.NONE)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
            System.out.println(message + " received from partition " + partition);
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        listen-in-0:
          destination: partitioned.topic
          group: myGroup

必要に応じてインスタンスを追加できます。Kafka は、パーティション割り当てのバランスを取り直します。インスタンス数(または instance count * concurrency)がパーティションの数を超えると、一部のコンシューマーはアイドル状態になります。