トランザクションサポート
この章では、Spring Integration のトランザクションのサポートについて説明します。次のトピックについて説明します。
メッセージフローのトランザクションについて
Spring Integration は、メッセージフローのトランザクションニーズに対処するためのいくつかのフックを公開します。これらのフックとそれらの利点をよりよく理解するには、最初にメッセージフローを開始するために使用できる 6 つのメカニズムを再検討し、これらの各メカニズム内でこれらのフローのトランザクションニーズに対処する方法を確認する必要があります。
次の 6 つのメカニズムがメッセージフローを開始します(各マニュアルの詳細は、このマニュアル全体で提供されています)。
ゲートウェイプロキシ: 基本的なメッセージングゲートウェイ。
メッセージチャンネル:
MessageChannel
メソッド(たとえばchannel.send(message)
)との直接相互作用。メッセージ発行者: Spring Bean でのメソッド呼び出しの副産物としてメッセージフローを開始する方法。
受信チャネルアダプターとゲートウェイ: サードパーティシステムと Spring Integration メッセージングシステム(たとえば、
[JmsMessage] → Jms Inbound Adapter[SI Message] → SI Channel
)の接続に基づいてメッセージフローを開始する方法。スケジューラー: 事前に構成されたスケジューラーによって配布されたスケジューリングイベントに基づいてメッセージフローを開始する方法。
ポーラー: スケジューラと同様に、これは、事前に構成されたポーラーによって配信されるスケジューリングまたは間隔ベースのイベントに基づいてメッセージフローを開始する方法です。
これらの 6 つのメカニズムを 2 つの一般的なカテゴリに分割できます。
ユーザープロセスによって開始されるメッセージフロー: このカテゴリのシナリオ例は、ゲートウェイメソッドを呼び出すか、
Message
をMessageChannel
に明示的に送信します。つまり、これらのメッセージフローは、開始するサードパーティプロセス(記述したコードなど)に依存します。デーモンプロセスによって開始されるメッセージフロー: このカテゴリのシナリオ例には、ポーリングされたメッセージで新しいメッセージフローを開始するメッセージキューをポーリングするポーラー、または新しいメッセージを作成して事前定義された時間にメッセージフローを開始することでプロセスをスケジュールするスケジューラーが含まれます。
明らかに、ゲートウェイプロキシ、MessageChannel.send(…)
および MessagePublisher
はすべて最初のカテゴリに属し、受信アダプターとゲートウェイ、スケジューラ、ポーラーは 2 番目のカテゴリに属します。
それでは、各カテゴリ内のさまざまなシナリオでトランザクションニーズにどのように対処できますか。また、Spring Integration が特定のシナリオのトランザクションに関して明確な何かを提供する必要があるのでしょうか。または、代わりに Spring のトランザクションサポートを使用できますか?
Spring 自体は、トランザクション管理のファーストクラスサポートを提供します。ここでのゴールは新しいものを提供することではなく、Spring を使用してトランザクションの既存のサポートを活用することです。つまり、フレームワークとして、Spring のトランザクション管理機能へのフックを公開する必要があります。ただし、Spring Integration 構成は Spring 構成に基づいているため、Spring はすでに公開しているため、これらのフックを常に公開する必要はありません。結局のところ、すべての Spring Integration コンポーネントは Spring Bean です。
このゴールを念頭に置いて、ユーザープロセスによって開始されるメッセージフローとデーモンによって開始されるメッセージフローの 2 つのシナリオを再度検討できます。
ユーザープロセスによって開始され、Spring アプリケーションコンテキストで構成されるメッセージフローは、そのようなプロセスの通常のトランザクション構成に従います。トランザクションをサポートするために Spring Integration によって明示的に構成する必要はありません。トランザクションは、Spring の標準トランザクションサポートを通じて開始でき、開始する必要があります。Spring Integration メッセージフローは、それ自体が Spring によって構成されるため、コンポーネントのトランザクションセマンティクスを当然尊重します。例: ゲートウェイまたはサービスアクティベーターメソッドに @Transactional
のアノテーションを付けることができます。または、XML 構成で TransactionInterceptor
を定義して、トランザクションである必要がある特定のメソッドを指すポイントカット式を使用することができます。要するに、これらのシナリオでは、トランザクションの構成と境界を完全に制御できます。
ただし、デーモンプロセスによって開始されるメッセージフローに関しては、状況が少し異なります。開発者によって構成されますが、これらのフローには、開始される人間またはその他のプロセスが直接関与することはありません。これらは、プロセスの構成に基づいてトリガープロセス(デーモンプロセス)によって開始されるトリガーベースのフローです。例: 毎週金曜日の夜にスケジューラーにメッセージフローを開始させることができます。また、毎秒メッセージフローを開始するトリガーを構成することもできます。その結果、これらのトリガーベースのプロセスに、結果のメッセージフローをトランザクションにする意図を知らせて、新しいメッセージフローが開始されるたびにトランザクションコンテキストを作成できるようにする方法が必要です。言い換えると、いくつかのトランザクション構成を公開する必要がありますが、Spring によってすでに提供されているトランザクションサポートに委譲するのに十分なだけです(他のシナリオで行うように)。
ポーラートランザクションサポート
Spring Integration は、ポーラーのトランザクションサポートを提供します。ポーラーは特別な型のコンポーネントです。ポーラータスク内で、それ自体がトランザクションであるリソースに対して receive()
を呼び出すことができるため、トランザクションの境界に receive()
呼び出しを含めて、タスクが失敗した場合にロールバックできるようにするためです。チャネルに同じサポートを追加した場合、追加されたトランザクションは、send()
呼び出しから始まるすべてのダウンストリームコンポーネントに影響します。これは、特に Spring がダウンストリームのコンポーネントのトランザクションのニーズに対処するためのいくつかの方法をすでに提供している場合は特に、強い理由がなくてもトランザクション境界のかなり広い範囲を提供します。ただし、receive()
メソッドがトランザクション境界に含まれていることは、ポーラーにとって「強力な理由」です。
次の例に示すように、ポーラーを構成するたびに、transactional
子要素とその属性を使用してトランザクション構成を提供できます。
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>
上記の構成は、ネイティブの Spring トランザクション構成に似ています。トランザクションマネージャーへの参照を提供し、トランザクション属性を指定するか、デフォルトに依存する必要があります (たとえば、'transaction-manager' 属性 が指定されていない場合は、デフォルトで "transactionManager" という名前の Bean になります)。内部的には、プロセスは Spring のネイティブトランザクションにラップされ、TransactionInterceptor
がトランザクションの処理を担当します。トランザクションマネージャーの構成方法、トランザクションマネージャーの種類 (JTA、データソースなど)、トランザクション構成に関連するその他の詳細については、Spring Framework リファレンスガイドを参照してください。
上記の構成では、このポーラーによって開始されたすべてのメッセージフローはトランザクションです。ポーラーのトランザクション構成の詳細と詳細については、ポーリングとトランザクションを参照してください。
トランザクションとともに、ポーラーを実行するときに、いくつかの横断的な問題に対処する必要がある場合があります。これを支援するために、ポーラー要素は <advice-chain>
子要素を受け入れます。これにより、ポーラーに適用されるアドバイスインスタンスのカスタムチェーンを定義できます。(詳細については、ポーリング可能なメッセージソースを参照してください)Spring Integration 2.0 では、ポーラーはリファクタリング作業を行い、プロキシメカニズムを使用してトランザクションの問題や他の横断的問題に対処しています。この取り組みから進化した重要な変更の 1 つは、<transactional>
要素と <advice-chain>
要素を相互に排他的にしたことです。これの背後にある理論的根拠は、複数のアドバイスが必要で、そのうちの 1 つがトランザクションアドバイスである場合、以前と同じ便利さで <advice-chain>
に含めることができますが、これにより、ご希望の順番でアドバイス。次の例は、その方法を示しています。
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
上記の例は、Spring トランザクションアドバイス(txAdvice
)の基本的な XML ベースの構成を示しており、ポーラーによって定義された <advice-chain>
に含まれています。ポーラーのトランザクションの問題のみに対処する必要がある場合でも、便宜上 <transactional>
要素を使用できます。
トランザクション境界
もう 1 つの重要な要素は、メッセージフロー内のトランザクションの境界です。トランザクションが開始されると、トランザクションコンテキストは現在のスレッドにバインドされます。メッセージフローにあるエンドポイントとチャネルの数に関係なく、フローが同じスレッドで継続することを保証している限り、トランザクションコンテキストは保持されます。ポーリング可能チャネルまたはエグゼ キュータチャネルを導入してそれを中断するか、一部のサービスで手動で新しいスレッドを開始すると、トランザクション境界も同様に中断されます。基本的に、トランザクションはそこですぐに終了し、スレッド間でハンドオフが成功すると、フローは成功したと見なされ、フローが継続し、ダウンストリームのどこかで例外が発生する可能性があるにもかかわらず、COMMIT シグナルが送信されます。そのようなフローが同期である場合、その例外は、トランザクションコンテキストのイニシエーターでもあるメッセージフローのイニシエーターにスローバックされ、トランザクションは ROLLBACK になります。中間点は、スレッド境界が壊れている任意のポイントでトランザクションチャネルを使用することです。例: トランザクション MessageStore 戦略に委譲するキューバックアップチャネルを使用できます。または、JMS バックアップチャネルを使用できます。
トランザクションの同期
一部の環境では、フロー全体を網羅するトランザクションと操作を同期すると役立ちます。例: 多数のデータベース更新を実行するフローの開始時に <file:inbound-channel-adapter/>
を検討してください。トランザクションがコミットされた場合は、ファイルを success
ディレクトリに移動したい場合がありますが、トランザクションがロールバックした場合は、ファイルを failure
ディレクトリに移動したい場合があります。
Spring Integration 2.2 は、これらの操作をトランザクションと同期する機能を導入しました。さらに、「実際の」トランザクションがなくても、成功または失敗時にさまざまなアクションを実行したい場合は、PseudoTransactionManager
を構成できます。詳細については、疑似トランザクションを参照してください。
次のリストは、この機能の主要な戦略インターフェースを示しています。
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
ファクトリは TransactionSynchronization
(Javadoc) オブジェクトの作成を担当します。独自に実装することも、フレームワークが提供する DefaultTransactionSynchronizationFactory
を使用することもできます。この実装は、TransactionSynchronizationProcessor
のデフォルト実装である ExpressionEvaluatingTransactionSynchronizationProcessor
に委譲する TransactionSynchronization
を返します。このプロセッサーは、beforeCommitExpression
、afterCommitExpression
、afterRollbackExpression
という 3 つの SpEL 式をサポートしています。
これらのアクションは、トランザクションに精通している人には自明のはずです。いずれの場合も、#root
変数は元の Message
です。場合によっては、ポーラーによってポーリングされている MessageSource
に応じて、他の SpEL 変数が使用可能になります。例: MongoDbMessageSource
は、メッセージソースの MongoTemplate
を参照する #mongoTemplate
変数を提供します。同様に、RedisStoreMessageSource
は、ポーリングによって作成された RedisStore
を参照する #store
変数を提供します。
特定のポーラーの機能を有効にするには、synchronization-factory
属性を使用して、ポーラーの <transactional/>
要素で TransactionSynchronizationFactory
への参照を提供できます。
バージョン 5.0 以降、Spring Integration は PassThroughTransactionSynchronizationFactory
を提供します。PassThroughTransactionSynchronizationFactory
は、TransactionSynchronizationFactory
が構成されていないが、アドバイスチェーンに型 TransactionInterceptor
のアドバイスが存在する場合、デフォルトでポーリングエンドポイントに適用されます。すぐに使用可能な TransactionSynchronizationFactory
実装を使用する場合、ポーリングエンドポイントは、ポーリングされたメッセージを現在のトランザクションコンテキストにバインドし、トランザクションアドバイスの後に例外がスローされた場合、MessagingException
の failedMessage
として提供します。TransactionInterceptor
を実装しないカスタムトランザクションアドバイスを使用する場合、PassThroughTransactionSynchronizationFactory
を明示的に構成してこの動作を実現できます。いずれの場合でも、MessagingException
は errorChannel
に送信される ErrorMessage
のペイロードになり、原因はアドバイスによってスローされる生の例外です。以前は、ErrorMessage
には、アドバイスによってスローされる未加工の例外であるペイロードがあり、failedMessage
情報への参照を提供していなかったため、トランザクションコミットの問題の理由を判断することは困難でした。
これらのコンポーネントの構成を簡素化するために、Spring Integration はデフォルトのファクトリの名前空間サポートを提供します。次の例は、名前空間を使用してファイル受信チャネルアダプターを構成する方法を示しています。
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
SpEL 評価の結果は、ペイロードとして committedChannel
または rolledBackChannel
のいずれかに送信されます(この場合、これは Boolean.TRUE
または Boolean.FALSE
— java.io.File.renameTo()
メソッド呼び出しの結果)。
さらに Spring Integration 処理のためにペイロード全体を送信する場合は、"payload" 式を使用します。
これにより、アクションがトランザクションと同期されることを理解することが重要です。本質的にトランザクションではないリソースを実際にトランザクションにすることはありません。代わりに、トランザクション(JDBC またはそれ以外)がポーリングの前に開始され、フローの完了時にコミットまたはロールバックされ、その後に同期アクションが続きます。 カスタム |
after-commit
および after-rollback
式に加えて、before-commit
もサポートされています。その場合、評価(またはダウンストリーム処理)が例外をスローすると、トランザクションはコミットされる代わりにロールバックされます。
疑似トランザクション
トランザクションの同期セクションを読んだ後、ポーラーの下流に「実際の」トランザクションリソース(JDBC など)がない場合でも、フローの完了時にこれらの「成功」または「失敗」アクションを実行すると便利だと思うかもしれません。例: "<file:inbound-channel-adapter/>" に続いて "<ftp:outbout-channel-adapter/>" を検討します。これらのコンポーネントはどちらもトランザクションではありませんが、FTP 転送の成功または失敗に基づいて、入力ファイルを別のディレクトリに移動したい場合があります。
この機能を提供するために、フレームワークは PseudoTransactionManager
を提供し、実際のトランザクションリソースがない場合でも上記の構成を可能にします。フローが正常に完了すると、beforeCommit
および afterCommit
同期が呼び出されます。失敗すると、afterRollback
同期が呼び出されます。実際のトランザクションではないため、実際のコミットやロールバックは発生しません。擬似トランザクションは、同期機能を有効にするために使用される手段です。
PseudoTransactionManager
を使用するには、実際のトランザクションマネージャーを構成するのと同じ方法で、<Bean/> として定義できます。次の例は、その方法を示しています。
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />
リアクティブトランザクション
バージョン 5.3 以降、ReactiveTransactionManager
は、リアクティブ型を返すエンドポイントの TransactionInterceptor
アドバイスと共に使用することもできます。これには、Flux
または Mono
ペイロードでメッセージを生成する MessageSource
および ReactiveMessageHandler
実装(ReactiveMongoDbMessageSource
など)が含まれます。他のすべての応答生成メッセージハンドラーの実装は、応答ペイロードも何らかのリアクティブ型である場合、ReactiveTransactionManager
に依存できます。