トランザクションサポート

この章では、Spring Integration のトランザクションのサポートについて説明します。次のトピックについて説明します。

メッセージフローのトランザクションについて

Spring Integration は、メッセージフローのトランザクションニーズに対処するためのいくつかのフックを公開します。これらのフックとそれらの利点をよりよく理解するには、最初にメッセージフローを開始するために使用できる 6 つのメカニズムを再検討し、これらの各メカニズム内でこれらのフローのトランザクションニーズに対処する方法を確認する必要があります。

次の 6 つのメカニズムがメッセージフローを開始します(各マニュアルの詳細は、このマニュアル全体で提供されています)。

  • ゲートウェイプロキシ: 基本的なメッセージングゲートウェイ。

  • メッセージチャンネル: MessageChannel メソッド(たとえば channel.send(message))との直接相互作用。

  • メッセージ発行者: Spring Bean でのメソッド呼び出しの副産物としてメッセージフローを開始する方法。

  • 受信チャネルアダプターとゲートウェイ: サードパーティシステムと Spring Integration メッセージングシステム(たとえば、[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel)の接続に基づいてメッセージフローを開始する方法。

  • スケジューラー: 事前に構成されたスケジューラーによって配布されたスケジューリングイベントに基づいてメッセージフローを開始する方法。

  • ポーラー: スケジューラと同様に、これは、事前に構成されたポーラーによって配信されるスケジューリングまたは間隔ベースのイベントに基づいてメッセージフローを開始する方法です。

これらの 6 つのメカニズムを 2 つの一般的なカテゴリに分割できます。

  • ユーザープロセスによって開始されるメッセージフロー: このカテゴリのシナリオ例は、ゲートウェイメソッドを呼び出すか、Message を MessageChannel に明示的に送信します。つまり、これらのメッセージフローは、開始するサードパーティプロセス(記述したコードなど)に依存します。

  • デーモンプロセスによって開始されるメッセージフロー: このカテゴリのシナリオ例には、ポーリングされたメッセージで新しいメッセージフローを開始するメッセージキューをポーリングするポーラー、または新しいメッセージを作成して事前定義された時間にメッセージフローを開始することでプロセスをスケジュールするスケジューラーが含まれます。

明らかに、ゲートウェイプロキシ、MessageChannel.send(…​) および MessagePublisher はすべて最初のカテゴリに属し、受信アダプターとゲートウェイ、スケジューラ、ポーラーは 2 番目のカテゴリに属します。

それでは、各カテゴリ内のさまざまなシナリオでトランザクションニーズにどのように対処できますか。また、Spring Integration が特定のシナリオのトランザクションに関して明確な何かを提供する必要があるのでしょうか。または、代わりに Spring のトランザクションサポートを使用できますか?

Spring 自体は、トランザクション管理のファーストクラスサポートを提供します。ここでのゴールは新しいものを提供することではなく、Spring を使用してトランザクションの既存のサポートを活用することです。つまり、フレームワークとして、Spring のトランザクション管理機能へのフックを公開する必要があります。ただし、Spring Integration 構成は Spring 構成に基づいているため、Spring はすでに公開しているため、これらのフックを常に公開する必要はありません。結局のところ、すべての Spring Integration コンポーネントは Spring Bean です。

このゴールを念頭に置いて、ユーザープロセスによって開始されるメッセージフローとデーモンによって開始されるメッセージフローの 2 つのシナリオを再度検討できます。

ユーザープロセスによって開始され、Spring アプリケーションコンテキストで構成されたメッセージフローは、そのようなプロセスの通常のトランザクション構成に従います。トランザクションをサポートするために Spring Integration で明示的に構成する必要はありません。トランザクションは、Spring の標準トランザクションサポートを通じて開始できます。Spring Integration メッセージフロー自体は Spring によって構成されているため、コンポーネントのトランザクションセマンティクスを自然に尊重します。例: ゲートウェイまたはサービスアクティベーターメソッドに @Transactional アノテーションを付けるか、TransactionInterceptor を、トランザクションである必要がある特定のメソッドを指すポイントカット式を使用して XML 構成で定義できます。つまり、これらのシナリオでは、トランザクションの構成と境界を完全に制御できます。

ただし、デーモンプロセスによって開始されるメッセージフローに関しては、状況が少し異なります。開発者によって構成されますが、これらのフローには、開始される人間またはその他のプロセスが直接関与することはありません。これらは、プロセスの構成に基づいてトリガープロセス(デーモンプロセス)によって開始されるトリガーベースのフローです。例: 毎週金曜日の夜にスケジューラーにメッセージフローを開始させることができます。また、毎秒メッセージフローを開始するトリガーを構成することもできます。その結果、これらのトリガーベースのプロセスに、結果のメッセージフローをトランザクションにする意図を知らせて、新しいメッセージフローが開始されるたびにトランザクションコンテキストを作成できるようにする方法が必要です。言い換えると、いくつかのトランザクション構成を公開する必要がありますが、Spring によってすでに提供されているトランザクションサポートに委譲するのに十分なだけです(他のシナリオで行うように)。

ポーラートランザクションサポート

Spring Integration は、ポーラーのトランザクションサポートを提供します。ポーラーは特殊な型のコンポーネントです。ポーラータスク内で、それ自体がトランザクションであるリソースに対して receive() を呼び出すことができるため、トランザクションの境界に receive() 呼び出しを含めることができます。これにより、タスクが失敗した場合にロールバックできます。チャネルに同じサポートを追加する場合、追加されたトランザクションは、send() 呼び出しで始まるすべてのダウンストリームコンポーネントに影響します。これは、特に Spring がダウンストリームのコンポーネントのトランザクションニーズに対処するためのいくつかの方法をすでに提供している場合は特に、強い理由なしにトランザクション境界のかなり広い範囲を提供します。ただし、トランザクション境界に含まれている receive() メソッドは、ポーラーの「強力な理由」です。

次の例に示すように、ポーラーを構成するたびに、transactional 子要素とその属性を使用してトランザクション構成を提供できます。

<int:poller max-messages-per-poll="1" fixed-rate="1000">
    <transactional transaction-manager="txManager"
                   isolation="DEFAULT"
                   propagation="REQUIRED"
                   read-only="true"
                   timeout="1000"/>
</poller>

上記の構成は、ネイティブ Spring トランザクション構成に似ています。トランザクションマネージャーへの参照を提供し、トランザクション属性を指定するか、デフォルトに依存する必要があります(たとえば、'transaction-manager' 属性が指定されていない場合、デフォルトは 'transactionManager' という名前の Bean になります)。内部的に、プロセスは Spring のネイティブトランザクションにラップされており、TransactionInterceptor がトランザクションの処理を担当しています。トランザクションマネージャーの構成方法、トランザクションマネージャーの型(JTA、Datasource など)、トランザクション構成に関連するその他の詳細については、Spring Framework リファレンスガイドを参照してください。

上記の構成では、このポーラーによって開始されたすべてのメッセージフローはトランザクションです。ポーラーのトランザクション構成の詳細と詳細については、ポーリングとトランザクションを参照してください。

トランザクションとともに、ポーラーを実行するときに、いくつかの横断的な問題に対処する必要がある場合があります。これを支援するために、ポーラー要素は <advice-chain> 子要素を受け入れます。これにより、ポーラーに適用されるアドバイスインスタンスのカスタムチェーンを定義できます。(詳細については、ポーリング可能なメッセージソースを参照してください)Spring Integration 2.0 では、ポーラーはリファクタリング作業を行い、プロキシメカニズムを使用してトランザクションの問題や他の横断的問題に対処しています。この取り組みから進化した重要な変更の 1 つは、<transactional> 要素と <advice-chain> 要素を相互に排他的にしたことです。これの背後にある理論的根拠は、複数のアドバイスが必要で、そのうちの 1 つがトランザクションアドバイスである場合、以前と同じ便利さで <advice-chain> に含めることができますが、これにより、ご希望の順番でアドバイス。次の例は、その方法を示しています。

<int:poller max-messages-per-poll="1" fixed-rate="10000">
  <advice-chain>
    <ref bean="txAdvice"/>
    <ref bean="someOtherAdviceBean" />
    <beans:bean class="foo.bar.SampleAdvice"/>
  </advice-chain>
</poller>

<tx:advice id="txAdvice" transaction-manager="txManager">
  <tx:attributes>
    <tx:method name="get*" read-only="true"/>
    <tx:method name="*"/>
  </tx:attributes>
</tx:advice>

上記の例は、Spring トランザクションアドバイス(txAdvice)の基本的な XML ベースの構成を示しており、ポーラーによって定義された <advice-chain> に含まれています。ポーラーのトランザクションの問題のみに対処する必要がある場合でも、便宜上 <transactional> 要素を使用できます。

トランザクション境界

もう 1 つの重要な要素は、メッセージフロー内のトランザクションの境界です。トランザクションが開始されると、トランザクションコンテキストは現在のスレッドにバインドされます。メッセージフローにあるエンドポイントとチャネルの数に関係なく、フローが同じスレッドで継続することを保証している限り、トランザクションコンテキストは保持されます。ポーリング可能チャネルまたはエグゼ キュータチャネルを導入してそれを中断するか、一部のサービスで手動で新しいスレッドを開始すると、トランザクション境界も同様に中断されます。基本的に、トランザクションはそこですぐに終了し、スレッド間でハンドオフが成功すると、フローは成功したと見なされ、フローが継続し、ダウンストリームのどこかで例外が発生する可能性があるにもかかわらず、COMMIT シグナルが送信されます。そのようなフローが同期である場合、その例外は、トランザクションコンテキストのイニシエーターでもあるメッセージフローのイニシエーターにスローバックされ、トランザクションは ROLLBACK になります。中間点は、スレッド境界が壊れている任意のポイントでトランザクションチャネルを使用することです。例: トランザクション MessageStore 戦略に委譲するキューバックアップチャネルを使用できます。または、JMS バックアップチャネルを使用できます。

トランザクションの同期

一部の環境では、フロー全体を含むトランザクションと操作を同期できます。例: 多数のデータベース更新を実行するフローの開始時に <file:inbound-channel-adapter/> を検討します。トランザクションがコミットする場合、ファイルを success ディレクトリに移動し、トランザクションがロールバックする場合は failure ディレクトリに移動することができます。

Spring Integration 2.2 は、これらの操作をトランザクションと同期する機能を導入しました。さらに、「実際の」トランザクションがなくても、成功または失敗時にさまざまなアクションを実行したい場合は、PseudoTransactionManager を構成できます。詳細については、疑似トランザクションを参照してください。

次のリストは、この機能の主要な戦略インターフェースを示しています。

public interface TransactionSynchronizationFactory {

    TransactionSynchronization create(Object key);
}

public interface TransactionSynchronizationProcessor {

    void processBeforeCommit(IntegrationResourceHolder holder);

    void processAfterCommit(IntegrationResourceHolder holder);

    void processAfterRollback(IntegrationResourceHolder holder);

}

ファクトリは TransactionSynchronization (Javadoc) オブジェクトの作成を担当します。独自に実装することも、フレームワークが提供する DefaultTransactionSynchronizationFactory を使用することもできます。この実装は、TransactionSynchronizationProcessor のデフォルト実装である ExpressionEvaluatingTransactionSynchronizationProcessor に委譲する TransactionSynchronization を返します。このプロセッサーは、beforeCommitExpressionafterCommitExpressionafterRollbackExpression という 3 つの SpEL 式をサポートしています。

これらのアクションは、トランザクションに精通している人には自明のはずです。いずれの場合も、#root 変数は元の Message です。場合によっては、ポーラーによってポーリングされている MessageSource に応じて、他の SpEL 変数が使用可能になります。例: MongoDbMessageSource は、メッセージソースの MongoTemplate を参照する #mongoTemplate 変数を提供します。同様に、RedisStoreMessageSource は、ポーリングによって作成された RedisStore を参照する #store 変数を提供します。

特定のポーラーの機能を有効にするには、synchronization-factory 属性を使用して、ポーラーの <transactional/> 要素で TransactionSynchronizationFactory への参照を提供できます。

バージョン 5.0 以降、Spring Integration は PassThroughTransactionSynchronizationFactory を提供します。PassThroughTransactionSynchronizationFactory は、TransactionSynchronizationFactory が構成されていないが、アドバイスチェーンに型 TransactionInterceptor のアドバイスが存在する場合、デフォルトでポーリングエンドポイントに適用されます。すぐに使用可能な TransactionSynchronizationFactory 実装を使用する場合、ポーリングエンドポイントは、ポーリングされたメッセージを現在のトランザクションコンテキストにバインドし、トランザクションアドバイスの後に例外がスローされた場合、MessagingException の failedMessage として提供します。TransactionInterceptor を実装しないカスタムトランザクションアドバイスを使用する場合、PassThroughTransactionSynchronizationFactory を明示的に構成してこの動作を実現できます。いずれの場合でも、MessagingException は errorChannel に送信される ErrorMessage のペイロードになり、原因はアドバイスによってスローされる生の例外です。以前は、ErrorMessage には、アドバイスによってスローされる未加工の例外であるペイロードがあり、failedMessage 情報への参照を提供していなかったため、トランザクションコミットの問題の理由を判断することは困難でした。

これらのコンポーネントの構成を簡素化するために、Spring Integration はデフォルトのファクトリの名前空間サポートを提供します。次の例は、名前空間を使用してファイル受信チャネルアダプターを構成する方法を示しています。

<int-file:inbound-channel-adapter id="inputDirPoller"
    channel="someChannel"
    directory="/foo/bar"
    filter="filter"
    comparator="testComparator">
    <int:poller fixed-rate="5000">
        <int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
    </int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
        channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
        channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

SpEL 評価の結果は、ペイロードとして committedChannel または rolledBackChannel のいずれかに送信されます(この場合、これは Boolean.TRUE または Boolean.FALSE — java.io.File.renameTo() メソッド呼び出しの結果)。

さらに Spring Integration 処理のためにペイロード全体を送信する場合は、"payload" 式を使用します。

これにより、アクションがトランザクションと同期されることを理解することが重要です。本質的にトランザクションではないリソースを実際にトランザクションにすることはありません。代わりに、トランザクション(JDBC またはそれ以外)がポーリングの前に開始され、フローの補完時にコミットまたはロールバックされ、その後に同期アクションが続きます。

カスタム TransactionSynchronizationFactory を提供する場合、トランザクションの補完時にバインドされたリソースが自動的にバインド解除されるリソース同期を作成する必要があります。デフォルトの TransactionSynchronizationFactory は、ResourceHolderSynchronization のサブクラスを返すことでそうします。デフォルトの shouldUnbindAtCompletion() は true を返します。

after-commit および after-rollback 式に加えて、before-commit もサポートされています。その場合、評価(またはダウンストリーム処理)が例外をスローすると、トランザクションはコミットされる代わりにロールバックされます。

疑似トランザクション

トランザクションの同期セクションを読んだ後、ポーラーの下流に「実際の」トランザクションリソース(JDBC など)がない場合でも、フローの補完時にこれらの「成功」または「失敗」アクションを実行すると便利だと思うかもしれません。例: "<file:inbound-channel-adapter/>" に続いて "<ftp:outbout-channel-adapter/>" を検討します。これらのコンポーネントはどちらもトランザクションではありませんが、FTP 転送の成功または失敗に基づいて、入力ファイルを別のディレクトリに移動したい場合があります。

この機能を提供するために、フレームワークは PseudoTransactionManager を提供し、実際のトランザクションリソースがない場合でも上記の構成を可能にします。フローが正常に完了すると、beforeCommit および afterCommit 同期が呼び出されます。失敗すると、afterRollback 同期が呼び出されます。実際のトランザクションではないため、実際のコミットやロールバックは発生しません。擬似トランザクションは、同期機能を有効にするために使用される手段です。

PseudoTransactionManager を使用するには、実際のトランザクションマネージャーを構成するのと同じ方法で、<Bean/> として定義できます。次の例は、その方法を示しています。

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />

リアクティブトランザクション

バージョン 5.3 以降、ReactiveTransactionManager は、リアクティブ型を返すエンドポイントの TransactionInterceptor アドバイスと共に使用することもできます。これには、Flux または Mono ペイロードでメッセージを生成する MessageSource および ReactiveMessageHandler 実装(ReactiveMongoDbMessageSource など)が含まれます。他のすべての応答生成メッセージハンドラーの実装は、応答ペイロードも何らかのリアクティブ型である場合、ReactiveTransactionManager に依存できます。