RabbitMQ バインダーを使用したパーティション分割
RabbitMQ はネイティブでのパーティショニングをサポートしていません。
特定のパーティションにデータを送信すると有利な場合があります。たとえば、メッセージ処理を厳密に並べ替える場合は、特定の顧客宛てのすべてのメッセージを同じパーティションに送信する必要があります。
RabbitMessageChannelBinder
は、各パーティションのキューを宛先交換にバインドすることにより、パーティション化を提供します。
次の Java と YAML の例は、プロデューサーを構成する方法を示しています。
プロデューサー
@SpringBootApplication
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
前の例の構成では、デフォルトのパーティショニング(
|
次の構成は、トピック交換をプロビジョニングします。
次のキューがその交換にバインドされています。
次のバインディングは、キューを交換に関連付けます。
次の Java と YAML の例は、前の例を継続し、コンシューマーを構成する方法を示しています。
コンシューマー
@SpringBootApplication
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
System.out.println(in + " received from queue " + queue);
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
RabbitMessageChannelBinder は動的スケーリングをサポートしていません。パーティションごとに少なくとも 1 つのコンシューマーが必要です。コンシューマーの instanceIndex は、どのパーティションが消費されているかを示すために使用されます。Cloud Foundry などのプラットフォームは、instanceIndex を持つインスタンスを 1 つだけ持つことができます。 |