RSocket サポート

RSocket Spring Integration モジュール(spring-integration-rsocket)は、RSocket アプリケーションプロトコル (英語) の実行を許可します。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-rsocket</artifactId>
    <version>5.5.8</version>
</dependency>
compile "org.springframework.integration:spring-integration-rsocket:5.5.8"

このモジュールは、バージョン 5.2 以降で利用可能であり、RSocketRequesterRSocketMessageHandlerRSocketStrategies などの RSocket コンポーネント実装を備えた Spring メッセージング基盤に基づいています。RSocket プロトコル、用語、コンポーネントの詳細については、Spring Framework RSocket サポートを参照してください。

チャネルアダプターを介して統合フロー処理を開始する前に、サーバーとクライアントの間に RSocket 接続を確立する必要があります。この目的のために、Spring Integration RSocket サポートは、AbstractRSocketConnector の ServerRSocketConnector および ClientRSocketConnector 実装を提供します。

ServerRSocketConnector は、クライアントからの接続を受け入れるために提供された io.rsocket.transport.ServerTransport に従って、ホストとポート上のリスナーを公開します。内部 RSocketServer インスタンスは、setServerConfigurer()、構成可能な他のオプションを使用してカスタマイズできます。ペイロードデータとヘッダーメタデータ用の RSocketStrategies と MimeType。クライアントリクエスターから setupRoute が提供されると(以下の ClientRSocketConnector を参照)、接続されたクライアントは、clientRSocketKeyStrategyBiFunction<Map<String, Object>, DataBuffer, Object> によって決定されたキーに RSocketRequester として格納されます。デフォルトでは、接続データは、UTF-8 文字セットを使用して文字列に変換された値としてキーに使用されます。このような RSocketRequester レジストリは、アプリケーションロジックで使用して、特定のクライアント接続を決定し、それと対話したり、接続されているすべてのクライアントに同じメッセージを公開したりできます。クライアントから接続が確立されると、RSocketConnectedEvent が ServerRSocketConnector から発行されます。これは、Spring メッセージングモジュールの @ConnectMapping アノテーションによって提供されるものと似ています。マッピングパターン * は、すべてのクライアントルートを受け入れることを意味します。RSocketConnectedEvent は、DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER ヘッダーを介してさまざまなルートを区別するために使用できます。

典型的なサーバー構成は次のようになります。

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ServerRSocketConnector serverRSocketConnector() {
    ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
    serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
    serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
    serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
    serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
                                    + headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
    return serverRSocketConnector;
}

@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
	...
}

RSocketStrategies Bean および RSocketConnectedEvent の @EventListener を含むすべてのオプションはオプションです。詳細については、ServerRSocketConnector JavaDocs を参照してください。

バージョン 5.2.1 から、ServerRSocketMessageHandler は、既存の RSocket サーバーとの可能な接続のために、パブリックな最上位クラスに抽出されます。ServerRSocketConnector に ServerRSocketMessageHandler の外部インスタンスが提供される場合、RSocket サーバーは内部で作成されず、提供されたインスタンスにすべての処理ロジックが委譲されます。さらに、ServerRSocketMessageHandler を messageMappingCompatible フラグで構成して、RSocket コントローラーの @MessageMapping も処理し、標準 RSocketMessageHandler が提供する機能を完全に置き換えることができます。これは、RSocket チャネルアダプターと同じアプリケーションに従来の @MessageMapping メソッドが存在し、アプリケーションに外部で構成された RSocket サーバーが存在する場合、混合構成で役立ちます。

ClientRSocketConnector は、提供された ClientTransport を介して接続された RSocket に基づいて、RSocketRequester のホルダーとして機能します。RSocketConnector は、提供されている RSocketConnectorConfigurer でカスタマイズできます。このコンポーネントでは、setupRoute (オプションのテンプレート変数を使用)とメタデータを使用した setupData も構成できます。

典型的なクライアント構成は次のようになります。

@Bean
public RSocketStrategies rsocketStrategies() {
    return RSocketStrategies.builder()
        .decoder(StringDecoder.textPlainOnly())
        .encoder(CharSequenceEncoder.allMimeTypes())
        .dataBufferFactory(new DefaultDataBufferFactory(true))
        .build();
}

@Bean
public ClientRSocketConnector clientRSocketConnector() {
    ClientRSocketConnector clientRSocketConnector =
            new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
    clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
    clientRSocketConnector.setSetupRoute("clientConnect/{user}");
    clientRSocketConnector.setSetupRouteVariables("myUser");
    return clientRSocketConnector;
}

これらのオプションのほとんど(RSocketStrategies Bean を含む)はオプションです。任意のポートでローカルに開始された RSocket サーバーに接続する方法に注意してください。setupData の使用例については、ServerRSocketConnector.clientRSocketKeyStrategy を参照してください。詳細については、ClientRSocketConnector およびその AbstractRSocketConnector スーパークラス JavaDocs も参照してください。

ClientRSocketConnector と ServerRSocketConnector は両方とも、受信 RSocket リクエストをルーティングするために、受信チャネルアダプターを path 構成にマッピングするロールを果たします。詳細については、次のセクションを参照してください。

RSocket 受信ゲートウェイ

RSocketInboundGateway は、RSocket リクエストを受信し、レスポンス(ある場合)を生成する責任があります。MVC リクエストマッピングまたは @MessageMapping セマンティクスに類似したパターンである可能性のある path マッピングの配列が必要です。さらに(バージョン 5.2.2 以降)、一連の相互作用モデル(RSocketInteractionModel を参照)を RSocketInboundGateway で構成して、特定のフレーム型によってこのエンドポイントへの RSocket リクエストを制限できます。デフォルトでは、すべての相互作用モデルがサポートされています。そのような Bean は、その IntegrationRSocketEndpoint 実装(ReactiveMessageHandler の拡張)に従って、受信リクエスト用の内部 IntegrationRSocketMessageHandler のルーティングロジックの ServerRSocketConnector または ClientRSocketConnector のいずれかによって自動的に検出されます。明示的なエンドポイント登録のために、AbstractRSocketConnector を RSocketInboundGateway に提供できます。これにより、AbstractRSocketConnector で自動検出オプションが無効になります。RSocketStrategies は RSocketInboundGateway に注入することもできますが、明示的な注入を無効にして提供された AbstractRSocketConnector から取得することもできます。これらの RSocketStrategies からデコーダーを使用して、提供された requestElementType に従ってリクエストペイロードをデコードします。Message の受信で RSocketPayloadReturnValueHandler.RESPONSE_HEADER ヘッダーが提供されない場合、RSocketInboundGateway はリクエストを fireAndForget RSocket 相互作用モデルとして扱います。この場合、RSocketInboundGateway は send 操作を outputChannel に実行します。それ以外の場合、RSocketPayloadReturnValueHandler.RESPONSE_HEADER ヘッダーからの MonoProcessor 値は、RSocket へのレスポンスの送信に使用されます。このために、RSocketInboundGateway は outputChannel で sendAndReceiveMessageReactive 操作を実行します。ダウンストリームに送信するメッセージの payload は、MessagingRSocket ロジックに従って常に Flux です。fireAndForget RSocket 相互作用モデルの場合、メッセージには単純に変換された payload が含まれます。レスポンス payload は、プレーンオブジェクトまたは Publisher である可能性があります。RSocketInboundGateway は、RSocketStrategies で提供されるエンコーダーに従って、それらの両方を RSocket レスポンスに適切に変換します。

バージョン 5.3 以降、decodeFluxAsUnit オプション(デフォルトは false)が RSocketInboundGateway に追加されています。デフォルトでは、受信 Flux は、その各イベントが個別にデコードされる方法で変換されます。これは、@MessageMapping セマンティクスで現在存在する正確な動作です。アプリケーションの要件に従って、以前の動作を復元するか、Flux 全体を単一のユニットとしてデコードするには、decodeFluxAsUnit を true に設定する必要があります。ただし、ターゲットのデコードロジックは、選択した Decoder によって異なります。StringDecoder では、バイトバッファーの終了を示すために、ストリームに新しい行セパレーター(デフォルト)が存在する必要があります。

RSocketInboundGateway エンドポイントを構成し、ダウンストリームのペイロードを処理する方法のサンプルについては、Java を使用した RSocket エンドポイントの構成を参照してください。

RSocket 送信ゲートウェイ

RSocketOutboundGateway は、RSocket へのリクエストを実行し、RSocket の返信(ある場合)に基づいて返信を生成する AbstractReplyProducingMessageHandler です。低レベルの RSocket プロトコルの相互作用は、提供された ClientRSocketConnector またはサーバー側のリクエストメッセージの RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER ヘッダーから解決された RSocketRequester に委譲されます。サーバー側のターゲット RSocketRequester は、RSocketConnectedEvent から、または ServerRSocketConnector.setClientRSocketKeyStrategy() を介した接続リクエストマッピング用に選択されたいくつかのビジネスキーに従って ServerRSocketConnector.getClientRSocketRequester() API を使用して解決できます。詳細については、ServerRSocketConnector JavaDocs を参照してください。

リクエストを送信する route は、明示的に(パス変数と一緒に)、またはリクエストメッセージに対して評価される SpEL 式を介して構成する必要があります。

RSocket 相互作用モデルは、RSocketInteractionModel オプションまたはそれぞれの式設定を介して提供できます。デフォルトでは、一般的なゲートウェイのユースケースに requestResponse が使用されます。

リクエストメッセージペイロードが Publisher の場合、ターゲット RSocketRequester で提供される RSocketStrategies に従って要素をエンコードするために publisherElementType オプションを提供できます。このオプションの式は、ParameterizedTypeReference に評価できます。データとその型の詳細については、RSocketRequester.RequestSpec.data() JavaDocs を参照してください。

RSocket リクエストは、metadata で拡張することもできます。このために、RSocketOutboundGateway でリクエストメッセージに対する metadataExpression を構成できます。そのような式は Map<Object, MimeType> に評価される必要があります。

interactionModel が fireAndForget でない場合、expectedResponseType を指定する必要があります。デフォルトでは String.class です。このオプションの式は、ParameterizedTypeReference に評価できます。応答データとその型の詳細については、RSocketRequester.RetrieveSpec.retrieveMono() および RSocketRequester.RetrieveSpec.retrieveFlux() JavaDocs を参照してください。

RSocketOutboundGateway からの返信 payload は Mono (fireAndForget 相互作用モデルの場合でも Mono<Void>)であり、常にこのコンポーネントを async として作成します。このような Mono は、通常のチャネル用に outputChannel に生成される前にサブスクライブされるか、FluxMessageChannel によってオンデマンドで処理されます。requestStream または requestChannel 相互作用モデルの Flux レスポンスもレスポンス Mono にラップされます。パススルーサービスアクティベーターを使用して、FluxMessageChannel によってダウンストリームでフラット化できます。

@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
    return payload;
}

または、ターゲットアプリケーションロジックで明示的にサブスクライブします。

予想されるレスポンス型は、このゲートウェイを送信チャネルアダプターとして扱う void に構成(または式を介して評価)することもできます。ただし、返された Mono へのサブスクリプションを開始するには、outputChannel を構成する必要があります(NullChannel だけの場合でも)。

RSocketOutboundGateway エンドポイントを構成してダウンストリームのペイロードを処理する方法のサンプルについては、Java を使用した RSocket エンドポイントの構成を参照してください。

RSocket 名前空間のサポート

Spring Integration は、rsocket 名前空間と対応するスキーマ定義を提供します。構成に含めるには、アプリケーションコンテキスト構成ファイルに次の名前空間宣言を追加します。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/rsocket
    https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
    ...
</beans>

受信

Spring Integration RSocket 受信チャネルアダプターを XML で構成するには、int-rsocket 名前空間の適切な inbound-gateway コンポーネントを使用する必要があります。次の例は、構成方法を示しています。

<int-rsocket:inbound-gateway id="inboundGateway"
                             path="testPath"
                             interaction-models="requestStream,requestChannel"
                             rsocket-connector="clientRSocketConnector"
                             request-channel="requestChannel"
                             rsocket-strategies="rsocketStrategies"
                             request-element-type="byte[]"/>

ClientRSocketConnector および ServerRSocketConnector は、汎用 <bean> 定義として構成する必要があります。

送信

<int-rsocket:outbound-gateway id="outboundGateway"
                              client-rsocket-connector="clientRSocketConnector"
                              auto-startup="false"
                              interaction-model="fireAndForget"
                              route-expression="'testRoute'"
                              request-channel="requestChannel"
                              publisher-element-type="byte[]"
                              expected-response-type="java.util.Date"
                              metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>

これらすべての XML 属性の説明については、spring-integration-rsocket.xsd を参照してください。

Java を使用した RSocket エンドポイントの構成

次の例は、Java で RSocket 受信エンドポイントを構成する方法を示しています。

@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
    RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
    rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
    return rsocketInboundGateway;
}

@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
    return payload.next().map(String::toUpperCase);
}

この構成では、ClientRSocketConnector または ServerRSocketConnector が想定され、「エコー」パス上のそのようなエンドポイントの自動検出を意味します。RSocket リクエストを完全にリアクティブに処理し、リアクティブな応答を生成する @Transformer シグネチャーに注意してください。

次の例は、Java DSL で RSocket 受信ゲートウェイを構成する方法を示しています。

@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
    return IntegrationFlows
        .from(RSockets.inboundGateway("/uppercase")
                   .interactionModels(RSocketInteractionModel.requestChannel))
        .<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
        .get();
}

この構成では、ClientRSocketConnector または ServerRSocketConnector が想定されており、「/uppercase”パス上のエンドポイントと、「リクエストチャネル」などの予想される相互作用モデルを自動検出することを意味します。

次の例は、Java で RSocket 送信ゲートウェイを構成する方法を示しています。

@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
    RSocketOutboundGateway rsocketOutboundGateway =
            new RSocketOutboundGateway(
                    new FunctionExpression<Message<?>>((m) ->
                        m.getHeaders().get("route_header")));
    rsocketOutboundGateway.setInteractionModelExpression(
            new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
    rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
    return rsocketOutboundGateway;
}

setClientRSocketConnector() は、クライアント側にのみ必要です。サーバー側では、RSocketRequester 値を持つ RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER ヘッダーをリクエストメッセージで提供する必要があります。

次の例は、Java DSL で RSocket 送信ゲートウェイを設定する方法を示しています。

@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
    return IntegrationFlows
        .from(Function.class)
        .handle(RSockets.outboundGateway("/uppercase")
            .interactionModel(RSocketInteractionModel.requestResponse)
            .expectedResponseType(String.class)
            .clientRSocketConnector(clientRSocketConnector))
        .get();
}

上記のフローの最初で、前述の Function インターフェースを使用する方法の詳細については、ゲートウェイとしての IntegrationFlow  を参照してください。