ストリームの変更

MongoDB 3.6 以降、ストリームの変更 (英語) を使用すると、アプリケーションは oplog を追跡することなく、変更について通知を受け取ることができます。

変更ストリームのサポートは、レプリカセットまたはシャードクラスターでのみ可能です。

変更ストリームは、命令型とリアクティブの両方の MongoDB Java ドライバーで使用できます。リソースの消費が少ないため、リアクティブバリアントを使用することを強くお勧めします。ただし、リアクティブ API を使用できない場合でも、Spring エコシステムですでに普及しているメッセージングの概念を使用して変更イベントを取得できます。

コレクションレベルとデータベースレベルの両方で監視することが可能ですが、データベースレベルのバリアントでは、データベース内のすべてのコレクションからの変更が公開されます。データベース変更ストリームをサブスクライブする場合は、エンティティ型が異なると変換が正しく適用されない可能性があるため、イベント型に適切な型を使用するようにしてください。疑わしい場合は、Document を使用してください。

MessageListener でストリームを変更する

同期ドライバーを使用してストリームを変更する (英語) をリッスンすると、長時間実行されるブロッキングタスクが作成され、別のコンポーネントに委譲する必要があります。この場合、まず、特定の SubscriptionRequest タスクを実行するためのメインエントリポイントとなる MessageListenerContainer (Javadoc) を作成する必要があります。Spring Data MongoDB には、MongoTemplate で動作し、ChangeStreamRequest (Javadoc) の Task インスタンスを作成して実行できるデフォルトの実装がすでに付属しています。

次の例は、MessageListener インスタンスで変更ストリームを使用する方法を示しています。

例 1: MessageListener インスタンスでストリームを変更する
MessageListenerContainer container = new DefaultMessageListenerContainer(template);
container.start();                                                                                              (1)

MessageListener<ChangeStreamDocument<Document>, User> listener = System.out::println;                           (2)
ChangeStreamRequestOptions options = new ChangeStreamRequestOptions("db", "user", ChangeStreamOptions.empty()); (3)

Subscription subscription = container.register(new ChangeStreamRequest<>(listener, options), User.class);       (4)

// ...

container.stop();                                                                                               (5)
1 コンテナーを開始すると、リソースが初期化され、すでに登録されている SubscriptionRequest インスタンスの Task インスタンスが開始されます。起動後に追加されたリクエストはすぐに実行されます。
2Message を受信したときに呼び出されるリスナーを定義します。Message#getBody() は、リクエストされたドメイン型に変換されます。変換せずに生の結果を受け取るには、Document を使用します。
3 リッスンするコレクションを設定し、ChangeStreamOptions を通じて追加のオプションを提供します。
4 リクエストを登録します。返された Subscription を使用して、現在の Task 状態を確認し、それをキャンセルしてリソースを解放できます。
5 コンテナーが不要になったことを確認したら、忘れずにコンテナーを停止してください。これを行うと、コンテナー内で実行中のすべての Task インスタンスが停止します。

処理中のエラーは org.springframework.util.ErrorHandler に渡されます。特に明記されていない場合、ErrorHandler を追加するログがデフォルトで適用されます。
追加機能を提供するには、register(request, body, errorHandler) を使用してください。

リアクティブな変更ストリーム

リアクティブ API を使用して変更ストリームをサブスクライブすることは、ストリームを操作するためのより自然なアプローチです。それでも、ChangeStreamOptions などの重要な構成要素は変わりません。次の例は、ChangeStreamEvent を発行する変更ストリームの使用メソッドを示しています。

例 2: ChangeStreamEvent を発行するストリームの変更
Flux<ChangeStreamEvent<User>> flux = reactiveTemplate.changeStream(User.class) (1)
    .watchCollection("people")
    .filter(where("age").gte(38))                                              (2)
    .listen();                                                                 (3)
1 基になるドキュメントの変換先となるイベントターゲット型。変換せずに生の結果を受け取るには、これを省略しておきます。
2 イベントをフィルターするには、集約パイプラインまたはクエリ Criteria だけを使用します。
3 変更ストリームイベントの Flux を取得します。ChangeStreamEvent#getBody() は (2) でリクエストされたドメイン型に変換されます。

変更ストリームの再開

変更ストリームは再開でき、中断したところからイベントの発行を再開できます。ストリームを再開するには、再開トークンまたは最後の既知のサーバー時間 (UTC) のいずれかを指定する必要があります。ChangeStreamOptions (Javadoc) を使用して、それに応じて値を設定します。

次の例は、サーバー時間を使用して再開オフセットを設定する方法を示しています。

例 3: 変更ストリームを再開する
Flux<ChangeStreamEvent<User>> resumed = template.changeStream(User.class)
    .watchCollection("people")
    .resumeAt(Instant.now().minusSeconds(1)) (1)
    .listen();
1getTimestamp メソッドを通じて ChangeStreamEvent のサーバー時刻を取得することも、getResumeToken を通じて公開される resumeToken を使用することもできます。
場合によっては、Instant は、変更ストリームを再開するときに十分正確な測定値ではない可能性があります。この目的には、MongoDB ネイティブ BsonTimestamp (英語) を使用します。