MongoDb サポート
バージョン 2.1 は MongoDB (英語) のサポートを導入しました: 「高性能、オープンソース、ドキュメント指向データベース」。
この依存関係をプロジェクトに含める必要があります。
MongoDB をダウンロード、インストール、実行するには、MongoDB ドキュメント (英語) を参照してください。
MongoDb への接続
ブロッキングまたは反応?
5.3 バージョン以降、Spring Integration は、MongoDB へのアクセス時にノンブロッキング I/O を有効にするために、リアクティブ MongoDB ドライバーをサポートしています。リアクティブサポートを有効にするには、MongoDB リアクティブストリームドライバーを依存関係に追加します。
Maven
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-reactivestreams</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-reactivestreams"
通常の同期クライアントの場合、依存関係にそれぞれのドライバーを追加する必要があります。
Maven
Gradle
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
</dependency>
compile "org.mongodb:mongodb-driver-sync"
どちらも optional
であり、エンドユーザーの選択サポートを強化するためのフレームワークです。
MongoDB との対話を開始するには、まずそれに接続する必要があります。Spring Integration は、別の Spring プロジェクト Spring Data MongoDB によって提供されるサポートに基づいて構築されています。これは、MongoDatabaseFactory
および ReactiveMongoDatabaseFactory
と呼ばれるファクトリクラスを提供し、MongoDB クライアント API との統合を簡素化します。
Spring Data はデフォルトでブロッキング MongoDB ドライバーを提供しますが、上記の依存関係を含めることでリアクティブな使用を選択できます。 |
MongoDatabaseFactory
を使用する
MongoDB に接続するには、MongoDatabaseFactory
インターフェースの実装を使用できます。
次の例は、SimpleMongoClientDatabaseFactory
の使用方法を示しています。
Java
XML
MongoDatabaseFactory mongoDbFactory =
new SimpleMongoClientDatabaseFactory(com.mongodb.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleMongoClientDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
SimpleMongoClientDatabaseFactory
は 2 つの引数を取ります: MongoClient
インスタンスとデータベースの名前を指定する String
。host
、port
などのプロパティを構成する必要がある場合は、基になる MongoClients
クラスによって提供されるコンストラクターの 1 つを使用して渡すことができます。MongoDB の構成方法の詳細については、Spring-Data-MongoDB リファレンスを参照してください。
ReactiveMongoDatabaseFactory
を使用する
リアクティブドライバーを使用して MongoDB に接続するには、ReactiveMongoDatabaseFactory
インターフェースの実装を使用できます。
次の例は、SimpleReactiveMongoDatabaseFactory
の使用方法を示しています。
Java
XML
ReactiveMongoDatabaseFactory mongoDbFactory =
new SimpleReactiveMongoDatabaseFactory(com.mongodb.reactivestreams.client.MongoClients.create(), "test");
<bean id="mongoDbFactory" class="o.s.data.mongodb.core.SimpleReactiveMongoDatabaseFactory">
<constructor-arg>
<bean class="com.mongodb.reactivestreams.client.MongoClients" factory-method="create"/>
</constructor-arg>
<constructor-arg value="test"/>
</bean>
MongoDB メッセージストア
エンタープライズ統合パターン(EIP)ブックに従って、メッセージストア (英語) を使用するとメッセージを保持できます。信頼性が懸念される場合、メッセージをバッファリングする機能(QueueChannel
、aggregator
、resequencer
など)を備えたコンポーネントを扱う場合、これは便利です。Spring Integration では、MessageStore
戦略はクレームチェック (英語) パターンの基盤も提供します。これは EIP でも説明されています。
Spring Integration の MongoDB モジュールは、MongoDbMessageStore
を提供します。これは、MessageStore
戦略(主にクレームチェックパターンで使用)と MessageGroupStore
戦略(主にアグリゲーターパターンおよびリシーケンサパターンで使用)の両方の実装です。
次の例では、MongoDbMessageStore
が QueueChannel
と aggregator
を使用するように構成しています。
<bean id="mongoDbMessageStore" class="o.s.i.mongodb.store.MongoDbMessageStore">
<constructor-arg ref="mongoDbFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="mongoDbMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="mongoDbMessageStore"/>
上記の例は単純な Bean 構成であり、コンストラクター引数として MongoDbFactory
を想定しています。
MongoDbMessageStore
は、Spring Data Mongo マッピングメカニズムを使用して、ネストされたすべてのプロパティを含む Mongo ドキュメントとして Message
を拡張します。監査または分析のために payload
または headers
にアクセスする必要がある場合(たとえば、保存されているメッセージに対して)に役立ちます。
MongoDbMessageStore は、カスタム MappingMongoConverter 実装を使用して Message インスタンスを MongoDB ドキュメントとして格納します。Message のプロパティ(payload および header 値)にはいくつかの制限があります。 |
バージョン 5.1.6 以降、MongoDbMessageStore
は、内部 MappingMongoConverter
実装に伝搬されるカスタムコンバーターで構成できます。詳細については、MongoDbMessageStore.setCustomConverters(Object… customConverters)
JavaDocs を参照してください。
Spring Integration 3.0 は ConfigurableMongoDbMessageStore
を導入しました。MessageStore
と MessageGroupStore
の両方のインターフェースを実装しています。このクラスは、コンストラクター引数として MongoTemplate
を受け取ることができます。これを使用して、たとえば、カスタム WriteConcern
を構成できます。別のコンストラクターには MappingMongoConverter
と MongoDbFactory
が必要です。これにより、Message
インスタンスとそのプロパティにカスタム変換を提供できます。デフォルトでは、ConfigurableMongoDbMessageStore
は標準の Java シリアライゼーションを使用して、Message
インスタンスを MongoDB に読み書きし(MongoDbMessageBytesConverter
を参照)、MongoTemplate
の他のプロパティのデフォルト値に依存することに注意してください。提供されている MongoDbFactory
および MappingMongoConverter
から MongoTemplate
を構築します。ConfigurableMongoDbMessageStore
によって格納されるコレクションのデフォルト名は configurableStoreMessages
です。メッセージに複雑なデータ型が含まれている場合、この実装を使用して堅牢で柔軟なソリューションを作成することをお勧めします。
バージョン 6.0.8 以降、AbstractConfigurableMongoDbMessageStore
には、自動インデックス作成を無効にするために使用できる setCreateIndexes(boolean)
(デフォルトは true
) オプションが用意されています。次の例は、Bean を宣言し、自動インデックス作成を無効にする方法を示しています。
@Bean
public MongoDbChannelMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory) {
MongoDbChannelMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndexes(false);
return mongoDbChannelMessageStore;
}
MongoDB チャネルメッセージストア
バージョン 4.0 は、新しい MongoDbChannelMessageStore
を導入しました。QueueChannel
インスタンスで使用するために最適化された MessageGroupStore
です。priorityEnabled = true
を使用すると、<int:priority-queue>
インスタンスでそれを使用して、持続メッセージの優先順位ポーリングを実現できます。優先度 MongoDB ドキュメントフィールドは、IntegrationMessageHeaderAccessor.PRIORITY
(priority
)メッセージヘッダーから入力されます。
さらに、すべての MongoDB MessageStore
インスタンスには、MessageGroup
ドキュメント用の sequence
フィールドがあります。sequence
値は、同じコレクションからの単純な sequence
ドキュメントに対する $inc
操作の結果であり、要求に応じて作成されます。sequence
フィールドは poll
操作で使用され、メッセージが同じミリ秒以内に保存される場合、先入れ先出し(FIFO)メッセージの順序(設定されている場合は優先順位内)を提供します。
priorityEnabled オプションはストア全体に適用されるため、優先度と非優先度に同じ MongoDbChannelMessageStore Bean を使用することはお勧めしません。ただし、ストアからのメッセージポーリングはソートされ、インデックスを使用するため、同じ collection を両方の MongoDbChannelMessageStore 型に使用できます。そのシナリオを構成するには、次の例に示すように、一方のメッセージストア Bean を他方から拡張できます。 |
<bean id="channelStore" class="o.s.i.mongodb.store.MongoDbChannelMessageStore">
<constructor-arg name="mongoDbFactory" ref="mongoDbFactory"/>
</bean>
<int:channel id="queueChannel">
<int:queue message-store="store"/>
</int:channel>
<bean id="priorityStore" parent="channelStore">
<property name="priorityEnabled" value="true"/>
</bean>
<int:channel id="priorityChannel">
<int:priority-queue message-store="priorityStore"/>
</int:channel>
自動インデックス作成を無効にして AbstractConfigurableMongoDbMessageStore を使用する
バージョン 6.0.8 以降、AbstractConfigurableMongoDbMessageStore
は、自動インデックス作成を無効または有効 (デフォルト) にするために使用できる setCreateIndex(boolean)
を実装します。次の例は、Bean を宣言し、自動インデックス作成を無効にする方法を示しています。
@Bean
public AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore(MongoDatabaseFactory databaseFactory)
{
AbstractConfigurableMongoDbMessageStore mongoDbChannelMessageStore = new MongoDbChannelMessageStore(databaseFactory);
mongoDbChannelMessageStore.setCreateIndex(false);
return mongoDbChannelMessageStore;
}
MongoDB メタデータストア
Spring Integration 4.2 は、MongoDB ベースの新しい MetadataStore
(メタデータストアを参照)実装を導入しました。MongoDbMetadataStore
を使用して、アプリケーションの再起動後もメタデータの状態を維持できます。この新しい MetadataStore
実装は、次のようなアダプターで使用できます。
これらのアダプターに新しい MongoDbMetadataStore
を使用するように指示するには、Bean 名が metadataStore
の Spring Bean を宣言します。フィード受信チャネルアダプターは、宣言された MongoDbMetadataStore
を自動的に取得して使用します。次の例は、metadataStore
という名前の Bean を宣言する方法を示しています。
@Bean
public MetadataStore metadataStore(MongoDbFactory factory) {
return new MongoDbMetadataStore(factory, "integrationMetadataStore");
}
MongoDbMetadataStore
は ConcurrentMetadataStore
も実装しており、キーの値を保存または変更できるインスタンスは 1 つだけであるため、複数のアプリケーションインスタンス間で確実に共有できます。MongoDB の保証により、これらの操作はすべてアトミックです。
MongoDB 受信チャネルアダプター
MongoDB 受信チャネルアダプターは、MongoDB からデータを読み取り、Message
ペイロードとして送信するポーリングコンシューマーです。次の例は、MongoDB 受信チャネルアダプターを構成する方法を示しています。
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query="{'name' : 'Bob'}"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="100"/>
</int-mongodb:inbound-channel-adapter>
上記の構成が示すように、inbound-channel-adapter
要素を使用し、次のようなさまざまな属性の値を提供することにより、MongoDb 受信チャネルアダプターを構成します。
query
: JSON クエリ (MongoDB クエリ (英語) を参照してください)query-expression
: JSON クエリ文字列(上記のquery
属性として)またはo.s.data.mongodb.core.query.Query
のインスタンスに評価される SpEL 式。query
属性と相互に排他的。entity-class
: ペイロードオブジェクトの型。指定しない場合、com.mongodb.DBObject
が返されます。collection-name
またはcollection-name-expression
: 使用する MongoDB コレクションの名前を識別します。mongodb-factory
:o.s.data.mongodb.MongoDbFactory
のインスタンスへの参照mongo-template
:o.s.data.mongodb.core.MongoTemplate
のインスタンスへの参照他のすべての受信アダプターに共通するその他の属性 (「チャネル」など)。
mongo-template と mongodb-factory の両方を設定することはできません。 |
上記の例は、query
のリテラル値を持ち、collection
のデフォルト名を使用するため、比較的単純で静的です。条件によっては、実行時にこれらの値を変更する必要がある場合があります。そのためには、-expression
の同等物(query-expression
および collection-name-expression
)を使用します。ここで、提供された式は有効な SpEL 式です。
また、MongoDB から読み取られ、正常に処理されたデータに対して後処理を行うこともできます。たとえば ; 処理後にドキュメントを移動または削除することができます。これを行うには、次の例に示すように、追加されたトランザクション同期機能 Spring Integration 2.2 を使用します。
<int-mongodb:inbound-channel-adapter id="mongoInboundAdapter"
channel="replyChannel"
query-expression="new BasicQuery('{''name'' : ''Bob''}').limit(100)"
entity-class="java.lang.Object"
auto-startup="false">
<int:poller fixed-rate="200" max-messages-per-poll="1">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-mongodb:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit
expression="@documentCleaner.remove(#mongoTemplate, payload, headers.mongo_collectionName)"
channel="someChannel"/>
</int:transaction-synchronization-factory>
<bean id="documentCleaner" class="thing1.thing2.DocumentCleaner"/>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
次の例は、前述の例で参照されている DocumentCleaner
を示しています。
public class DocumentCleaner {
public void remove(MongoOperations mongoOperations, Object target, String collectionName) {
if (target instanceof List<?> documents){
for (Object document : documents) {
mongoOperations.remove(new BasicQuery(JSON.serialize(document)), collectionName);
}
}
}
}
transactional
要素を使用して、ポーラーをトランザクション対応として宣言できます。この要素は、実際のトランザクションマネージャーを参照できます(たとえば、フローの他の部分が JDBC を呼び出す場合)。「実際の」トランザクションがない場合は、o.s.i.transaction.PseudoTransactionManager
のインスタンスを使用できます。これは、Spring の PlatformTransactionManager
の実装であり、実際のトランザクションがないときに Mongo アダプターのトランザクション同期機能を使用できるようにします。
これを行っても、MongoDB 自体はトランザクションになりません。成功(コミット)または失敗(ロールバック)の前後にアクションの同期を取ることができます。 |
ポーラーがトランザクション対応になると、o.s.i.transaction.TransactionSynchronizationFactory
のインスタンスを transactional
要素に設定できます。TransactionSynchronizationFactory
は TransactionSynchronization
のインスタンスを作成します。便宜上、デフォルトの SpEL ベースの TransactionSynchronizationFactory
を公開しました。これにより、SpEL 式を構成し、その実行をトランザクションと調整(同期)できます。コミット前、コミット後、ロールバック後のイベントの式と、評価結果(存在する場合)が送信される各イベントのチャネルがサポートされています。子要素ごとに、expression
および channel
属性を指定できます。channel
属性のみが存在する場合、受信したメッセージは特定の同期シナリオの一部としてそこに送信されます。expression
属性のみが存在し、式の結果が null 以外の値である場合、ペイロードとしての結果を含むメッセージが生成され、デフォルトチャネル(NullChannel
)に送信され、ログに表示されます(DEBUG
レベル)。評価結果を特定のチャネルに移動する場合は、channel
属性を追加します。式の結果が null または void の場合、メッセージは生成されません。
トランザクション同期の詳細については、トランザクションの同期を参照してください。
バージョン 5.5 以降、MongoDbMessageSource
は updateExpression
で構成できます。これは、MongoDb update
構文を使用する String
または org.springframework.data.mongodb.core.query.Update
インスタンスに評価される必要があります。上記の後処理手順の代替として使用でき、コレクションからフェッチされたエンティティを変更するため、次のポーリングサイクルで再びコレクションからプルされることはありません (更新によって何らかの値が変更されると仮定します)。クエリで使用されます)。同じコレクションの MongoDbMessageSource
の複数のインスタンスがクラスターで使用されている場合は、実行の分離とデータの一貫性を実現するためにトランザクションを使用することをお勧めします。
MongoDB 変更ストリーム受信チャネルアダプター
バージョン 5.3 以降、spring-integration-mongodb
モジュールは MongoDbChangeStreamMessageProducer
(Spring Data ReactiveMongoOperations.changeStream(String, ChangeStreamOptions, Class)
API のリアクティブ MessageProducerSupport
実装)を導入します。このコンポーネントは、デフォルトでペイロードとして ChangeStreamEvent
の body
を含むメッセージの Flux
と、いくつかの変更ストリーム関連ヘッダーを生成します(MongoHeaders
を参照)。この MongoDbChangeStreamMessageProducer
を FluxMessageChannel
と組み合わせて、オンデマンドサブスクリプションとダウンストリームでのイベント消費のための outputChannel
として使用することをお勧めします。
このチャネルアダプターの Java DSL 構成は次のようになります。
@Bean
IntegrationFlow changeStreamFlow(ReactiveMongoOperations mongoTemplate) {
return IntegrationFlow.from(
MongoDb.changeStreamInboundChannelAdapter(mongoTemplate)
.domainType(Person.class)
.collection("person")
.extractBody(false))
.channel(MessageChannels.flux())
.get();
}
MongoDbChangeStreamMessageProducer
が停止するか、サブスクリプションがダウンストリームでキャンセルされるか、MongoDb 変更ストリームが OperationType.INVALIDATE
を生成すると、Publisher
が完了します。チャネルアダプターを再度起動すると、ソースデータの新しい Publisher
が作成され、MessageProducerSupport.subscribeToPublisher(Publisher<? extends Message<?>>)
で自動的にサブスクライブされます。他の場所からの変更ストリームイベントを消費する必要がある場合は、このチャネルアダプターを再起動して新しいオプションを開始することができます。
変更ストリームサポートの詳細については、Spring Data MongoDb のドキュメントを参照してください。
MongoDB 送信チャネルアダプター
MongoDB 送信チャネルアダプターを使用すると、次の例に示すように、メッセージペイロードを MongoDB ドキュメントストアに書き込むことができます。
<int-mongodb:outbound-channel-adapter id="fullConfigWithCollectionExpression"
collection-name="myCollection"
mongo-converter="mongoConverter"
mongodb-factory="mongoDbFactory" />
上記の構成が示すように、outbound-channel-adapter
要素を使用して MongoDB 送信チャネルアダプターを構成し、次のようなさまざまな属性の値を提供できます。
collection-name
またはcollection-name-expression
: 使用する MongoDb コレクションの名前を識別します。mongo-converter
: 生の Java オブジェクトを JSON ドキュメント表現に変換するのを支援するo.s.data.mongodb.core.convert.MongoConverter
のインスタンスへの参照。mongodb-factory
:o.s.data.mongodb.MongoDbFactory
のインスタンスへの参照。mongo-template
:o.s.data.mongodb.core.MongoTemplate
のインスタンスへの参照。注意: mongo-template と mongodb-factory の両方を設定することはできません。すべての受信 アダプターに共通するその他の属性 (「チャネル」など)。
前の例は、collection-name
のリテラル値を持っているため、比較的単純で静的です。状況によっては、実行時にこの値を変更する必要がある場合があります。これを行うには、collection-name-expression
を使用します。指定された式は有効な SpEL 式です。
MongoDB 送信ゲートウェイ
バージョン 5.0 は、MongoDB 送信ゲートウェイを導入しました。リクエストチャネルにメッセージを送信することにより、データベースを照会できます。次に、ゲートウェイはレスポンスをレスポンスチャネルに送信します。次の例に示すように、メッセージペイロードとヘッダーを使用して、クエリとコレクション名を指定できます。
Java DSL
Kotlin DSL
Java
XML
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory;
@Autowired
private MongoConverter;
@Bean
public IntegrationFlow gatewaySingleQueryFlow() {
return f -> f
.handle(queryOutboundGateway())
.channel(c -> c.queue("retrieveResults"));
}
private MongoDbOutboundGatewaySpec queryOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction(m -> m.getHeaders().get("collection"))
.expectSingleResult(true)
.entityClass(Person.class);
}
}
class MongoDbKotlinApplication {
fun main(args: Array<String>) = runApplication<MongoDbKotlinApplication>(*args)
@Autowired
lateinit var mongoDbFactory: MongoDatabaseFactory
@Autowired
lateinit var mongoConverter: MongoConverter
@Bean
fun gatewaySingleQueryFlow() =
integrationFlow {
handle(queryOutboundGateway())
channel { queue("retrieveResults") }
}
private fun queryOutboundGateway(): MongoDbOutboundGatewaySpec {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.query("{name : 'Bob'}")
.collectionNameFunction<Any> { m -> m.headers["collection"] as String }
.expectSingleResult(true)
.entityClass(Person::class.java)
}
}
@SpringBootApplication
public class MongoDbJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MongoDbJavaApplication.class)
.web(false)
.run(args);
}
@Autowired
private MongoDbFactory mongoDbFactory;
@Bean
@ServiceActivator(inputChannel = "requestChannel")
public MessageHandler mongoDbOutboundGateway() {
MongoDbOutboundGateway gateway = new MongoDbOutboundGateway(this.mongoDbFactory);
gateway.setCollectionNameExpressionString("'myCollection'");
gateway.setQueryExpressionString("'{''name'' : ''Bob''}'");
gateway.setExpectSingleResult(true);
gateway.setEntityClass(Person.class);
gateway.setOutputChannelName("replyChannel");
return gateway;
}
@Bean
@ServiceActivator(inputChannel = "replyChannel")
public MessageHandler handler() {
return message -> System.out.println(message.getPayload());
}
}
<int-mongodb:outbound-gateway id="gatewayQuery"
mongodb-factory="mongoDbFactory"
mongo-converter="mongoConverter"
query="{firstName: 'Bob'}"
collection-name="myCollection"
request-channel="in"
reply-channel="out"
entity-class="org.springframework.integration.mongodb.test.entity$Person"/>
MongoDB 送信ゲートウェイで次の属性を使用できます。
collection-name
またはcollection-name-expression
: 使用する MongoDB コレクションの名前を識別します。mongo-converter
: 生の Java オブジェクトを JSON ドキュメント表現に変換するのを支援するo.s.data.mongodb.core.convert.MongoConverter
のインスタンスへの参照。mongodb-factory
:o.s.data.mongodb.MongoDbFactory
のインスタンスへの参照。mongo-template
:o.s.data.mongodb.core.MongoTemplate
のインスタンスへの参照。注意:mongo-template
とmongodb-factory
の両方を設定することはできません。entity-class
: MongoTemplate のfind(..)
およびfindOne(..)
メソッドに渡されるエンティティクラスの完全修飾名。この属性が提供されない場合、デフォルト値はorg.bson.Document
です。query
またはquery-expression
: MongoDB クエリを指定します。その他のクエリサンプルについては、MongoDB ドキュメント (英語) を参照してください。collection-callback
:org.springframework.data.mongodb.core.CollectionCallback
のインスタンスへの参照。5.0.11 がリクエストメッセージコンテキストであるため、o.s.i.mongodb.outbound.MessageCollectionCallback
のインスタンスが好ましい。詳細については、Javadocs を参照してください。注意:collection-callback
とクエリ属性の両方を持つことはできません。
query
および query-expression
プロパティの代替として、MessageCollectionCallback
関数インターフェース実装への参照として collectionCallback
プロパティを使用することにより、他のデータベース操作を指定できます。次の例では、カウント操作を指定しています。
private MongoDbOutboundGatewaySpec collectionCallbackOutboundGateway() {
return MongoDb.outboundGateway(this.mongoDbFactory, this.mongoConverter)
.collectionCallback((collection, requestMessage) -> collection.count())
.collectionName("myCollection");
}
MongoDB Reactive Channel Adapters
バージョン 5.3 以降、ReactiveMongoDbStoringMessageHandler
および ReactiveMongoDbMessageSource
実装が提供されています。それらは Spring Data の ReactiveMongoOperations
に基づいており、org.mongodb:mongodb-driver-reactivestreams
依存関係が必要です。
ReactiveMongoDbStoringMessageHandler
は ReactiveMessageHandler
の実装であり、反応フローの構成が統合フローの定義に含まれている場合に、フレームワークでネイティブにサポートされます。詳細については、ReactiveMessageHandler を参照してください。
構成の観点からは、他の多くの標準チャネルアダプターとの違いはありません。たとえば、Java DSL では、このようなチャネルアダプターを次のように使用できます。
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return f -> f
.channel(MessageChannels.flux())
.handle(MongoDb.reactiveOutboundChannelAdapter(mongoDbFactory));
}
このサンプルでは、提供された ReactiveMongoDatabaseFactory
を介して MongoDb に接続し、リクエストメッセージからのデータを data
名でデフォルトのコレクションに保存します。実際の操作は、内部で作成された ReactiveStreamsConsumer
の反応ストリーム構成からオンデマンドで実行されます。
ReactiveMongoDbMessageSource
は、提供された ReactiveMongoDatabaseFactory
または ReactiveMongoOperations
および MongoDb クエリ (または式) に基づく AbstractMessageSource
実装であり、予期される entityClass
型で expectSingleResult
オプションに従って find()
または findOne()
操作を呼び出し、クエリ結果を変換します。生成されたメッセージのペイロードの Publisher
(expectSingleResult
オプションに応じて Flux
または Mono
) がサブスクライブされると、クエリの実行と結果の評価がオンデマンドで実行されます。スプリッターと FluxMessageChannel
がダウンストリームで使用されている場合、フレームワークはそのようなペイロードを自動的にサブスクライブできます (基本的に flatMap
)。それ以外の場合、ダウンストリームエンドポイントでポーリングされたパブリッシャーにサブスクライブするのは、ターゲットアプリケーションの責任です。
Java DSL では、このようなチャネルアダプターは次のように構成できます。
@Bean
public IntegrationFlow reactiveMongoDbFlow(ReactiveMongoDatabaseFactory mongoDbFactory) {
return IntegrationFlow
.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory, "{'name' : 'Name'}")
.entityClass(Person.class),
c -> c.poller(Pollers.fixedDelay(1000)))
.split()
.channel(c -> c.flux("output"))
.get();
}
バージョン 5.5 以降、ReactiveMongoDbMessageSource
は updateExpression
で構成できます。ブロッキング MongoDbMessageSource
と同じ機能があります。詳細については、MongoDB 受信チャネルアダプターおよび AbstractMongoDbMessageSourceSpec
JavaDocs を参照してください。