調整可能なカーソル
デフォルトでは、クライアントがカーソルによって提供された結果をすべて使い切ると、MongoDB は自動的にカーソルを閉じます。カーソルが枯渇したときにカーソルを閉じると、ストリームが有限ストリームに変わります。上限付きコレクション (英語) の場合は、クライアントが最初に返されたデータをすべて消費した後も開いたままになる調整可能なカーソル (英語) を使用できます。
上限付きコレクションは MongoOperations.createCollection を使用して作成できます。これを行うには、必要な CollectionOptions.empty().capped()… を指定します。 |
テーラブルカーソルは、命令型とリアクティブの両方の MongoDB API で使用できます。リソースの消費が少ないため、リアクティブバリアントを使用することを強くお勧めします。ただし、リアクティブ API を使用できない場合でも、Spring エコシステムですでに普及しているメッセージングの概念を使用できます。
MessageListener
で調整可能なカーソル
同期ドライバーを使用して上限付きコレクションをリッスンすると、長時間実行されるブロックタスクが作成され、別のコンポーネントに委譲する必要があります。この場合、最初に MessageListenerContainer
を作成する必要があります。これは、特定の SubscriptionRequest
を実行するためのメインエントリポイントになります。Spring Data MongoDB には、MongoTemplate
で動作するデフォルト実装がすでに付属しており、TailableCursorRequest
の Task
インスタンスを作成して実行できます。
次の例は、MessageListener
インスタンスでテール可能カーソルを使用する方法を示しています。
MessageListener
インスタンスでの調整可能なカーソル MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start(); (1)
MessageListener<Document, User> listener = System.out::println; (2)
TailableCursorRequest request = TailableCursorRequest.builder()
.collection("orders") (3)
.filter(query(where("value").lt(100))) (4)
.publishTo(listener) (5)
.build();
container.register(request, User.class); (6)
// ...
container.stop(); (7)
1 | コンテナーを開始すると、リソースが初期化され、すでに登録されている SubscriptionRequest インスタンスの Task インスタンスが開始されます。起動後に追加されたリクエストはすぐに実行されます。 |
2 | Message を受信したときに呼び出されるリスナーを定義します。Message#getBody() は、リクエストされたドメイン型に変換されます。変換せずに生の結果を受け取るには、Document を使用します。 |
3 | 聴くコレクションを設定します。 |
4 | 受信するドキュメントにオプションのフィルターを提供します。 |
5 | 受信 Message を公開するメッセージリスナーを設定します。 |
6 | リクエストを登録します。返された Subscription を使用して、現在の Task 状態を確認し、それをキャンセルしてリソースを解放できます。 |
7 | コンテナーが不要になったことを確認したら、忘れずにコンテナーを停止してください。これを行うと、コンテナー内で実行中のすべての Task インスタンスが停止します。 |
リアクティブな調整可能なカーソル
リアクティブデータ型でテール可能カーソルを使用すると、無限ストリームを構築できます。テーラブルカーソルは、外部から閉じられるまで開いたままになります。新しいドキュメントが上限付きコレクションに到着すると、データが出力されます。
クエリが一致を返さなかった場合、またはカーソルがコレクションの「最後」にあるドキュメントを返し、その後アプリケーションがそのドキュメントを削除した場合、テーラブルカーソルはデッドまたは無効になる可能性があります。次の例は、無限ストリームクエリを作成して使用する方法を示しています。
Flux<Person> stream = template.tail(query(where("name").is("Joe")), Person.class);
Disposable subscription = stream.doOnNext(person -> System.out.println(person)).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();
Spring Data MongoDB リアクティブリポジトリは、クエリメソッドに @Tailable
アノテーションを付けることで無限ストリームをサポートします。これは、次の例に示すように、Flux
および複数の要素を発行できるその他のリアクティブ型を返すメソッドで機能します。
public interface PersonRepository extends ReactiveMongoRepository<Person, String> {
@Tailable
Flux<Person> findByFirstname(String firstname);
}
Flux<Person> stream = repository.findByFirstname("Joe");
Disposable subscription = stream.doOnNext(System.out::println).subscribe();
// …
// Later: Dispose the subscription to close the stream
subscription.dispose();