バッチ処理を伴う @RabbitListener

バッチのメッセージを受信すると、通常はコンテナーによってデバッチ処理が実行され、リスナーは一度に 1 つのメッセージで呼び出されます。バージョン 2.2 以降では、リスナーコンテナーファクトリとリスナーを構成して、1 回の呼び出しでバッチ全体を受信することができます。ファクトリの batchListener プロパティを設定し、メソッドのペイロードパラメーターを List または Collection にします。

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

batchListener プロパティを true に設定すると、ファクトリが作成するコンテナーの deBatchingEnabled コンテナープロパティが自動的にオフになります (consumerBatchEnabled が true でない場合 - 以下を参照)。事実上、デバッチはコンテナーからリスナーアダプターに移動され、アダプターはリスナーに渡されるリストを作成します。

バッチ対応のファクトリは、マルチメソッドリスナーでは使用できません。

また、バージョン 2.2 以降。バッチ処理されたメッセージを一度に 1 つずつ受信すると、最後のメッセージには true に設定されたブールヘッダーが含まれます。このヘッダーは、リスナーメソッドに @Header(AmqpHeaders.LAST_IN_BATCH) boolean last` パラメーターを追加することで取得できます。ヘッダーは MessageProperties.isLastInBatch() からマップされます。さらに、AmqpHeaders.BATCH_SIZE には、すべてのメッセージフラグメントのバッチのサイズが入力されます。

さらに、新しいプロパティ consumerBatchEnabled が SimpleMessageListenerContainer に追加されました。これが true の場合、コンテナーは batchSize までのメッセージのバッチを作成します。新しいメッセージが到着しないまま receiveTimeout が経過すると、部分的なバッチが配信されます。プロデューサーが作成したバッチを受信した場合、バッチはデバッチされ、コンシューマー側のバッチに追加されます。配信された実際のメッセージ数は、ブローカから受信したメッセージ数を表す batchSize を超える場合があります。consumerBatchEnabled が真の場合、deBatchingEnabled は真でなければなりません。コンテナーファクトリはこの要件を適用します。

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

@RabbitListener で consumerBatchEnabled を使用する場合:

@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    ...
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    ...
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    ...
}
  • 1 つ目は、受信した生の変換されていない org.springframework.amqp.core.Message で呼び出されます。

  • 2 つ目は、変換されたペイロードとマップされたヘッダー / プロパティを使用して org.springframework.messaging.Message<?> で呼び出されます。

  • 3 つ目は、ヘッダー / プロパティにアクセスせずに、変換されたペイロードで呼び出されます。

MANUAL ack モードを使用するときによく使用される Channel パラメーターを追加することもできます。delivery_tag プロパティにアクセスできないため、これは 3 番目の例ではあまり役に立ちません。

Spring Boot は、consumerBatchEnabled および batchSize の構成プロパティを提供しますが、batchListener には提供しません。バージョン 3.0 以降、コンテナーファクトリで consumerBatchEnabled を true に設定すると、batchListener も true に設定されます。consumerBatchEnabled が true の場合、リスナーはバッチリスナーである必要があります。

バージョン 3.0 以降、リスナーメソッドは Collection<?> または List<?> を使用できます。

バッチモードのリスナーは、バッチ内のメッセージと生成される単一の応答の間に相関関係がない可能性があるため、応答をサポートしません。非同期の戻り値の型は、バッチリスナーでも引き続きサポートされます。