WebFlux サポート

WebFlux Spring Integration モジュール(spring-integration-webflux)は、HTTP リクエストの実行と、受信 HTTP リクエストのリアクティブ処理を可能にします。

この依存関係をプロジェクトに含める必要があります。

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-webflux</artifactId>
    <version>5.5.8</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-webflux:5.5.8"

サーブレットベース以外のサーバー構成の場合は、io.projectreactor.netty:reactor-netty 依存関係を含める必要があります。

WebFlux サポートは、次のゲートウェイ実装で構成されています: WebFluxInboundEndpoint および WebFluxRequestExecutingMessageHandler。このサポートは、Spring WebFlux およびプロジェクト Reactor (英語) の基盤に完全に基づいています。多くのオプションがリアクティブ HTTP コンポーネントと通常の HTTP コンポーネント間で共有されるため、詳細については HTTP サポートを参照してください。

WebFlux 名前空間のサポート

Spring Integration は、webflux 名前空間と対応するスキーマ定義を提供します。構成に含めるには、アプリケーションコンテキスト構成ファイルに次の名前空間宣言を追加します。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/webflux
    https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
    ...
</beans>

WebFlux 受信コンポーネント

バージョン 5.0 から、WebHandler の WebFluxInboundEndpoint 実装が提供されます。このコンポーネントは MVC ベースの HttpRequestHandlingEndpointSupport に似ており、新しく抽出された BaseHttpInboundEndpoint を通じていくつかの共通オプションを共有します。Spring WebFlux リアクティブ環境(MVC の代わり)で使用されます。次の例は、WebFlux エンドポイントの単純な実装を示しています。

Java DSL
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
    return IntegrationFlows
        .from(WebFlux.inboundChannelAdapter("/reactivePost")
            .requestMapping(m -> m.methods(HttpMethod.POST))
            .requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
            .statusCodeFunction(m -> HttpStatus.ACCEPTED))
        .channel(c -> c.queue("storeChannel"))
        .get();
}
Kotlin DSL
@Bean
fun inboundChannelAdapterFlow() =
    integrationFlow(
        WebFlux.inboundChannelAdapter("/reactivePost")
            .apply {
                requestMapping { m -> m.methods(HttpMethod.POST) }
                requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
                statusCodeFunction { m -> HttpStatus.ACCEPTED }
            })
    {
        channel { queue("storeChannel") }
    }
Java
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {

    @Bean
    public WebFluxInboundEndpoint simpleInboundEndpoint() {
        WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
        RequestMapping requestMapping = new RequestMapping();
        requestMapping.setPathPatterns("/test");
        endpoint.setRequestMapping(requestMapping);
        endpoint.setRequestChannelName("serviceChannel");
        return endpoint;
    }

    @ServiceActivator(inputChannel = "serviceChannel")
    String service() {
        return "It works!";
    }

}
XML
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
    <int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>

この構成は、@EnableWebFlux を使用して WebFlux インフラストラクチャを統合アプリケーションに追加することを除いて、HttpRequestHandlingEndpointSupport (例の前に記載)と似ています。また、WebFluxInboundEndpoint は、リアクティブ HTTP サーバー実装によって提供されるバックプレッシャー、オンデマンドベースの機能を使用して、ダウンストリームフローに対して sendAndReceive 操作を実行します。

応答部分もノンブロッキングであり、オンデマンド解決のために応答 Mono にフラットマッピングされる内部 FutureReplyChannel に基づいています。

カスタム ServerCodecConfigurerRequestedContentTypeResolver、さらには ReactiveAdapterRegistry を使用して WebFluxInboundEndpoint を構成できます。後者は、応答型として応答を返すために使用できるメカニズムを提供します: Reactor Flux、RxJava ObservableFlowable など。このようにして、次の例に示すように、Spring Integration コンポーネントを使用してサーバー送信イベント [Wikipedia] (英語) シナリオを実装できます。

Java DSL
@Bean
public IntegrationFlow sseFlow() {
    return IntegrationFlows
            .from(WebFlux.inboundGateway("/sse")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> Flux.just("foo", "bar", "baz"))
            .get();
}
Kotlin DSL
@Bean
fun sseFlow() =
     integrationFlow(
            WebFlux.inboundGateway("/sse")
                       .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            {
                 handle { (p, h) -> Flux.just("foo", "bar", "baz") }
            }
Java
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
    WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
    RequestMapping requestMapping = new RequestMapping();
    requestMapping.setPathPatterns("/sse");
    requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
    endpoint.setRequestMapping(requestMapping);
    endpoint.setRequestChannelName("requests");
    return endpoint;
}
XML
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
                               path="test1"
                               auto-startup="false"
                               phase="101"
                               request-payload-type="byte[]"
                               error-channel="errorChannel"
                               payload-expression="payload"
                               supported-methods="PUT"
                               status-code-expression="'202'"
                               header-mapper="headerMapper"
                               codec-configurer="codecConfigurer"
                               reactive-adapter-registry="reactiveAdapterRegistry"
                               requested-content-type-resolver="requestedContentTypeResolver">
            <int-webflux:request-mapping headers="foo"/>
            <int-webflux:cross-origin origin="foo" method="PUT"/>
            <int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>

可能な構成オプションの詳細については、リクエストマッピングのサポートおよびクロスオリジンリソースシェアリング(CORS)サポートを参照してください。

リクエスト本文が空の場合、または payloadExpression が null を返す場合、リクエストパラメーター(MultiValueMap<String, String>)は、処理するターゲットメッセージの payload に使用されます。

ペイロード検証

バージョン 5.2 以降、WebFluxInboundEndpoint は Validator で構成できます。HTTP サポートの MVC 検証とは異なり、フォールバックおよび payloadExpression 機能を実行する前に、HttpMessageReader によってリクエストが変換された Publisher の要素を検証するために使用されます。フレームワークは、最終ペイロードの構築後に Publisher オブジェクトがどれほど複雑になるかを想定できません。正確に最終的なペイロード(またはその Publisher 要素)の検証の可視性を制限する要件がある場合、検証は WebFlux エンドポイントの代わりにダウンストリームに行く必要があります。詳細については、Spring WebFlux のドキュメントを参照してください。無効なペイロードは、すべての検証 Errors を含む IntegrationWebExchangeBindException (WebExchangeBindException 拡張)で拒否されます。検証については、Spring Framework 参考マニュアルを参照してください。

WebFlux 送信コンポーネント

WebFluxRequestExecutingMessageHandler (バージョン 5.0 以降)の実装は HttpRequestExecutingMessageHandler に似ています。Spring Framework WebFlux モジュールの WebClient を使用します。構成するには、次のような Bean を定義します。

Java DSL
@Bean
public IntegrationFlow outboundReactive() {
    return f -> f
        .handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                        .queryParams(m.getPayload())
                        .build()
                        .toUri())
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String.class));
}
Kotlin DSL
@Bean
fun outboundReactive() =
    integrationFlow {
        handle(
            WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
                UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
                    .queryParams(m.getPayload())
                    .build()
                    .toUri()
            })
                .httpMethod(HttpMethod.GET)
                .expectedResponseType(String::class.java)
        )
    }
Java
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
    WebFluxRequestExecutingMessageHandler handler =
        new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
    handler.setHttpMethod(HttpMethod.POST);
    handler.setExpectedResponseType(String.class);
    return handler;
}
XML
<int-webflux:outbound-gateway id="reactiveExample1"
    request-channel="requests"
    url="http://localhost/test"
    http-method-expression="headers.httpMethod"
    extract-request-payload="false"
    expected-response-type-expression="payload"
    charset="UTF-8"
    reply-timeout="1234"
    reply-channel="replies"/>

<int-webflux:outbound-channel-adapter id="reactiveExample2"
    url="http://localhost/example"
    http-method="GET"
    channel="requests"
    charset="UTF-8"
    extract-payload="false"
    expected-response-type="java.lang.String"
    order="3"
    auto-startup="false"/>

WebClientexchange() 操作は Mono<ClientResponse> を返します。これは、WebFluxRequestExecutingMessageHandler からの出力として AbstractIntegrationMessageBuilder に(いくつかの Mono.map() ステップを使用して)マップされます。outputChannel としての ReactiveChannel とともに、Mono<ClientResponse> 評価は、ダウンストリームサブスクリプションが作成されるまで延期されます。それ以外の場合は、async モードとして扱われ、Mono レスポンスは WebFluxRequestExecutingMessageHandler からの非同期レスポンス用に SettableListenableFuture に適合されます。出力メッセージのターゲットペイロードは、WebFluxRequestExecutingMessageHandler 構成によって異なります。setExpectedResponseType(Class<?>) または setExpectedResponseTypeExpression(Expression) は、レスポンス本体要素変換のターゲット型を識別します。replyPayloadToFlux が true に設定されている場合、レスポンス本体は各要素に提供された expectedResponseType を使用して Flux に変換され、この Flux はペイロードとしてダウンストリームに送信されます。その後、スプリッターを使用して、この Flux をリアクティブに反復処理できます。

さらに、expectedResponseType および replyPayloadToFlux プロパティの代わりに、BodyExtractor<?, ClientHttpResponse> を WebFluxRequestExecutingMessageHandler に挿入できます。ClientHttpResponse への低レベルのアクセスに使用でき、ボディおよび HTTP ヘッダーの変換をさらに制御できます。Spring Integration は、ClientHttpResponse 全体およびその他の可能なカスタムロジックを生成(ダウンストリーム)するための識別関数として ClientHttpResponseBodyExtractor を提供します。

バージョン 5.2 以降、WebFluxRequestExecutingMessageHandler は、リクエストメッセージのペイロードとしてリアクティブ PublisherResourceMultiValueMap 型をサポートします。それぞれの BodyInserter は、WebClient.RequestBodySpec に入力されるために内部的に使用されます。ペイロードがリアクティブ Publisher の場合、構成済みの publisherElementType または publisherElementTypeExpression を使用して、パブリッシャーの要素型の型を判別できます。式は、ターゲット Class<?> または ParameterizedTypeReference に解決される Class<?>String に解決される必要があります。

バージョン 5.5 以降、WebFluxRequestExecutingMessageHandler は extractResponseBody フラグ(デフォルトでは true)を公開して、レスポンス本文のみを返すか、提供された expectedResponseType または replyPayloadToFlux とは関係なく、ResponseEntity 全体をレスポンスメッセージペイロードとして返します。ResponseEntity に本体が存在しない場合、このフラグは無視され、ResponseEntity 全体が返されます。

その他の可能な構成オプションについては、HTTP 送信コンポーネントを参照してください。

WebFlux ヘッダーマッピング

WebFlux コンポーネントは HTTP プロトコルに完全に基づいているため、HTTP ヘッダーマッピングに違いはありません。ヘッダーのマッピングに使用できるその他の可能なオプションとコンポーネントについては、HTTP ヘッダーマッピングを参照してください。