RSocket
このセクションでは、Spring Framework の RSocket プロトコルのサポートについて説明します。
概要
RSocket は、TCP、WebSocket、その他のバイトストリームトランスポートを介した多重化された二重通信用のアプリケーションプロトコルであり、次の相互作用モデルのいずれかを使用します。
Request-Response
— 1 つのメッセージを送信し、1 つを受信します。Request-Stream
— 1 つのメッセージを送信し、メッセージのストリームを受信します。Channel
— メッセージのストリームを両方向に送信します。Fire-and-Forget
— 一方向のメッセージを送信します。
最初の接続が確立されると、両側が対称になり、各側が上記の相互作用のいずれかを開始できるため、「クライアント」と「サーバー」の区別が失われます。これが、プロトコルで参加側を「リクエスター」および「レスポンダー」と呼び、上記の相互作用を「リクエストストリーム」または単に「リクエスト」と呼ぶ理由です。
RSocket プロトコルの主な機能と利点は次のとおりです。
Reactive Streams (英語) セマンティクスは、
Request-Stream
およびChannel
のようなストリーミングリクエストのためにネットワーク境界を横断し、バックプレッシャーシグナルは、リクエスタとレスポンダとの間を移動し、リクエスタがソースでレスポンダの速度を低下させることを可能にし、ネットワーク層輻輳制御への依存、およびネットワークレベルまたは任意のレベルでのバッファリングの必要性を低減します。調整をリクエストする — この機能は、
LEASE
フレームにちなんで「リース」という名前が付けられています。このフレームは、各エンドから送信して、特定の時間に他のエンドが許可するリクエストの総数を制限できます。リースは定期的に更新されます。セッション再開 — これは接続が失われるように設計されており、何らかの状態を維持する必要があります。状態管理はアプリケーションに対して透過的であり、可能な場合にプロデューサーを停止し、必要な状態の量を減らすことができるバックプレッシャーと組み合わせてうまく機能します。
大きなメッセージの断片化と再組み立て。
キープアライブ(ハートビート)。
RSocket は複数の言語で実装 [GitHub] (英語) されています。Java ライブラリ [GitHub] (英語) はプロジェクト Reactor (英語) および Reactor Netty [GitHub] (英語) 上に構築されており、トランスポート用です。つまり、アプリケーションの Reactive Streams パブリッシャーからのシグナルは、RSocket を介してネットワーク全体に透過的に伝播します。
プロトコル
RSocket の利点の 1 つはそれがワイヤの上でよく定義された振舞いと何らかのプロトコル拡張 [GitHub] (英語) に伴う読みやすい仕様 (英語) を持っているということです。言語の実装や高レベルのフレームワーク API に関係なく、仕様を読むことをお勧めします。このセクションでは、コンテキストを確立するための簡潔な概要を提供します。
接続
最初に、クライアントは TCP や WebSocket などの低レベルのストリーミングトランスポートを介してサーバーに接続し、SETUP
フレームをサーバーに送信して接続のパラメーターを設定します。
サーバーは SETUP
フレームを拒否する場合がありますが、通常、送信(クライアント用)および受信(サーバー用)した後、SETUP
がリースセマンティクスを使用してリクエストの数を制限しない限り、リクエストを開始できます。どちらの側も、リクエストを許可するために、もう一方の端からの LEASE
フレームを待つ必要があります。
リクエストをする
接続が確立されると、両側でフレーム REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
または REQUEST_FNF
のいずれかを介してリクエストを開始できます。これらの各フレームは、リクエスターからレスポンダーに 1 つのメッセージを運びます。
次に、レスポンダーはレスポンスメッセージとともに PAYLOAD
フレームを返します。REQUEST_CHANNEL
の場合、リクエスターはさらに多くのリクエストメッセージを含む PAYLOAD
フレームを送信します。
リクエストに Request-Stream
や Channel
などのメッセージのストリームが含まれる場合、レスポンダーはリクエスターからのリクエストシグナルを考慮する必要があります。需要はメッセージの数として表されます。初期需要は、REQUEST_STREAM
および REQUEST_CHANNEL
フレームで指定されます。後続のリクエストは、REQUEST_N
フレームを介して通知されます。
各側は、METADATA_PUSH
フレームを介して、個々のリクエストではなく、接続全体に関するメタデータ通知も送信できます。
メッセージフォーマット
RSocket メッセージにはデータとメタデータが含まれます。メタデータは、ルートやセキュリティトークンなどを送信するために使用できます。データとメタデータは異なる形式にすることができます。それぞれの MIME 型は SETUP
フレームで宣言され、特定の接続のすべてのリクエストに適用されます。
すべてのメッセージにメタデータを含めることができますが、通常、ルートなどのメタデータはリクエストごとであるため、リクエストの最初のメッセージ、つまりフレーム REQUEST_RESPONSE
、REQUEST_STREAM
、REQUEST_CHANNEL
または REQUEST_FNF
の 1 つにのみ含まれます。
プロトコル拡張機能は、アプリケーションで使用する一般的なメタデータ形式を定義します。
複合メタデータ [GitHub] (英語) -- 独立してフォーマットされた複数のメタデータエントリ。
ルーティング [GitHub] (英語) — リクエストのルート。
Java 実装
RSocket の Java 実装 [GitHub] (英語) はプロジェクト Reactor (英語) 上に構築されています。TCP および WebSocket のトランスポートは Reactor Netty [GitHub] (英語) に基づいています。Reactive Streams ライブラリとして、Reactor はプロトコルを実装する作業を簡素化します。アプリケーションでは、宣言演算子と透過的なバックプレッシャーサポートを備えた Flux
および Mono
を使用するのが自然です。
RSocket Java の API は意図的に最小限かつ基本的なものになっています。これはプロトコルの機能に焦点を当てており、アプリケーションプログラミングモデル (例: RPC codegen とその他) はより高いレベルの独立した関心事として残されています。
メイン契約 io.rsocket.RSocket [GitHub] (英語) は、単一のメッセージの約束を表す Mono
、メッセージのストリームを表す Flux
、バイトバッファーとしてデータとメタデータにアクセスする実際のメッセージを表す io.rsocket.Payload
を使用して、4 つのリクエスト対話型をモデル化します。RSocket
契約は対称的に使用されます。リクエストの場合、アプリケーションには、リクエストを実行する RSocket
が与えられます。応答のために、アプリケーションは RSocket
を実装してリクエストを処理します。
これは、徹底的な導入を意図したものではありません。ほとんどの場合、Spring アプリケーションはその API を直接使用する必要はありません。ただし、Spring に依存しない RSocket を確認または実験することが重要な場合があります。RSocket Java リポジトリには、API とプロトコルの機能を示す多数のサンプルアプリ [GitHub] (英語) が含まれています。
Spring サポート
spring-messaging
モジュールには次のものが含まれます。
RSocketRequester — データとメタデータのエンコード / デコードを使用して
io.rsocket.RSocket
を介してリクエストを行うための流れるような API。アノテーション付きレスポンダー —
@MessageMapping
および@RSocketExchange
は、応答するためのアノテーション付きハンドラーメソッドです。RSocket インターフェース — リクエスターまたはレスポンダーとして使用するための、
@RSocketExchange
メソッドを備えた Java インターフェースとしての RSocket サービス宣言。
spring-web
モジュールには、Jackson CBOR/JSON などの Encoder
および Decoder
実装、および RSocket アプリケーションが必要とする可能性が高い Protobuf が含まれています。また、効率的なルートマッチングのためにプラグインできる PathPatternParser
も含まれています。
Spring Boot 2.2 は、TCP または WebSocket を介した RSocket サーバーの立ち上げをサポートしています。これには、WebFlux サーバーで WebSocket を介して RSocket を公開するオプションも含まれます。RSocketRequester.Builder
および RSocketStrategies
のクライアントサポートと自動構成もあります。詳細については、Spring Boot リファレンスの RSocket セクションを参照してください。
Spring Security 5.2 は RSocket サポートを提供します。
Spring Integration 5.2 は、RSocket クライアントおよびサーバーと対話するための受信および送信ゲートウェイを提供します。詳細については、Spring Integration リファレンスマニュアルを参照してください。
Spring Cloud Gateway は RSocket 接続をサポートしています。
RSocketRequester
RSocketRequester
は、RSocket リクエストを実行するための流れるような API を提供し、低レベルのデータバッファーではなく、データとメタデータのオブジェクトを受け入れて返します。対称的に使用して、クライアントからリクエストを作成したり、サーバーからリクエストを作成したりできます。
クライアントリクエスター
クライアント側で RSocketRequester
を取得するには、サーバーに接続します。これには、接続設定を含む RSocket SETUP
フレームの送信が含まれます。RSocketRequester
は、SETUP
フレームの接続設定を含む io.rsocket.core.RSocketConnector
の準備に役立つビルダーを提供します。
これは、デフォルト設定で接続する最も基本的な方法です。
Java
Kotlin
RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);
URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)
URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)
上記はすぐには接続されません。リクエストが行われると、共有接続が透過的に確立されて使用されます。
接続設定
RSocketRequester.Builder
は、初期 SETUP
フレームをカスタマイズするために以下を提供します。
dataMimeType(MimeType)
— 接続上のデータの MIME 型を設定します。metadataMimeType(MimeType)
— 接続のメタデータの MIME 型を設定します。setupData(Object)
—SETUP
に含めるデータ。setupRoute(String, Object…)
—SETUP
に含めるメタデータのルート。setupMetadata(Object, MimeType)
—SETUP
に含める他のメタデータ。
データの場合、デフォルトの MIME 型は最初に構成された Decoder
から派生します。メタデータの場合、デフォルトの MIME 型は複合メタデータ [GitHub] (英語) であり、リクエストごとに複数のメタデータ値と MIME 型のペアを許可します。通常、両方を変更する必要はありません。
SETUP
フレームのデータとメタデータはオプションです。サーバー側では、@ConnectMapping メソッドを使用して、接続の開始と SETUP
フレームのコンテンツを処理できます。メタデータは、接続レベルのセキュリティに使用できます。
戦略
RSocketRequester.Builder
は、RSocketStrategies
を受け入れてリクエスターを構成します。これを使用して、データとメタデータ値の(de)-serialization 用のエンコーダーとデコーダーを提供する必要があります。デフォルトでは、String
、byte[]
、ByteBuffer
用の spring-core
の基本コーデックのみが登録されます。spring-web
を追加すると、次のように登録できるその他の機能にアクセスできます。
Java
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.build();
RSocketRequester requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.build()
val requester = RSocketRequester.builder()
.rsocketStrategies(strategies)
.tcp("localhost", 7000)
RSocketStrategies
は再利用のために設計されています。いくつかのシナリオでは、たとえば同じアプリケーションのクライアントとサーバーの場合、Spring 構成で宣言することをお勧めします。
クライアントレスポンダー
RSocketRequester.Builder
を使用して、サーバーからのリクエストに対するレスポンダーを構成できます。
サーバーで使用されているものと同じインフラストラクチャに基づいてクライアント側の応答にアノテーション付きハンドラーを使用できますが、次のようにプログラムで登録します。
Java
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.routeMatcher(new PathPatternRouteMatcher()) (1)
.build();
SocketAcceptor responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(responder)) (3)
.tcp("localhost", 7000);
1 | spring-web が存在する場合は、効率的なルートマッチングのために PathPatternRouteMatcher を使用します。 |
2 | @MessageMapping および / または @ConnectMapping メソッドを使用して、クラスからレスポンダを作成します。 |
3 | レスポンダを登録します。 |
val strategies = RSocketStrategies.builder()
.routeMatcher(PathPatternRouteMatcher()) (1)
.build()
val responder =
RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(responder) } (3)
.tcp("localhost", 7000)
1 | spring-web が存在する場合は、効率的なルートマッチングのために PathPatternRouteMatcher を使用します。 |
2 | @MessageMapping および / または @ConnectMapping メソッドを使用して、クラスからレスポンダを作成します。 |
3 | レスポンダを登録します。 |
上記は、クライアントレスポンダのプログラムによる登録用に設計されたショートカットにすぎないことに注意してください。クライアントレスポンダーが Spring 構成にある代替シナリオの場合、RSocketMessageHandler
を Spring Bean として宣言し、次のように適用できます。
Java
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> connector.acceptor(handler.responder()))
.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val requester = RSocketRequester.builder()
.rsocketConnector { it.acceptor(handler.responder()) }
.tcp("localhost", 7000)
上記の場合、RSocketMessageHandler
で setHandlerPredicate
を使用して、クライアントレスポンダーを検出するための別の戦略に切り替える必要がある場合もあります。@RSocketClientResponder
などのカスタムアノテーションとデフォルトの @Controller
に基づいています。これは、クライアントとサーバー、または同じアプリケーション内の複数のクライアントを使用するシナリオで必要です。
プログラミングモデルの詳細については、アノテーション付きレスポンダーも参照してください。
拡張
RSocketRequesterBuilder
は、基になる io.rsocket.core.RSocketConnector
を公開するためのコールバックを提供し、キープアライブインターバル、セッション再開、インターセプターなどの詳細設定オプションを提供します。次のように、そのレベルでオプションを構成できます。
Java
Kotlin
RSocketRequester requester = RSocketRequester.builder()
.rsocketConnector(connector -> {
// ...
})
.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
.rsocketConnector {
//...
}
.tcp("localhost", 7000)
サーバーリクエスター
サーバーから接続されたクライアントにリクエストを行うことは、サーバーから接続されたクライアントのリクエスターを取得することです。
アノテーション付きレスポンダーでは、@ConnectMapping
および @MessageMapping
メソッドは RSocketRequester
引数をサポートします。これを使用して、接続のリクエスターにアクセスします。@ConnectMapping
メソッドは本質的に SETUP
フレームのハンドラーであり、リクエストを開始する前に処理する必要があることに注意してください。そのため、最初のリクエストは処理から分離する必要があります。例:
Java
Kotlin
@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
requester.route("status").data("5")
.retrieveFlux(StatusReport.class)
.subscribe(bar -> { (1)
// ...
});
return ... (2)
}
1 | 処理とは無関係に、リクエストを非同期的に開始します。 |
2 | 処理を実行し、完了 Mono<Void> を返します。 |
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
GlobalScope.launch {
requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
// ...
}
}
/// ... (2)
}
1 | 処理とは無関係に、リクエストを非同期的に開始します。 |
2 | サスペンド機能で処理してください。 |
要求
Java
Kotlin
ViewBox viewBox = ... ;
Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlux(AirportLocation.class); (3)
1 | リクエストメッセージのメタデータに含めるルートを指定します。 |
2 | リクエストメッセージのデータを提供します。 |
3 | 予想されるレスポンスを宣言します。 |
val viewBox: ViewBox = ...
val locations = requester.route("locate.radars.within") (1)
.data(viewBox) (2)
.retrieveFlow<AirportLocation>() (3)
1 | リクエストメッセージのメタデータに含めるルートを指定します。 |
2 | リクエストメッセージのデータを提供します。 |
3 | 予想されるレスポンスを宣言します。 |
相互作用型は、入力と出力のカーディナリティから暗黙的に決定されます。上記の例は、1 つの値が送信され、値のストリームが受信されるため、Request-Stream
です。ほとんどの場合、入力と出力の選択が RSocket インタラクションの種類と、レスポンダーが期待する入力と出力の種類と一致する限り、これについて考える必要はありません。無効な組み合わせの唯一の例は、多対 1 です。
data(Object)
メソッドは、Flux
および Mono
を含む Reactive Streams Publisher
、および ReactiveAdapterRegistry
に登録されている値のその他のプロデューサーも受け入れます。同じ型の値を生成する Flux
などの複数値 Publisher
の場合、オーバーロードされた data
メソッドのいずれかを使用して、すべての要素の型チェックと Encoder
ルックアップを回避することを検討してください。
data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);
data(Object)
ステップはオプションです。データを送信しないリクエストの場合はスキップします。
Java
Kotlin
Mono<AirportLocation> location = requester.route("find.radar.EWR"))
.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait
val location = requester.route("find.radar.EWR")
.retrieveAndAwait<AirportLocation>()
複合メタデータ [GitHub] (英語) (デフォルト)を使用し、値が登録済み Encoder
でサポートされている場合、追加のメタデータ値を追加できます。例:
Java
Kotlin
String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");
Flux<AirportLocation> locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow
val requester: RSocketRequester = ...
val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")
val locations = requester.route("locate.radars.within")
.metadata(securityToken, mimeType)
.data(viewBox)
.retrieveFlow<AirportLocation>()
Fire-and-Forget
の場合、Mono<Void>
を返す send()
メソッドを使用します。Mono
は、メッセージが正常に送信されたことのみを示し、処理されたことを示すものではないことに注意してください。
Metadata-Push
の場合、Mono<Void>
戻り値を指定して sendMetadata()
メソッドを使用します。
アノテーション付きレスポンダー
RSocket レスポンダーは、@MessageMapping
および @ConnectMapping
メソッドとして実装できます。@MessageMapping
メソッドは個々のリクエストを処理し、@ConnectMapping
メソッドは接続レベルのイベント(セットアップとメタデータプッシュ)を処理します。アノテーション付きのレスポンダーは、サーバー側からの応答とクライアント側からの応答のために対称的にサポートされます。
サーバーレスポンダー
サーバー側でアノテーション付きのレスポンダーを使用するには、RSocketMessageHandler
を Spring 構成に追加して、@MessageMapping
および @ConnectMapping
メソッドで @Controller
Bean を検出します。
Java
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.routeMatcher(new PathPatternRouteMatcher());
return handler;
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
routeMatcher = PathPatternRouteMatcher()
}
}
次に、Java RSocket API を使用して RSocket サーバーを起動し、レスポンダーの RSocketMessageHandler
を次のように接続します。
Java
Kotlin
ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);
CloseableChannel server =
RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.block();
import org.springframework.beans.factory.getBean
val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()
val server = RSocketServer.create(handler.responder())
.bind(TcpServerTransport.create("localhost", 7000))
.awaitSingle()
RSocketMessageHandler
は、デフォルトで複合 [GitHub] (英語) メタデータとルーティング [GitHub] (英語) メタデータをサポートしています。別の MIME 型に切り替えるか、追加のメタデータ MIME 型を登録する必要がある場合は、MetadataExtractor を設定できます。
サポートするメタデータおよびデータ形式に必要な Encoder
および Decoder
インスタンスを設定する必要があります。コーデックの実装には、spring-web
モジュールが必要になる可能性があります。
デフォルトでは、SimpleRouteMatcher
は AntPathMatcher
を介したルートのマッチングに使用されます。効率的なルートマッチングのために、spring-web
から PathPatternRouteMatcher
を差し込むことをお勧めします。RSocket ルートは階層化できますが、URL パスではありません。両方のルートマッチャーが "." を使用するように構成されています。デフォルトではセパレータとして使用され、HTTP URL のような URL デコードはありません。
RSocketMessageHandler
は、同じプロセスでクライアントとサーバー間で構成を共有する必要がある場合に便利な RSocketStrategies
を介して構成できます。
Java
Kotlin
@Configuration
static class ServerConfig {
@Bean
public RSocketMessageHandler rsocketMessageHandler() {
RSocketMessageHandler handler = new RSocketMessageHandler();
handler.setRSocketStrategies(rsocketStrategies());
return handler;
}
@Bean
public RSocketStrategies rsocketStrategies() {
return RSocketStrategies.builder()
.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
.routeMatcher(new PathPatternRouteMatcher())
.build();
}
}
@Configuration
class ServerConfig {
@Bean
fun rsocketMessageHandler() = RSocketMessageHandler().apply {
rSocketStrategies = rsocketStrategies()
}
@Bean
fun rsocketStrategies() = RSocketStrategies.builder()
.encoders { it.add(Jackson2CborEncoder()) }
.decoders { it.add(Jackson2CborDecoder()) }
.routeMatcher(PathPatternRouteMatcher())
.build()
}
クライアントレスポンダー
クライアント側のアノテーション付き応答者は、RSocketRequester.Builder
で構成する必要があります。詳細については、クライアントレスポンダーを参照してください。
@MessageMapping
Java
Kotlin
@Controller
public class RadarsController {
@MessageMapping("locate.radars.within")
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
@Controller
class RadarsController {
@MessageMapping("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
上記の @MessageMapping
メソッドは、ルート "locate.radars.within" を持つ Request-Stream インタラクションに応答します。次のメソッド引数を使用するオプションを備えた柔軟なメソッドシグネチャーをサポートします。
メソッド引数 | 説明 |
---|---|
| リクエストのペイロード。これは、 注意 : アノテーションの使用はオプションです。単純型ではなく、サポートされている他の引数のいずれでもないメソッド引数は、予期されるペイロードと見なされます。 |
| リモートの終了をリクエストするためのリクエスター。 |
| マッピングパターンの変数に基づいてルートから抽出された値。 |
| MetadataExtractor で説明されている抽出のために登録されたメタデータ値。 |
| MetadataExtractor に従って、抽出のために登録されたすべてのメタデータ値。 |
戻り値は、レスポンスペイロードとして直列化される 1 つ以上のオブジェクトであると予想されます。これは、Mono
または Flux
のような非同期型、具体的な値、void
または Mono<Void>
などの値のない非同期型のいずれかです。
@MessageMapping
メソッドがサポートする RSocket インタラクション型は、入力(つまり、@Payload
引数)と出力のカーディナリティから決定されます。カーディナリティは次のことを意味します。
基数 | 説明 |
---|---|
1 | 明示的な値、または |
多 |
|
0 | 入力の場合、これはメソッドに 出力の場合、これは |
以下の表は、すべての入力および出力カーディナリティーの組み合わせと、対応する相互作用型を示しています。
入力カーディナリティ | 出力カーディナリティ | インタラクション型 |
---|---|---|
0, 1 | 0 | 一方向メッセージ、リクエストとレスポンス |
0, 1 | 1 | リクエストとレスポンス |
0, 1 | 多 | リクエストストリーム |
多 | 0, 1、多数 | リクエストチャネル |
@RSocketExchange
@MessageMapping
の代わりに、@RSocketExchange
メソッドを使用してリクエストを処理することもできます。このようなメソッドは RSocket インターフェースで宣言され、RSocketServiceProxyFactory
を介してリクエスターとして使用することも、レスポンダーによって実装することもできます。
例: レスポンダーとしてリクエストを処理するには:
Java
Kotlin
public interface RadarsService {
@RSocketExchange("locate.radars.within")
Flux<AirportLocation> radars(MapRequest request);
}
@Controller
public class RadarsController implements RadarsService {
public Flux<AirportLocation> radars(MapRequest request) {
// ...
}
}
interface RadarsService {
@RSocketExchange("locate.radars.within")
fun radars(request: MapRequest): Flow<AirportLocation>
}
@Controller
class RadarsController : RadarsService {
override fun radars(request: MapRequest): Flow<AirportLocation> {
// ...
}
}
@RSocketExhange
と @MessageMapping
の間にはいくつかの違いがあります。これは、@RSocketExhange
がリクエスターとレスポンダーの使用に適したものである必要があるためです。例: @MessageMapping
は任意の数のルートを処理するように宣言でき、各ルートはパターンにすることができますが、@RSocketExchange
は単一の具体的なルートで宣言する必要があります。メタデータに関連してサポートされるメソッドパラメーターにも小さな違いがあります。サポートされるパラメーターのリストについては、@MessageMapping および RSocket インターフェースを参照してください。
@RSocketExchange
を型 レベルで使用すると、特定の RSocket サービスインターフェースのすべてのルートに共通のプレフィックスを指定できます。
@ConnectMapping
@ConnectMapping
は RSocket 接続の開始時に SETUP
フレームを処理し、後続のメタデータは METADATA_PUSH
フレーム、つまり io.rsocket.RSocket
の metadataPush(Payload)
を介して通知をプッシュします。
@ConnectMapping
メソッドは、@MessageMapping と同じ引数をサポートしていますが、SETUP
および METADATA_PUSH
フレームからのメタデータとデータに基づいています。@ConnectMapping
には、メタデータにルートを持つ特定の接続に処理を絞り込むパターンを設定できます。パターンが宣言されていない場合は、すべての接続が一致します。
@ConnectMapping
メソッドはデータを返すことができず、戻り値として void
または Mono<Void>
を使用して宣言する必要があります。新しい接続に対して処理がエラーを返した場合、接続は拒否されます。接続のために RSocketRequester
にリクエストを行うために、処理を遅らせてはなりません。詳細については、サーバーリクエスターを参照してください。
MetadataExtractor
応答者はメタデータを解釈する必要があります。複合メタデータ [GitHub] (英語) では、それぞれ独自の MIME 型を使用して、個別にフォーマットされたメタデータ値(ルーティング、セキュリティ、トレースなど)を使用できます。アプリケーションには、サポートするメタデータ MIME 型を構成する方法と、抽出された値にアクセスする方法が必要です。
MetadataExtractor
は、直列化されたメタデータを取得し、デコードされた名前と値のペアを返す契約です。名前付きのヘッダーのように、たとえばアノテーション付きハンドラーメソッドの @Header
を介してアクセスできます。
DefaultMetadataExtractor
に Decoder
インスタンスを与えて、メタデータをデコードできます。すぐに使用できる「メッセージ /x.rsocket.routing.v0」 [GitHub] (英語) の組み込みサポートがあり、String
にデコードして「ルート」キーに保存します。その他の MIME 型の場合は、Decoder
を提供して、MIME 型を次のように登録する必要があります。
Java
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")
複合メタデータは、独立したメタデータ値を組み合わせるのに適しています。ただし、リクエスターは複合メタデータをサポートしていないか、使用しないことを選択する場合があります。このため、DefaultMetadataExtractor
は、デコードされた値を出力マップにマップするカスタムロジックを必要とする場合があります。JSON がメタデータに使用される例を次に示します。
Java
Kotlin
DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
MimeType.valueOf("application/vnd.myapp.metadata+json"),
new ParameterizedTypeReference<Map<String,String>>() {},
(jsonMap, outputMap) -> {
outputMap.putAll(jsonMap);
});
import org.springframework.messaging.rsocket.metadataToExtract
val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
outputMap.putAll(jsonMap)
}
MetadataExtractor
から RSocketStrategies
を構成する場合、RSocketStrategies.Builder
に構成済みのデコーダーを使用して抽出プログラムを作成させ、コールバックを使用して登録を次のようにカスタマイズできます。
Java
Kotlin
RSocketStrategies strategies = RSocketStrategies.builder()
.metadataExtractorRegistry(registry -> {
registry.metadataToExtract(fooMimeType, Foo.class, "foo");
// ...
})
.build();
import org.springframework.messaging.rsocket.metadataToExtract
val strategies = RSocketStrategies.builder()
.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
registry.metadataToExtract<Foo>(fooMimeType, "foo")
// ...
}
.build()
RSocket インターフェース
Spring Framework を使用すると、RSocket サービスを @RSocketExchange
メソッドを使用した Java インターフェースとして定義できます。このようなインターフェースを RSocketServiceProxyFactory
に渡して、RSocketRequester 経由でリクエストを実行するプロキシを作成できます。リクエストを処理するレスポンダーとしてインターフェースを実装することもできます。
まず、@RSocketExchange
メソッドを使用してインターフェースを作成します。
interface RadarService {
@RSocketExchange("radars")
Flux<AirportLocation> getRadars(@Payload MapRequest request);
// more RSocket exchange methods...
}
これで、メソッドが呼び出されたときにリクエストを実行するプロキシを作成できます。
RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();
RadarService service = factory.createClient(RadarService.class);
レスポンダーとしてリクエストを処理するインターフェースを実装することもできます。アノテーション付きレスポンダーを参照してください。
メソッドパラメーター
アノテーション付きの RSocket 交換メソッドは、次のメソッドパラメーターを使用して柔軟なメソッドシグネチャーをサポートします。
メソッド引数 | 説明 |
---|---|
| ルート内のテンプレートプレースホルダーを展開するために、 |
| リクエストの入力ペイロードを設定します。これは、具体的な値、または |
| 入力ペイロードのメタデータエントリの値。これは、次の引数がメタデータエントリ |
| メタデータエントリの |
戻り値
アノテーション付きの RSocket 交換メソッドは、具体的な値である戻り値、または ReactiveAdapterRegistry
を介して Reactive Streams Publisher
に適合できる値のプロデューサーをサポートします。
デフォルトでは、同期 (ブロッキング) メソッドシグネチャーを持つ RSocket サービスメソッドの動作は、基になる RSocket ClientTransport
のレスポンスタイムアウト設定および RSocket キープアライブ設定に依存します。RSocketServiceProxyFactory.Builder
は、レスポンスをブロックする最大時間を構成できる blockTimeout
オプションを公開していますが、より詳細に制御するには、RSocket レベルでタイムアウト値を構成することをお勧めします。