セッションとトランザクション
バージョン 3.6 の時点で、MongoDB はセッションの概念をサポートしています。セッションを使用すると、MongoDB の因果関係の一貫性 (英語) モデルが有効になり、因果関連を考慮した順序で操作が実行されることが保証されます。これらは ServerSession
インスタンスと ClientSession
インスタンスに分割されます。このセクションでは、セッションについて話すときは、ClientSession
を指します。
クライアントセッション内の操作は、セッション外の操作から分離されません。 |
MongoOperations
と ReactiveMongoOperations
は両方とも、ClientSession
を操作に結び付けるためのゲートウェイメソッドを提供します。MongoCollection
および MongoDatabase
は、MongoDB のコレクションおよびデータベースインターフェースを実装するセッションプロキシオブジェクトを使用するため、呼び出しごとにセッションを追加する必要はありません。これは、MongoCollection#find()
への潜在的な呼び出しが MongoCollection#find(ClientSession)
に委譲されることを意味します。
(Reactive)MongoOperations#getCollection などのメソッドは、それ自体が ClientSession の専用メソッドを提供するネイティブ MongoDB Java Driver ゲートウェイオブジェクト ( MongoCollection など) を返します。これらのメソッドはセッションプロキシではありません。MongoOperations の #execute コールバックのいずれかを介さずに、MongoCollection または MongoDatabase と直接対話する場合は、必要に応じて ClientSession を指定する必要があります。 |
ClientSession サポート
次の例は、セッションの使用箇所を示しています。
命令的
リアクティブ
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
ClientSession session = client.startSession(sessionOptions); (1)
template.withSession(() -> session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
Person durzo = action.findOne(query, Person.class); (2)
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
action.insert(azoth); (3)
return azoth;
});
session.close() (4)
1 | サーバーから新しいセッションを取得します。 |
2 | 以前と同様に MongoOperation メソッドを使用します。ClientSession は自動的に適用されます。 |
3 | ClientSession は必ず閉じてください。 |
4 | セッションを閉じます。 |
DBRef インスタンス、特に遅延ロードされるインスタンスを処理する場合、すべてのデータがロードされる前に ClientSession を閉じないことが重要です。それ以外の場合、遅延フェッチは失敗します。 |
ClientSessionOptions sessionOptions = ClientSessionOptions.builder()
.causallyConsistent(true)
.build();
Publisher<ClientSession> session = client.startSession(sessionOptions); (1)
template.withSession(session)
.execute(action -> {
Query query = query(where("name").is("Durzo Blint"));
return action.findOne(query, Person.class)
.flatMap(durzo -> {
Person azoth = new Person("Kylar Stern");
azoth.setMaster(durzo);
return action.insert(azoth); (2)
});
}, ClientSession::close) (3)
.subscribe(); (4)
1 | 新しいセッションを取得するために Publisher を取得します。 |
2 | 以前と同様に ReactiveMongoOperation メソッドを使用します。ClientSession は自動的に取得され、適用されます。 |
3 | ClientSession は必ず閉じてください。 |
4 | 購読するまでは何も起こりません。詳細については、プロジェクト Reactor リファレンスガイド (英語) を参照してください。 |
実際のセッションを提供する Publisher
を使用すると、実際のサブスクリプションの時点までセッションの取得を延期できます。ただし、古いセッションでサーバーが汚染されないように、完了したらセッションを閉じる必要があります。セッションが必要なくなった場合は、execute
の doFinally
フックを使用して ClientSession#close()
を呼び出します。セッション自体をより詳細に制御したい場合は、ドライバーを通じて ClientSession
を取得し、Supplier
を通じて提供できます。
ClientSession のリアクティブな使用は、テンプレート API の使用に限定されます。現在、リアクティブリポジトリとのセッション統合はありません。 |
MongoDB トランザクション
バージョン 4 以降、MongoDB はトランザクション (英語) をサポートします。トランザクションはセッション上に構築されるため、アクティブな ClientSession
が必要です。
アプリケーションコンテキスト内で MongoTransactionManager を指定しない限り、トランザクションサポートは DISABLED です。setSessionSynchronization(ALWAYS) を使用して、進行中の非ネイティブ MongoDB トランザクションに参加できます。 |
トランザクションをプログラムで完全に制御するには、MongoOperations
でセッションコールバックを使用するとよいでしょう。
次の例は、プログラムによるトランザクション制御を示しています。
命令的
リアクティブ
ClientSession session = client.startSession(options); (1)
template.withSession(session)
.execute(action -> {
session.startTransaction(); (2)
try {
Step step = // ...;
action.insert(step);
process(step);
action.update(Step.class).apply(Update.set("state", // ...
session.commitTransaction(); (3)
} catch (RuntimeException e) {
session.abortTransaction(); (4)
}
}, ClientSession::close) (5)
1 | 新しい ClientSession を入手します。 |
2 | トランザクションを開始します。 |
3 | すべてが期待どおりに機能する場合は、変更をコミットします。 |
4 | 何かが壊れたため、すべてをロールバックします。 |
5 | 完了したら、セッションを閉じることを忘れないでください。 |
前述の例では、コールバック内でセッションスコープの MongoOperations
インスタンスを使用して、セッションがすべてのサーバー呼び出しに確実に渡されるようにしながら、トランザクション動作を完全に制御できます。このアプローチに伴うオーバーヘッドの一部を回避するには、TransactionTemplate
を使用して、手動トランザクションフローのノイズの一部を取り除くことができます。
Mono<DeleteResult> result = Mono
.from(client.startSession()) (1)
.flatMap(session -> {
session.startTransaction(); (2)
return Mono.from(collection.deleteMany(session, ...)) (3)
.onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) (4)
.flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) (5)
.doFinally(signal -> session.close()); (6)
});
1 | まず、明らかにセッションを開始する必要があります。 |
2 | ClientSession を手元に用意したら、取引を開始します。 |
3 | ClientSession を操作に渡すことにより、トランザクション内で操作します。 |
4 | 操作が例外的に完了した場合は、トランザクションを停止してエラーを保存する必要があります。 |
5 | もちろん、成功した場合は変更をコミットします。操作結果はまだ保存されています。 |
6 | 最後に、セッションを必ず閉じる必要があります。 |
上記の操作の原因は、commitTransaction()
または abortTransaction()
を介して発行されるトランザクション結果ではなく、メインフロー DeleteResult
を保持することであり、これによりかなり複雑なセットアップが発生します。
アプリケーションコンテキスト内で ReactiveMongoTransactionManager を指定しない限り、トランザクションサポートは DISABLED です。setSessionSynchronization(ALWAYS) を使用して、進行中の非ネイティブ MongoDB トランザクションに参加できます。 |
TransactionTemplate/TransactionalOperator との取引
Spring Data MongoDB トランザクションは、TransactionTemplate
と TransactionalOperator
の両方をサポートします。
TransactionTemplate
/ TransactionalOperator
でのトランザクション 命令的
リアクティブ
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); (2)
txTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) { (3)
Step step = // ...;
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
}
});
1 | テンプレート API の設定中にトランザクションの同期を有効にします。 |
2 | 提供された PlatformTransactionManager を使用して TransactionTemplate を作成します。 |
3 | コールバック内では、ClientSession とトランザクションはすでに登録されています。 |
実行時に MongoTemplate の状態を変更すると (前のリストの項目 1 で可能だと思われるかもしれませんが)、スレッドと可視性の問題が発生する可能性があります。 |
template.setSessionSynchronization(ALWAYS); (1)
// ...
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
new DefaultTransactionDefinition()); (2)
Step step = // ...;
template.insert(step);
Mono<Void> process(step)
.then(template.update(Step.class).apply(Update.set("state", …))
.as(rxtx::transactional) (3)
.then();
1 | トランザクションに参加するためにトランザクション同期を有効にします。 |
2 | 提供された ReactiveTransactionManager を使用して TransactionalOperator を作成します。 |
3 | TransactionalOperator.transactional(…) は、すべてのアップストリーム操作のトランザクション管理を提供します。 |
MongoTransactionManager および ReactiveMongoTransactionManager とのトランザクション
MongoTransactionManager
/ ReactiveMongoTransactionManager
は、よく知られた Spring トランザクションサポートへのゲートウェイです。これにより、アプリケーションは Spring のマネージドトランザクション機能を使用できるようになります。MongoTransactionManager
は ClientSession
をスレッドにバインドしますが、ReactiveMongoTransactionManager
はこれに ReactorContext
を使用します。MongoTemplate
はセッションを検出し、それに応じてトランザクションに関連付けられたこれらのリソースを操作します。MongoTemplate
は、他の進行中のトランザクションに参加することもできます。次の例は、MongoTransactionManager
を使用してトランザクションを作成および使用する方法を示しています。
MongoTransactionManager
/ ReactiveMongoTransactionManager
でのトランザクション 命令的
リアクティブ
@Configuration
static class Config extends AbstractMongoClientConfiguration {
@Bean
MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) { (1)
return new MongoTransactionManager(dbFactory);
}
// ...
}
@Component
public class StateService {
@Transactional
void someBusinessFunction(Step step) { (2)
template.insert(step);
process(step);
template.update(Step.class).apply(Update.set("state", // ...
};
});
1 | MongoTransactionManager をアプリケーションコンテキストに登録します。 |
2 | メソッドをトランザクションとしてマークします。 |
@Transactional(readOnly = true) は、MongoTransactionManager に対して、発信リクエストに ClientSession を追加するトランザクションも開始するようにアドバイスします。 |
@Configuration
public class Config extends AbstractReactiveMongoConfiguration {
@Bean
ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) { (1)
return new ReactiveMongoTransactionManager(factory);
}
// ...
}
@Service
public class StateService {
@Transactional
Mono<UpdateResult> someBusinessFunction(Step step) { (2)
return template.insert(step)
.then(process(step))
.then(template.update(Step.class).apply(Update.set("state", …));
};
});
1 | ReactiveMongoTransactionManager をアプリケーションコンテキストに登録します。 |
2 | メソッドをトランザクションとしてマークします。 |
@Transactional(readOnly = true) は、ReactiveMongoTransactionManager に対して、発信リクエストに ClientSession を追加するトランザクションも開始するようにアドバイスします。 |
MongoDB 固有のトランザクションオプションの制御
トランザクションサービスメソッドでは、トランザクションを実行するために特定のトランザクションオプションが必要になる場合があります。Spring Data MongoDB のトランザクションマネージャーは、@Transactional(label = { "mongo:readConcern=available" })
などのトランザクションラベルの評価をサポートします。
デフォルトでは、mongo:
プレフィックスを使用するラベル名前空間は、デフォルトで構成されている MongoTransactionOptionsResolver
によって評価されます。トランザクションラベルは TransactionAttribute
によって提供され、TransactionTemplate
および TransactionalOperator
を介してプログラムによるトランザクション制御に使用できます。@Transactional(label = …)
はその宣言的な性質により、ドキュメントとしても機能する優れた出発点を提供します。
現在、次のオプションがサポートされています。
- 最大コミット時間
commitTransaction 操作のサーバー上での最大実行時間を制御します。値の形式は、
Duration.parse(…)
で使用される ISO-8601 期間形式に対応します。使用方法:
mongo:maxCommitTime=PT1S
- 関心事を読む
トランザクションの読み取り対象を設定します。
使用方法:
mongo:readConcern=LOCAL|MAJORITY|LINEARIZABLE|SNAPSHOT|AVAILABLE
- 読み取り設定
トランザクションの読み取り設定を設定します。
使用方法:
mongo:readPreference=PRIMARY|SECONDARY|SECONDARY_PREFERRED|PRIMARY_PREFERRED|NEAREST
- 関心事を書く
トランザクションの書き込み関心事を設定します。
使用方法:
mongo:writeConcern=ACKNOWLEDGED|W1|W2|W3|UNACKNOWLEDGED|JOURNALED|MAJORITY
外部トランザクションに結合するネストされたトランザクションは、トランザクションがすでに開始されているため、初期トランザクションオプションに影響を与えません。トランザクションオプションは、新しいトランザクションが開始される場合にのみ適用されます。 |
トランザクション内の特別な動作
トランザクション内では、MongoDB サーバーの動作が少し異なります。
接続設定
MongoDB ドライバーは、ドライバーを自動検出モードに切り替える専用のレプリカセット名構成オプションを提供します。このオプションは、トランザクション中のプライマリレプリカセットノードとコマンドルーティングを識別できます。
MongoDB URI には必ず replicaSet を追加してください。詳細については、接続文字列オプション (英語) を参照してください。 |
収集操作
MongoDB は、トランザクション内でのコレクション作成などのコレクション操作をサポートしません。これは、最初の使用時に行われるオンザフライのコレクション作成にも影響します。必要な構造がすべて適切に配置されていることを確認してください。
一時的なエラー
MongoDB は、トランザクション操作中に発生したエラーに特別なラベルを追加できます。これらは一時的なエラーを示している可能性があり、操作を再試行するだけで解消される可能性があります。そういった用途には Spring Retry [GitHub] (英語) を強くお勧めします。それでも、MongoDB リファレンスマニュアルに従って、MongoTransactionManager#doCommit(MongoTransactionObject)
をオーバーライドしてコミット操作を再試行します (英語) 動作を実装することはできます。
カウント
MongoDB count
は、トランザクション内の実際の状況を反映していない可能性がある収集統計に基づいて動作します。マルチドキュメントトランザクション内で count
コマンドを発行すると、サーバーはエラー 50851 で応答します。MongoTemplate
がアクティブなトランザクションを検出すると、公開されているすべての count()
メソッドが変換され、$match
および $count
演算子を使用して集約フレームワークに委譲され、collation
などの Query
設定が維持されます。
集計カウントヘルパー内で geo コマンドを使用する場合、制限が適用されます。次の演算子は使用できないため、別の演算子に置き換える必要があります。
$where
→$expr
$near
→$geoWithin
($center
あり)$nearSphere
→$geoWithin
($centerSphere
あり)
Criteria.near(…)
および Criteria.nearSphere(…)
を使用するクエリは、Criteria.within(…)
および Criteria.withinSphere(…)
にそれぞれ書き直す必要があります。within
に変更する必要があるリポジトリクエリメソッドの near
クエリキーワードにも同じことが当てはまります。詳細については、MongoDB JIRA チケット DRIVERS-518 (英語) も参照してください。
次のスニペットは、セッションバインドクロージャ内での count
の使用箇所を示しています。
session.startTransaction();
template.withSession(session)
.execute(action -> {
action.count(query(where("state").is("active")), Step.class)
...
上記のスニペットは、次のコマンドで具体化されます。
db.collection.aggregate(
[
{ $match: { state: "active" } },
{ $count: "totalEntityCount" }
]
)
代わりに:
db.collection.find( { state: "active" } ).count()