WebFlux サポート
WebFlux Spring Integration モジュール(spring-integration-webflux
)は、HTTP リクエストの実行と、受信 HTTP リクエストのリアクティブ処理を可能にします。
この依存関係をプロジェクトに含める必要があります。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>5.5.8</version>
</dependency>
compile "org.springframework.integration:spring-integration-webflux:5.5.8"
サーブレットベース以外のサーバー構成の場合は、io.projectreactor.netty:reactor-netty
依存関係を含める必要があります。
WebFlux サポートは、次のゲートウェイ実装で構成されています: WebFluxInboundEndpoint
および WebFluxRequestExecutingMessageHandler
。このサポートは、Spring WebFlux およびプロジェクト Reactor (英語) の基盤に完全に基づいています。多くのオプションがリアクティブ HTTP コンポーネントと通常の HTTP コンポーネント間で共有されるため、詳細については HTTP サポートを参照してください。
WebFlux 名前空間のサポート
Spring Integration は、webflux
名前空間と対応するスキーマ定義を提供します。構成に含めるには、アプリケーションコンテキスト構成ファイルに次の名前空間宣言を追加します。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>
WebFlux 受信コンポーネント
バージョン 5.0 から、WebHandler
の WebFluxInboundEndpoint
実装が提供されます。このコンポーネントは MVC ベースの HttpRequestHandlingEndpointSupport
に似ており、新しく抽出された BaseHttpInboundEndpoint
を通じていくつかの共通オプションを共有します。Spring WebFlux リアクティブ環境(MVC の代わり)で使用されます。次の例は、WebFlux エンドポイントの単純な実装を示しています。
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlows
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
この構成は、@EnableWebFlux
を使用して WebFlux インフラストラクチャを統合アプリケーションに追加することを除いて、HttpRequestHandlingEndpointSupport
(例の前に記載)と似ています。また、WebFluxInboundEndpoint
は、リアクティブ HTTP サーバー実装によって提供されるバックプレッシャー、オンデマンドベースの機能を使用して、ダウンストリームフローに対して sendAndReceive
操作を実行します。
応答部分もノンブロッキングであり、オンデマンド解決のために応答 Mono にフラットマッピングされる内部 FutureReplyChannel に基づいています。 |
カスタム ServerCodecConfigurer
、RequestedContentTypeResolver
、さらには ReactiveAdapterRegistry
を使用して WebFluxInboundEndpoint
を構成できます。後者は、応答型として応答を返すために使用できるメカニズムを提供します: Reactor Flux
、RxJava Observable
、Flowable
など。このようにして、次の例に示すように、Spring Integration コンポーネントを使用してサーバー送信イベント [Wikipedia] (英語) シナリオを実装できます。
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlows
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
可能な構成オプションの詳細については、リクエストマッピングのサポートおよびクロスオリジンリソースシェアリング(CORS)サポートを参照してください。
リクエスト本文が空の場合、または payloadExpression
が null
を返す場合、リクエストパラメーター(MultiValueMap<String, String>
)は、処理するターゲットメッセージの payload
に使用されます。
ペイロード検証
バージョン 5.2 以降、WebFluxInboundEndpoint
は Validator
で構成できます。HTTP サポートの MVC 検証とは異なり、フォールバックおよび payloadExpression
機能を実行する前に、HttpMessageReader
によってリクエストが変換された Publisher
の要素を検証するために使用されます。フレームワークは、最終ペイロードの構築後に Publisher
オブジェクトがどれほど複雑になるかを想定できません。正確に最終的なペイロード(またはその Publisher
要素)の検証の可視性を制限する要件がある場合、検証は WebFlux エンドポイントの代わりにダウンストリームに行く必要があります。詳細については、Spring WebFlux のドキュメントを参照してください。無効なペイロードは、すべての検証 Errors
を含む IntegrationWebExchangeBindException
(WebExchangeBindException
拡張)で拒否されます。検証については、Spring Framework 参考マニュアルを参照してください。
WebFlux 送信コンポーネント
WebFluxRequestExecutingMessageHandler
(バージョン 5.0 以降)の実装は HttpRequestExecutingMessageHandler
に似ています。Spring Framework WebFlux モジュールの WebClient
を使用します。構成するには、次のような Bean を定義します。
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="http://localhost/test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="http://localhost/example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
WebClient
exchange()
操作は Mono<ClientResponse>
を返します。これは、WebFluxRequestExecutingMessageHandler
からの出力として AbstractIntegrationMessageBuilder
に(いくつかの Mono.map()
ステップを使用して)マップされます。outputChannel
としての ReactiveChannel
とともに、Mono<ClientResponse>
評価は、ダウンストリームサブスクリプションが作成されるまで延期されます。それ以外の場合は、async
モードとして扱われ、Mono
レスポンスは WebFluxRequestExecutingMessageHandler
からの非同期レスポンス用に SettableListenableFuture
に適合されます。出力メッセージのターゲットペイロードは、WebFluxRequestExecutingMessageHandler
構成によって異なります。setExpectedResponseType(Class<?>)
または setExpectedResponseTypeExpression(Expression)
は、レスポンス本体要素変換のターゲット型を識別します。replyPayloadToFlux
が true
に設定されている場合、レスポンス本体は各要素に提供された expectedResponseType
を使用して Flux
に変換され、この Flux
はペイロードとしてダウンストリームに送信されます。その後、スプリッターを使用して、この Flux
をリアクティブに反復処理できます。
さらに、expectedResponseType
および replyPayloadToFlux
プロパティの代わりに、BodyExtractor<?, ClientHttpResponse>
を WebFluxRequestExecutingMessageHandler
に挿入できます。ClientHttpResponse
への低レベルのアクセスに使用でき、ボディおよび HTTP ヘッダーの変換をさらに制御できます。Spring Integration は、ClientHttpResponse
全体およびその他の可能なカスタムロジックを生成(ダウンストリーム)するための識別関数として ClientHttpResponseBodyExtractor
を提供します。
バージョン 5.2 以降、WebFluxRequestExecutingMessageHandler
は、リクエストメッセージのペイロードとしてリアクティブ Publisher
、Resource
、MultiValueMap
型をサポートします。それぞれの BodyInserter
は、WebClient.RequestBodySpec
に入力されるために内部的に使用されます。ペイロードがリアクティブ Publisher
の場合、構成済みの publisherElementType
または publisherElementTypeExpression
を使用して、パブリッシャーの要素型の型を判別できます。式は、ターゲット Class<?>
または ParameterizedTypeReference
に解決される Class<?>
、String
に解決される必要があります。
バージョン 5.5 以降、WebFluxRequestExecutingMessageHandler
は extractResponseBody
フラグ(デフォルトでは true
)を公開して、レスポンス本文のみを返すか、提供された expectedResponseType
または replyPayloadToFlux
とは関係なく、ResponseEntity
全体をレスポンスメッセージペイロードとして返します。ResponseEntity
に本体が存在しない場合、このフラグは無視され、ResponseEntity
全体が返されます。
その他の可能な構成オプションについては、HTTP 送信コンポーネントを参照してください。
WebFlux ヘッダーマッピング
WebFlux コンポーネントは HTTP プロトコルに完全に基づいているため、HTTP ヘッダーマッピングに違いはありません。ヘッダーのマッピングに使用できるその他の可能なオプションとコンポーネントについては、HTTP ヘッダーマッピングを参照してください。