最新の安定バージョンについては、spring-cloud-stream 4.2.1 を使用してください。 |
バッチメッセージの受信
RabbitMQ バインダーでは、コンシューマーバインディングによって処理されるバッチには次の 2 つの型があります。
プロデューサーによって作成されたバッチ
通常、プロデューサーバーインディングに batch-enabled=true
(Rabbit プロデューサーのプロパティを参照)がある場合、またはメッセージが BatchingRabbitTemplate
によって作成される場合、バッチの要素はリスナーメソッドへの個別の呼び出しとして返されます。バージョン 3.0 以降、spring.cloud.stream.bindings.<name>.consumer.batch-mode
が true
に設定されている場合、そのようなバッチは List<?>
としてリスナーメソッドに提示できます。
コンシューマー側のバッチ処理
バージョン 3.1 以降、コンシューマーは、変換されたペイロードの List<?>
としてアプリケーションに提示されるバッチに複数の受信メッセージをアセンブルするように構成できます。次の簡単なアプリケーションは、この手法の使用方法を示しています。
spring.cloud.stream.bindings.input-in-0.group=someGroup
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.enable-batching=true
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.batch-size=10
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.receive-timeout=200
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<List<Thing>> input() {
return list -> {
System.out.println("Received " + list.size());
list.forEach(thing -> {
System.out.println(thing);
// ...
});
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}");
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}");
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getField() {
return this.field;
}
public void setField(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1]
Thing [field=value2]
バッチ内のメッセージの数は、batch-size
および receive-timeout
プロパティによって指定されます。receive-timeout
が新しいメッセージなしで経過した場合、「短い」バッチが配信されます。
コンシューマー側のバッチ処理は、container-type=simple (デフォルト)でのみサポートされます。 |
コンシューマー側のバッチメッセージのヘッダーを調べたい場合は、Message<List<?>>
を使用する必要があります。ヘッダーは、ヘッダー AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS
内の List<Map<String, Object>>
であり、対応するインデックス内の各ペイロード要素のヘッダーがあります。繰り返しますが、ここに簡単な例があります:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
Consumer<Message<List<Thing>>> input() {
return msg -> {
List<Thing> things = msg.getPayload();
System.out.println("Received " + things.size());
List<Map<String, Object>> headers =
(List<Map<String, Object>>) msg.getHeaders().get(AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS);
for (int i = 0; i < things.size(); i++) {
System.out.println(things.get(i) + " myHeader=" + headers.get(i).get("myHeader"));
// ...
}
};
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value1\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue1");
return msg;
});
template.convertAndSend("input-in-0.someGroup", "{\"field\":\"value2\"}", msg -> {
msg.getMessageProperties().setHeader("myHeader", "headerValue2");
return msg;
});
};
}
public static class Thing {
private String field;
public Thing() {
}
public Thing(String field) {
this.field = field;
}
public String getfield() {
return this.field;
}
public void setfield(String field) {
this.field = field;
}
@Override
public String toString() {
return "Thing [field=" + this.field + "]";
}
}
}
Received 2
Thing [field=value1] myHeader=headerValue1
Thing [field=value2] myHeader=headerValue2