R2DBC サポート

Spring Integration は、R2DBC (英語) ドライバーを介したデータベースへのリアクティブアクセスを使用してメッセージを送受信するためのチャネルアダプターを提供します。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>5.5.8</version>
</dependency>
compile "org.springframework.integration:spring-integration-r2dbc:5.5.8"

R2DBC 受信チャネルアダプター

R2dbcMessageSource は、R2dbcEntityOperations に基づくポーリング可能な MessageSource 実装であり、expectSingleResult オプションに従ってデータベースからフェッチされたデータのペイロードとして Flux または Mono を含むメッセージを生成します。SELECT へのクエリは、静的に提供することも、すべての receive() 呼び出しで評価される SpEL 式に基づいて提供することもできます。R2dbcMessageSource.SelectCreator は、StatementMapper.SelectSpec 流れるような API を使用できるようにするための評価コンテキストのルートオブジェクトとして存在します。デフォルトでは、このチャネルアダプターは select からのレコードを LinkedCaseInsensitiveMap インスタンスにマップします。this.r2dbcEntityOperations.getConverter() に基づいて EntityRowMapper によって下で使用される payloadType オプションを提供してカスタマイズできます。updateSql はオプションであり、後続のポーリングからスキップするためにデータベース内の読み取りレコードをマークするために使用されます。UPDATE 操作には BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec> を指定して、SELECT 結果のレコードに基づいて値を UPDATE にバインドできます。

このチャネルアダプターの一般的な構成は、次のようになります。

@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}

Java DSL の場合、このチャネルアダプターの構成は次のようになります。

@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlows
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .<Mono<?>>handle((p, h) -> p, e -> e.async(true))
        .channel(MessageChannels.flux())
        .get();
}

R2DBC 送信チャネルアダプター

R2dbcMessageHandler は、提供された R2dbcEntityOperations を使用してデータベースで INSERT (デフォルト)、UPDATEDELETE クエリを実行するための ReactiveMessageHandler 実装です。R2dbcMessageHandler.Type は、静的に構成することも、リクエストメッセージに対する SpEL 式を介して構成することもできます。実行するクエリは、tableNamevaluescriteria 式オプションに基づくか、(tableName が提供されていない場合)メッセージペイロード全体が SQL を実行する org.springframework.data.relational.core.mapping.Table エンティティとして扱われます。パッケージ org.springframework.data.relational.core.query は、UPDATE および DELETE クエリに使用される Criteria 流れるような API に直接アクセスするための SpEL 評価コンテキストへのインポートとして登録されます。valuesExpression は INSERT および UPDATE で使用され、リクエストメッセージに対してターゲット表の変更を実行するには、列と値のペアについて Map に対して評価する必要があります。

このチャネルアダプターの一般的な構成は、次のようになります。

@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}

Java DSL の場合、このチャネルアダプターの構成は次のようになります。

.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))