AMQP

Advanced Message Queuing Protocol(AMQP)は、メッセージ指向ミドルウェア向けのプラットフォームに依存しないワイヤーレベルのプロトコルです。Spring AMQP プロジェクトは、コア Spring コンセプトを AMQP ベースのメッセージングソリューションの開発に適用します。Spring Boot は、spring-boot-starter-amqp 「スターター」など、RabbitMQ を介して AMQP を操作するためのいくつかの便利な機能を提供します。

RabbitMQ のサポート

RabbitMQ (英語) は、AMQP プロトコルに基づく、軽量で信頼性が高く、スケーラブルでポータブルなメッセージブローカーです。Spring は、RabbitMQ を使用して AMQP プロトコルを介して通信します。

RabbitMQ 構成は、spring.rabbitmq.*外部構成プロパティによって制御されます。例: application.properties で次のセクションを宣言できます。

  • プロパティ

  • YAML

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

または、addresses 属性を使用して同じ接続を構成できます。

  • プロパティ

  • YAML

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
その方法でアドレスを指定する場合、host および port プロパティは無視されます。アドレスが amqps プロトコルを使用している場合、SSL サポートは自動的に有効になります。

サポートされているプロパティベースの構成オプションの詳細については、RabbitProperties (Javadoc) を参照してください。Spring AMQP で使用される RabbitMQ ConnectionFactory の下位レベルの詳細を構成するには、ConnectionFactoryCustomizer Bean を定義します。

ConnectionNameStrategy Bean がコンテキストに存在する場合、自動構成された CachingConnectionFactory によって作成された接続に名前を付けるために自動的に使用されます。

アプリケーション全体の追加のカスタマイズを RabbitTemplate に行うには、RabbitTemplateCustomizer Bean を使用します。

詳細については、RabbitMQ で使用されるプロトコルである AMQP を理解する (英語) を参照してください。

メッセージの送信

Spring の AmqpTemplate および AmqpAdmin は自動構成され、次の例に示すように、独自の Bean に直接オートワイヤーできます。

  • Java

  • Kotlin

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
RabbitMessagingTemplate (Javadoc) は、同様の方法で注入できます。MessageConverter Bean が定義されている場合、自動構成された AmqpTemplate に自動的に関連付けられます。

必要に応じて、Bean として定義されている org.springframework.amqp.core.Queue は、RabbitMQ インスタンスで対応するキューを宣言するために自動的に使用されます。

操作を再試行するには、AmqpTemplate で再試行を有効にします(たとえば、ブローカー接続が失われた場合)。

  • プロパティ

  • YAML

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

再試行はデフォルトで無効になっています。RabbitRetryTemplateCustomizer Bean を宣言することにより、プログラムで RetryTemplate をカスタマイズすることもできます。

さらに RabbitTemplate インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は RabbitTemplateConfigurer Bean を提供します。これを使用して、自動構成で使用されるファクトリと同じ設定で RabbitTemplate を初期化できます。

ストリームへのメッセージの送信

特定のストリームにメッセージを送信するには、次の例に示すように、ストリームの名前を指定します。

  • プロパティ

  • YAML

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

MessageConverterStreamMessageConverterProducerCustomizer Bean が定義されている場合、自動構成された RabbitStreamTemplate に自動的に関連付けられます。

さらに RabbitStreamTemplate インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は RabbitStreamTemplateConfigurer Bean を提供します。これを使用して、自動構成で使用されるファクトリと同じ設定で RabbitStreamTemplate を初期化できます。

メッセージの受信

Rabbit インフラストラクチャが存在する場合、任意の Bean に @RabbitListener のアノテーションを付けて、リスナーエンドポイントを作成できます。RabbitListenerContainerFactory が定義されていない場合、デフォルトの SimpleRabbitListenerContainerFactory が自動的に構成され、spring.rabbitmq.listener.type プロパティを使用して直接コンテナーに切り替えることができます。MessageConverter または MessageRecoverer Bean が定義されている場合、デフォルトのファクトリに自動的に関連付けられます。

次のサンプルコンポーネントは、someQueue キューにリスナーエンドポイントを作成します。

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
詳細については、@EnableRabbit の Javadoc を参照してください。

さらに RabbitListenerContainerFactory インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は SimpleRabbitListenerContainerFactoryConfigurer および DirectRabbitListenerContainerFactoryConfigurer を提供します。これらを使用して、自動構成で使用されるファクトリと同じ設定で SimpleRabbitListenerContainerFactory および DirectRabbitListenerContainerFactory を初期化できます。

選択したコンテナーの種類は関係ありません。これらの 2 つの Bean は、自動構成によって公開されます。

たとえば、次の構成クラスは、特定の MessageConverter を使用する別のファクトリを公開します。

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory? {
		return ...
	}

}

次に、次のように、@RabbitListener アノテーション付きメソッドでファクトリを使用できます。

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

再試行を有効にして、リスナーが例外をスローする状況を処理できます。デフォルトでは、RejectAndDontRequeueRecoverer が使用されますが、独自の MessageRecoverer を定義できます。再試行が使い果たされると、ブローカーがそうするように構成されている場合、メッセージは拒否され、送達不能交換にドロップまたはルーティングされます。デフォルトでは、再試行は無効になっています。RabbitRetryTemplateCustomizer Bean を宣言することにより、RetryTemplate をプログラムでカスタマイズすることもできます。

デフォルトでは、再試行が無効になっていて、リスナーが例外をスローした場合、配信は無期限に再試行されます。この動作は次の 2 つの方法で変更できます。defaultRequeueRejected プロパティを false に設定して再配信をゼロにするか、AmqpRejectAndDontRequeueException をスローしてメッセージを拒否することを通知します。後者は、再試行が有効で、配信試行の最大数に達したときに使用されるメカニズムです。