AMQP

Advanced Message Queuing Protocol (AMQP) は、メッセージ指向ミドルウェア用のプラットフォーム中立のワイヤレベルプロトコルです。Spring AMQP プロジェクトは、Spring のコア概念を AMQP ベースのメッセージングソリューションの開発に適用します。Spring Boot は、spring-boot-starter-amqp スターターなど、RabbitMQ を介して AMQP を操作するための便利な機能をいくつか提供します。

RabbitMQ のサポート

RabbitMQ (英語) は、AMQP プロトコルに基づく、軽量で信頼性が高く、スケーラブルでポータブルなメッセージブローカーです。Spring は、RabbitMQ を使用して AMQP プロトコルを介して通信します。

RabbitMQ 構成は、spring.rabbitmq.*外部構成プロパティによって制御されます。例: application.properties で次のセクションを宣言できます。

  • プロパティ

  • YAML

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

または、addresses 属性を使用して同じ接続を構成できます。

  • プロパティ

  • YAML

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
その方法でアドレスを指定する場合、host および port プロパティは無視されます。アドレスが amqps プロトコルを使用している場合、SSL サポートは自動的に有効になります。

サポートされているプロパティベースの構成オプションの詳細については、RabbitProperties (Javadoc) を参照してください。Spring AMQP で使用される RabbitMQ ConnectionFactory (英語) の下位レベルの詳細を構成するには、ConnectionFactoryCustomizer (Javadoc) Bean を定義します。

コンテキスト内に ConnectionNameStrategy (Javadoc) Bean が存在する場合、自動構成された CachingConnectionFactory (Javadoc) によって作成された接続の名前として自動的に使用されます。

RabbitTemplate (Javadoc) に対してアプリケーション全体の追加カスタマイズを行うには、RabbitTemplateCustomizer (Javadoc) Bean を使用します。

詳細については、RabbitMQ で使用されるプロトコルである AMQP を理解する (英語) を参照してください。

メッセージの送信

Spring の AmqpTemplate (Javadoc) AmqpAdmin (Javadoc) は自動構成されており、次の例に示すように、独自の Bean に直接自動接続できます。

  • Java

  • Kotlin

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
RabbitMessagingTemplate (Javadoc) も同様の方法で注入できます。MessageConverter (Javadoc) Bean が定義されている場合、自動構成された AmqpTemplate (Javadoc) に自動的に関連付けられます。

必要に応じて、Bean として定義されている Queue (Javadoc) は、RabbitMQ インスタンス上の対応するキューを宣言するために自動的に使用されます。

操作を再試行するには、AmqpTemplate (Javadoc) で再試行を有効にします (たとえば、ブローカー接続が失われた場合など)。

  • プロパティ

  • YAML

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

再試行はデフォルトで無効になっています。RabbitRetryTemplateCustomizer (Javadoc) Bean を宣言して、プログラムで RetryTemplate (Javadoc) をカスタマイズすることもできます。

さらに多くの RabbitTemplate (Javadoc) インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は、自動構成で使用されるファクトリと同じ設定で RabbitTemplate (Javadoc) を初期化するために使用できる RabbitTemplateConfigurer (Javadoc) Bean を提供します。

ストリームへのメッセージの送信

特定のストリームにメッセージを送信するには、次の例に示すように、ストリームの名前を指定します。

  • プロパティ

  • YAML

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

MessageConverter (Javadoc) StreamMessageConverter (Javadoc) ProducerCustomizer (Javadoc) Bean が定義されている場合、自動構成された RabbitStreamTemplate (Javadoc) に自動的に関連付けられます。

さらに多くの RabbitStreamTemplate (Javadoc) インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は、自動構成で使用されるファクトリと同じ設定で RabbitStreamTemplate (Javadoc) を初期化するために使用できる RabbitStreamTemplateConfigurer (Javadoc) Bean を提供します。

メッセージの受信

Rabbit インフラストラクチャが存在する場合、任意の Bean に @RabbitListener (Javadoc) のアノテーションを付けてリスナーエンドポイントを作成できます。RabbitListenerContainerFactory (Javadoc) が定義されていない場合は、デフォルトの SimpleRabbitListenerContainerFactory (Javadoc) が自動的に構成され、spring.rabbitmq.listener.type プロパティを使用して直接コンテナーに切り替えることができます。MessageConverter (Javadoc) または MessageRecoverer (Javadoc) Bean が定義されている場合は、デフォルトのファクトリに自動的に関連付けられます。

次のサンプルコンポーネントは、someQueue キューにリスナーエンドポイントを作成します。

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
詳細については、@EnableRabbit (Javadoc) を参照してください。

さらに多くの RabbitListenerContainerFactory (Javadoc) インスタンスを作成する必要がある場合、またはデフォルトをオーバーライドする場合、Spring Boot は、自動構成で使用されるファクトリと同じ設定で SimpleRabbitListenerContainerFactory (Javadoc) および DirectRabbitListenerContainerFactory (Javadoc) を初期化するために使用できる SimpleRabbitListenerContainerFactoryConfigurer (Javadoc) および DirectRabbitListenerContainerFactoryConfigurer (Javadoc) を提供します。

選択したコンテナーの種類は関係ありません。これらの 2 つの Bean は、自動構成によって公開されます。

たとえば、次の構成クラスは、特定の MessageConverter (Javadoc) を使用する別のファクトリを公開します。

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory? {
		return ...
	}

}

次に、次のように、任意の @RabbitListener (Javadoc) アノテーション付きメソッドでファクトリを使用できます。

  • Java

  • Kotlin

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

リスナーが例外をスローする状況に対処するために、再試行を有効にすることができます。デフォルトでは RejectAndDontRequeueRecoverer (Javadoc) が使用されますが、独自の MessageRecoverer (Javadoc) を定義することもできます。再試行が失敗すると、メッセージは拒否され、ドロップされるか、ブローカーがそうするように構成されている場合はデッドレター交換にルーティングされます。デフォルトでは、再試行は無効になっています。RabbitRetryTemplateCustomizer (Javadoc) Bean を宣言して、プログラムで RetryTemplate (Javadoc) をカスタマイズすることもできます。

デフォルトでは、再試行が無効でリスナーが例外をスローすると、配信は無期限に再試行されます。この動作は、次の 2 つの方法で変更できます。defaultRequeueRejected プロパティを false に設定して再配信を 0 回試行するか、AmqpRejectAndDontRequeueException (Javadoc) をスローしてメッセージを拒否するように通知します。後者は、再試行が有効で配信試行の最大回数に達した場合に使用されるメカニズムです。