最新の安定バージョンについては、Spring Integration 6.5.3 を使用してください! |
R2DBC サポート
Spring Integration は、R2DBC (英語) ドライバーを介したデータベースへのリアクティブアクセスを使用してメッセージを送受信するためのチャネルアダプターを提供します。
この依存関係をプロジェクトに含める必要があります。
R2DBC 受信チャネルアダプター
R2dbcMessageSource は、R2dbcEntityOperations に基づくポーリング可能な MessageSource 実装であり、expectSingleResult オプションに従ってデータベースからフェッチされたデータのペイロードとして Flux または Mono を使用してメッセージを生成します。SELECT へのクエリは、静的に提供することも、receive() 呼び出しごとに評価される SpEL 式に基づいて行うこともできます。R2dbcMessageSource.SelectCreator は、評価コンテキストのルートオブジェクトとして存在し、StatementMapper.SelectSpec 流れるような API を使用できるようにします。デフォルトでは、このチャネルアダプターは、select からのレコードを LinkedCaseInsensitiveMap インスタンスにマップします。this.r2dbcEntityOperations.getConverter() に基づいて EntityRowMapper によって下で使用される payloadType オプションを提供してカスタマイズできます。updateSql はオプションであり、データベース内の読み取りレコードをマークして、後続のポーリングからスキップするために使用されます。UPDATE 操作を BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec> と共に指定して、SELECT 結果のレコードに基づいて値を UPDATE にバインドできます。
このチャネルアダプターの一般的な構成は、次のようになります。
@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
"SELECT * FROM person WHERE name='Name'");
r2dbcMessageSource.setPayloadType(Person.class);
r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
r2dbcMessageSource.setBindFunction(
(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
return r2dbcMessageSource;
}Java DSL の場合、このチャネルアダプターの構成は次のようになります。
@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
return IntegrationFlow
.from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
(selectCreator) ->
selectCreator.createSelect("person")
.withProjection("*")
.withCriteria(Criteria.where("id").is(1)))
.expectSingleResult(true)
.payloadType(Person.class)
.updateSql("UPDATE Person SET id='2' where id = :id")
.bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
bindSpec.bind("id", o.getId())),
e -> e.poller(p -> p.fixedDelay(100)))
.handle((p, h) -> p)
.channel(MessageChannels.flux())
.get();
}R2DBC 送信チャネルアダプター
R2dbcMessageHandler は、提供された R2dbcEntityOperations を使用してデータベースで INSERT (デフォルト)、UPDATE、DELETE クエリを実行するための ReactiveMessageHandler 実装です。R2dbcMessageHandler.Type は、静的に構成することも、リクエストメッセージに対する SpEL 式を介して構成することもできます。実行するクエリは、tableName、values、criteria 式オプションに基づくか、(tableName が提供されていない場合)メッセージペイロード全体が SQL を実行する org.springframework.data.relational.core.mapping.Table エンティティとして扱われます。パッケージ org.springframework.data.relational.core.query は、UPDATE および DELETE クエリに使用される Criteria 流れるような API に直接アクセスするための SpEL 評価コンテキストへのインポートとして登録されます。valuesExpression は INSERT および UPDATE で使用され、リクエストメッセージに対してターゲット表の変更を実行するには、列と値のペアについて Map に対して評価する必要があります。
このチャネルアダプターの一般的な構成は、次のようになります。
@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
messageHandler.setCriteriaExpression(
EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
return messageHandler;
}Java DSL の場合、このチャネルアダプターの構成は次のようになります。
.handleReactive(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
.queryType(R2dbcMessageHandler.Type.UPDATE)
.tableNameExpression("payload.class.simpleName")
.criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
.values("{age:36}"))