スレッド化と非同期コンシューマー
非同期コンシューマーには、さまざまなスレッドが関係しています。
SimpleMessageListenerContainer
で構成された TaskExecutor
からのスレッドは、新しいメッセージが RabbitMQ Client
によって配信されるときに MessageListener
を呼び出すために使用されます。構成されていない場合は、SimpleAsyncTaskExecutor
が使用されます。プールされたエグゼキューターを使用する場合は、構成された同時実行を処理するのに十分なプールサイズを確保する必要があります。DirectMessageListenerContainer
では、MessageListener
は RabbitMQ Client
スレッドで直接呼び出されます。この場合、taskExecutor
は、コンシューマーを監視するタスクに使用されます。
デフォルトの SimpleAsyncTaskExecutor を使用すると、リスナーが呼び出されるスレッドに対して、リスナーコンテナー beanName が threadNamePrefix で使用されます。これは、ログ分析に役立ちます。通常、ロギングアペンダー構成に常にスレッド名を含めることをお勧めします。コンテナーの taskExecutor プロパティによって TaskExecutor が明示的に提供されている場合、変更されずにそのまま使用されます。同様の手法を使用して、カスタム TaskExecutor Bean 定義によって作成されたスレッドに名前を付け、ログメッセージでスレッドを識別しやすくすることをお勧めします。 |
CachingConnectionFactory
で構成された Executor
は、接続の作成時に RabbitMQ Client
に渡され、そのスレッドを使用して新しいメッセージがリスナーコンテナーに配信されます。これが構成されていない場合、クライアントは (執筆時点で) 接続ごとに Runtime.getRuntime().availableProcessors() * 2
のプールサイズを持つ内部スレッドプールエグゼキューターを使用します。
多数のファクトリがある場合、または CacheMode.CONNECTION
を使用している場合は、ワークロードを満たすのに十分なスレッドを持つ共有 ThreadPoolTaskExecutor
の使用を検討することをお勧めします。
DirectMessageListenerContainer では、コネクションファクトリが、そのファクトリを使用するすべてのリスナーコンテナーで目的の同時実行をサポートするのに十分なスレッドを持つタスクエグゼキュータで構成されていることを確認する必要があります。デフォルトのプールサイズ (執筆時点) は Runtime.getRuntime().availableProcessors() * 2 です。 |
RabbitMQ client
は ThreadFactory
を使用して、低レベルの I/O (ソケット) 操作用のスレッドを作成します。このファクトリを変更するには、基礎となるクライアント接続ファクトリの構成に従って、基礎となる RabbitMQ ConnectionFactory
を構成する必要があります。