スレッド化と非同期コンシューマー

非同期コンシューマーには、さまざまなスレッドが関係しています。

SimpleMessageListenerContainer で構成された TaskExecutor からのスレッドは、新しいメッセージが RabbitMQ Client によって配信されるときに MessageListener を呼び出すために使用されます。構成されていない場合は、SimpleAsyncTaskExecutor が使用されます。プールされたエグゼキューターを使用する場合は、構成された同時実行を処理するのに十分なプールサイズを確保する必要があります。DirectMessageListenerContainer では、MessageListener は RabbitMQ Client スレッドで直接呼び出されます。この場合、taskExecutor は、コンシューマーを監視するタスクに使用されます。

デフォルトの SimpleAsyncTaskExecutor を使用すると、リスナーが呼び出されるスレッドに対して、リスナーコンテナー beanName が threadNamePrefix で使用されます。これは、ログ分析に役立ちます。通常、ロギングアペンダー構成に常にスレッド名を含めることをお勧めします。コンテナーの taskExecutor プロパティによって TaskExecutor が明示的に提供されている場合、変更されずにそのまま使用されます。同様の手法を使用して、カスタム TaskExecutor Bean 定義によって作成されたスレッドに名前を付け、ログメッセージでスレッドを識別しやすくすることをお勧めします。

CachingConnectionFactory で構成された Executor は、接続の作成時に RabbitMQ Client に渡され、そのスレッドを使用して新しいメッセージがリスナーコンテナーに配信されます。これが構成されていない場合、クライアントは (執筆時点で) 接続ごとに Runtime.getRuntime().availableProcessors() * 2 のプールサイズを持つ内部スレッドプールエグゼキューターを使用します。

多数のファクトリがある場合、または CacheMode.CONNECTION を使用している場合は、ワークロードを満たすのに十分なスレッドを持つ共有 ThreadPoolTaskExecutor の使用を検討することをお勧めします。

DirectMessageListenerContainer では、コネクションファクトリが、そのファクトリを使用するすべてのリスナーコンテナーで目的の同時実行をサポートするのに十分なスレッドを持つタスクエグゼキュータで構成されていることを確認する必要があります。デフォルトのプールサイズ (執筆時点) は Runtime.getRuntime().availableProcessors() * 2 です。

RabbitMQ client は ThreadFactory を使用して、低レベルの I/O (ソケット) 操作用のスレッドを作成します。このファクトリを変更するには、基礎となるクライアント接続ファクトリの構成に従って、基礎となる RabbitMQ ConnectionFactory を構成する必要があります。