RabbitMQ バインダーを使用したパーティション分割

RabbitMQ はネイティブでのパーティショニングをサポートしていません。

特定のパーティションにデータを送信すると有利な場合があります。たとえば、メッセージ処理を厳密に並べ替える場合は、特定の顧客宛てのすべてのメッセージを同じパーティションに送信する必要があります。

RabbitMessageChannelBinder は、各パーティションのキューを宛先交換にバインドすることにより、パーティション化を提供します。

次の Java と YAML の例は、プロデューサーを構成する方法を示しています。

プロデューサー
@SpringBootApplication
public class RabbitPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "abc1", "def1", "qux1",
            "abc2", "def2", "qux2",
            "abc3", "def3", "qux3",
            "abc4", "def4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            generate-out-0:
              destination: partitioned.destination
              producer:
                partitioned: true
                partition-key-expression: headers['partitionKey']
                partition-count: 2
                required-groups:
                - myGroup

前の例の構成では、デフォルトのパーティショニング(key.hashCode() % partitionCount)を使用しています。これは、キー値に応じて、適切にバランスの取れたアルゴリズムを提供する場合と提供しない場合があります。partitionSelectorExpression または partitionSelectorClass プロパティを使用して、このデフォルトをオーバーライドできます。

required-groups プロパティは、プロデューサーのデプロイ時にコンシューマーキューをプロビジョニングする必要がある場合にのみ必要です。そうしないと、対応するコンシューマーがデプロイされるまで、パーティションに送信されたメッセージはすべて失われます。

次の構成は、トピック交換をプロビジョニングします。

part exchange

次のキューがその交換にバインドされています。

part queues

次のバインディングは、キューを交換に関連付けます。

part bindings

次の Java と YAML の例は、前の例を継続し、コンシューマーを構成する方法を示しています。

コンシューマー
@SpringBootApplication
public class RabbitPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
            System.out.println(in + " received from queue " + queue);
        };
    }

}
application.yml
    spring:
      cloud:
        stream:
          bindings:
            listen-in-0:
              destination: partitioned.destination
              group: myGroup
              consumer:
                partitioned: true
                instance-index: 0
RabbitMessageChannelBinder は動的スケーリングをサポートしていません。パーティションごとに少なくとも 1 つのコンシューマーが必要です。コンシューマーの instanceIndex は、どのパーティションが消費されているかを示すために使用されます。Cloud Foundry などのプラットフォームは、instanceIndex を持つインスタンスを 1 つだけ持つことができます。