最新の安定バージョンについては、Spring Integration 7.0.4 を使用してください! |
RSocket サポート
RSocket Spring Integration モジュール(spring-integration-rsocket)は、RSocket アプリケーションプロトコル (英語) の実行を許可します。
この依存関係をプロジェクトに含める必要があります。
このモジュールは、バージョン 5.2 以降で利用可能であり、RSocketRequester、RSocketMessageHandler、RSocketStrategies などの RSocket コンポーネント実装を備えた Spring メッセージング基盤に基づいています。RSocket プロトコル、用語、コンポーネントの詳細については、Spring Framework RSocket サポートを参照してください。
チャネルアダプターを介して統合フロー処理を開始する前に、サーバーとクライアントの間に RSocket 接続を確立する必要があります。この目的のために、Spring Integration RSocket サポートは、AbstractRSocketConnector の ServerRSocketConnector および ClientRSocketConnector 実装を提供します。
ServerRSocketConnector は、クライアントからの接続を受け入れるために提供された io.rsocket.transport.ServerTransport に従って、ホストとポートでリスナーを公開します。内部 RSocketServer インスタンスは、setServerConfigurer() でカスタマイズできます。また、構成可能なその他のオプションもあります。ペイロードデータおよびヘッダーメタデータ用の RSocketStrategies および MimeType。クライアントリクエスタから setupRoute が提供されると (以下の ClientRSocketConnector を参照)、接続されたクライアントは clientRSocketKeyStrategy BiFunction<Map<String, Object>, DataBuffer, Object> によって決定されるキーに RSocketRequester として保存されます。デフォルトでは、接続データは UTF-8 文字セットの文字列に変換された値としてキーに使用されます。このような RSocketRequester レジストリをアプリケーションロジックで使用して、特定のクライアント接続を決定し、それと対話したり、接続されているすべてのクライアントに同じメッセージを発行したりできます。クライアントからの接続が確立されると、RSocketConnectedEvent が ServerRSocketConnector から発行されます。これは、Spring メッセージングモジュールの @ConnectMapping アノテーションによって提供されるものと似ています。マッピングパターン * は、すべてのクライアントルートを受け入れることを意味します。RSocketConnectedEvent は、DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER ヘッダーを介してさまざまなルートを区別するために使用できます。
典型的なサーバー構成は次のようになります。
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ServerRSocketConnector serverRSocketConnector() {
ServerRSocketConnector serverRSocketConnector = new ServerRSocketConnector("localhost", 0);
serverRSocketConnector.setRSocketStrategies(rsocketStrategies());
serverRSocketConnector.setMetadataMimeType(new MimeType("message", "x.rsocket.routing.v0"));
serverRSocketConnector.setServerConfigurer((server) -> server.payloadDecoder(PayloadDecoder.ZERO_COPY));
serverRSocketConnector.setClientRSocketKeyStrategy((headers, data) -> ""
+ headers.get(DestinationPatternsMessageCondition.LOOKUP_DESTINATION_HEADER));
return serverRSocketConnector;
}
@EventListener
public void onApplicationEvent(RSocketConnectedEvent event) {
...
}RSocketStrategies Bean および RSocketConnectedEvent の @EventListener を含むすべてのオプションはオプションです。詳細については、ServerRSocketConnector JavaDocs を参照してください。
バージョン 5.2.1 以降では、既存の RSocket サーバーとの接続を可能にするために、ServerRSocketMessageHandler がパブリックの最上位クラスに抽出されます。ServerRSocketConnector に ServerRSocketMessageHandler の外部インスタンスが提供されると、内部で RSocket サーバーが作成されず、すべての処理ロジックが提供されたインスタンスに委譲されます。さらに、ServerRSocketMessageHandler を messageMappingCompatible フラグで構成して、RSocket コントローラーの @MessageMapping も処理し、標準の RSocketMessageHandler によって提供される機能を完全に置き換えることができます。これは、従来の @MessageMapping メソッドが RSocket チャネルアダプターと共に同じアプリケーションに存在し、外部で構成された RSocket サーバーがアプリケーションに存在する場合、混合構成で役立ちます。
ClientRSocketConnector は、提供された ClientTransport を介して接続された RSocket に基づいて、RSocketRequester のホルダーとして機能します。RSocketConnector は、提供されている RSocketConnectorConfigurer でカスタマイズできます。このコンポーネントでは、setupRoute (オプションのテンプレート変数を使用)とメタデータを使用した setupData も構成できます。
典型的なクライアント構成は次のようになります。
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.decoder(StringDecoder.textPlainOnly())
.encoder(CharSequenceEncoder.allMimeTypes())
.dataBufferFactory(new DefaultDataBufferFactory(true))
.build();
}
@Bean
public ClientRSocketConnector clientRSocketConnector() {
ClientRSocketConnector clientRSocketConnector =
new ClientRSocketConnector("localhost", serverRSocketConnector().getBoundPort().block());
clientRSocketConnector.setRSocketStrategies(rsocketStrategies());
clientRSocketConnector.setSetupRoute("clientConnect/{user}");
clientRSocketConnector.setSetupRouteVariables("myUser");
return clientRSocketConnector;
} これらのオプションのほとんど(RSocketStrategies Bean を含む)はオプションです。任意のポートでローカルに開始された RSocket サーバーに接続する方法に注意してください。setupData の使用例については、ServerRSocketConnector.clientRSocketKeyStrategy を参照してください。詳細については、ClientRSocketConnector およびその AbstractRSocketConnector スーパークラス JavaDocs も参照してください。
ClientRSocketConnector と ServerRSocketConnector は両方とも、受信 RSocket リクエストをルーティングするために、受信チャネルアダプターを path 構成にマッピングするロールを果たします。詳細については、次のセクションを参照してください。
RSocket 受信ゲートウェイ
RSocketInboundGateway は、RSocket リクエストを受信し、レスポンス (存在する場合) を生成するロールを果たします。MVC リクエストマッピングまたは @MessageMapping セマンティクスと同様のパターンである可能性のある path マッピングの配列が必要です。さらに、(バージョン 5.2.2 以降)、一連の対話モデル ( RSocketInteractionModel を参照) を RSocketInboundGateway で構成して、特定のフレーム型によってこのエンドポイントへの RSocket リクエストを制限できます。デフォルトでは、すべての対話モデルがサポートされています。そのような Bean は、その IntegrationRSocketEndpoint 実装 ( ReactiveMessageHandler の拡張) に従って、受信リクエストの内部 IntegrationRSocketMessageHandler のルーティングロジックの ServerRSocketConnector または ClientRSocketConnector によって自動検出されます。明示的なエンドポイント登録のために、AbstractRSocketConnector を RSocketInboundGateway に提供できます。このようにして、その AbstractRSocketConnector で自動検出オプションが無効になります。RSocketStrategies は、RSocketInboundGateway に注入することもできます。または、明示的な注入をオーバーライドして、提供された AbstractRSocketConnector から取得します。これらの RSocketStrategies からデコーダーを使用して、提供された requestElementType に従ってリクエストペイロードをデコードします。受信する Message で RSocketPayloadReturnValueHandler.RESPONSE_HEADER ヘッダーが提供されない場合、RSocketInboundGateway はリクエストを fireAndForget RSocket 対話モデルとして扱います。この場合、RSocketInboundGateway は outputChannel に対して単純な send 操作を実行します。それ以外の場合、RSocketPayloadReturnValueHandler.RESPONSE_HEADER ヘッダーの MonoProcessor 値は、RSocket へのレスポンスの送信に使用されます。この目的のために、RSocketInboundGateway は outputChannel で sendAndReceiveMessageReactive 操作を実行します。ダウンストリームに送信するメッセージの payload は、MessagingRSocket ロジックに従って常に Flux です。fireAndForget RSocket インタラクションモデルの場合、メッセージには変換されたプレーンな payload が含まれます。レスポンス payload は、プレーンオブジェクトまたは Publisher である可能性があります。RSocketInboundGateway は、RSocketStrategies で提供されるエンコーダーに従って、両方を適切に RSocket レスポンスに変換します。
バージョン 5.3 以降、decodeFluxAsUnit オプション (デフォルトは false) が RSocketInboundGateway に追加されています。デフォルトでは、受信 Flux は、その各イベントが個別にデコードされる方法で変換されます。これは、現在 @MessageMapping セマンティクスに存在する正確な動作です。以前の動作を復元するか、アプリケーション要件に従って Flux 全体を単一ユニットとしてデコードするには、decodeFluxAsUnit を true に設定する必要があります。ただし、ターゲットのデコードロジックは、選択した Decoder によって異なります。StringDecoder では、バイトバッファーの終わりを示すためにストリーム内に改行セパレーター (デフォルト) が存在する必要があります。
RSocketInboundGateway エンドポイントを構成し、ダウンストリームのペイロードを処理する方法のサンプルについては、Java を使用した RSocket エンドポイントの構成を参照してください。
RSocket 送信ゲートウェイ
RSocketOutboundGateway は、RSocket へのリクエストを実行し、RSocket の返信(ある場合)に基づいて返信を生成する AbstractReplyProducingMessageHandler です。低レベルの RSocket プロトコルの相互作用は、提供された ClientRSocketConnector またはサーバー側のリクエストメッセージの RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER ヘッダーから解決された RSocketRequester に委譲されます。サーバー側のターゲット RSocketRequester は、RSocketConnectedEvent から、または ServerRSocketConnector.setClientRSocketKeyStrategy() を介した接続リクエストマッピング用に選択されたいくつかのビジネスキーに従って ServerRSocketConnector.getClientRSocketRequester() API を使用して解決できます。詳細については、ServerRSocketConnector JavaDocs を参照してください。
リクエストを送信する route は、明示的に(パス変数と一緒に)、またはリクエストメッセージに対して評価される SpEL 式を介して構成する必要があります。
RSocket 対話モデルは、RSocketInteractionModel オプションまたはそれぞれの式設定を介して提供できます。デフォルトでは、一般的なゲートウェイのユースケースに requestResponse が使用されます。
リクエストメッセージペイロードが Publisher の場合、ターゲット RSocketRequester で提供される RSocketStrategies に従って要素をエンコードするために publisherElementType オプションを提供できます。このオプションの式は、ParameterizedTypeReference に評価できます。データとその型の詳細については、RSocketRequester.RequestSpec.data() JavaDocs を参照してください。
RSocket リクエストは、metadata で拡張することもできます。このために、RSocketOutboundGateway でリクエストメッセージに対する metadataExpression を構成できます。そのような式は Map<Object, MimeType> に評価される必要があります。
interactionModel が fireAndForget でない場合、expectedResponseType を指定する必要があります。デフォルトでは String.class です。このオプションの式は、ParameterizedTypeReference に評価できます。応答データとその型の詳細については、RSocketRequester.RetrieveSpec.retrieveMono() および RSocketRequester.RetrieveSpec.retrieveFlux() JavaDocs を参照してください。
RSocketOutboundGateway からの返信 payload は Mono (fireAndForget 相互作用モデルの場合でも Mono<Void>)であり、常にこのコンポーネントを async として作成します。このような Mono は、通常のチャネル用に outputChannel に生成される前にサブスクライブされるか、FluxMessageChannel によってオンデマンドで処理されます。requestStream または requestChannel 相互作用モデルの Flux レスポンスもレスポンス Mono にラップされます。パススルーサービスアクティベーターを使用して、FluxMessageChannel によってダウンストリームでフラット化できます。
@ServiceActivator(inputChannel = "rsocketReplyChannel", outputChannel ="fluxMessageChannel")
public Flux<?> flattenRSocketResponse(Flux<?> payload) {
return payload;
}または、ターゲットアプリケーションロジックで明示的にサブスクライブします。
このゲートウェイを送信チャネルアダプターとして扱う void に対して、予期されるレスポンス型を構成 (または式を介して評価) することもできます。ただし、outputChannel は、返された Mono へのサブスクリプションを開始するために (たとえ NullChannel であっても) 構成する必要があります。
RSocketOutboundGateway エンドポイントを構成してダウンストリームのペイロードを処理する方法のサンプルについては、Java を使用した RSocket エンドポイントの構成を参照してください。
RSocket 名前空間のサポート
Spring Integration は、rsocket 名前空間と対応するスキーマ定義を提供します。構成に含めるには、アプリケーションコンテキスト構成ファイルに次の名前空間宣言を追加します。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-rsocket="http://www.springframework.org/schema/integration/rsocket"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/rsocket
https://www.springframework.org/schema/integration/rsocket/spring-integration-rsocket.xsd">
...
</beans>受信
Spring Integration RSocket 受信チャネルアダプターを XML で構成するには、int-rsocket 名前空間の適切な inbound-gateway コンポーネントを使用する必要があります。次の例は、構成方法を示しています。
<int-rsocket:inbound-gateway id="inboundGateway"
path="testPath"
interaction-models="requestStream,requestChannel"
rsocket-connector="clientRSocketConnector"
request-channel="requestChannel"
rsocket-strategies="rsocketStrategies"
request-element-type="byte[]"/>ClientRSocketConnector および ServerRSocketConnector は、汎用 <bean> 定義として構成する必要があります。
送信
<int-rsocket:outbound-gateway id="outboundGateway"
client-rsocket-connector="clientRSocketConnector"
auto-startup="false"
interaction-model="fireAndForget"
route-expression="'testRoute'"
request-channel="requestChannel"
publisher-element-type="byte[]"
expected-response-type="java.util.Date"
metadata-expression="{'metadata': new org.springframework.util.MimeType('*')}"/> これらすべての XML 属性の説明については、spring-integration-rsocket.xsd を参照してください。
Java を使用した RSocket エンドポイントの構成
次の例は、Java で RSocket 受信エンドポイントを構成する方法を示しています。
@Bean
public RSocketInboundGateway rsocketInboundGatewayRequestReply() {
RSocketInboundGateway rsocketInboundGateway = new RSocketInboundGateway("echo");
rsocketInboundGateway.setRequestChannelName("requestReplyChannel");
return rsocketInboundGateway;
}
@Transformer(inputChannel = "requestReplyChannel")
public Mono<String> echoTransformation(Flux<String> payload) {
return payload.next().map(String::toUpperCase);
} この構成では、ClientRSocketConnector または ServerRSocketConnector が想定され、「エコー」パス上のそのようなエンドポイントの自動検出を意味します。RSocket リクエストを完全にリアクティブに処理し、リアクティブな応答を生成する @Transformer シグネチャーに注意してください。
次の例は、Java DSL で RSocket 受信ゲートウェイを構成する方法を示しています。
@Bean
public IntegrationFlow rsocketUpperCaseFlow() {
return IntegrationFlow
.from(RSockets.inboundGateway("/uppercase")
.interactionModels(RSocketInteractionModel.requestChannel))
.<Flux<String>, Mono<String>>transform((flux) -> flux.next().map(String::toUpperCase))
.get();
} この構成では、ClientRSocketConnector または ServerRSocketConnector が想定されており、「/uppercase”パス上のエンドポイントと、「リクエストチャネル」などの予想される相互作用モデルを自動検出することを意味します。
次の例は、Java で RSocket 送信ゲートウェイを構成する方法を示しています。
@Bean
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "replyChannel")
public RSocketOutboundGateway rsocketOutboundGateway() {
RSocketOutboundGateway rsocketOutboundGateway =
new RSocketOutboundGateway(
new FunctionExpression<Message<?>>((m) ->
m.getHeaders().get("route_header")));
rsocketOutboundGateway.setInteractionModelExpression(
new FunctionExpression<Message<?>>((m) -> m.getHeaders().get("rsocket_interaction_model")));
rsocketOutboundGateway.setClientRSocketConnector(clientRSocketConnector());
return rsocketOutboundGateway;
}setClientRSocketConnector() は、クライアント側にのみ必要です。サーバー側では、RSocketRequester 値を持つ RSocketRequesterMethodArgumentResolver.RSOCKET_REQUESTER_HEADER ヘッダーをリクエストメッセージで提供する必要があります。
次の例は、Java DSL で RSocket 送信ゲートウェイを設定する方法を示しています。
@Bean
public IntegrationFlow rsocketUpperCaseRequestFlow(ClientRSocketConnector clientRSocketConnector) {
return IntegrationFlow
.from(Function.class)
.handle(RSockets.outboundGateway("/uppercase")
.interactionModel(RSocketInteractionModel.requestResponse)
.expectedResponseType(String.class)
.clientRSocketConnector(clientRSocketConnector))
.get();
} 上記のフローの最初で、前述の Function インターフェースを使用する方法の詳細については、ゲートウェイとしての IntegrationFlow を参照してください。