R2DBC サポート
Spring Integration は、R2DBC (英語) ドライバーを介したデータベースへのリアクティブアクセスを使用してメッセージを送受信するためのチャネルアダプターを提供します。
この依存関係をプロジェクトに含める必要があります。
R2DBC 受信チャネルアダプター
R2dbcMessageSource
は、R2dbcEntityOperations
に基づくポーリング可能な MessageSource
実装であり、expectSingleResult
オプションに従ってデータベースからフェッチされたデータのペイロードとして Flux
または Mono
を使用してメッセージを生成します。SELECT
へのクエリは、静的に提供することも、receive()
呼び出しごとに評価される SpEL 式に基づいて行うこともできます。R2dbcMessageSource.SelectCreator
は、評価コンテキストのルートオブジェクトとして存在し、StatementMapper.SelectSpec
流れるような API を使用できるようにします。デフォルトでは、このチャネルアダプターは、select からのレコードを LinkedCaseInsensitiveMap
インスタンスにマップします。this.r2dbcEntityOperations.getConverter()
に基づいて EntityRowMapper
によって下で使用される payloadType
オプションを提供してカスタマイズできます。updateSql
はオプションであり、データベース内の読み取りレコードをマークして、後続のポーリングからスキップするために使用されます。UPDATE
操作を BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>
と共に指定して、SELECT
結果のレコードに基づいて値を UPDATE
にバインドできます。
このチャネルアダプターの一般的な構成は、次のようになります。
@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
"SELECT * FROM person WHERE name='Name'");
r2dbcMessageSource.setPayloadType(Person.class);
r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
r2dbcMessageSource.setBindFunction(
(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
return r2dbcMessageSource;
}
Java DSL の場合、このチャネルアダプターの構成は次のようになります。
@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlow
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
(selectCreator) ->
selectCreator.createSelect("person")
.withProjection("*")
.withCriteria(Criteria.where("id").is(1)))
.expectSingleResult(true)
.payloadType(Person.class)
.updateSql("UPDATE Person SET id='2' where id = :id")
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
bindSpec.bind("id", o.getId())),
e -> e.poller(p -> p.fixedDelay(100)))
.handle((p, h) -> p)
.channel(MessageChannels.flux())
.get();
}
R2DBC 送信チャネルアダプター
R2dbcMessageHandler
は、提供された R2dbcEntityOperations
を使用してデータベースで INSERT
(デフォルト)、UPDATE
、DELETE
クエリを実行するための ReactiveMessageHandler
実装です。R2dbcMessageHandler.Type
は、静的に構成することも、リクエストメッセージに対する SpEL 式を介して構成することもできます。実行するクエリは、tableName
、values
、criteria
式オプションに基づくか、(tableName
が提供されていない場合)メッセージペイロード全体が SQL を実行する org.springframework.data.relational.core.mapping.Table
エンティティとして扱われます。パッケージ org.springframework.data.relational.core.query
は、UPDATE
および DELETE
クエリに使用される Criteria
流れるような API に直接アクセスするための SpEL 評価コンテキストへのインポートとして登録されます。valuesExpression
は INSERT
および UPDATE
で使用され、リクエストメッセージに対してターゲット表の変更を実行するには、列と値のペアについて Map
に対して評価する必要があります。
このチャネルアダプターの一般的な構成は、次のようになります。
@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
messageHandler.setCriteriaExpression(
EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
return messageHandler;
}
Java DSL の場合、このチャネルアダプターの構成は次のようになります。
.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
.values("{age:36}"))