トランザクション

Spring Rabbit フレームワークは、Spring トランザクションの既存のユーザーによく知られているように、宣言的に選択できるさまざまなセマンティクスを使用して、同期および非同期のユースケースで自動トランザクション管理をサポートしています。これにより、ほとんどではないにしても、多くの一般的なメッセージングパターンを簡単に実装できます。

目的のトランザクションセマンティクスをフレームワークに通知するには、2 つの方法があります。RabbitTemplate と SimpleMessageListenerContainer の両方にフラグ channelTransacted があり、true の場合、トランザクションチャネルを使用し、すべての操作 (送信または受信) をコミットまたはロールバック (結果に応じて) で終了するようにフレームワークに指示しますが、例外があります。ロールバックを通知します。もう 1 つのシグナルは、進行中の操作のコンテキストとして、Spring の PlatformTransactionManager 実装の 1 つを使用して外部トランザクションを提供することです。フレームワークがメッセージを送信または受信しているときに進行中のトランザクションがすでに存在し、channelTransacted フラグが true である場合、メッセージングトランザクションのコミットまたはロールバックは、現在のトランザクションが終了するまで延期されます。channelTransacted フラグが false の場合、トランザクションセマンティクスはメッセージング操作に適用されません (自動応答されます)。

channelTransacted フラグは構成時の設定です。AMQP コンポーネントの作成時 (通常はアプリケーションの起動時) に宣言および処理されます。外部トランザクションは、システムが実行時に現在のスレッド状態に応答するため、原則としてより動的です。ただし、実際には、トランザクションが宣言的にアプリケーションに階層化されている場合、構成設定でもあることがよくあります。

RabbitTemplate を使用した同期ユースケースの場合、好みに応じて宣言的または命令的に外部トランザクションが呼び出し元によって提供されます (通常の Spring トランザクションモデル)。次の例は、テンプレートが channelTransacted=true で構成されている宣言型アプローチ (非侵襲的であるため、通常は推奨される) を示しています。

@Transactional
public void doSomething() {
    String incoming = rabbitTemplate.receiveAndConvert();
    // do some more database processing...
    String outgoing = processInDatabaseAndExtractReply(incoming);
    rabbitTemplate.convertAndSend(outgoing);
}

前の例では、String ペイロードが受信され、変換され、@Transactional としてマークされたメソッド内のメッセージ本文として送信されます。データベース処理が例外で失敗した場合、受信メッセージはブローカーに返され、発信メッセージは送信されません。これは、トランザクションメソッドの チェーン 内で RabbitTemplate を使用するすべての操作に適用されます (たとえば、トランザクションを早期にコミットするために Channel が直接操作されない限り)。

SimpleMessageListenerContainer を使用した非同期ユースケースでは、外部トランザクションが必要な場合、リスナーを設定するときにコンテナーによってリクエストされる必要があります。外部トランザクションが必要であることを知らせるために、ユーザーは構成時に PlatformTransactionManager の実装をコンテナーに提供します。次の例は、その方法を示しています。

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

    @Bean
    public SimpleMessageListenerContainer messageListenerContainer() {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(rabbitConnectionFactory());
        container.setTransactionManager(transactionManager());
        container.setChannelTransacted(true);
        container.setQueueName("some.queue");
        container.setMessageListener(exampleListener());
        return container;
    }

}

前の例では、別の Bean 定義 (表示されていません) から注入された依存関係としてトランザクションマネージャーが追加され、channelTransacted フラグも true に設定されます。その結果、リスナーが例外で失敗した場合、トランザクションはロールバックされ、メッセージもブローカーに返されます。重要なのは、トランザクションがコミットに失敗した場合 (たとえば、データベースの制約エラーや接続の問題が原因で)、AMQP トランザクションもロールバックされ、メッセージがブローカーに返されることです。これは「ベストエフォート 1 フェーズコミット」と呼ばれることもあり、信頼性の高いメッセージングの非常に強力なパターンです。前の例で channelTransacted フラグが false (デフォルト) に設定されている場合、外部トランザクションは引き続きリスナーに提供されますが、すべてのメッセージング操作は自動確認応答されるため、その効果は、業務のロールバック。

条件付きロールバック

バージョン 1.6.6 より前のバージョンでは、外部トランザクションマネージャー (JDBC など) を使用しているときにコンテナーの transactionAttribute にロールバックルールを追加しても効果がありませんでした。例外は常にトランザクションをロールバックします。

また、コンテナーのアドバイスチェーン でトランザクションアドバイスを使用する場合、すべてのリスナー例外が ListenerExecutionFailedException にラップされるため、条件付きロールバックはあまり役に立ちませんでした。

最初の問題は修正され、ルールが適切に適用されるようになりました。さらに、ListenerFailedRuleBasedTransactionAttribute が提供されるようになりました。これは RuleBasedTransactionAttribute のサブクラスですが、唯一の違いは、ListenerExecutionFailedException を認識し、そのような例外の原因をルールに使用することです。このトランザクション属性は、コンテナー内で直接使用するか、トランザクションアドバイスを通じて使用できます。

次の例では、このルールを使用しています。

@Bean
public AbstractMessageListenerContainer container() {
    ...
    container.setTransactionManager(transactionManager);
    RuleBasedTransactionAttribute transactionAttribute =
        new ListenerFailedRuleBasedTransactionAttribute();
    transactionAttribute.setRollbackRules(Collections.singletonList(
        new NoRollbackRuleAttribute(DontRollBackException.class)));
    container.setTransactionAttribute(transactionAttribute);
    ...
}

受信メッセージのロールバックに関する注意

AMQP トランザクションは、ブローカーに送信されたメッセージと ack にのみ適用されます。その結果、Spring トランザクションのロールバックがあり、メッセージが受信された場合、Spring AMQP はトランザクションをロールバックするだけでなく、メッセージを手動で拒否する必要があります (ナックのようなものですが、仕様ではそう呼ばれていません)。メッセージの拒否時に実行されるアクションは、トランザクションとは無関係であり、defaultRequeueRejected プロパティ (デフォルト: true) に依存します。失敗したメッセージの拒否の詳細については、メッセージリスナーと非同期ケースを参照してください。

RabbitMQ トランザクションとその制限の詳細については、RabbitMQ ブローカーのセマンティクス (英語) を参照してください。

RabbitMQ 2.7.0 より前は、そのようなメッセージ (および、チャネルが閉じられているか中止されたときに確認応答されないメッセージ) は、Rabbit ブローカーのキューの後ろに送られていました。2.7.0 以降、JMS ロールバックメッセージと同様に、拒否されたメッセージはキューの先頭に移動します。
以前は、ローカルトランザクションと TransactionManager が提供された場合とで、トランザクションロールバック時のメッセージの再キューイングに一貫性がありませんでした。前者の場合、通常の再キューイングロジック (AmqpRejectAndDontRequeueException または defaultRequeueRejected=false) が適用されます ( メッセージリスナーと非同期ケースを参照)。トランザクションマネージャーでは、ロールバック時にメッセージが無条件に再キューイングされていました。バージョン 2.0 以降、動作は一貫しており、通常の再キューイングロジックがどちらの場合にも適用されます。以前の動作に戻すには、コンテナーの alwaysRequeueWithTxManagerRollback プロパティを true に設定します。メッセージリスナーコンテナーの設定を参照してください。

RabbitTransactionManager を使用する

RabbitTransactionManager (Javadoc) は、外部トランザクション内で Rabbit 操作を実行し、外部トランザクションと同期する代わりに使用できます。このトランザクションマネージャーは PlatformTransactionManager (Javadoc) インターフェースの実装であり、単一の Rabbit ConnectionFactory と共に使用する必要があります。

この戦略では、XA トランザクションを提供できません。たとえば、メッセージングとデータベースアクセスの間でトランザクションを共有するためです。

後でチャネルを作成する標準の Connection.createChannel() 呼び出しの代わりに、ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean) を介してトランザクション Rabbit リソースを取得するには、アプリケーションコードが必要です。Spring AMQP の RabbitTemplate (Javadoc) を使用すると、スレッドにバインドされたチャネルを自動検出し、そのトランザクションに自動的に参加します。

Java 構成では、次の Bean を使用して新しい RabbitTransactionManager をセットアップできます。

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
    return new RabbitTransactionManager(connectionFactory);
}

XML 構成を希望する場合は、XML アプリケーションコンテキストファイルで次の Bean を宣言できます。

<bean id="rabbitTxManager"
      class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
    <property name="connectionFactory" ref="connectionFactory"/>
</bean>

トランザクションの同期

RabbitMQ トランザクションを他の (DBMS などの) トランザクションと同期すると、"Best Effort One Phase Commit" セマンティクスが提供されます。トランザクション同期の補完後フェーズで、RabbitMQ トランザクションがコミットに失敗する可能性があります。これは、spring-tx インフラストラクチャによってエラーとして記録されますが、呼び出し元のコードに例外はスローされません。バージョン 2.3.10 以降では、トランザクションを処理したのと同じスレッドでトランザクションがコミットされた後に、ConnectionUtils.checkAfterCompletion() を呼び出すことができます。例外が発生しなかった場合は単純に戻ります。それ以外の場合は、完了の同期ステータスを表すプロパティを持つ AfterCompletionFailedException がスローされます。

ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true) を呼び出して、この機能を有効にします。これはグローバルフラグであり、すべてのスレッドに適用されます。