バッチ処理を伴う @RabbitListener
バッチのメッセージを受信すると、通常はコンテナーによってデバッチ処理が実行され、リスナーは一度に 1 つのメッセージで呼び出されます。バージョン 2.2 以降では、リスナーコンテナーファクトリとリスナーを構成して、1 回の呼び出しでバッチ全体を受信することができます。ファクトリの batchListener
プロパティを設定し、メソッドのペイロードパラメーターを List
または Collection
にします。
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
batchListener
プロパティを true に設定すると、ファクトリが作成するコンテナーの deBatchingEnabled
コンテナープロパティが自動的にオフになります (consumerBatchEnabled
が true
でない場合 - 以下を参照)。事実上、デバッチはコンテナーからリスナーアダプターに移動され、アダプターはリスナーに渡されるリストを作成します。
バッチ対応のファクトリは、マルチメソッドリスナーでは使用できません。
また、バージョン 2.2 以降。バッチ処理されたメッセージを一度に 1 つずつ受信すると、最後のメッセージには true
に設定されたブールヘッダーが含まれます。このヘッダーは、リスナーメソッドに @Header(AmqpHeaders.LAST_IN_BATCH)
boolean last` パラメーターを追加することで取得できます。ヘッダーは MessageProperties.isLastInBatch()
からマップされます。さらに、AmqpHeaders.BATCH_SIZE
には、すべてのメッセージフラグメントのバッチのサイズが入力されます。
さらに、新しいプロパティ consumerBatchEnabled
が SimpleMessageListenerContainer
に追加されました。これが true の場合、コンテナーは batchSize
までのメッセージのバッチを作成します。新しいメッセージが到着しないまま receiveTimeout
が経過すると、部分的なバッチが配信されます。プロデューサーが作成したバッチを受信した場合、バッチはデバッチされ、コンシューマー側のバッチに追加されます。配信された実際のメッセージ数は、ブローカから受信したメッセージ数を表す batchSize
を超える場合があります。consumerBatchEnabled
が真の場合、deBatchingEnabled
は真でなければなりません。コンテナーファクトリはこの要件を適用します。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
@RabbitListener
で consumerBatchEnabled
を使用する場合:
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
1 つ目は、受信した生の変換されていない
org.springframework.amqp.core.Message
で呼び出されます。2 つ目は、変換されたペイロードとマップされたヘッダー / プロパティを使用して
org.springframework.messaging.Message<?>
で呼び出されます。3 つ目は、ヘッダー / プロパティにアクセスせずに、変換されたペイロードで呼び出されます。
MANUAL
ack モードを使用するときによく使用される Channel
パラメーターを追加することもできます。delivery_tag
プロパティにアクセスできないため、これは 3 番目の例ではあまり役に立ちません。
Spring Boot は、consumerBatchEnabled
および batchSize
の構成プロパティを提供しますが、batchListener
には提供しません。バージョン 3.0 以降、コンテナーファクトリで consumerBatchEnabled
を true
に設定すると、batchListener
も true
に設定されます。consumerBatchEnabled
が true
の場合、リスナーはバッチリスナーである必要があります。
バージョン 3.0 以降、リスナーメソッドは Collection<?>
または List<?>
を使用できます。
バッチモードのリスナーは、バッチ内のメッセージと生成される単一の応答の間に相関関係がない可能性があるため、応答をサポートしません。非同期の戻り値の型は、バッチリスナーでも引き続きサポートされます。 |