リスナーの同時実行

SimpleMessageListenerContainer

デフォルトでは、リスナーコンテナーは、キューからメッセージを受信する単一のコンシューマーを開始します。

前のセクションの表を調べると、同時実行を制御する多くのプロパティと属性を確認できます。最も単純なのは concurrentConsumers で、メッセージを同時に処理する (一定の) 数のコンシューマーを作成します。

バージョン 1.3.0 より前では、これが使用可能な唯一の設定であり、設定を変更するにはコンテナーを停止してから再度開始する必要がありました。

バージョン 1.3.0 以降、concurrentConsumers プロパティを動的に調整できるようになりました。コンテナーの実行中に変更された場合、新しい設定に合わせてコンシューマーが必要に応じて追加または削除されます。

さらに、maxConcurrentConsumers という新しいプロパティが追加され、コンテナーはワークロードに基づいて同時実行を動的に調整します。これは、4 つの追加プロパティ consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval と連携して機能します。デフォルト設定では、コンシューマーを増やすアルゴリズムは次のように機能します。

maxConcurrentConsumers に達しておらず、既存のコンシューマーが 10 サイクル連続してアクティブであり、最後のコンシューマーが開始されてから少なくとも 10 秒が経過した場合、新しいコンシューマーが開始されます。batchSize * receiveTimeout ミリ秒で少なくとも 1 つのメッセージを受信した場合、コンシューマーはアクティブであると見なされます。

デフォルト設定では、コンシューマーを減らすアルゴリズムは次のように機能します。

実行中の concurrentConsumers を超える数があり、コンシューマーが 10 回連続してタイムアウト (アイドル) を検出し、かつ最後のコンシューマーが少なくとも 60 秒前に停止された場合、コンシューマーは停止します。タイムアウトは、receiveTimeout および batchSize プロパティによって異なります。batchSize * receiveTimeout ミリ秒以内にメッセージを受信しない場合、コンシューマーはアイドル状態であると見なされます。デフォルトのタイムアウト (1 秒) と batchSize が 4 の場合、コンシューマーの停止は 40 秒のアイドル時間後に考慮されます (4 つのタイムアウトは 1 つのアイドル検出に対応します)。

実際には、コンテナー全体がしばらくアイドル状態の場合にのみ、コンシューマーを停止できます。これは、ブローカーがすべてのアクティブなコンシューマー間で作業を共有するためです。

構成されたキューの数に関係なく、各コンシューマーは 1 つのチャネルを使用します。

バージョン 2.0 以降では、concurrentConsumers および maxConcurrentConsumers プロパティを concurrency プロパティで設定できます (例: 2-4)。

DirectMessageListenerContainer を使用する

このコンテナーでは、同時実行は構成されたキューと consumersPerQueue に基づいています。各キューの各コンシューマーは個別のチャネルを使用し、同時実行は rabbit クライアントライブラリによって制御されます。デフォルトでは、執筆時点では、DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2 スレッドのプールを使用しています。

必要な最大同時実行性を提供するように taskExecutor を構成できます。