AMQP
Advanced Message Queuing Protocol(AMQP)は、メッセージ指向ミドルウェア向けのプラットフォームに依存しないワイヤーレベルのプロトコルです。Spring AMQP プロジェクトは、コア Spring コンセプトを AMQP ベースのメッセージングソリューションの開発に適用します。Spring Boot は、spring-boot-starter-amqp
「スターター」など、RabbitMQ を介して AMQP を操作するためのいくつかの便利な機能を提供します。
RabbitMQ のサポート
RabbitMQ (英語) は、AMQP プロトコルに基づく、軽量で信頼性が高く、スケーラブルでポータブルなメッセージブローカーです。Spring は、RabbitMQ を使用して AMQP プロトコルを介して通信します。
RabbitMQ 構成は、spring.rabbitmq.*
の外部構成プロパティによって制御されます。例: application.properties
で次のセクションを宣言できます。
プロパティ
YAML
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
または、addresses
属性を使用して同じ接続を構成できます。
プロパティ
YAML
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
その方法でアドレスを指定する場合、host および port プロパティは無視されます。アドレスが amqps プロトコルを使用している場合、SSL サポートは自動的に有効になります。 |
サポートされているプロパティベースの構成オプションの詳細については、RabbitProperties
(Javadoc) を参照してください。Spring AMQP で使用される RabbitMQ ConnectionFactory
の下位レベルの詳細を構成するには、ConnectionFactoryCustomizer
Bean を定義します。
ConnectionNameStrategy
Bean がコンテキストに存在する場合、自動構成された CachingConnectionFactory
によって作成された接続に名前を付けるために自動的に使用されます。
アプリケーション全体の追加のカスタマイズを RabbitTemplate
に行うには、RabbitTemplateCustomizer
Bean を使用します。
詳細については、RabbitMQ で使用されるプロトコルである AMQP を理解する (英語) を参照してください。 |
メッセージの送信
Spring の AmqpTemplate
および AmqpAdmin
は自動構成され、次の例に示すように、独自の Bean に直接オートワイヤーできます。
Java
Kotlin
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
public void someMethod() {
this.amqpAdmin.getQueueInfo("someQueue");
}
public void someOtherMethod() {
this.amqpTemplate.convertAndSend("hello");
}
}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
// ...
fun someMethod() {
amqpAdmin.getQueueInfo("someQueue")
}
fun someOtherMethod() {
amqpTemplate.convertAndSend("hello")
}
}
RabbitMessagingTemplate (Javadoc) は、同様の方法で注入できます。MessageConverter Bean が定義されている場合、自動構成された AmqpTemplate に自動的に関連付けられます。 |
必要に応じて、Bean として定義されている org.springframework.amqp.core.Queue
は、RabbitMQ インスタンスで対応するキューを宣言するために自動的に使用されます。
操作を再試行するには、AmqpTemplate
で再試行を有効にします(たとえば、ブローカー接続が失われた場合)。
プロパティ
YAML
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
再試行はデフォルトで無効になっています。RabbitRetryTemplateCustomizer
Bean を宣言することにより、プログラムで RetryTemplate
をカスタマイズすることもできます。
さらに RabbitTemplate
インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は RabbitTemplateConfigurer
Bean を提供します。これを使用して、自動構成で使用されるファクトリと同じ設定で RabbitTemplate
を初期化できます。
ストリームへのメッセージの送信
特定のストリームにメッセージを送信するには、次の例に示すように、ストリームの名前を指定します。
プロパティ
YAML
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
MessageConverter
、StreamMessageConverter
、ProducerCustomizer
Bean が定義されている場合、自動構成された RabbitStreamTemplate
に自動的に関連付けられます。
さらに RabbitStreamTemplate
インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は RabbitStreamTemplateConfigurer
Bean を提供します。これを使用して、自動構成で使用されるファクトリと同じ設定で RabbitStreamTemplate
を初期化できます。
メッセージの受信
Rabbit インフラストラクチャが存在する場合、任意の Bean に @RabbitListener
のアノテーションを付けて、リスナーエンドポイントを作成できます。RabbitListenerContainerFactory
が定義されていない場合、デフォルトの SimpleRabbitListenerContainerFactory
が自動的に構成され、spring.rabbitmq.listener.type
プロパティを使用して直接コンテナーに切り替えることができます。MessageConverter
または MessageRecoverer
Bean が定義されている場合、デフォルトのファクトリに自動的に関連付けられます。
次のサンプルコンポーネントは、someQueue
キューにリスナーエンドポイントを作成します。
Java
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
詳細については、@EnableRabbit の Javadoc を参照してください。 |
さらに RabbitListenerContainerFactory
インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は SimpleRabbitListenerContainerFactoryConfigurer
および DirectRabbitListenerContainerFactoryConfigurer
を提供します。これらを使用して、自動構成で使用されるファクトリと同じ設定で SimpleRabbitListenerContainerFactory
および DirectRabbitListenerContainerFactory
を初期化できます。
選択したコンテナーの種類は関係ありません。これらの 2 つの Bean は、自動構成によって公開されます。 |
たとえば、次の構成クラスは、特定の MessageConverter
を使用する別のファクトリを公開します。
Java
Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
次に、次のように、@RabbitListener
アノテーション付きメソッドでファクトリを使用できます。
Java
Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
再試行を有効にして、リスナーが例外をスローする状況を処理できます。デフォルトでは、RejectAndDontRequeueRecoverer
が使用されますが、独自の MessageRecoverer
を定義できます。再試行が使い果たされると、ブローカーがそうするように構成されている場合、メッセージは拒否され、送達不能交換にドロップまたはルーティングされます。デフォルトでは、再試行は無効になっています。RabbitRetryTemplateCustomizer
Bean を宣言することにより、RetryTemplate
をプログラムでカスタマイズすることもできます。
デフォルトでは、再試行が無効になっていて、リスナーが例外をスローした場合、配信は無期限に再試行されます。この動作は次の 2 つの方法で変更できます。defaultRequeueRejected プロパティを false に設定して再配信をゼロにするか、AmqpRejectAndDontRequeueException をスローしてメッセージを拒否することを通知します。後者は、再試行が有効で、配信試行の最大数に達したときに使用されるメカニズムです。 |