接続とリソースの管理
前のセクションで説明した AMQP モデルは一般的であり、すべての実装に適用できますが、リソースの管理に入ると、詳細はブローカーの実装に固有のものになります。このセクションでは、"spring-rabbit" モジュール内にのみ存在するコードに焦点を当てます。こでは、RabbitMQ がサポートされている唯一の実装であるためです。
RabbitMQ ブローカーへの接続を管理する中心的なコンポーネントは、ConnectionFactory インターフェースです。ConnectionFactory 実装の責任は、com.rabbitmq.client.Connection のラッパーである org.springframework.amqp.rabbit.connection.Connection のインスタンスを提供することです。
接続ファクトリの選択
3 つの接続ファクトリから選択できます
PooledChannelConnectionFactoryThreadChannelConnectionFactoryCachingConnectionFactory
最初の 2 つはバージョン 2.3 で追加されました。
ほとんどの使用例では、CachingConnectionFactory を使用する必要があります。ThreadChannelConnectionFactory は、範囲指定された操作を使用せずにメッセージの厳密な順序付けを保証したい場合に使用できます。PooledChannelConnectionFactory は、単一の接続とチャネルのプールを使用するという点で CachingConnectionFactory に似ています。実装はより簡単ですが、関連付けられたパブリッシャーの確認はサポートされていません。
簡単な発行元の確認は、3 つのファクトリすべてでサポートされています。
別の接続を使用するように RabbitTemplate を構成する場合、バージョン 2.3.2 以降では、公開接続ファクトリを別の型に構成できるようになりました。デフォルトでは、パブリッシングファクトリは同じ型であり、メインファクトリに設定されたすべてのプロパティもパブリッシングファクトリに伝達されます。
バージョン 3.1 以降、AbstractConnectionFactory には、接続モジュールのバックオフポリシーをサポートする connectionCreatingBackOff プロパティが含まれています。現在、createChannel() の動作では、channelMax 制限に達したときに発生する例外を処理するサポートがあり、試行と間隔に基づいたバックオフ戦略が実装されています。
PooledChannelConnectionFactory
このファクトリは、Apache Pool2 に基づいて、単一の接続と 2 つのチャネルプールを管理します。1 つのプールはトランザクションチャネル用で、もう 1 つは非トランザクションチャネル用です。プールは、デフォルト構成の GenericObjectPool です。プールを構成するためのコールバックが提供されています。詳細については、Apache のドキュメントを参照してください。
このファクトリを使用するには、Apache commons-pool2 jar がクラスパス上に存在している必要があります。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}ThreadChannelConnectionFactory
このファクトリは、1 つの接続と 2 つの ThreadLocal を管理します。1 つはトランザクションチャネル用で、もう 1 つは非トランザクションチャネル用です。このファクトリは、同じスレッドに対するすべての操作が同じチャネルを使用することを保証します (開いている限り)。これにより、範囲指定された操作を必要とせずに厳密なメッセージの順序付けが容易になります。アプリケーションが有効期間の短いスレッドを多数使用する場合、メモリリークを回避するには、ファクトリの closeThreadChannel() を呼び出してチャネルリソースを解放する必要があります。バージョン 2.3.7 以降、スレッドはそのチャネルを別のスレッドに転送できます。詳細については、マルチスレッド環境での厳密なメッセージの順序付けを参照してください。
CachingConnectionFactory
提供される 3 番目の実装は CachingConnectionFactory です。これは、デフォルトで、アプリケーションで共有できる単一の接続プロキシを確立します。AMQP とのメッセージングの「作業単位」は実際には「チャネル」であるため、接続の共有が可能です (ある意味では、これは JMS における接続とセッションの関連に似ています)。接続インスタンスは createChannel メソッドを提供します。CachingConnectionFactory 実装は、これらのチャネルのキャッシングをサポートし、チャネルがトランザクションかどうかに基づいて、チャネルごとに個別のキャッシュを維持します。CachingConnectionFactory のインスタンスを作成する場合、コンストラクターを介して「ホスト名」を指定できます。また、'username' および 'password' プロパティも指定する必要があります。チャネルキャッシュのサイズ (デフォルトは 25) を構成するには、setChannelCacheSize() メソッドを呼び出します。
バージョン 1.3 以降では、CachingConnectionFactory を構成して、チャネルだけでなく接続もキャッシュすることができます。この場合、createConnection() を呼び出すたびに、新しい接続が作成されます (または、アイドル状態の接続がキャッシュから取得されます)。接続を閉じると、接続がキャッシュに戻されます (キャッシュサイズに達していない場合)。このような接続で作成されたチャネルもキャッシュされます。別の接続の使用は、HA クラスターから消費する場合や、ロードバランサーと組み合わせて異なるクラスターメンバーに接続する場合など、一部の環境で役立つ場合があります。接続をキャッシュするには、cacheMode を CacheMode.CONNECTION に設定します。
| これは、接続数を制限しません。むしろ、許可されるアイドル状態のオープン接続の数を指定します。 |
バージョン 1.5.5 以降、connectionLimit と呼ばれる新しいプロパティが提供されます。このプロパティを設定すると、許可される接続の総数が制限されます。設定すると、制限に達した場合、接続がアイドル状態になるのを待機するために channelCheckoutTimeLimit が使用されます。時間を超えると、AmqpTimeoutException がスローされます。
キャッシュモードが また、このドキュメントの記載時点では、 |
キャッシュサイズは (デフォルトでは) 制限ではなく、単にキャッシュできるチャネルの数であることを理解することが重要です。たとえば 10 のキャッシュサイズでは、実際には任意の数のチャネルを使用できます。10 を超えるチャネルが使用されていて、それらがすべてキャッシュに返される場合、10 がキャッシュに入ります。残りは物理的に閉鎖されています。
バージョン 1.6 から、デフォルトのチャネルキャッシュサイズが 1 から 25 に増加しました。大量のマルチスレッド環境では、キャッシュが小さいということは、チャネルが高速で作成され、閉じられることを意味します。デフォルトのキャッシュサイズを大きくすると、このオーバーヘッドを回避できます。RabbitMQ 管理 UI を使用して使用中のチャネルを監視し、多くのチャネルが作成されて閉じられている場合は、キャッシュサイズをさらに増やすことを検討する必要があります。キャッシュは (アプリケーションの同時実行要件に合わせて) オンデマンドでのみ拡張されるため、この変更は既存の少量のアプリケーションには影響しません。
バージョン 1.4.2 以降、CachingConnectionFactory には channelCheckoutTimeout というプロパティがあります。このプロパティが 0 より大きい場合、channelCacheSize は、接続で作成できるチャネル数の制限になります。制限に達すると、チャネルが使用可能になるか、このタイムアウトに達するまでスレッドの呼び出しがブロックされます。この場合、AmqpTimeoutException がスローされます。
フレームワーク内で使用されるチャネル ( RabbitTemplate など) は、確実にキャッシュに返されます。フレームワークの外部でチャネルを作成する場合 (たとえば、接続に直接アクセスして createChannel() を呼び出すことによって)、チャネルが不足しないように、おそらく finally ブロックで (閉じることによって) 確実に返す必要があります。 |
次の例は、新しい connection を作成する方法を示しています。
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();XML を使用する場合、構成は次の例のようになります。
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean> フレームワークの単体テストコードでのみ使用できる SingleConnectionFactory 実装もあります。チャネルをキャッシュしないため、CachingConnectionFactory よりも単純ですが、パフォーマンスと回復力が不足しているため、単純なテスト以外での実用的な使用は意図されていません。なんらかの理由で独自の ConnectionFactory を実装する必要がある場合、AbstractConnectionFactory 基底クラスが適切な出発点となる場合があります。 |
ConnectionFactory は、次のように rabbit 名前空間を使用して迅速かつ便利に作成できます。
<rabbit:connection-factory id="connectionFactory"/> ほとんどの場合、フレームワークが最適なデフォルトを選択できるため、このアプローチが推奨されます。作成されたインスタンスは CachingConnectionFactory です。チャネルのデフォルトのキャッシュサイズは 25 であることに注意してください。より多くのチャネルをキャッシュする場合は、"channelCacheSize" プロパティを設定して、より大きな値を設定します。XML では次のようになります。
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>また、名前空間を使用して、次のように 'channel-cache-size' 属性 を追加できます。
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/> デフォルトのキャッシュモードは CHANNEL ですが、代わりに接続をキャッシュするように構成できます。次の例では、connection-cache-size を使用します。
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>次のように、名前空間を使用してホストとポートの属性を指定できます。
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>あるいは、クラスター環境で実行している場合は、次のように addresses 属性を使用できます。
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>address-shuffle-mode については、クラスターへの接続を参照してください。
次の例では、スレッド名の前に rabbitmq- を付けるカスタムスレッドファクトリを使用しています。
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>AddressResolver
バージョン 2.1.15 から、AddressResolver を使用して接続アドレスを解決できるようになりました。これにより、addresses および host/port プロパティの設定が上書きされます。
命名接続
バージョン 1.7 以降、AbstractionConnectionFactory への注入用に ConnectionNameStrategy が提供されています。生成された名前は、ターゲット RabbitMQ 接続のアプリケーション固有の識別に使用されます。RabbitMQ サーバーがサポートしている場合、接続名は管理 UI に表示されます。この値は一意である必要はなく、HTTP API リクエストなどで接続識別子として使用することはできません。この値は、人間が判読できると想定されており、connection_name キーの ClientProperties の一部です。次のように、単純な Lambda を使用できます。
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");ConnectionFactory 引数を使用して、ロジックによってターゲット接続名を区別できます。デフォルトでは、AbstractConnectionFactory の beanName、オブジェクトを表す 16 進文字列、および内部カウンターを使用して connection_name が生成されます。<rabbit:connection-factory> 名前空間コンポーネントには、connection-name-strategy 属性も提供されます。
SimplePropertyValueConnectionNameStrategy の実装は、接続名をアプリケーションプロパティに設定します。次の例に示すように、これを @Bean として宣言し、接続ファクトリに挿入できます。
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
} プロパティは、アプリケーションコンテキストの Environment に存在する必要があります。
Spring Boot とその自動構成された接続ファクトリを使用する場合、ConnectionNameStrategy または @Bean を宣言するだけで済みます。Spring Boot は Bean を自動検出し、ファクトリに接続します。 |
ブロックされた接続とリソースの制約
メモリアラーム (英語) に対応するブローカーからの対話に対して、接続がブロックされている可能性があります。バージョン 2.0 から、org.springframework.amqp.rabbit.connection.Connection に com.rabbitmq.client.BlockedListener インスタンスを提供して、接続のブロックおよびブロック解除のイベントを通知できます。さらに、AbstractConnectionFactory は、内部の BlockedListener 実装を通じて、それぞれ ConnectionBlockedEvent と ConnectionUnblockedEvent を発行します。これらを使用すると、アプリケーションロジックを提供して、ブローカーの問題に適切に対応し、(たとえば) 修正アクションを実行できます。
アプリケーションが単一の CachingConnectionFactory で構成されている場合、デフォルトでは Spring Boot 自動構成で構成されているため、ブローカーによって接続がブロックされると、アプリケーションは動作を停止します。また、ブローカーによってブロックされると、そのクライアントのいずれかが機能しなくなります。同じアプリケーションにプロデューサーとコンシューマーがある場合、プロデューサーが接続をブロックし (ブローカーにリソースがなくなったため)、コンシューマーが解放できない (接続がブロックされているため) 場合、デッドロックが発生する可能性があります。この問題を軽減するために、同じオプション (プロデューサー用とコンシューマー用) を持つ別の CachingConnectionFactory インスタンスをもう 1 つ用意することをお勧めします。コンシューマースレッドで実行されるトランザクションプロデューサーでは、コンシューマートランザクションに関連付けられた Channel を再利用する必要があるため、別個の CachingConnectionFactory は使用できません。 |
バージョン 2.0.2 以降、RabbitTemplate には、トランザクションが使用されていない限り、2 番目の接続ファクトリを自動的に使用する構成オプションがあります。詳細については、別の接続を使用するを参照してください。パブリッシャー接続の ConnectionNameStrategy は、メソッドの呼び出し結果に .publisher が追加されたプライマリ戦略と同じです。
バージョン 1.7.7 から、SimpleConnection.createChannel() が Channel を作成できない場合にスローされる AmqpResourceNotAvailableException が提供されます (たとえば、channelMax 制限に達し、キャッシュに使用可能なチャネルがないため)。RetryPolicy でこの例外を使用して、何らかのバックオフ後に操作を回復できます。
基礎となるクライアント接続ファクトリの構成
CachingConnectionFactory は、Rabbit クライアント ConnectionFactory のインスタンスを使用します。CachingConnectionFactory で同等のプロパティを設定する場合、多くの構成プロパティ (host、port、userName、password、requestedHeartBeat、connectionTimeout など) が渡されます。他のプロパティ (clientProperties など) を設定するには、Rabbit ファクトリのインスタンスを定義し、CachingConnectionFactory の適切なコンストラクターを使用してそのインスタンスへの参照を提供します。ネームスペースを使用する場合 ( 前述のとおり )、connection-factory 属性で構成されたファクトリへの参照を提供する必要があります。便宜上、次のセクションで説明するように、Spring アプリケーションコンテキストでの接続ファクトリの構成を支援するファクトリ Bean が提供されています。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>4.0.x クライアントは、デフォルトで自動回復を有効にします。この機能と互換性がありますが、Spring AMQP には独自の回復メカニズムがあり、通常、クライアント回復機能は必要ありません。ブローカーが使用可能であるが接続がまだ回復していない場合に AutoRecoverConnectionNotCurrentlyOpenException インスタンスを取得しないように、amqp-client 自動回復を無効にすることをお勧めします。たとえば、RetryTemplate が RabbitTemplate で構成されている場合、クラスター内の別のブローカーにフェイルオーバーする場合でも、この例外に気付く場合があります。自動回復接続はタイマーで回復するため、Spring AMQP の回復メカニズムを使用すると、接続をより迅速に回復できます。バージョン 1.7.1 以降、Spring AMQP は、独自の RabbitMQ 接続ファクトリを明示的に作成して CachingConnectionFactory に提供しない限り、amqp-client 自動回復を無効にします。RabbitConnectionFactoryBean によって作成された RabbitMQ ConnectionFactory インスタンスでも、デフォルトでオプションが無効になっています。 |
RabbitConnectionFactoryBean と SSL の構成
バージョン 1.4 以降では、依存性注入を使用して、基になるクライアント接続ファクトリで SSL プロパティを簡単に構成できる便利な RabbitConnectionFactoryBean が提供されています。その他の setter は、基になるファクトリにデリゲートします。以前は、SSL オプションをプログラムで構成する必要がありました。次の例は、RabbitConnectionFactoryBean を構成する方法を示しています。
Java
XML
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
<bean id="rabbitConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
<rabbit:connection-factory id="connectionFactory"
connection-factory="rabbitConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
Spring Boot アプリケーションファイル (.yaml または .properties)
プロパティ
YAML
spring.rabbitmq.host=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.enabled=truespring:
rabbitmq:
host: ...
ssl:
keyStoreType: jks
trustStoreType: jks
keyStore: ...
trustStore: ...
trustStorePassword: ...
keyStorePassword: ...
enabled: trueSSL の構成については、RabbitMQ ドキュメント (英語) を参照してください。keyStore および trustStore 構成を省略して、証明書の検証なしで SSL 経由で接続します。次の例は、キーとトラストストアの構成を提供する方法を示しています。
sslPropertiesLocation プロパティは、次のキーを含むプロパティファイルを指す Spring Resource です。
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secretkeyStore および truststore は、ストアを指す Spring Resources です。通常、このプロパティファイルはオペレーティングシステムによって保護され、アプリケーションは読み取りアクセス権を持っています。
Spring AMQP バージョン 1.5 以降では、提供時の Bean でこれらのプロパティを直接設定できます。離散プロパティと sslPropertiesLocation の両方が指定されている場合、後者のプロパティが離散値をオーバーライドします。
バージョン 2.0 以降、サーバー証明書はより安全であるため、デフォルトで検証されます。何らかの理由でこの検証をスキップしたい場合は、ファクトリ Bean の skipServerCertificateValidation プロパティを true に設定してください。バージョン 2.1 から、RabbitConnectionFactoryBean はデフォルトで enableHostnameVerification() を呼び出すようになりました。以前の動作に戻すには、enableHostnameVerification プロパティを false に設定します。 |
バージョン 2.2.5 以降、ファクトリ Bean は常にデフォルトで TLS v1.2 を使用します。以前は、場合によっては v1.1 を使用し、他の場合には v1.2 を使用していました (他のプロパティによって異なります)。何らかの理由で v1.1 を使用する必要がある場合は、sslAlgorithm プロパティを設定します: setSslAlgorithm("TLSv1.1")。 |
クラスターへの接続
クラスターに接続するには、CachingConnectionFactory で addresses プロパティを構成します。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
} バージョン 3.0 から、新しい接続が確立されるたびに、基になる接続ファクトリがランダムなアドレスを選択してホストへの接続を試みます。最初から最後まで接続を試行する以前の動作に戻すには、addressShuffleMode プロパティを AddressShuffleMode.NONE に設定します。
バージョン 2.3 から、INORDER シャッフルモードが追加されました。これは、接続が作成された後に最初のアドレスが最後に移動されることを意味します。すべてのノードのすべてのシャードから消費したい場合は、CacheMode.CONNECTION と適切な同時実行性を備えた RabbitMQ シャーディングプラグイン [GitHub] (英語) でこのモードを使用することをお勧めします。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}ルーティング接続ファクトリ
バージョン 1.3 から、AbstractRoutingConnectionFactory が導入されました。このファクトリは、複数の ConnectionFactories のマッピングを構成し、実行時に一部の lookupKey によってターゲット ConnectionFactory を決定するメカニズムを提供します。通常、実装はスレッドにバインドされたコンテキストをチェックします。便宜上、Spring AMQP は SimpleRoutingConnectionFactory を提供します。これは、現在のスレッドにバインドされた lookupKey を SimpleResourceHolder から取得します。次の例は、XML と Java の両方で SimpleRoutingConnectionFactory を構成する方法を示しています。
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
} 使用後はリソースのバインドを解除することが重要です。詳細については、AbstractRoutingConnectionFactory の JavaDoc (Javadoc) を参照してください。
バージョン 1.4 以降、RabbitTemplate は SpEL sendConnectionFactorySelectorExpression および receiveConnectionFactorySelectorExpression プロパティをサポートします。これらは、各 AMQP プロトコルインタラクション操作 (send、sendAndReceive、receive または receiveAndReply) で評価され、提供された AbstractRoutingConnectionFactory の lookupKey 値に解決されます。式で @vHostResolver.getVHost(#root) などの Bean 参照を使用できます。send 操作の場合、送信されるメッセージはルート評価オブジェクトです。receive 操作の場合、queueName はルート評価オブジェクトです。
ルーティングアルゴリズムは次のとおりです。セレクター式が null であるか、null に評価されるか、指定された ConnectionFactory が AbstractRoutingConnectionFactory のインスタンスではない場合、すべてが以前と同様に機能し、提供された ConnectionFactory 実装に依存します。評価結果が null ではなく、その lookupKey のターゲット ConnectionFactory がなく、AbstractRoutingConnectionFactory が lenientFallback = true で構成されている場合も同様です。AbstractRoutingConnectionFactory の場合、determineCurrentLookupKey() に基づく routing 実装にフォールバックします。ただし、lenientFallback = false の場合は、IllegalStateException がスローされます。
名前空間サポートは、<rabbit:template> コンポーネントの send-connection-factory-selector-expression および receive-connection-factory-selector-expression 属性も提供します。
また、バージョン 1.4 以降では、リスナーコンテナーでルーティング接続ファクトリを構成できます。その場合、キュー名のリストが検索キーとして使用されます。例: コンテナーを setQueueNames("thing1", "thing2") で構成する場合、検索キーは [thing1,thing]" です (キーにスペースがないことに注意してください)。
バージョン 1.6.9 以降では、リスナーコンテナーで setLookupKeyQualifier を使用して、ルックアップキーに修飾子を追加できます。これにより、たとえば、同じ名前で別の仮想ホスト (それぞれに接続ファクトリがある) にあるキューをリッスンできます。
例: ルックアップキー修飾子 thing1 とキュー thing2 をリッスンするコンテナーを使用すると、ターゲット接続ファクトリを登録できるルックアップキーは thing1[thing2] になる可能性があります。
| ターゲット (および提供されている場合はデフォルト) 接続ファクトリは、パブリッシャーの確認と戻りに対して同じ設定を持っている必要があります。パブリッシャーの確認と return を参照してください。 |
バージョン 2.4.4 以降では、この検証を無効にすることができます。確認と return の間の値が等しくない必要がある場合は、AbstractRoutingConnectionFactory#setConsistentConfirmsReturns を使用して検証を無効にすることができます。AbstractRoutingConnectionFactory に追加された最初の接続ファクトリによって、confirms および returns の一般的な値が決定されることに注意してください。
確認したいメッセージが確認 /return され、そうでないメッセージがある場合に便利です。例:
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
} こうすることで、ヘッダー x-use-publisher-confirms: true を持つメッセージがキャッシュ接続を通じて送信され、メッセージを確実に配信できます。確実なメッセージ配信の詳細については、"パブリッシャーの確認と return" を参照してください。
キューアフィニティと LocalizedQueueConnectionFactory
クラスター内で HA キューを使用する場合、最高のパフォーマンスを得るために、リードキューが存在する物理ブローカーに接続することができます。CachingConnectionFactory は複数のブローカーアドレスを使用して構成できます。これはフェイルオーバーのためであり、クライアントは構成された AddressShuffleMode 順序に従って接続を試みます。LocalizedQueueConnectionFactory は、管理プラグインによって提供される REST API を使用して、どのノードがキューのリードであるかを決定します。次に、そのノードだけに接続する CachingConnectionFactory を作成 (またはキャッシュから取得) します。接続が失敗した場合、新しいリードノードが決定され、コンシューマーはそこに接続します。LocalizedQueueConnectionFactory は、キューの物理的な場所を特定できない場合に備えて、デフォルトの接続ファクトリを使用して構成されます。この場合、LocalizedQueueConnectionFactory は通常どおりクラスターに接続します。
LocalizedQueueConnectionFactory は RoutingConnectionFactory であり、SimpleMessageListenerContainer は、上記のルーティング接続ファクトリで説明したように、キュー名をルックアップキーとして使用します。
この理由 (ルックアップにキュー名を使用) から、コンテナーが単一のキューをリッスンするように構成されている場合にのみ、LocalizedQueueConnectionFactory を使用できます。 |
| 各ノードで RabbitMQ 管理プラグインを有効にする必要があります。 |
この接続ファクトリは、SimpleMessageListenerContainer で使用される接続など、存続期間の長い接続を対象としています。接続を確立する前に REST API を呼び出すオーバーヘッドがあるため、RabbitTemplate などの短い接続での使用は想定されていません。また、パブリッシュ操作の場合、キューは不明であり、メッセージはすべてのクラスターメンバーにパブリッシュされるため、ノードを検索するロジックにはほとんど価値がありません。 |
次の構成例は、ファクトリを構成する方法を示しています。
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
} 最初の 3 つのパラメーターは addresses、adminUris、nodes の配列であることに注意してください。これらは、コンテナーがキューに接続しようとすると、管理 API を使用してどのノードがキューのリードであるかを判断し、そのノードと同じ配列位置のアドレスに接続するという点で位置的です。
バージョン 3.0 以降、RabbitMQ http-client は Rest API へのアクセスに使用されなくなりました。代わりに、デフォルトでは、spring-webflux がクラスパス上にある場合、Spring Webflux の WebClient が使用されます。それ以外の場合は、RestTemplate が使用されます。 |
WebFlux をクラスパスに追加するには:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>compile 'org.springframework.amqp:spring-rabbit' ローカライズされたキュー接続ファクトリ #setNodeLocator (Javadoc) を実装し、その createClient、restCall、オプションで close メソッドをオーバーライドすることで、他の REST テクノロジを使用することもできます。
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public Map<String, Object> restCall(MyClient client, String baseUri, String vhost, String queue) throws URISyntaxException {
...
}
}); フレームワークは WebFluxNodeLocator と RestTemplateNodeLocator を提供し、上記で説明したデフォルトを使用します。
パブリッシャーの確認と return
CachingConnectionFactory プロパティ publisherConfirmType を ConfirmType.CORRELATED に設定し、publisherReturns プロパティを "true" に設定することにより、確認済み (相関あり) および返されたメッセージがサポートされます。
これらのオプションが設定されている場合、ファクトリによって作成された Channel インスタンスは、コールバックを容易にするために使用される PublisherCallbackChannel にラップされます。このようなチャネルを取得すると、クライアントは PublisherCallbackChannel.Listener を Channel に登録できます。PublisherCallbackChannel 実装には、confirm または return を適切なリスナーにルーティングするためのロジックが含まれています。これらの機能については、以降のセクションで詳しく説明します。
範囲指定された操作の相関するパブリッシャーの確認と return および simplePublisherConfirms も参照してください。
| その他の背景情報については、RabbitMQ チームによるパブリッシャー確認の導入 (英語) というタイトルのブログ投稿を参照してください。 |
接続リスナーとチャネルリスナー
接続ファクトリは、ConnectionListener および ChannelListener 実装の登録をサポートしています。これにより、接続およびチャネル関連のイベントの通知を受け取ることができます。(ConnectionListener は、接続が確立されたときに宣言を実行するために RabbitAdmin によって使用されます。詳細については、交換、キュー、バインディングの自動宣言を参照してください)。次のリストは、ConnectionListener インターフェース定義を示しています。
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
} バージョン 2.0 から、org.springframework.amqp.rabbit.connection.Connection オブジェクトに com.rabbitmq.client.BlockedListener インスタンスを指定して、接続のブロックおよびブロック解除のイベントを通知できるようになりました。次の例は、ChannelListener インターフェース定義を示しています。
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}ChannelListener を登録するシナリオの 1 つについては、公開は非同期です — 成功と失敗を検出する方法を参照してください。
チャネルクローズイベントのロギング
バージョン 1.5 では、ユーザーがログレベルを制御できるようにするメカニズムが導入されました。
AbstractConnectionFactory は、デフォルトの戦略を使用して、チャネルの閉鎖を次のように記録します。
通常のチャネルクローズ (200 OK) はログに記録されません。
パッシブキュー宣言の失敗によりチャネルが閉じられた場合、DEBUG レベルでログに記録されます。
排他的コンシューマー条件により
basic.consumeが拒否されたためにチャネルが閉じられた場合、そのチャネルは DEBUG レベル (3.1 以降、以前は INFO) でログに記録されます。その他はすべて ERROR レベルでログに記録されます。
この動作を変更するには、closeExceptionLogger プロパティでカスタム ConditionalExceptionLogger を CachingConnectionFactory に挿入します。
また、AbstractConnectionFactory.DefaultChannelCloseLogger が公開され、サブクラス化できるようになりました。
コンシューマーイベントも参照してください。
ランタイムキャッシュのプロパティ
バージョン 1.6 から始まった CachingConnectionFactory は、getCacheProperties() メソッドを通じてキャッシュ統計を提供するようになりました。これらの統計を使用してキャッシュを調整し、本番環境で最適化できます。例: ハイウォーターマークを使用して、キャッシュサイズを増やす必要があるかどうかを判断できます。キャッシュサイズと等しい場合は、さらに増やすことを検討してください。次の表に、CacheMode.CHANNEL のプロパティを示します。
| プロパティ | 意味 |
|---|---|
connectionName |
|
channelCacheSize | アイドル状態が許可されている現在構成されている最大チャネル数。 |
localPort | 接続用のローカルポート (利用可能な場合)。これを使用して、RabbitMQ 管理 UI の接続とチャネルを関連付けることができます。 |
idleChannelsTx | 現在アイドル状態 (キャッシュされている) のトランザクションチャネルの数。 |
idleChannelsNotTx | 現在アイドル状態 (キャッシュされている) の非トランザクションチャネルの数。 |
idleChannelsTxHighWater | 同時にアイドル (キャッシュ) されたトランザクションチャネルの最大数。 |
idleChannelsNotTxHighWater | 最大数の非トランザクションチャネルが同時にアイドル (キャッシュ) されています。 |
次の表に、CacheMode.CONNECTION のプロパティを示します。
| プロパティ | 意味 |
|---|---|
connectionName:<localPort> |
|
openConnections | ブローカーへの接続を表す接続オブジェクトの数。 |
channelCacheSize | アイドル状態が許可されている現在構成されている最大チャネル数。 |
connectionCacheSize | アイドル状態が許可される現在構成されている最大接続数。 |
idleConnections | 現在アイドル状態の接続の数。 |
idleConnectionsHighWater | 同時にアイドル状態になっている接続の最大数。 |
idleChannelsTx:<localPort> | この接続で現在アイドル状態 (キャッシュされている) のトランザクションチャネルの数。プロパティ名の |
idleChannelsNotTx:<localPort> | この接続で現在アイドル状態 (キャッシュされている) の非トランザクションチャネルの数。プロパティ名の |
idleChannelsTxHighWater:<localPort> | 同時にアイドル状態 (キャッシュ) になっているトランザクションチャネルの最大数。プロパティ名の localPort 部分は、RabbitMQ 管理 UI 上の接続およびチャネルと相関させるために使用できます。 |
idleChannelsNotTxHighWater:<localPort> | 最大数の非トランザクションチャネルが同時にアイドル (キャッシュ) されています。プロパティ名の |
cacheMode プロパティ (CHANNEL または CONNECTION) も含まれます。

RabbitMQ 自動接続 / トポロジリカバリ
Spring AMQP の最初のバージョン以降、フレームワークは、ブローカーに障害が発生した場合に独自の接続とチャネルの回復を提供してきました。また、ブローカーの構成に従って、接続が再確立されると、RabbitAdmin はすべてのインフラストラクチャ Bean (キューなど) を再宣言します。現在 amqp-client ライブラリによって提供されている自動回復 (英語) には依存しません。amqp-client では、デフォルトで自動回復が有効になっています。2 つの回復メカニズムの間にはいくつかの非互換性があるため、デフォルトでは、Spring は基になる RabbitMQ connectionFactory の automaticRecoveryEnabled プロパティを false に設定します。プロパティが true であっても、Spring は回復した接続をすぐに閉じることにより、効果的に無効にします。
| デフォルトでは、Bean として定義されている要素 (キュー、エクスチェンジ、バインディング) のみが、接続の失敗後に再宣言されます。その動作を変更する方法については、自動削除宣言の回復を参照してください。 |