MongoDb サポート

バージョン 2.1 は MongoDB (英語) のサポートを導入しました: 「高性能、オープンソース、ドキュメント指向データベース」。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mongodb</artifactId>
    <version>6.4.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-mongodb:6.4.1"

MongoDB をダウンロード、インストール、実行するには、MongoDB ドキュメント (英語) を参照してください。

MongoDb への接続

ブロッキングまたは反応?

5.3 バージョン以降、Spring Integration は、MongoDB へのアクセス時にノンブロッキング I/O を有効にするために、リアクティブ MongoDB ドライバーをサポートしています。リアクティブサポートを有効にするには、MongoDB リアクティブストリームドライバーを依存関係に追加します。

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"

通常の同期クライアントの場合、依存関係にそれぞれのドライバーを追加する必要があります。

  • Maven

  • Gradle

<dependency>
    <groupId>org.mongodb</groupId>
    <artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"

どちらも optional であり、エンドユーザーの選択サポートを強化するためのフレームワークです。

MongoDB との対話を開始するには、まずそれに接続する必要があります。Spring Integration は、別の Spring プロジェクト Spring Data MongoDB によって提供されるサポートに基づいて構築されています。これは、MongoDatabaseFactory および ReactiveMongoDatabaseFactory と呼ばれるファクトリクラスを提供し、MongoDB クライアント API との統合を簡素化します。

Spring Data はデフォルトでブロッキング MongoDB ドライバーを提供しますが、上記の依存関係を含めることでリアクティブな使用を選択できます。

MongoDatabaseFactory を使用する

MongoDB に接続するには、MongoDatabaseFactory インターフェースの実装を使用できます。

次の例は、SimpleMongoClientDatabaseFactory の使用方法を示しています。

  • Java

  • XML

MongoDatabaseFactory mongoDbFactory =
        new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

SimpleMongoClientDatabaseFactory は 2 つの引数を取ります: MongoClient インスタンスとデータベースの名前を指定する Stringhostport などのプロパティを構成する必要がある場合は、基になる MongoClients クラスによって提供されるコンストラクターの 1 つを使用して渡すことができます。MongoDB の構成方法の詳細については、Spring-Data-MongoDB リファレンスを参照してください。

ReactiveMongoDatabaseFactory を使用する

リアクティブドライバーを使用して MongoDB に接続するには、ReactiveMongoDatabaseFactory インターフェースの実装を使用できます。

次の例は、SimpleReactiveMongoDatabaseFactory の使用方法を示しています。

  • Java

  • XML

ReactiveMongoDatabaseFactory mongoDbFactory =
        new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
    <constructor-arg>
        <bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
    </constructor-arg>
    <constructor-arg value="test"/>
</bean>

MongoDB メッセージストア

エンタープライズ統合パターン(EIP)ブックに従って、メッセージストア (英語) を使用するとメッセージを保持できます。信頼性が懸念される場合、メッセージをバッファリングする機能(QueueChannelaggregatorresequencer など)を備えたコンポーネントを扱う場合、これは便利です。Spring Integration では、MessageStore 戦略はクレームチェック (英語) パターンの基盤も提供します。これは EIP でも説明されています。

Spring Integration の MongoDB モジュールは、MongoDbMessageStore を提供します。これは、MessageStore 戦略(主にクレームチェックパターンで使用)と MessageGroupStore 戦略(主にアグリゲーターパターンおよびリシーケンサパターンで使用)の両方の実装です。

次の例では、MongoDbMessageStore が QueueChannel と aggregator を使用するように構成しています。

<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
    <constructor-arg ref="mongoDbFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="mongoDbMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="mongoDbMessageStore"/>

上記の例は単純な Bean 構成であり、コンストラクター引数として MongoDbFactory を想定しています。

MongoDbMessageStore は、Spring Data Mongo マッピングメカニズムを使用して、ネストされたすべてのプロパティを含む Mongo ドキュメントとして Message を拡張します。監査または分析のために payload または headers にアクセスする必要がある場合(たとえば、保存されているメッセージに対して)に役立ちます。

MongoDbMessageStore は、カスタム MappingMongoConverter 実装を使用して Message インスタンスを MongoDB ドキュメントとして格納します。Message のプロパティ(payload および header 値)にはいくつかの制限があります。

バージョン 5.1.6 以降、MongoDbMessageStore は、内部 MappingMongoConverter 実装に伝搬されるカスタムコンバーターで構成できます。詳細については、MongoDbMessageStore.setCustomConverters(Object…​ customConverters) JavaDocs を参照してください。

Spring Integration 3.0 は ConfigurableMongoDbMessageStore を導入しました。MessageStore と MessageGroupStore の両方のインターフェースを実装しています。このクラスは、コンストラクター引数として MongoTemplate を受け取ることができます。これを使用して、たとえば、カスタム WriteConcern を構成できます。別のコンストラクターには MappingMongoConverter と MongoDbFactory が必要です。これにより、Message インスタンスとそのプロパティにカスタム変換を提供できます。デフォルトでは、ConfigurableMongoDbMessageStore は標準の Java シリアライゼーションを使用して、Message インスタンスを MongoDB に読み書きし(MongoDbMessageBytesConverter を参照)、MongoTemplate の他のプロパティのデフォルト値に依存することに注意してください。提供されている MongoDbFactory および MappingMongoConverter から MongoTemplate を構築します。ConfigurableMongoDbMessageStore によって格納されるコレクションのデフォルト名は configurableStoreMessages です。メッセージに複雑なデータ型が含まれている場合、この実装を使用して堅牢で柔軟なソリューションを作成することをお勧めします。

バージョン 6.0.8 以降、AbstractConfigurableMongoDbMessageStore には、自動インデックス作成を無効にするために使用できる setCreateIndexes(boolean) (デフォルトは true) オプションが用意されています。次の例は、Bean を宣言し、自動インデックス作成を無効にする方法を示しています。

@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
    MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndexes(false);
    return mongoDbChannelMessageStore;
}

MongoDB チャネルメッセージストア

バージョン 4.0 は、新しい MongoDbChannelMessageStore を導入しました。QueueChannel インスタンスで使用するために最適化された MessageGroupStore です。priorityEnabled = true を使用すると、<int:priority-queue> インスタンスでそれを使用して、持続メッセージの優先順位ポーリングを実現できます。優先度 MongoDB ドキュメントフィールドは、IntegrationMessageHeaderAccessor.PRIORITY (priority)メッセージヘッダーから入力されます。

さらに、すべての MongoDB MessageStore インスタンスには、MessageGroup ドキュメント用の sequence フィールドがあります。sequence 値は、同じコレクションからの単純な sequence ドキュメントに対する $inc 操作の結果であり、要求に応じて作成されます。sequence フィールドは poll 操作で使用され、メッセージが同じミリ秒以内に保存される場合、先入れ先出し(FIFO)メッセージの順序(設定されている場合は優先順位内)を提供します。

priorityEnabled オプションはストア全体に適用されるため、優先度と非優先度に同じ MongoDbChannelMessageStore Bean を使用することはお勧めしません。ただし、ストアからのメッセージポーリングはソートされ、インデックスを使用するため、同じ collection を両方の MongoDbChannelMessageStore 型に使用できます。そのシナリオを構成するには、次の例に示すように、一方のメッセージストア Bean を他方から拡張できます。
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
    <constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>

<int:channel id="queueChannel">
    <int:queue message-store="store"/>
</int:channel>

<bean id="priorityStore" parent="channelStore">
    <property name="priorityEnabled" value="true"/>
</bean>

<int:channel id="priorityChannel">
    <int:priority-queue message-store="priorityStore"/>
</int:channel>

自動インデックス作成を無効にして AbstractConfigurableMongoDbMessageStore を使用する

バージョン 6.0.8 以降、AbstractConfigurableMongoDbMessageStore は、自動インデックス作成を無効または有効 (デフォルト) にするために使用できる setCreateIndex(boolean) を実装します。次の例は、Bean を宣言し、自動インデックス作成を無効にする方法を示しています。

@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
    AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
    mongoDbChannelMessageStore.setCreateIndex(false);

    return mongoDbChannelMessageStore;
}

MongoDB メタデータストア

Spring Integration 4.2 は、MongoDB ベースの新しい MetadataStore (メタデータストアを参照)実装を導入しました。MongoDbMetadataStore を使用して、アプリケーションの再起動後もメタデータの状態を維持できます。この新しい MetadataStore 実装は、次のようなアダプターで使用できます。

これらのアダプターに新しい MongoDbMetadataStore を使用するように指示するには、Bean 名が metadataStore の Spring Bean を宣言します。フィード受信チャネルアダプターは、宣言された MongoDbMetadataStore を自動的に取得して使用します。次の例は、metadataStore という名前の Bean を宣言する方法を示しています。

@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
    return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}

MongoDbMetadataStore は ConcurrentMetadataStore も実装しており、キーの値を保存または変更できるインスタンスは 1 つだけであるため、複数のアプリケーションインスタンス間で確実に共有できます。MongoDB の保証により、これらの操作はすべてアトミックです。

MongoDB 受信チャネルアダプター

MongoDB 受信チャネルアダプターは、MongoDB からデータを読み取り、Message ペイロードとして送信するポーリングコンシューマーです。次の例は、MongoDB 受信チャネルアダプターを構成する方法を示しています。

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
       channel="replyChannel"
       query="{'name' : 'Bob'}"
       entity-class="java.lang.Object"
       auto-startup="false">
		<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>

上記の構成が示すように、inbound-channel-adapter 要素を使用し、次のようなさまざまな属性の値を提供することにより、MongoDb 受信チャネルアダプターを構成します。

  • query: JSON クエリ (MongoDB クエリ (英語) を参照してください)

  • query-expression: JSON クエリ文字列(上記の query 属性として)または o.s.data.mongodb.core.query.Query のインスタンスに評価される SpEL 式。query 属性と相互に排他的。

  • entity-class: ペイロードオブジェクトの型。指定しない場合、com.mongodb.DBObject が返されます。

  • collection-name または collection-name-expression: 使用する MongoDB コレクションの名前を識別します。

  • mongodb-factoryo.s.data.mongodb.MongoDbFactory のインスタンスへの参照

  • mongo-templateo.s.data.mongodb.core.MongoTemplate のインスタンスへの参照

  • 他のすべての受信アダプターに共通するその他の属性 (「チャネル」など)。

mongo-template と mongodb-factory の両方を設定することはできません。

上記の例は、query のリテラル値を持ち、collection のデフォルト名を使用するため、比較的単純で静的です。条件によっては、実行時にこれらの値を変更する必要がある場合があります。そのためには、-expression の同等物(query-expression および collection-name-expression)を使用します。ここで、提供された式は有効な SpEL 式です。

また、MongoDB から読み取られ、正常に処理されたデータに対して後処理を行うこともできます。たとえば ; 処理後にドキュメントを移動または削除することができます。これを行うには、次の例に示すように、追加されたトランザクション同期機能 Spring Integration 2.2 を使用します。

<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
    channel="replyChannel"
    query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
    entity-class="java.lang.Object"
    auto-startup="false">
        <int:poller fixed-rate="200" max-messages-per-poll="1">
            <int:transactional synchronization-factory="syncFactory"/>
        </int:poller>
</int-mongodb:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit
        expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
        channel="someChannel"/>
</int:transaction-synchronization-factory>

<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

次の例は、前述の例で参照されている DocumentCleaner を示しています。

public class DocumentCleaner {
    public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
        if (target instanceof List<?> documents){
            for (Object document : documents) {
                mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
            }
        }
    }
}

transactional 要素を使用して、ポーラーをトランザクション対応として宣言できます。この要素は、実際のトランザクションマネージャーを参照できます(たとえば、フローの他の部分が JDBC を呼び出す場合)。「実際の」トランザクションがない場合は、o.s.i.transaction.PseudoTransactionManager のインスタンスを使用できます。これは、Spring の PlatformTransactionManager の実装であり、実際のトランザクションがないときに Mongo アダプターのトランザクション同期機能を使用できるようにします。

これを行っても、MongoDB 自体はトランザクションになりません。成功(コミット)または失敗(ロールバック)の前後にアクションの同期を取ることができます。

ポーラーがトランザクション対応になると、o.s.i.transaction.TransactionSynchronizationFactory のインスタンスを transactional 要素に設定できます。TransactionSynchronizationFactory は TransactionSynchronization のインスタンスを作成します。便宜上、デフォルトの SpEL ベースの TransactionSynchronizationFactory を公開しました。これにより、SpEL 式を構成し、その実行をトランザクションと調整(同期)できます。コミット前、コミット後、ロールバック後のイベントの式と、評価結果(存在する場合)が送信される各イベントのチャネルがサポートされています。子要素ごとに、expression および channel 属性を指定できます。channel 属性のみが存在する場合、受信したメッセージは特定の同期シナリオの一部としてそこに送信されます。expression 属性のみが存在し、式の結果が null 以外の値である場合、ペイロードとしての結果を含むメッセージが生成され、デフォルトチャネル(NullChannel)に送信され、ログに表示されます(DEBUG レベル)。評価結果を特定のチャネルに移動する場合は、channel 属性を追加します。式の結果が null または void の場合、メッセージは生成されません。

トランザクション同期の詳細については、トランザクションの同期を参照してください。

バージョン 5.5 以降、MongoDbMessageSource は updateExpression で構成できます。これは、MongoDb update 構文を使用する String または org.springframework.data.mongodb.core.query.Update インスタンスに評価される必要があります。上記の後処理手順の代替として使用でき、コレクションからフェッチされたエンティティを変更するため、次のポーリングサイクルで再びコレクションからプルされることはありません (更新によって何らかの値が変更されると仮定します)。クエリで使用されます)。同じコレクションの MongoDbMessageSource の複数のインスタンスがクラスターで使用されている場合は、実行の分離とデータの一貫性を実現するためにトランザクションを使用することをお勧めします。

MongoDB 変更ストリーム受信チャネルアダプター

バージョン 5.3 以降、spring-integration-mongodb モジュールは MongoDbChangeStreamMessageProducer (Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class) API のリアクティブ MessageProducerSupport 実装)を導入します。このコンポーネントは、デフォルトでペイロードとして ChangeStreamEvent の body を含むメッセージの Flux と、いくつかの変更ストリーム関連ヘッダーを生成します(MongoHeaders を参照)。この MongoDbChangeStreamMessageProducer を FluxMessageChannel と組み合わせて、オンデマンドサブスクリプションとダウンストリームでのイベント消費のための outputChannel として使用することをお勧めします。

このチャネルアダプターの Java DSL 構成は次のようになります。

@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
    return IntegrationFlow.from(
            MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
                    .domainType(Person.class)
                    .collection("person")
                    .extractBody(false))
            .channel(MessageChannels.flux())
            .get();
}

MongoDbChangeStreamMessageProducer が停止するか、サブスクリプションがダウンストリームでキャンセルされるか、MongoDb 変更ストリームが OperationType.INVALIDATE を生成すると、Publisher が完了します。チャネルアダプターを再度起動すると、ソースデータの新しい Publisher が作成され、MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>) で自動的にサブスクライブされます。他の場所からの変更ストリームイベントを消費する必要がある場合は、このチャネルアダプターを再起動して新しいオプションを開始することができます。

変更ストリームサポートの詳細については、Spring Data MongoDb のドキュメントを参照してください。

MongoDB 送信チャネルアダプター

MongoDB 送信チャネルアダプターを使用すると、次の例に示すように、メッセージペイロードを MongoDB ドキュメントストアに書き込むことができます。

<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
	collection-name="myCollection"
	mongo-converter="mongoConverter"
	mongodb-factory="mongoDbFactory" />

上記の構成が示すように、outbound-channel-adapter 要素を使用して MongoDB 送信チャネルアダプターを構成し、次のようなさまざまな属性の値を提供できます。

  • collection-name または collection-name-expression: 使用する MongoDb コレクションの名前を識別します。

  • mongo-converter: 生の Java オブジェクトを JSON ドキュメント表現に変換するのを支援する o.s.data.mongodb.core.convert.MongoConverter のインスタンスへの参照。

  • mongodb-factoryo.s.data.mongodb.MongoDbFactory のインスタンスへの参照。

  • mongo-templateo.s.data.mongodb.core.MongoTemplate のインスタンスへの参照。注意: mongo-template と mongodb-factory の両方を設定することはできません。

  • すべての受信 アダプターに共通するその他の属性 (「チャネル」など)。

前の例は、collection-name のリテラル値を持っているため、比較的単純で静的です。状況によっては、実行時にこの値を変更する必要がある場合があります。これを行うには、collection-name-expression を使用します。指定された式は有効な SpEL 式です。

MongoDB 送信ゲートウェイ

バージョン 5.0 は、MongoDB 送信ゲートウェイを導入しました。リクエストチャネルにメッセージを送信することにより、データベースを照会できます。次に、ゲートウェイはレスポンスをレスポンスチャネルに送信します。次の例に示すように、メッセージペイロードとヘッダーを使用して、クエリとコレクション名を指定できます。

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory;

    @Autowired
    private MongoConverter;


    @Bean
    public IntegrationFlow gatewaySingleQueryFlow() {
        return f -> f
                .handle(queryOutboundGateway())
                .channel(c -> c.queue("retrieveResults"));
    }

    private MongoDbOutboundGatewaySpec queryOutboundGateway() {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
                .query("{name : 'Bob'}")
                .collectionNameFunction(m -> m.getHeaders().get("collection"))
                .expectSingleResult(true)
                .entityClass(Person.class);
    }

}
class MongoDbKotlinApplication {

    fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)

    @Autowired
    lateinit var mongoDbFactory: MongoDatabaseFactory

    @Autowired
    lateinit var mongoConverter: MongoConverter

    @Bean
    fun gatewaySingleQueryFlow() =
    integrationFlow {
        handle(queryOutboundGateway())
        channel { queue("retrieveResults") }
    }

    private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
        return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .query("{name : 'Bob'}")
            .collectionNameFunction<Any> { m -> m.headers["collection"] as String }
            .expectSingleResult(true)
            .entityClass(Person::class.java)
    }

}
@SpringBootApplication
public class MongoDbJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MongoDbJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Autowired
    private MongoDbFactory mongoDbFactory;

    @Bean
    @ServiceActivator(inputChannel = "requestChannel")
    public MessageHandler mongoDbOutboundGateway() {
        MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
        gateway.setCollectionNameExpressionString("'myCollection'");
        gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
        gateway.setExpectSingleResult(true);
        gateway.setEntityClass(Person.class);
        gateway.setOutputChannelName("replyChannel");
        return gateway;
    }

    @Bean
    @ServiceActivator(inputChannel = "replyChannel")
    public MessageHandler handler() {
        return message -> System.out.println(message.getPayload());
    }
}
<int-mongodb:outbound-gateway id="gatewayQuery"
    mongodb-factory="mongoDbFactory"
    mongo-converter="mongoConverter"
    query="{firstName: 'Bob'}"
    collection-name="myCollection"
    request-channel="in"
    reply-channel="out"
    entity-class="org.springframework.integration.mongodb.test.entity$Person"/>

MongoDB 送信ゲートウェイで次の属性を使用できます。

  • collection-name または collection-name-expression: 使用する MongoDB コレクションの名前を識別します。

  • mongo-converter: 生の Java オブジェクトを JSON ドキュメント表現に変換するのを支援する o.s.data.mongodb.core.convert.MongoConverter のインスタンスへの参照。

  • mongodb-factoryo.s.data.mongodb.MongoDbFactory のインスタンスへの参照。

  • mongo-templateo.s.data.mongodb.core.MongoTemplate のインスタンスへの参照。注意: mongo-template と mongodb-factory の両方を設定することはできません。

  • entity-class: MongoTemplate の find(..) および findOne(..) メソッドに渡されるエンティティクラスの完全修飾名。この属性が提供されない場合、デフォルト値は org.bson.Document です。

  • query または query-expression: MongoDB クエリを指定します。その他のクエリサンプルについては、MongoDB ドキュメント (英語) を参照してください。

  • collection-callbackorg.springframework.data.mongodb.core.CollectionCallback のインスタンスへの参照。5.0.11 がリクエストメッセージコンテキストであるため、o.s.i.mongodb.outbound.MessageCollectionCallback のインスタンスが好ましい。詳細については、Javadocs を参照してください。注意: collection-callback とクエリ属性の両方を持つことはできません。

query および query-expression プロパティの代替として、MessageCollectionCallback 関数インターフェース実装への参照として collectionCallback プロパティを使用することにより、他のデータベース操作を指定できます。次の例では、カウント操作を指定しています。

private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
    return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
            .collectionCallback((collection, requestMessage) -> collection.count())
            .collectionName("myCollection");
}

MongoDB Reactive Channel Adapters

バージョン 5.3 以降、ReactiveMongoDbStoringMessageHandler および ReactiveMongoDbMessageSource 実装が提供されています。それらは Spring Data の ReactiveMongoOperations に基づいており、org.mongodb:mongodb-driver-reactivestreams 依存関係が必要です。

ReactiveMongoDbStoringMessageHandler は ReactiveMessageHandler の実装であり、反応フローの構成が統合フローの定義に含まれている場合に、フレームワークでネイティブにサポートされます。詳細については、ReactiveMessageHandler を参照してください。

構成の観点からは、他の多くの標準チャネルアダプターとの違いはありません。たとえば、Java DSL では、このようなチャネルアダプターを次のように使用できます。

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return f -> f
            .channel(MessageChannels.flux())
            .handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}

このサンプルでは、提供された ReactiveMongoDatabaseFactory を介して MongoDb に接続し、リクエストメッセージからのデータを data 名でデフォルトのコレクションに保存します。実際の操作は、内部で作成された ReactiveStreamsConsumer の反応ストリーム構成からオンデマンドで実行されます。

ReactiveMongoDbMessageSource は、提供された ReactiveMongoDatabaseFactory または ReactiveMongoOperations および MongoDb クエリ (または式) に基づく AbstractMessageSource 実装であり、予期される entityClass 型で expectSingleResult オプションに従って find() または findOne() 操作を呼び出し、クエリ結果を変換します。生成されたメッセージのペイロードの Publisher (expectSingleResult オプションに応じて Flux または Mono ) がサブスクライブされると、クエリの実行と結果の評価がオンデマンドで実行されます。スプリッターと FluxMessageChannel がダウンストリームで使用されている場合、フレームワークはそのようなペイロードを自動的にサブスクライブできます (基本的に flatMap)。それ以外の場合、ダウンストリームエンドポイントでポーリングされたパブリッシャーにサブスクライブするのは、ターゲットアプリケーションの責任です。

Java DSL では、このようなチャネルアダプターは次のように構成できます。

@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
    return IntegrationFlow
            .from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
                            .entityClass(Person.class),
                    c -> c.poller(Pollers.fixedDelay(1000)))
            .split()
            .channel(c -> c.flux("output"))
            .get();
}

バージョン 5.5 以降、ReactiveMongoDbMessageSource は updateExpression で構成できます。ブロッキング MongoDbMessageSource と同じ機能があります。詳細については、MongoDB 受信チャネルアダプターおよび AbstractMongoDbMessageSourceSpec JavaDocs を参照してください。