このバージョンはまだ開発中であり、まだ安定しているとは見なされていません。最新の安定バージョンについては、Spring Integration 6.5.3 を使用してください! |
WebFlux サポート
WebFlux Spring Integration モジュール(spring-integration-webflux)は、HTTP リクエストの実行と、受信 HTTP リクエストのリアクティブ処理を可能にします。
この依存関係はプロジェクトに必要です:
サーブレットベース以外のサーバー構成の場合は、io.projectreactor.netty:reactor-netty 依存関係を含める必要があります。
WebFlux サポートは、次のゲートウェイ実装で構成されています: WebFluxInboundEndpoint および WebFluxRequestExecutingMessageHandler。このサポートは、Spring WebFlux およびプロジェクト Reactor (英語) の基盤に完全に基づいています。多くのオプションがリアクティブ HTTP コンポーネントと通常の HTTP コンポーネント間で共有されるため、詳細については HTTP サポートを参照してください。
WebFlux 名前空間のサポート
Spring Integration は、webflux 名前空間と対応するスキーマ定義を提供します。構成に含めるには、アプリケーションコンテキスト構成ファイルに次の名前空間宣言を追加します。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-webflux="http://www.springframework.org/schema/integration/webflux"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/webflux
https://www.springframework.org/schema/integration/webflux/spring-integration-webflux.xsd">
...
</beans>WebFlux 受信コンポーネント
バージョン 5.0 から、WebHandler の WebFluxInboundEndpoint 実装が提供されます。このコンポーネントは MVC ベースの HttpRequestHandlingEndpointSupport に似ており、新しく抽出された BaseHttpInboundEndpoint を通じていくつかの共通オプションを共有します。Spring WebFlux リアクティブ環境(MVC の代わり)で使用されます。次の例は、WebFlux エンドポイントの単純な実装を示しています。
Java DSL
Kotlin DSL
Java
XML
@Bean
public IntegrationFlow inboundChannelAdapterFlow() {
return IntegrationFlow
.from(WebFlux.inboundChannelAdapter("/reactivePost")
.requestMapping(m -> m.methods(HttpMethod.POST))
.requestPayloadType(ResolvableType.forClassWithGenerics(Flux.class, String.class))
.statusCodeFunction(m -> HttpStatus.ACCEPTED))
.channel(c -> c.queue("storeChannel"))
.get();
}
@Bean
fun inboundChannelAdapterFlow() =
integrationFlow(
WebFlux.inboundChannelAdapter("/reactivePost")
.apply {
requestMapping { m -> m.methods(HttpMethod.POST) }
requestPayloadType(ResolvableType.forClassWithGenerics(Flux::class.java, String::class.java))
statusCodeFunction { m -> HttpStatus.ACCEPTED }
})
{
channel { queue("storeChannel") }
}
@Configuration
@EnableWebFlux
@EnableIntegration
public class ReactiveHttpConfiguration {
@Bean
public WebFluxInboundEndpoint simpleInboundEndpoint() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/test");
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("serviceChannel");
return endpoint;
}
@ServiceActivator(inputChannel = "serviceChannel")
String service() {
return "It works!";
}
}
<int-webflux:inbound-gateway request-channel="requests" path="/sse">
<int-webflux:request-mapping produces="text/event-stream"/>
</int-webflux:inbound-gateway>
この構成は、@EnableWebFlux を使用して WebFlux インフラストラクチャを統合アプリケーションに追加することを除いて、HttpRequestHandlingEndpointSupport (例の前に記載)と似ています。また、WebFluxInboundEndpoint は、リアクティブ HTTP サーバー実装によって提供されるバックプレッシャー、オンデマンドベースの機能を使用して、ダウンストリームフローに対して sendAndReceive 操作を実行します。
応答部分もノンブロッキングであり、オンデマンド解決のために応答 Mono にフラットマッピングされる内部 FutureReplyChannel に基づいています。 |
カスタム ServerCodecConfigurer、RequestedContentTypeResolver、さらには ReactiveAdapterRegistry を使用して WebFluxInboundEndpoint を構成できます。後者は、応答型として応答を返すために使用できるメカニズムを提供します: Reactor Flux、RxJava Observable、Flowable など。このようにして、次の例に示すように、Spring Integration コンポーネントを使用してサーバー送信イベント [Wikipedia] (英語) シナリオを実装できます。
Java DSL
Kotlin DSL
Java
XML
@Bean
public IntegrationFlow sseFlow() {
return IntegrationFlow
.from(WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
.handle((p, h) -> Flux.just("foo", "bar", "baz"))
.get();
}
@Bean
fun sseFlow() =
integrationFlow(
WebFlux.inboundGateway("/sse")
.requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
{
handle { (p, h) -> Flux.just("foo", "bar", "baz") }
}
@Bean
public WebFluxInboundEndpoint webfluxInboundGateway() {
WebFluxInboundEndpoint endpoint = new WebFluxInboundEndpoint();
RequestMapping requestMapping = new RequestMapping();
requestMapping.setPathPatterns("/sse");
requestMapping.setProduces(MediaType.TEXT_EVENT_STREAM_VALUE);
endpoint.setRequestMapping(requestMapping);
endpoint.setRequestChannelName("requests");
return endpoint;
}
<int-webflux:inbound-channel-adapter id="reactiveFullConfig" channel="requests"
path="test1"
auto-startup="false"
phase="101"
request-payload-type="byte[]"
error-channel="errorChannel"
payload-expression="payload"
supported-methods="PUT"
status-code-expression="'202'"
header-mapper="headerMapper"
codec-configurer="codecConfigurer"
reactive-adapter-registry="reactiveAdapterRegistry"
requested-content-type-resolver="requestedContentTypeResolver">
<int-webflux:request-mapping headers="foo"/>
<int-webflux:cross-origin origin="foo" method="PUT"/>
<int-webflux:header name="foo" expression="'foo'"/>
</int-webflux:inbound-channel-adapter>
可能な構成オプションの詳細については、リクエストマッピングのサポートおよびクロスオリジンリソースシェアリング(CORS)サポートを参照してください。
リクエスト本文が空の場合、または payloadExpression が null を返す場合、リクエストパラメーター(MultiValueMap<String, String>)は、処理するターゲットメッセージの payload に使用されます。
ペイロード検証
Starting with version 5.2, the WebFluxInboundEndpoint can be configured with a Validator. Unlike the MVC validation in the HTTP サポート , it is used to validate elements in the Publisher to which a request has been converted by the HttpMessageReader, before performing a fallback and payloadExpression functions. The Framework can’t assume how complex the Publisher object can be after building the final payload. If there are requirements to restrict validation visibility for exactly final payload (or its Publisher elements), the validation should go downstream instead of WebFlux endpoint. See more information in the Spring WebFlux documentation. An invalid payload is rejected with an IntegrationWebExchangeBindException (a WebExchangeBindException extension), containing all the validation Errors. See more in Spring Framework 参考マニュアル about validation.
WebFlux 送信コンポーネント
WebFluxRequestExecutingMessageHandler (バージョン 5.0 以降)の実装は HttpRequestExecutingMessageHandler に似ています。Spring Framework WebFlux モジュールの WebClient を使用します。構成するには、次のような Bean を定義します。
Java DSL
Kotlin DSL
Java
XML
@Bean
public IntegrationFlow outboundReactive() {
return f -> f
.handle(WebFlux.<MultiValueMap<String, String>>outboundGateway(m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri())
.httpMethod(HttpMethod.GET)
.expectedResponseType(String.class));
}
@Bean
fun outboundReactive() =
integrationFlow {
handle(
WebFlux.outboundGateway<MultiValueMap<String, String>>({ m ->
UriComponentsBuilder.fromUriString("http://localhost:8080/foo")
.queryParams(m.getPayload())
.build()
.toUri()
})
.httpMethod(HttpMethod.GET)
.expectedResponseType(String::class.java)
)
}
@ServiceActivator(inputChannel = "reactiveHttpOutRequest")
@Bean
public WebFluxRequestExecutingMessageHandler reactiveOutbound(WebClient client) {
WebFluxRequestExecutingMessageHandler handler =
new WebFluxRequestExecutingMessageHandler("http://localhost:8080/foo", client);
handler.setHttpMethod(HttpMethod.POST);
handler.setExpectedResponseType(String.class);
return handler;
}
<int-webflux:outbound-gateway id="reactiveExample1"
request-channel="requests"
url="http://localhost/test"
http-method-expression="headers.httpMethod"
extract-request-payload="false"
expected-response-type-expression="payload"
charset="UTF-8"
reply-timeout="1234"
reply-channel="replies"/>
<int-webflux:outbound-channel-adapter id="reactiveExample2"
url="http://localhost/example"
http-method="GET"
channel="requests"
charset="UTF-8"
extract-payload="false"
expected-response-type="java.lang.String"
order="3"
auto-startup="false"/>
WebClient exchange() 操作は Mono<ClientResponse> を返します。これは (いくつかの Mono.map() ステップを使用して) WebFluxRequestExecutingMessageHandler からの出力として AbstractIntegrationMessageBuilder にマップされます。outputChannel としての ReactiveChannel と合わせて、Mono<ClientResponse> の評価は、ダウンストリームのサブスクリプションが行われるまで延期されます。それ以外の場合は、async モードとして扱われ、Mono レスポンスは、WebFluxRequestExecutingMessageHandler からの非同期レスポンスの SettableListenableFuture に適応されます。出力メッセージのターゲットペイロードは、WebFluxRequestExecutingMessageHandler 構成によって異なります。setExpectedResponseType(Class<?>) または setExpectedResponseTypeExpression(Expression) は、レスポンス本文要素変換のターゲット型を識別します。replyPayloadToFlux が true に設定されている場合、レスポンス本文は各要素に指定された expectedResponseType を使用して Flux に変換され、この Flux がペイロードとしてダウンストリームに送信されます。その後、スプリッターを使用して、この Flux をリアクティブな方法で反復できます。
さらに、expectedResponseType および replyPayloadToFlux プロパティの代わりに、BodyExtractor<?, ClientHttpResponse> を WebFluxRequestExecutingMessageHandler に注入できます。これは、ClientHttpResponse への低レベルアクセスに使用でき、本文と HTTP ヘッダーの変換をより詳細に制御できます。Spring Integration は、ClientHttpResponse 全体とその他の可能なカスタムロジックを (ダウンストリームで) 生成するための ID 関数として ClientHttpResponseBodyExtractor を提供します。
バージョン 5.2 以降、WebFluxRequestExecutingMessageHandler は、リクエストメッセージのペイロードとしてリアクティブ Publisher、Resource、MultiValueMap 型をサポートします。それぞれの BodyInserter は、WebClient.RequestBodySpec に入力されるために内部的に使用されます。ペイロードがリアクティブ Publisher の場合、構成済みの publisherElementType または publisherElementTypeExpression を使用して、パブリッシャーの要素型の型を判別できます。式は、ターゲット Class<?> または ParameterizedTypeReference に解決される Class<?>、String に解決される必要があります。
バージョン 5.5 以降、WebFluxRequestExecutingMessageHandler は extractResponseBody フラグ(デフォルトでは true)を公開して、レスポンス本文のみを返すか、提供された expectedResponseType または replyPayloadToFlux とは関係なく、ResponseEntity 全体をレスポンスメッセージペイロードとして返します。ResponseEntity に本体が存在しない場合、このフラグは無視され、ResponseEntity 全体が返されます。
その他の可能な構成オプションについては、HTTP 送信コンポーネントを参照してください。
WebFlux ヘッダーマッピング
WebFlux コンポーネントは HTTP プロトコルに完全に基づいているため、HTTP ヘッダーマッピングに違いはありません。ヘッダーのマッピングに使用できるその他の可能なオプションとコンポーネントについては、HTTP ヘッダーマッピングを参照してください。
WebFlux リクエスト属性
バージョン 6.0 以降、WebFluxRequestExecutingMessageHandler は setAttributeVariablesExpression() を介してリクエスト属性を評価するように設定できます。この SpEL 式は Map で評価する必要があります。このマッピングは、WebClient.RequestBodySpec.attributes(Consumer<Map<String, Object>> attributesConsumer) HTTP リクエスト設定コールバックに伝播されます。これは、キーバリューオブジェクトの形式で Message からリクエストに情報を渡す必要があり、下流のフィルターがこれらの属性にアクセスしてさらに処理する必要がある場合に役立ちます。