RSocket サポート
RSocket Spring Integration モジュール(spring-integration-rsocket
)は、RSocket アプリケーションプロトコル (英語) の実行を許可します。
この依存関係をプロジェクトに含める必要があります。
このモジュールは、バージョン 5.2 以降で利用可能であり、RSocketRequester
、RSocketMessageHandler
、RSocketStrategies
などの RSocket コンポーネント実装を備えた Spring メッセージング基盤に基づいています。RSocket プロトコル、用語、コンポーネントの詳細については、Spring Framework RSocket サポートを参照してください。
チャネルアダプターを介して統合フロー処理を開始する前に、サーバーとクライアントの間に RSocket 接続を確立する必要があります。この目的のために、Spring Integration RSocket サポートは、AbstractRSocketConnector
の ServerRSocketConnector
および ClientRSocketConnector
実装を提供します。
ServerRSocketConnector
は、クライアントからの接続を受け入れるために提供された io.rsocket.transport.ServerTransport
に従って、ホストとポートでリスナーを公開します。内部 RSocketServer
インスタンスは、setServerConfigurer()
でカスタマイズできます。また、構成可能なその他のオプションもあります。ペイロードデータおよびヘッダーメタデータ用の RSocketStrategies
および MimeType
。クライアントリクエスタから setupRoute
が提供されると (以下の ClientRSocketConnector
を参照)、接続されたクライアントは clientRSocketKeyStrategy
BiFunction<Map<String, Object>, DataBuffer, Object>
によって決定されるキーに RSocketRequester
として保存されます。デフォルトでは、接続データは UTF-8 文字セットの文字列に変換された値としてキーに使用されます。このような RSocketRequester
レジストリをアプリケーションロジックで使用して、特定のクライアント接続を決定し、それと対話したり、接続されているすべてのクライアントに同じメッセージを発行したりできます。クライアントからの接続が確立されると、RSocketConnectedEvent
が ServerRSocketConnector
から発行されます。これは、Spring メッセージングモジュールの @ConnectMapping
アノテーションによって提供されるものと似ています。マッピングパターン *
は、すべてのクライアントルートを受け入れることを意味します。RSocketConnectedEvent
は、DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER
ヘッダーを介してさまざまなルートを区別するために使用できます。
典型的なサーバー構成は次のようになります。
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}
RSocketStrategies
Bean および RSocketConnectedEvent
の @EventListener
を含むすべてのオプションはオプションです。詳細については、ServerRSocketConnector
JavaDocs を参照してください。
バージョン 5.2.1 以降では、既存の RSocket サーバーとの接続を可能にするために、ServerRSocketMessageHandler
がパブリックの最上位クラスに抽出されます。ServerRSocketConnector
に ServerRSocketMessageHandler
の外部インスタンスが提供されると、内部で RSocket サーバーが作成されず、すべての処理ロジックが提供されたインスタンスに委譲されます。さらに、ServerRSocketMessageHandler
を messageMappingCompatible
フラグで構成して、RSocket コントローラーの @MessageMapping
も処理し、標準の RSocketMessageHandler
によって提供される機能を完全に置き換えることができます。これは、従来の @MessageMapping
メソッドが RSocket チャネルアダプターと共に同じアプリケーションに存在し、外部で構成された RSocket サーバーがアプリケーションに存在する場合、混合構成で役立ちます。
ClientRSocketConnector
は、提供された ClientTransport
を介して接続された RSocket
に基づいて、RSocketRequester
のホルダーとして機能します。RSocketConnector
は、提供されている RSocketConnectorConfigurer
でカスタマイズできます。このコンポーネントでは、setupRoute
(オプションのテンプレート変数を使用)とメタデータを使用した setupData
も構成できます。
典型的なクライアント構成は次のようになります。
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
}
これらのオプションのほとんど(RSocketStrategies
Bean を含む)はオプションです。任意のポートでローカルに開始された RSocket サーバーに接続する方法に注意してください。setupData
の使用例については、ServerRSocketConnector.clientRSocketKeyStrategy
を参照してください。詳細については、ClientRSocketConnector
およびその AbstractRSocketConnector
スーパークラス JavaDocs も参照してください。
ClientRSocketConnector
と ServerRSocketConnector
は両方とも、受信 RSocket リクエストをルーティングするために、受信チャネルアダプターを path
構成にマッピングするロールを果たします。詳細については、次のセクションを参照してください。
RSocket 受信ゲートウェイ
RSocketInboundGateway
は、RSocket リクエストを受信し、レスポンス (存在する場合) を生成するロールを果たします。MVC リクエストマッピングまたは @MessageMapping
セマンティクスと同様のパターンである可能性のある path
マッピングの配列が必要です。さらに、(バージョン 5.2.2 以降)、一連の対話モデル ( RSocketInteractionModel
を参照) を RSocketInboundGateway
で構成して、特定のフレーム型によってこのエンドポイントへの RSocket リクエストを制限できます。デフォルトでは、すべての対話モデルがサポートされています。そのような Bean は、その IntegrationRSocketEndpoint
実装 ( ReactiveMessageHandler
の拡張) に従って、受信リクエストの内部 IntegrationRSocketMessageHandler
のルーティングロジックの ServerRSocketConnector
または ClientRSocketConnector
によって自動検出されます。明示的なエンドポイント登録のために、AbstractRSocketConnector
を RSocketInboundGateway
に提供できます。このようにして、その AbstractRSocketConnector
で自動検出オプションが無効になります。RSocketStrategies
は、RSocketInboundGateway
に注入することもできます。または、明示的な注入をオーバーライドして、提供された AbstractRSocketConnector
から取得します。これらの RSocketStrategies
からデコーダーを使用して、提供された requestElementType
に従ってリクエストペイロードをデコードします。受信する Message
で RSocketPayloadReturnValueHandler.RESPONSE_HEADER
ヘッダーが提供されない場合、RSocketInboundGateway
はリクエストを fireAndForget
RSocket 対話モデルとして扱います。この場合、RSocketInboundGateway
は outputChannel
に対して単純な send
操作を実行します。それ以外の場合、RSocketPayloadReturnValueHandler.RESPONSE_HEADER
ヘッダーの MonoProcessor
値は、RSocket へのレスポンスの送信に使用されます。この目的のために、RSocketInboundGateway
は outputChannel
で sendAndReceiveMessageReactive
操作を実行します。ダウンストリームに送信するメッセージの payload
は、MessagingRSocket
ロジックに従って常に Flux
です。fireAndForget
RSocket インタラクションモデルの場合、メッセージには変換されたプレーンな payload
が含まれます。レスポンス payload
は、プレーンオブジェクトまたは Publisher
である可能性があります。RSocketInboundGateway
は、RSocketStrategies
で提供されるエンコーダーに従って、両方を適切に RSocket レスポンスに変換します。
バージョン 5.3 以降、decodeFluxAsUnit
オプション (デフォルトは false
) が RSocketInboundGateway
に追加されています。デフォルトでは、受信 Flux
は、その各イベントが個別にデコードされる方法で変換されます。これは、現在 @MessageMapping
セマンティクスに存在する正確な動作です。以前の動作を復元するか、アプリケーション要件に従って Flux
全体を単一ユニットとしてデコードするには、decodeFluxAsUnit
を true
に設定する必要があります。ただし、ターゲットのデコードロジックは、選択した Decoder
によって異なります。StringDecoder
では、バイトバッファーの終わりを示すためにストリーム内に改行セパレーター (デフォルト) が存在する必要があります。
RSocketInboundGateway
エンドポイントを構成し、ダウンストリームのペイロードを処理する方法のサンプルについては、Java を使用した RSocket エンドポイントの構成を参照してください。
RSocket 送信ゲートウェイ
RSocketOutboundGateway
は、RSocket へのリクエストを実行し、RSocket の返信(ある場合)に基づいて返信を生成する AbstractReplyProducingMessageHandler
です。低レベルの RSocket プロトコルの相互作用は、提供された ClientRSocketConnector
またはサーバー側のリクエストメッセージの RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
ヘッダーから解決された RSocketRequester
に委譲されます。サーバー側のターゲット RSocketRequester
は、RSocketConnectedEvent
から、または ServerRSocketConnector.setClientRSocketKeyStrategy()
を介した接続リクエストマッピング用に選択されたいくつかのビジネスキーに従って ServerRSocketConnector.getClientRSocketRequester()
API を使用して解決できます。詳細については、ServerRSocketConnector
JavaDocs を参照してください。
リクエストを送信する route
は、明示的に(パス変数と一緒に)、またはリクエストメッセージに対して評価される SpEL 式を介して構成する必要があります。
RSocket 対話モデルは、RSocketInteractionModel
オプションまたはそれぞれの式設定を介して提供できます。デフォルトでは、一般的なゲートウェイのユースケースに requestResponse
が使用されます。
リクエストメッセージペイロードが Publisher
の場合、ターゲット RSocketRequester
で提供される RSocketStrategies
に従って要素をエンコードするために publisherElementType
オプションを提供できます。このオプションの式は、ParameterizedTypeReference
に評価できます。データとその型の詳細については、RSocketRequester.RequestSpec.data()
JavaDocs を参照してください。
RSocket リクエストは、metadata
で拡張することもできます。このために、RSocketOutboundGateway
でリクエストメッセージに対する metadataExpression
を構成できます。そのような式は Map<Object, MimeType>
に評価される必要があります。
interactionModel
が fireAndForget
でない場合、expectedResponseType
を指定する必要があります。デフォルトでは String.class
です。このオプションの式は、ParameterizedTypeReference
に評価できます。応答データとその型の詳細については、RSocketRequester.RetrieveSpec.retrieveMono()
および RSocketRequester.RetrieveSpec.retrieveFlux()
JavaDocs を参照してください。
RSocketOutboundGateway
からの返信 payload
は Mono
(fireAndForget
相互作用モデルの場合でも Mono<Void>
)であり、常にこのコンポーネントを async
として作成します。このような Mono
は、通常のチャネル用に outputChannel
に生成される前にサブスクライブされるか、FluxMessageChannel
によってオンデマンドで処理されます。requestStream
または requestChannel
相互作用モデルの Flux
レスポンスもレスポンス Mono
にラップされます。パススルーサービスアクティベーターを使用して、FluxMessageChannel
によってダウンストリームでフラット化できます。
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}
または、ターゲットアプリケーションロジックで明示的にサブスクライブします。
このゲートウェイを送信チャネルアダプターとして扱う void
に対して、予期されるレスポンス型を構成 (または式を介して評価) することもできます。ただし、outputChannel
は、返された Mono
へのサブスクリプションを開始するために (たとえ NullChannel
であっても) 構成する必要があります。
RSocketOutboundGateway
エンドポイントを構成してダウンストリームのペイロードを処理する方法のサンプルについては、Java を使用した RSocket エンドポイントの構成を参照してください。
RSocket 名前空間のサポート
Spring Integration は、rsocket
名前空間と対応するスキーマ定義を提供します。構成に含めるには、アプリケーションコンテキスト構成ファイルに次の名前空間宣言を追加します。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>
受信
Spring Integration RSocket 受信チャネルアダプターを XML で構成するには、int-rsocket
名前空間の適切な inbound-gateway
コンポーネントを使用する必要があります。次の例は、構成方法を示しています。
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>
ClientRSocketConnector
および ServerRSocketConnector
は、汎用 <bean>
定義として構成する必要があります。
送信
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/>
これらすべての XML 属性の説明については、spring-integration-rsocket.xsd
を参照してください。
Java を使用した RSocket エンドポイントの構成
次の例は、Java で RSocket 受信エンドポイントを構成する方法を示しています。
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
}
この構成では、ClientRSocketConnector
または ServerRSocketConnector
が想定され、「エコー」パス上のそのようなエンドポイントの自動検出を意味します。RSocket リクエストを完全にリアクティブに処理し、リアクティブな応答を生成する @Transformer
シグネチャーに注意してください。
次の例は、Java DSL で RSocket 受信ゲートウェイを構成する方法を示しています。
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
}
この構成では、ClientRSocketConnector
または ServerRSocketConnector
が想定されており、「/uppercase”パス上のエンドポイントと、「リクエストチャネル」などの予想される相互作用モデルを自動検出することを意味します。
次の例は、Java で RSocket 送信ゲートウェイを構成する方法を示しています。
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}
setClientRSocketConnector()
は、クライアント側にのみ必要です。サーバー側では、RSocketRequester
値を持つ RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER
ヘッダーをリクエストメッセージで提供する必要があります。
次の例は、Java DSL で RSocket 送信ゲートウェイを設定する方法を示しています。
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
}
上記のフローの最初で、前述の Function
インターフェースを使用する方法の詳細については、ゲートウェイとしての IntegrationFlow
を参照してください。