AmqpTemplate

Spring Framework および関連プロジェクトによって提供される他の多くの高レベルの抽象化と同様に、Spring AMQP は中心的なロールを果たす「テンプレート」を提供します。主な操作を定義するインターフェースは AmqpTemplate と呼ばれます。これらの操作は、メッセージの送受信に関する一般的な動作をカバーしています。言い換えれば、それらはどの実装にも固有のものではないため、名前に "AMQP" が含まれています。一方、AMQP プロトコルの実装に関連付けられているそのインターフェースの実装があります。インターフェースレベルの API である JMS とは異なり、AMQP はワイヤレベルのプロトコルです。そのプロトコルの実装は独自のクライアントライブラリを提供するため、テンプレートインターフェースの各実装は特定のクライアントライブラリに依存します。現在、実装は RabbitTemplate のみです。以下の例では、しばしば AmqpTemplate を使用します。ただし、テンプレートがインスタンス化されているか setter が呼び出されている構成例またはコードの抜粋を見ると、実装型 (たとえば、RabbitTemplate) がわかります。

前述したように、AmqpTemplate インターフェースは、メッセージを送受信するためのすべての基本操作を定義します。メッセージ送信メッセージの受信でそれぞれメッセージの送信と受信を調べます。

非同期 Rabbit テンプレートも参照してください。

再試行機能の追加

バージョン 1.3 から、RabbitTemplate を構成して RetryTemplate を使用し、ブローカー接続の問題を処理できるようになりました。完全な情報については、spring-retry [GitHub] (英語) プロジェクトを参照してください。以下は、指数バックオフポリシーと、呼び出し元に例外をスローする前に 3 回試行するデフォルトの SimpleRetryPolicy を使用する 1 つの例にすぎません。

次の例では、XML 名前空間を使用しています。

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="500" />
            <property name="multiplier" value="10.0" />
            <property name="maxInterval" value="10000" />
        </bean>
    </property>
</bean>

次の例では、Java で @Configuration アノテーションを使用しています。

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}

バージョン 1.4 以降、retryTemplate プロパティに加えて、recoveryCallback オプションが RabbitTemplate でサポートされています。RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) の 2 番目の引数として使用されます。

再試行コンテキストには lastThrowable フィールドのみが含まれるという点で、RecoveryCallback は多少制限されています。より高度な使用例では、外部 RetryTemplate を使用して、コンテキストの属性を介して RecoveryCallback に追加情報を伝達できるようにする必要があります。次の例は、その方法を示しています。
retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

この場合、RetryTemplate を RabbitTemplate に注入しません。

公開は非同期です — 成功と失敗を検出する方法

メッセージの発行は非同期メカニズムであり、デフォルトでは、ルーティングできないメッセージは RabbitMQ によってドロップされます。パブリッシュが成功した場合は、相関するパブリッシャーの確認と return に従って、非同期の確認を受け取ることができます。次の 2 つの障害シナリオを検討してください。

  • 取引所に発行しますが、一致する宛先キューがありません。

  • 存在しない取引所に公開します。

最初のケースは、相関するパブリッシャーの確認と return に従って、発行元の return によってカバーされます。

2 番目のケースでは、メッセージは破棄され、戻り値は生成されません。基になるチャネルは例外で閉じられています。デフォルトでは、この例外はログに記録されますが、ChannelListener を CachingConnectionFactory に登録して、そのようなイベントの通知を取得できます。次の例は、ConnectionListener を追加する方法を示しています。

this.connectionFactory.addConnectionListener(new ConnectionListener() {

    @Override
    public void onCreate(Connection connection) {
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }

});

信号の reason プロパティを調べて、発生した問題を特定できます。

送信スレッドで例外を検出するには、RabbitTemplate で setChannelTransacted(true) を実行すると、例外が txCommit() で検出されます。ただし、トランザクションはパフォーマンスを大幅に低下させるため、この 1 つのユースケースだけでトランザクションを有効にする前に、このことを慎重に検討してください。

相関するパブリッシャーの確認と return

AmqpTemplate の RabbitTemplate 実装は、発行者の確認と return をサポートします。

返されたメッセージの場合、テンプレートの mandatory プロパティを true に設定するか、mandatory-expression を特定のメッセージの true に評価する必要があります。この機能には、publisherReturns プロパティが true に設定されている CachingConnectionFactory が必要です ( パブリッシャーの確認と return を参照)。クライアントが setReturnsCallback(ReturnsCallback callback) を呼び出して RabbitTemplate.ReturnsCallback を登録することにより、戻り値がクライアントに送信されます。コールバックは、次のメソッドを実装する必要があります。

void returnedMessage(ReturnedMessage returned);

ReturnedMessage には次のプロパティがあります。

  • message - 返されたメッセージ自体

  • replyCode - return 理由を示すコード

  • replyText - return のテキストによる理由 - 例: NO_ROUTE

  • exchange - メッセージが送信された取引所

  • routingKey - 使用されたルーティングキー

各 RabbitTemplate でサポートされる ReturnsCallback は 1 つだけです。返信タイムアウトも参照してください。

パブリッシャーの確認 (パブリッシャーの承認とも呼ばれます) の場合、テンプレートには publisherConfirm プロパティが ConfirmType.CORRELATED に設定された CachingConnectionFactory が必要です。クライアントが setConfirmCallback(ConfirmCallback callback) を呼び出して RabbitTemplate.ConfirmCallback を登録することにより、確認がクライアントに送信されます。コールバックはこのメソッドを実装する必要があります:

void confirm(CorrelationData correlationData, boolean ack, String cause);

CorrelationData は、元のメッセージを送信するときにクライアントによって提供されるオブジェクトです。ack は ack に対して true であり、nack に対して false です。nack インスタンスの場合、nack が生成されたときに使用可能な場合、原因には nack の理由が含まれる場合があります。例は、存在しない交換にメッセージを送信する場合です。その場合、ブローカーはチャネルを閉じます。閉鎖の理由は cause に含まれています。cause はバージョン 1.4 で追加されました。

RabbitTemplate でサポートされる ConfirmCallback は 1 つだけです。

rabbit テンプレートの送信操作が完了すると、チャネルが閉じられます。これにより、コネクションファクトリキャッシュがいっぱいの場合、確認または戻りの受信が妨げられます (キャッシュにスペースがある場合、チャネルは物理的に閉じられておらず、戻りと確認は正常に進行します)。キャッシュがいっぱいになると、フレームワークはクローズを最大 5 秒間延期して、確認とリターンを受信する時間を確保します。確認を使用する場合、最後の確認が受信されたときにチャネルが閉じられます。リターンのみを使用する場合、チャネルは 5 秒間開いたままになります。通常、接続ファクトリの channelCacheSize を十分に大きな値に設定して、メッセージが発行されたチャネルが閉じられるのではなくキャッシュに返されるようにすることをお勧めします。RabbitMQ 管理プラグインを使用して、チャネルの使用状況を監視できます。チャネルが頻繁に開いたり閉じたりする場合は、キャッシュサイズを増やしてサーバーのオーバーヘッドを減らすことを検討してください。
バージョン 2.1 より前では、パブリッシャーの確認が有効になっているチャネルは、確認が受信される前にキャッシュに返されていました。他のプロセスがチャネルをチェックアウトし、存在しない交換にメッセージを公開するなど、チャネルを閉じる操作を実行する可能性があります。これにより、確認が失われる可能性があります。バージョン 2.1 以降では、確認が未処理である間、チャネルがキャッシュに返されなくなりました。RabbitTemplate は、各操作の後、チャネルで論理 close() を実行します。一般に、これは、一度に 1 つのチャネルで未解決の確認が 1 つだけであることを意味します。
バージョン 2.2 以降、コールバックは接続ファクトリの executor スレッドの 1 つで呼び出されます。これは、コールバック内から Rabbit 操作を実行する場合にデッドロックが発生する可能性を回避するためです。以前のバージョンでは、コールバックは amqp-client 接続 I/O スレッドで直接呼び出されていました。I/O スレッドが結果を待ってブロックするため、何らかの RPC 操作 (新しいチャネルを開くなど) を実行するとデッドロックが発生しますが、結果は I/O スレッド自体によって処理される必要があります。これらのバージョンでは、作業 (メッセージの送信など) をコールバック内の別のスレッドに引き渡す必要がありました。フレームワークがコールバック呼び出しをエグゼキュータに渡すようになったため、これは不要になりました。
リターンコールバックが 60 秒以内に実行される限り、ACK の前に返されたメッセージを受信するという保証は維持されます。確認は、リターンコールバックが終了した後、または 60 秒後に配信されるようにスケジュールされています。

CorrelationData オブジェクトには、テンプレートで ConfirmCallback を使用する代わりに、結果を取得するために使用できる CompletableFuture があります。次の例は、CorrelationData インスタンスを構成する方法を示しています。

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...

これは CompletableFuture<Confirm> であるため、準備ができたら結果を get() にするか、非同期コールバックに whenComplete() を使用できます。Confirm オブジェクトは、ack と reason (nack インスタンスの場合) の 2 つのプロパティを持つ単純な Bean です。ブローカーが生成した nack インスタンスの理由は入力されません。これは、フレームワークによって生成された nack インスタンスに対して設定されます (たとえば、ack インスタンスが未解決である間に接続を閉じるなど)。

さらに、確認と返信の両方が有効になっている場合、どのキューにもルーティングできなかった場合、CorrelationDatareturn プロパティに返されたメッセージが入力されます。ack でフューチャが設定される前に、返されたメッセージプロパティが設定されることが保証されます。CorrelationData.getReturn() は、次のプロパティを持つ ReturnMessage を返します。

  • メッセージ (返されたメッセージ)

  • replyCode

  • replyText

  • 両替

  • ルーティングキー

パブリッシャーの確認を待つためのより単純なメカニズムについては、範囲指定された操作も参照してください。

範囲指定された操作

通常、テンプレートを使用する場合、Channel はキャッシュからチェックアウト (または作成) され、操作に使用され、再利用のためにキャッシュに戻されます。マルチスレッド環境では、次の操作が同じチャネルを使用するという保証はありません。ただし、チャネルの使用をより詳細に制御し、多くの操作がすべて同じチャネルで実行されるようにしたい場合があります。

バージョン 2.0 以降、invoke と呼ばれる新しいメソッドが OperationsCallback とともに提供されます。コールバックのスコープ内で指定された RabbitOperations 引数に対して実行されるすべての操作は、同じ専用の Channel を使用し、最後に閉じられます (キャッシュには返されません)。チャネルが PublisherCallbackChannel の場合、すべての確認が受信された後にキャッシュに戻されます ( 相関するパブリッシャーの確認と return を参照)。

@FunctionalInterface
public interface OperationsCallback<T> {

    T doInRabbit(RabbitOperations operations);

}

これが必要になる理由の一例は、基礎となる Channel で waitForConfirms() メソッドを使用したい場合です。前述のように、通常、チャネルはキャッシュされて共有されるため、このメソッドは以前は Spring API によって公開されていませんでした。RabbitTemplate は、OperationsCallback のスコープ内で使用される専用チャネルに委譲する waitForConfirms(long timeout) および waitForConfirmsOrDie(long timeout) を提供するようになりました。明らかな理由から、メソッドをその範囲外で使用することはできません。

確認をリクエストに関連付けることができる高レベルの抽象化が別の場所で提供されていることに注意してください ( 相関するパブリッシャーの確認と return を参照)。ブローカーが配信を確認するまで待ちたい場合は、次の例に示す手法を使用できます。

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
});

OperationsCallback のスコープ内の同じチャネルで RabbitAdmin 操作を呼び出す場合は、invoke 操作に使用されたのと同じ RabbitTemplate を使用して、管理者が構築されている必要があります。

テンプレート操作が既存のトランザクションのスコープ内ですでに実行されている場合 (たとえば、トランザクション化されたリスナーコンテナースレッドで実行され、トランザクション化されたテンプレートで操作を実行する場合)、上記の説明は意味がありません。その場合、操作はそのチャネルで実行され、スレッドがコンテナーに戻るときにコミットされます。そのシナリオでは、invoke を使用する必要はありません。

このように確認を使用する場合、確認をリクエストに関連付けるためにセットアップされたインフラストラクチャの多くは、実際には必要ありません (return も有効でない限り)。バージョン 2.2 以降、接続ファクトリは publisherConfirmType と呼ばれる新しいプロパティをサポートします。これを ConfirmType.SIMPLE に設定すると、インフラストラクチャーが回避され、確認処理がより効率的になります。

さらに、RabbitTemplate は、送信されたメッセージ MessageProperties に publisherSequenceNumber プロパティを設定します。特定の確認を確認 (またはログ記録またはその他の方法で使用) したい場合は、次の例に示すように、オーバーロードされた invoke メソッドを使用して行うことができます。

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
        com.rabbitmq.client.ConfirmCallback nacks);
これらの ConfirmCallback オブジェクト (ack および nack インスタンス用) は、テンプレートコールバックではなく、Rabbit クライアントコールバックです。

次の例では、ack および nack インスタンスをログに記録します。

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
}, (tag, multiple) -> {
        log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
        log.info("Nack: " + tag + ":" + multiple);
}));
スコープ操作はスレッドにバインドされます。マルチスレッド環境での厳密な順序付けについては、マルチスレッド環境での厳密なメッセージの順序付けを参照してください。

マルチスレッド環境での厳密なメッセージの順序付け

範囲指定された操作の説明は、操作が同じスレッドで実行される場合にのみ適用されます。

次の状況を考慮してください。

  • thread-1 はメッセージをキューに送信し、作業を thread-2 に渡します

  • thread-2 は同じキューにメッセージを送信します

RabbitMQ の非同期性とキャッシュされたチャネルの使用のため。同じチャネルが使用されるとは限らないため、メッセージがキューに到着する順序は保証されません。(ほとんどの場合、順番に届きますが、順不同で届く確率はゼロではありません)。このユースケースを解決するには、サイズが 1 のバウンドチャネルキャッシュを ( channelCheckoutTimeout と一緒に) 使用して、メッセージが常に同じチャネルでパブリッシュされ、順序が保証されるようにすることができます。これを行うには、コンシューマーなどの接続ファクトリに他の用途がある場合は、テンプレートに専用の接続ファクトリを使用するか、メインの接続ファクトリに埋め込まれたパブリッシャー接続ファクトリを使用するようにテンプレートを構成する必要があります ( 別の接続を使用するを参照)。

これは、単純な Spring Boot アプリケーションで最もよく説明されています。

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	CachingConnectionFactory ccf() {
		CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
		CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
		publisherCF.setChannelCacheSize(1);
		publisherCF.setChannelCheckoutTimeout(1000L);
		return ccf;
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	Service(RabbitTemplate template, TaskExecutor exec) {
		template.setUsePublisherConnection(true);
		this.template = template;
		this.exec = exec;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
	}

	void secondaryService(String toSend) {
		LOG.info("Publishing from secondary service");
		this.template.convertAndSend("queue", toSend);
	}

}

パブリッシングは 2 つの異なるスレッドで実行されますが、キャッシュは 1 つのチャネルに制限されているため、どちらも同じチャネルを使用します。

バージョン 2.3.7 以降、ThreadChannelConnectionFactory は、prepareContextSwitch および switchContext メソッドを使用して、スレッドのチャネルを別のスレッドに転送することをサポートしています。最初のメソッドは、2 番目のメソッドを呼び出す 2 番目のスレッドに渡されるコンテキストを返します。スレッドには、非トランザクションチャネルまたはトランザクションチャネル (またはそれぞれの 1 つ) をバインドできます。2 つの接続ファクトリを使用しない限り、個別に転送することはできません。以下に例を示します。

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	ThreadChannelConnectionFactory tccf() {
		ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
		rabbitConnectionFactory.setHost("localhost");
		return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	private final ThreadChannelConnectionFactory connFactory;

	Service(RabbitTemplate template, TaskExecutor exec,
			ThreadChannelConnectionFactory tccf) {

		this.template = template;
		this.exec = exec;
		this.connFactory = tccf;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		Object context = this.connFactory.prepareSwitchContext();
		this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
	}

	void secondaryService(String toSend, Object threadContext) {
		LOG.info("Publishing from secondary service");
		this.connFactory.switchContext(threadContext);
		this.template.convertAndSend("queue", toSend);
		this.connFactory.closeThreadChannel();
	}

}
prepareSwitchContext が呼び出されると、現在のスレッドがさらに操作を実行すると、新しいチャネルで実行されます。スレッドにバインドされたチャネルが不要になったら、閉じることが重要です。

メッセージングの統合

バージョン 1.4 以降、RabbitMessagingTemplate ( RabbitTemplate 上に構築) は、Spring Framework メッセージング抽象化、つまり org.springframework.messaging.Message との統合を提供します。これにより、spring-messagingMessage<?> 抽象化を使用してメッセージを送受信できるようになります。この抽象化は、Spring Integration や Spring の STOMP サポートなど、他の Spring プロジェクトで使用されます。関連するメッセージコンバーターは 2 つあります。1 つは spring-messaging Message<?> と Spring AMQP の Message 抽象化の間で変換し、もう 1 つは Spring AMQP の Message 抽象化と、基礎となる RabbitMQ クライアントライブラリで必要な形式との間で変換します。デフォルトでは、メッセージペイロードは、提供された RabbitTemplate インスタンスのメッセージコンバーターによって変換されます。あるいは、次の例に示すように、他のペイロードコンバーターを使用してカスタム MessagingMessageConverter を挿入することもできます。

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);

検証済みのユーザー ID

バージョン 1.6 以降、テンプレートは user-id-expression (Java 構成を使用する場合は userIdExpression ) をサポートするようになりました。メッセージが送信される場合、この式を評価した後、ユーザー ID プロパティが設定されます (まだ設定されていない場合)。評価のルートオブジェクトは、送信されるメッセージです。

次の例は、user-id-expression 属性の使用方法を示しています。

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

最初の例はリテラル式です。2 つ目は、アプリケーションコンテキストの接続ファクトリ Bean から username プロパティを取得します。

別の接続を使用する

バージョン 2.0.2 以降では、可能であれば、usePublisherConnection プロパティを true に設定して、リスナーコンテナーで使用される接続とは別の接続を使用できます。これは、何らかの理由でプロデューサーがブロックされたときにコンシューマーがブロックされないようにするためです。接続ファクトリは、この目的のために 2 番目の内部接続ファクトリを維持します。デフォルトではメインファクトリと同じ型ですが、公開に別のファクトリ型を使用する場合は明示的に設定できます。リスナーコンテナーによって開始されたトランザクションで rabbit テンプレートが実行されている場合、この設定に関係なく、コンテナーのチャネルが使用されます。

一般に、これが true に設定されているテンプレートで RabbitAdmin を使用しないでください。接続ファクトリを取る RabbitAdmin コンストラクターを使用します。テンプレートを受け取る他のコンストラクターを使用する場合は、テンプレートのプロパティが false であることを確認してください。これは、多くの場合、リスナーコンテナーのキューを宣言するために管理者が使用されるためです。プロパティが true に設定されたテンプレートを使用すると、排他キュー ( AnonymousQueue など) が、リスナーコンテナーによって使用される接続とは別の接続で宣言されることになります。その場合、コンテナーはキューを使用できません。