RSocket

このセクションでは、Spring Framework の RSocket プロトコルのサポートについて説明します。

概要

RSocket は、TCP、WebSocket、その他のバイトストリームトランスポートを介した多重化された二重通信用のアプリケーションプロトコルであり、次の相互作用モデルのいずれかを使用します。

  • Request-Response — 1 つのメッセージを送信し、1 つを受信します。

  • Request-Stream — 1 つのメッセージを送信し、メッセージのストリームを受信します。

  • Channel — メッセージのストリームを両方向に送信します。

  • Fire-and-Forget — 一方向のメッセージを送信します。

最初の接続が確立されると、両側が対称になり、各側が上記の相互作用のいずれかを開始できるため、「クライアント」と「サーバー」の区別が失われます。これが、プロトコルで参加側を「リクエスター」および「レスポンダー」と呼び、上記の相互作用を「リクエストストリーム」または単に「リクエスト」と呼ぶ理由です。

RSocket プロトコルの主な機能と利点は次のとおりです。

  • Reactive Streams (英語) セマンティクスは、Request-Stream および Channel のようなストリーミングリクエストのためにネットワーク境界を横断し、バックプレッシャーシグナルは、リクエスタとレスポンダとの間を移動し、リクエスタがソースでレスポンダの速度を低下させることを可能にし、ネットワーク層輻輳制御への依存、およびネットワークレベルまたは任意のレベルでのバッファリングの必要性を低減します。

  • 調整をリクエストする — この機能は、LEASE フレームにちなんで「リース」という名前が付けられています。このフレームは、各エンドから送信して、特定の時間に他のエンドが許可するリクエストの総数を制限できます。リースは定期的に更新されます。

  • セッション再開 — これは接続が失われるように設計されており、何らかの状態を維持する必要があります。状態管理はアプリケーションに対して透過的であり、可能な場合にプロデューサーを停止し、必要な状態の量を減らすことができるバックプレッシャーと組み合わせてうまく機能します。

  • 大きなメッセージの断片化と再組み立て。

  • キープアライブ(ハートビート)。

RSocket は複数の言語で実装 [GitHub] (英語) されています。Java ライブラリ [GitHub] (英語) プロジェクト Reactor (英語) および Reactor Netty [GitHub] (英語) 上に構築されており、トランスポート用です。つまり、アプリケーションの Reactive Streams パブリッシャーからのシグナルは、RSocket を介してネットワーク全体に透過的に伝播します。

プロトコル

RSocket の利点の 1 つはそれがワイヤの上でよく定義された振舞いと何らかのプロトコル拡張 [GitHub] (英語) に伴う読みやすい仕様 (英語) を持っているということです。言語の実装や高レベルのフレームワーク API に関係なく、仕様を読むことをお勧めします。このセクションでは、コンテキストを確立するための簡潔な概要を提供します。

接続

最初に、クライアントは TCP や WebSocket などの低レベルのストリーミングトランスポートを介してサーバーに接続し、SETUP フレームをサーバーに送信して接続のパラメーターを設定します。

サーバーは SETUP フレームを拒否する場合がありますが、通常、送信(クライアント用)および受信(サーバー用)した後、SETUP がリースセマンティクスを使用してリクエストの数を制限しない限り、リクエストを開始できます。どちらの側も、リクエストを許可するために、もう一方の端からの LEASE フレームを待つ必要があります。

リクエストをする

接続が確立されると、両側でフレーム REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNEL または REQUEST_FNF のいずれかを介してリクエストを開始できます。これらの各フレームは、リクエスターからレスポンダーに 1 つのメッセージを運びます。

次に、レスポンダーはレスポンスメッセージとともに PAYLOAD フレームを返します。REQUEST_CHANNEL の場合、リクエスターはさらに多くのリクエストメッセージを含む PAYLOAD フレームを送信します。

リクエストに Request-Stream や Channel などのメッセージのストリームが含まれる場合、レスポンダーはリクエスターからのリクエストシグナルを考慮する必要があります。需要はメッセージの数として表されます。初期需要は、REQUEST_STREAM および REQUEST_CHANNEL フレームで指定されます。後続のリクエストは、REQUEST_N フレームを介して通知されます。

各側は、METADATA_PUSH フレームを介して、個々のリクエストではなく、接続全体に関するメタデータ通知も送信できます。

メッセージフォーマット

RSocket メッセージにはデータとメタデータが含まれます。メタデータは、ルートやセキュリティトークンなどを送信するために使用できます。データとメタデータは異なる形式にすることができます。それぞれの MIME 型は SETUP フレームで宣言され、特定の接続のすべてのリクエストに適用されます。

すべてのメッセージにメタデータを含めることができますが、通常、ルートなどのメタデータはリクエストごとであるため、リクエストの最初のメッセージ、つまりフレーム REQUEST_RESPONSEREQUEST_STREAMREQUEST_CHANNEL または REQUEST_FNF の 1 つにのみ含まれます。

プロトコル拡張機能は、アプリケーションで使用する一般的なメタデータ形式を定義します。

Java 実装

RSocket の Java 実装 [GitHub] (英語) プロジェクト Reactor (英語) 上に構築されています。TCP および WebSocket のトランスポートは Reactor Netty [GitHub] (英語) に基づいています。Reactive Streams ライブラリとして、Reactor はプロトコルを実装する作業を簡素化します。アプリケーションでは、宣言演算子と透過的なバックプレッシャーサポートを備えた Flux および Mono を使用するのが自然です。

RSocket Java の API は意図的に最小限かつ基本的なものになっています。これはプロトコルの機能に焦点を当てており、アプリケーションプログラミングモデル (例: RPC codegen とその他) はより高いレベルの独立した関心事として残されています。

メイン契約 io.rsocket.RSocket [GitHub] (英語) は、単一のメッセージの約束を表す Mono、メッセージのストリームを表す Flux、バイトバッファーとしてデータとメタデータにアクセスする実際のメッセージを表す io.rsocket.Payload を使用して、4 つのリクエスト対話型をモデル化します。RSocket 契約は対称的に使用されます。リクエストの場合、アプリケーションには、リクエストを実行する RSocket が与えられます。応答のために、アプリケーションは RSocket を実装してリクエストを処理します。

これは、徹底的な導入を意図したものではありません。ほとんどの場合、Spring アプリケーションはその API を直接使用する必要はありません。ただし、Spring に依存しない RSocket を確認または実験することが重要な場合があります。RSocket Java リポジトリには、API とプロトコルの機能を示す多数のサンプルアプリ [GitHub] (英語) が含まれています。

Spring サポート

spring-messaging モジュールには次のものが含まれます。

  • RSocketRequester — データとメタデータのエンコード / デコードを使用して io.rsocket.RSocket を介してリクエストを行うための流れるような API。

  • アノテーション付きレスポンダー  — @MessageMapping および @RSocketExchange は、応答するためのアノテーション付きハンドラーメソッドです。

  • RSocket インターフェース  — リクエスターまたはレスポンダーとして使用するための、@RSocketExchange メソッドを備えた Java インターフェースとしての RSocket サービス宣言。

spring-web モジュールには、Jackson CBOR/JSON などの Encoder および Decoder 実装、および RSocket アプリケーションが必要とする可能性が高い Protobuf が含まれています。また、効率的なルートマッチングのためにプラグインできる PathPatternParser も含まれています。

Spring Boot 2.2 は、TCP または WebSocket を介した RSocket サーバーの立ち上げをサポートしています。これには、WebFlux サーバーで WebSocket を介して RSocket を公開するオプションも含まれます。RSocketRequester.Builder および RSocketStrategies のクライアントサポートと自動構成もあります。詳細については、Spring Boot リファレンスの RSocket セクションを参照してください。

Spring Security 5.2 は RSocket サポートを提供します。

Spring Integration 5.2 は、RSocket クライアントおよびサーバーと対話するための受信および送信ゲートウェイを提供します。詳細については、Spring Integration リファレンスマニュアルを参照してください。

Spring Cloud Gateway は RSocket 接続をサポートしています。

RSocketRequester

RSocketRequester は、RSocket リクエストを実行するための流れるような API を提供し、低レベルのデータバッファーではなく、データとメタデータのオブジェクトを受け入れて返します。対称的に使用して、クライアントからリクエストを作成したり、サーバーからリクエストを作成したりできます。

クライアントリクエスター

クライアント側で RSocketRequester を取得するには、サーバーに接続します。これには、接続設定を含む RSocket SETUP フレームの送信が含まれます。RSocketRequester は、SETUP フレームの接続設定を含む io.rsocket.core.RSocketConnector の準備に役立つビルダーを提供します。

これは、デフォルト設定で接続する最も基本的な方法です。

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder().tcp("localhost", 7000);

URI url = URI.create("https://example.org:8080/rsocket");
RSocketRequester requester = RSocketRequester.builder().webSocket(url);
val requester = RSocketRequester.builder().tcp("localhost", 7000)

URI url = URI.create("https://example.org:8080/rsocket");
val requester = RSocketRequester.builder().webSocket(url)

上記はすぐには接続されません。リクエストが行われると、共有接続が透過的に確立されて使用されます。

接続設定

RSocketRequester.Builder は、初期 SETUP フレームをカスタマイズするために以下を提供します。

  • dataMimeType(MimeType) — 接続上のデータの MIME 型を設定します。

  • metadataMimeType(MimeType) — 接続のメタデータの MIME 型を設定します。

  • setupData(Object) —  SETUP に含めるデータ。

  • setupRoute(String, Object…​) —  SETUP に含めるメタデータのルート。

  • setupMetadata(Object, MimeType) —  SETUP に含める他のメタデータ。

データの場合、デフォルトの MIME 型は最初に構成された Decoder から派生します。メタデータの場合、デフォルトの MIME 型は複合メタデータ [GitHub] (英語) であり、リクエストごとに複数のメタデータ値と MIME 型のペアを許可します。通常、両方を変更する必要はありません。

SETUP フレームのデータとメタデータはオプションです。サーバー側では、@ConnectMapping メソッドを使用して、接続の開始と SETUP フレームのコンテンツを処理できます。メタデータは、接続レベルのセキュリティに使用できます。

戦略

RSocketRequester.Builder は、RSocketStrategies を受け入れてリクエスターを構成します。これを使用して、データとメタデータ値の(de)-serialization 用のエンコーダーとデコーダーを提供する必要があります。デフォルトでは、Stringbyte[]ByteBuffer 用の spring-core の基本コーデックのみが登録されます。spring-web を追加すると、次のように登録できるその他の機能にアクセスできます。

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
	.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
	.build();

RSocketRequester requester = RSocketRequester.builder()
	.rsocketStrategies(strategies)
	.tcp("localhost", 7000);
val strategies = RSocketStrategies.builder()
		.encoders { it.add(Jackson2CborEncoder()) }
		.decoders { it.add(Jackson2CborDecoder()) }
		.build()

val requester = RSocketRequester.builder()
		.rsocketStrategies(strategies)
		.tcp("localhost", 7000)

RSocketStrategies は再利用のために設計されています。いくつかのシナリオでは、たとえば同じアプリケーションのクライアントとサーバーの場合、Spring 構成で宣言することをお勧めします。

クライアントレスポンダー

RSocketRequester.Builder を使用して、サーバーからのリクエストに対するレスポンダーを構成できます。

サーバーで使用されているものと同じインフラストラクチャに基づいてクライアント側の応答にアノテーション付きハンドラーを使用できますが、次のようにプログラムで登録します。

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.routeMatcher(new PathPatternRouteMatcher())  (1)
	.build();

SocketAcceptor responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(responder)) (3)
	.tcp("localhost", 7000);
1spring-web が存在する場合は、効率的なルートマッチングのために PathPatternRouteMatcher を使用します。
2@MessageMapping および / または @ConnectMapping メソッドを使用して、クラスからレスポンダを作成します。
3 レスポンダを登録します。
val strategies = RSocketStrategies.builder()
		.routeMatcher(PathPatternRouteMatcher())  (1)
		.build()

val responder =
	RSocketMessageHandler.responder(strategies, new ClientHandler()); (2)

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(responder) } (3)
		.tcp("localhost", 7000)
1spring-web が存在する場合は、効率的なルートマッチングのために PathPatternRouteMatcher を使用します。
2@MessageMapping および / または @ConnectMapping メソッドを使用して、クラスからレスポンダを作成します。
3 レスポンダを登録します。

上記は、クライアントレスポンダのプログラムによる登録用に設計されたショートカットにすぎないことに注意してください。クライアントレスポンダーが Spring 構成にある代替シナリオの場合、RSocketMessageHandler を Spring Bean として宣言し、次のように適用できます。

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> connector.acceptor(handler.responder()))
	.tcp("localhost", 7000);
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val requester = RSocketRequester.builder()
		.rsocketConnector { it.acceptor(handler.responder()) }
		.tcp("localhost", 7000)

上記の場合、RSocketMessageHandler で setHandlerPredicate を使用して、クライアントレスポンダーを検出するための別の戦略に切り替える必要がある場合もあります。@RSocketClientResponder などのカスタムアノテーションとデフォルトの @Controller に基づいています。これは、クライアントとサーバー、または同じアプリケーション内の複数のクライアントを使用するシナリオで必要です。

プログラミングモデルの詳細については、アノテーション付きレスポンダーも参照してください。

拡張

RSocketRequesterBuilder は、基になる io.rsocket.core.RSocketConnector を公開するためのコールバックを提供し、キープアライブインターバル、セッション再開、インターセプターなどの詳細設定オプションを提供します。次のように、そのレベルでオプションを構成できます。

  • Java

  • Kotlin

RSocketRequester requester = RSocketRequester.builder()
	.rsocketConnector(connector -> {
		// ...
	})
	.tcp("localhost", 7000);
val requester = RSocketRequester.builder()
		.rsocketConnector {
			//...
		}
		.tcp("localhost", 7000)

サーバーリクエスター

サーバーから接続されたクライアントにリクエストを行うことは、サーバーから接続されたクライアントのリクエスターを取得することです。

アノテーション付きレスポンダーでは、@ConnectMapping および @MessageMapping メソッドは RSocketRequester 引数をサポートします。これを使用して、接続のリクエスターにアクセスします。@ConnectMapping メソッドは本質的に SETUP フレームのハンドラーであり、リクエストを開始する前に処理する必要があることに注意してください。そのため、最初のリクエストは処理から分離する必要があります。例:

  • Java

  • Kotlin

@ConnectMapping
Mono<Void> handle(RSocketRequester requester) {
	requester.route("status").data("5")
		.retrieveFlux(StatusReport.class)
		.subscribe(bar -> { (1)
			// ...
		});
	return ... (2)
}
1 処理とは無関係に、リクエストを非同期的に開始します。
2 処理を実行し、完了 Mono<Void> を返します。
@ConnectMapping
suspend fun handle(requester: RSocketRequester) {
	GlobalScope.launch {
		requester.route("status").data("5").retrieveFlow<StatusReport>().collect { (1)
			// ...
		}
	}
	/// ... (2)
}
1 処理とは無関係に、リクエストを非同期的に開始します。
2 サスペンド機能で処理してください。

要求

クライアントまたはサーバーのリクエスターを取得したら、次のようにリクエストを作成できます。

  • Java

  • Kotlin

ViewBox viewBox = ... ;

Flux<AirportLocation> locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlux(AirportLocation.class); (3)
1 リクエストメッセージのメタデータに含めるルートを指定します。
2 リクエストメッセージのデータを提供します。
3 予想されるレスポンスを宣言します。
val viewBox: ViewBox = ...

val locations = requester.route("locate.radars.within") (1)
		.data(viewBox) (2)
		.retrieveFlow<AirportLocation>() (3)
1 リクエストメッセージのメタデータに含めるルートを指定します。
2 リクエストメッセージのデータを提供します。
3 予想されるレスポンスを宣言します。

相互作用型は、入力と出力のカーディナリティから暗黙的に決定されます。上記の例は、1 つの値が送信され、値のストリームが受信されるため、Request-Stream です。ほとんどの場合、入力と出力の選択が RSocket インタラクションの種類と、レスポンダーが期待する入力と出力の種類と一致する限り、これについて考える必要はありません。無効な組み合わせの唯一の例は、多対 1 です。

data(Object) メソッドは、Flux および Mono を含む Reactive Streams Publisher、および ReactiveAdapterRegistry に登録されている値のその他のプロデューサーも受け入れます。同じ型の値を生成する Flux などの複数値 Publisher の場合、オーバーロードされた data メソッドのいずれかを使用して、すべての要素の型チェックと Encoder ルックアップを回避することを検討してください。

data(Object producer, Class<?> elementClass);
data(Object producer, ParameterizedTypeReference<?> elementTypeRef);

data(Object) ステップはオプションです。データを送信しないリクエストの場合はスキップします。

  • Java

  • Kotlin

Mono<AirportLocation> location = requester.route("find.radar.EWR"))
	.retrieveMono(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveAndAwait

val location = requester.route("find.radar.EWR")
	.retrieveAndAwait<AirportLocation>()

複合メタデータ [GitHub] (英語) (デフォルト)を使用し、値が登録済み Encoder でサポートされている場合、追加のメタデータ値を追加できます。例:

  • Java

  • Kotlin

String securityToken = ... ;
ViewBox viewBox = ... ;
MimeType mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0");

Flux<AirportLocation> locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlux(AirportLocation.class);
import org.springframework.messaging.rsocket.retrieveFlow

val requester: RSocketRequester = ...

val securityToken: String = ...
val viewBox: ViewBox = ...
val mimeType = MimeType.valueOf("message/x.rsocket.authentication.bearer.v0")

val locations = requester.route("locate.radars.within")
		.metadata(securityToken, mimeType)
		.data(viewBox)
		.retrieveFlow<AirportLocation>()

Fire-and-Forget の場合、Mono<Void> を返す send() メソッドを使用します。Mono は、メッセージが正常に送信されたことのみを示し、処理されたことを示すものではないことに注意してください。

Metadata-Push の場合、Mono<Void> 戻り値を指定して sendMetadata() メソッドを使用します。

アノテーション付きレスポンダー

RSocket レスポンダーは、@MessageMapping および @ConnectMapping メソッドとして実装できます。@MessageMapping メソッドは個々のリクエストを処理し、@ConnectMapping メソッドは接続レベルのイベント(セットアップとメタデータプッシュ)を処理します。アノテーション付きのレスポンダーは、サーバー側からの応答とクライアント側からの応答のために対称的にサポートされます。

サーバーレスポンダー

サーバー側でアノテーション付きのレスポンダーを使用するには、RSocketMessageHandler を Spring 構成に追加して、@MessageMapping および @ConnectMapping メソッドで @Controller Bean を検出します。

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.routeMatcher(new PathPatternRouteMatcher());
		return handler;
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		routeMatcher = PathPatternRouteMatcher()
	}
}

次に、Java RSocket API を使用して RSocket サーバーを起動し、レスポンダーの RSocketMessageHandler を次のように接続します。

  • Java

  • Kotlin

ApplicationContext context = ... ;
RSocketMessageHandler handler = context.getBean(RSocketMessageHandler.class);

CloseableChannel server =
	RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.block();
import org.springframework.beans.factory.getBean

val context: ApplicationContext = ...
val handler = context.getBean<RSocketMessageHandler>()

val server = RSocketServer.create(handler.responder())
		.bind(TcpServerTransport.create("localhost", 7000))
		.awaitSingle()

RSocketMessageHandler は、デフォルトで複合 [GitHub] (英語) メタデータとルーティング [GitHub] (英語) メタデータをサポートしています。別の MIME 型に切り替えるか、追加のメタデータ MIME 型を登録する必要がある場合は、MetadataExtractor を設定できます。

サポートするメタデータおよびデータ形式に必要な Encoder および Decoder インスタンスを設定する必要があります。コーデックの実装には、spring-web モジュールが必要になる可能性があります。

デフォルトでは、SimpleRouteMatcher は AntPathMatcher を介したルートのマッチングに使用されます。効率的なルートマッチングのために、spring-web から PathPatternRouteMatcher を差し込むことをお勧めします。RSocket ルートは階層化できますが、URL パスではありません。両方のルートマッチャーが "." を使用するように構成されています。デフォルトではセパレータとして使用され、HTTP URL のような URL デコードはありません。

RSocketMessageHandler は、同じプロセスでクライアントとサーバー間で構成を共有する必要がある場合に便利な RSocketStrategies を介して構成できます。

  • Java

  • Kotlin

@Configuration
static class ServerConfig {

	@Bean
	public RSocketMessageHandler rsocketMessageHandler() {
		RSocketMessageHandler handler = new RSocketMessageHandler();
		handler.setRSocketStrategies(rsocketStrategies());
		return handler;
	}

	@Bean
	public RSocketStrategies rsocketStrategies() {
		return RSocketStrategies.builder()
			.encoders(encoders -> encoders.add(new Jackson2CborEncoder()))
			.decoders(decoders -> decoders.add(new Jackson2CborDecoder()))
			.routeMatcher(new PathPatternRouteMatcher())
			.build();
	}
}
@Configuration
class ServerConfig {

	@Bean
	fun rsocketMessageHandler() = RSocketMessageHandler().apply {
		rSocketStrategies = rsocketStrategies()
	}

	@Bean
	fun rsocketStrategies() = RSocketStrategies.builder()
			.encoders { it.add(Jackson2CborEncoder()) }
			.decoders { it.add(Jackson2CborDecoder()) }
			.routeMatcher(PathPatternRouteMatcher())
			.build()
}

クライアントレスポンダー

クライアント側のアノテーション付き応答者は、RSocketRequester.Builder で構成する必要があります。詳細については、クライアントレスポンダーを参照してください。

@MessageMapping

サーバーまたはクライアントレスポンダーの設定が完了したら、@MessageMapping メソッドを次のように使用できます。

  • Java

  • Kotlin

@Controller
public class RadarsController {

	@MessageMapping("locate.radars.within")
	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
@Controller
class RadarsController {

	@MessageMapping("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

上記の @MessageMapping メソッドは、ルート "locate.radars.within" を持つ Request-Stream インタラクションに応答します。次のメソッド引数を使用するオプションを備えた柔軟なメソッドシグネチャーをサポートします。

メソッド引数 説明

@Payload

リクエストのペイロード。これは、Mono や Flux などの非同期型の具体的な値になります。

注意 : アノテーションの使用はオプションです。単純型ではなく、サポートされている他の引数のいずれでもないメソッド引数は、予期されるペイロードと見なされます。

RSocketRequester

リモートの終了をリクエストするためのリクエスター。

@DestinationVariable

マッピングパターンの変数に基づいてルートから抽出された値。@MessageMapping("find.radar.{id}")

@Header

MetadataExtractor で説明されている抽出のために登録されたメタデータ値。

@Headers Map<String, Object>

MetadataExtractor に従って、抽出のために登録されたすべてのメタデータ値。

戻り値は、レスポンスペイロードとして直列化される 1 つ以上のオブジェクトであると予想されます。これは、Mono または Flux のような非同期型、具体的な値、void または Mono<Void> などの値のない非同期型のいずれかです。

@MessageMapping メソッドがサポートする RSocket インタラクション型は、入力(つまり、@Payload 引数)と出力のカーディナリティから決定されます。カーディナリティは次のことを意味します。

基数 説明

1

明示的な値、または Mono<T> などの単一値の非同期型のいずれか。

Flux<T> などの複数値非同期型。

0

入力の場合、これはメソッドに @Payload 引数がないことを意味します。

出力の場合、これは void または Mono<Void> などの値のない非同期型です。

以下の表は、すべての入力および出力カーディナリティーの組み合わせと、対応する相互作用型を示しています。

入力カーディナリティ 出力カーディナリティ インタラクション型

0, 1

0

一方向メッセージ、リクエストとレスポンス

0, 1

1

リクエストとレスポンス

0, 1

リクエストストリーム

0, 1、多数

リクエストチャネル

@RSocketExchange

@MessageMapping の代わりに、@RSocketExchange メソッドを使用してリクエストを処理することもできます。このようなメソッドは RSocket インターフェースで宣言され、RSocketServiceProxyFactory を介してリクエスターとして使用することも、レスポンダーによって実装することもできます。

例: レスポンダーとしてリクエストを処理するには:

  • Java

  • Kotlin

public interface RadarsService {

	@RSocketExchange("locate.radars.within")
	Flux<AirportLocation> radars(MapRequest request);
}

@Controller
public class RadarsController implements RadarsService {

	public Flux<AirportLocation> radars(MapRequest request) {
		// ...
	}
}
interface RadarsService {

	@RSocketExchange("locate.radars.within")
	fun radars(request: MapRequest): Flow<AirportLocation>
}

@Controller
class RadarsController : RadarsService {

	override fun radars(request: MapRequest): Flow<AirportLocation> {
		// ...
	}
}

@RSocketExhange と @MessageMapping の間にはいくつかの違いがあります。これは、@RSocketExhange がリクエスターとレスポンダーの使用に適したものである必要があるためです。例: @MessageMapping は任意の数のルートを処理するように宣言でき、各ルートはパターンにすることができますが、@RSocketExchange は単一の具体的なルートで宣言する必要があります。メタデータに関連してサポートされるメソッドパラメーターにも小さな違いがあります。サポートされるパラメーターのリストについては、@MessageMapping および RSocket インターフェースを参照してください。

@RSocketExchange を型 レベルで使用すると、特定の RSocket サービスインターフェースのすべてのルートに共通のプレフィックスを指定できます。

@ConnectMapping

@ConnectMapping は RSocket 接続の開始時に SETUP フレームを処理し、後続のメタデータは METADATA_PUSH フレーム、つまり io.rsocket.RSocket の metadataPush(Payload) を介して通知をプッシュします。

@ConnectMapping メソッドは、@MessageMapping と同じ引数をサポートしていますが、SETUP および METADATA_PUSH フレームからのメタデータとデータに基づいています。@ConnectMapping には、メタデータにルートを持つ特定の接続に処理を絞り込むパターンを設定できます。パターンが宣言されていない場合は、すべての接続が一致します。

@ConnectMapping メソッドはデータを返すことができず、戻り値として void または Mono<Void> を使用して宣言する必要があります。新しい接続に対して処理がエラーを返した場合、接続は拒否されます。接続のために RSocketRequester にリクエストを行うために、処理を遅らせてはなりません。詳細については、サーバーリクエスターを参照してください。

MetadataExtractor

応答者はメタデータを解釈する必要があります。複合メタデータ [GitHub] (英語) では、それぞれ独自の MIME 型を使用して、個別にフォーマットされたメタデータ値(ルーティング、セキュリティ、トレースなど)を使用できます。アプリケーションには、サポートするメタデータ MIME 型を構成する方法と、抽出された値にアクセスする方法が必要です。

MetadataExtractor は、直列化されたメタデータを取得し、デコードされた名前と値のペアを返す契約です。名前付きのヘッダーのように、たとえばアノテーション付きハンドラーメソッドの @Header を介してアクセスできます。

DefaultMetadataExtractor に Decoder インスタンスを与えて、メタデータをデコードできます。すぐに使用できる「メッセージ /x.rsocket.routing.v0」 [GitHub] (英語) の組み込みサポートがあり、String にデコードして「ルート」キーに保存します。その他の MIME 型の場合は、Decoder を提供して、MIME 型を次のように登録する必要があります。

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(fooMimeType, Foo.class, "foo");
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Foo>(fooMimeType, "foo")

複合メタデータは、独立したメタデータ値を組み合わせるのに適しています。ただし、リクエスターは複合メタデータをサポートしていないか、使用しないことを選択する場合があります。このため、DefaultMetadataExtractor は、デコードされた値を出力マップにマップするカスタムロジックを必要とする場合があります。JSON がメタデータに使用される例を次に示します。

  • Java

  • Kotlin

DefaultMetadataExtractor extractor = new DefaultMetadataExtractor(metadataDecoders);
extractor.metadataToExtract(
	MimeType.valueOf("application/vnd.myapp.metadata+json"),
	new ParameterizedTypeReference<Map<String,String>>() {},
	(jsonMap, outputMap) -> {
		outputMap.putAll(jsonMap);
	});
import org.springframework.messaging.rsocket.metadataToExtract

val extractor = DefaultMetadataExtractor(metadataDecoders)
extractor.metadataToExtract<Map<String, String>>(MimeType.valueOf("application/vnd.myapp.metadata+json")) { jsonMap, outputMap ->
	outputMap.putAll(jsonMap)
}

MetadataExtractor から RSocketStrategies を構成する場合、RSocketStrategies.Builder に構成済みのデコーダーを使用して抽出プログラムを作成させ、コールバックを使用して登録を次のようにカスタマイズできます。

  • Java

  • Kotlin

RSocketStrategies strategies = RSocketStrategies.builder()
	.metadataExtractorRegistry(registry -> {
		registry.metadataToExtract(fooMimeType, Foo.class, "foo");
		// ...
	})
	.build();
import org.springframework.messaging.rsocket.metadataToExtract

val strategies = RSocketStrategies.builder()
		.metadataExtractorRegistry { registry: MetadataExtractorRegistry ->
			registry.metadataToExtract<Foo>(fooMimeType, "foo")
			// ...
		}
		.build()

RSocket インターフェース

Spring Framework を使用すると、RSocket サービスを @RSocketExchange メソッドを使用した Java インターフェースとして定義できます。このようなインターフェースを RSocketServiceProxyFactory に渡して、RSocketRequester 経由でリクエストを実行するプロキシを作成できます。リクエストを処理するレスポンダーとしてインターフェースを実装することもできます。

まず、@RSocketExchange メソッドを使用してインターフェースを作成します。

interface RadarService {

	@RSocketExchange("radars")
	Flux<AirportLocation> getRadars(@Payload MapRequest request);

	// more RSocket exchange methods...

}

これで、メソッドが呼び出されたときにリクエストを実行するプロキシを作成できます。

RSocketRequester requester = ... ;
RSocketServiceProxyFactory factory = RSocketServiceProxyFactory.builder(requester).build();

RadarService service = factory.createClient(RadarService.class);

レスポンダーとしてリクエストを処理するインターフェースを実装することもできます。アノテーション付きレスポンダーを参照してください。

メソッドパラメーター

アノテーション付きの RSocket 交換メソッドは、次のメソッドパラメーターを使用して柔軟なメソッドシグネチャーをサポートします。

メソッド引数 説明

@DestinationVariable

ルート内のテンプレートプレースホルダーを展開するために、@RSocketExchange アノテーションからのルートと共に RSocketRequester に渡すルート変数を追加します。この変数は文字列または任意のオブジェクトにすることができ、toString() でフォーマットされます。

@Payload

リクエストの入力ペイロードを設定します。これは、具体的な値、または ReactiveAdapterRegistry を介して Reactive Streams Publisher に適応できる値の任意のプロデューサーにすることができます

Object, if followed by MimeType

入力ペイロードのメタデータエントリの値。これは、次の引数がメタデータエントリ MimeType である限り、任意の Object にすることができます。値は、具体的な値、または ReactiveAdapterRegistry を介して Reactive Streams Publisher に適合できる単一の値の任意のプロデューサーにすることができます。

MimeType

メタデータエントリの MimeType。前のメソッド引数は、メタデータ値であると想定されています。

戻り値

アノテーション付きの RSocket 交換メソッドは、具体的な値である戻り値、または ReactiveAdapterRegistry を介して Reactive Streams Publisher に適合できる値のプロデューサーをサポートします。

デフォルトでは、同期 (ブロッキング) メソッドシグネチャーを持つ RSocket サービスメソッドの動作は、基になる RSocket ClientTransport のレスポンスタイムアウト設定および RSocket キープアライブ設定に依存します。RSocketServiceProxyFactory.Builder は、レスポンスをブロックする最大時間を構成できる blockTimeout オプションを公開していますが、より詳細に制御するには、RSocket レベルでタイムアウト値を構成することをお勧めします。