WebSocket

リファレンスドキュメントのこのパートでは、リアクティブスタック WebSocket メッセージングのサポートについて説明します。

WebSocket の概要

WebSocket プロトコルである RFC 6455 [IETF] (英語) は、単一の TCP 接続を介してクライアントとサーバー間に全二重双方向通信チャネルを確立する標準化された方法を提供します。これは HTTP とは異なる TCP プロトコルですが、ポート 80 および 443 を使用し、既存のファイアウォールルールの再利用を可能にする HTTP 上で動作するように設計されています。

WebSocket の対話は、HTTP Upgrade ヘッダーを使用してアップグレードするか、この場合は WebSocket プロトコルに切り替える HTTP リクエストで始まります。次の例は、このような相互作用を示しています。

GET /spring-websocket-portfolio/portfolio HTTP/1.1
Host: localhost:8080
Upgrade: websocket (1)
Connection: Upgrade (2)
Sec-WebSocket-Key: Uc9l9TMkWGbHFD2qnFHltg==
Sec-WebSocket-Protocol: v10.stomp, v11.stomp
Sec-WebSocket-Version: 13
Origin: http://localhost:8080
1Upgrade ヘッダー。
2Upgrade 接続を使用します。

通常の 200 ステータスコードの代わりに、WebSocket をサポートするサーバーは、次のような出力を返します。

HTTP/1.1 101 Switching Protocols (1)
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: 1qVdfYHU9hPOl4JYYNXF623Gzn0=
Sec-WebSocket-Protocol: v10.stomp
1 プロトコルスイッチ

ハンドシェイクに成功した後、HTTP アップグレードリクエストの基になる TCP ソケットは、クライアントとサーバーの両方に対して開いたままになり、メッセージの送受信を継続します。

WebSockets の動作方法の完全な導入は、このドキュメントの範囲外です。RFC 6455、HTML5 の WebSocket の章、Web に関する多くの導入とチュートリアルのいずれかを参照してください。

WebSocket サーバーが Web サーバー(nginx など)の背後で実行されている場合は、WebSocket アップグレードリクエストを WebSocket サーバーに渡すように構成する必要がある可能性が高いことに注意してください。同様に、アプリケーションがクラウド環境で実行されている場合は、WebSocket サポートに関連するクラウドプロバイダーの指示を確認してください。

HTTP 対 WebSocket

WebSocket は HTTP 互換であるように設計されており、HTTP リクエストで始まりますが、2 つのプロトコルが非常に異なるアーキテクチャとアプリケーションプログラミングモデルにつながることを理解することが重要です。

HTTP および REST では、アプリケーションは多くの URL としてモデル化されます。アプリケーションと対話するために、クライアントはこれらの URL にアクセスし、リクエスト / レスポンススタイルを使用します。サーバーは、HTTP URL、メソッド、ヘッダーに基づいてリクエストを適切なハンドラーにルーティングします。

対照的に、WebSockets では、通常、最初の接続用の URL は 1 つだけです。その後、すべてのアプリケーションメッセージは同じ TCP 接続で流れます。これは、まったく異なる非同期のイベント駆動型のメッセージングアーキテクチャを指します。

WebSocket は低レベルのトランスポートプロトコルでもあり、HTTP とは異なり、メッセージのコンテンツにセマンティクスを規定していません。つまり、クライアントとサーバーがメッセージのセマンティクスに同意しない限り、メッセージをルーティングまたは処理する方法はありません。

WebSocket クライアントおよびサーバーは、HTTP ハンドシェイクリクエストの Sec-WebSocket-Protocol ヘッダーを介して、より高いレベルのメッセージングプロトコル(たとえば、STOMP)の使用をネゴシエートできます。それがなければ、彼らは彼ら自身の規約を考え出す必要があります。

WebSockets を使用する場合

WebSockets は、Web ページを動的かつインタラクティブにすることができます。ただし、多くの場合、AJAX と HTTP ストリーミングまたはロングポーリングを組み合わせることで、シンプルで効果的なソリューションを提供できます。

例: ニュース、メール、ソーシャルフィードは動的に更新する必要がありますが、数分ごとに更新しても問題ありません。一方、コラボレーション、ゲーム、金融アプリは、リアルタイムにさらに近づける必要があります。

遅延だけが決定要因ではありません。メッセージの量が比較的少ない場合(ネットワーク障害の監視など)、HTTP ストリーミングまたはポーリングは効果的なソリューションを提供できます。WebSocket の使用に最適なのは、低レイテンシー、高周波数、高ボリュームの組み合わせです。

また、インターネット上では、Upgrade ヘッダーを渡すように構成されていないか、アイドル状態であると思われる長寿命の接続を閉じるため、コントロール外の制限プロキシが WebSocket 相互作用を妨げる可能性があることに注意してください。これは、ファイアウォール内の内部アプリケーションに WebSocket を使用することは、一般公開アプリケーションよりも簡単な決定であることを意味します。

WebSocket API

Spring Framework は、WebSocket メッセージを処理するクライアント側およびサーバー側のアプリケーションを作成するために使用できる WebSocket API を提供します。

サーバー

WebSocket サーバーを作成するには、最初に WebSocketHandler を作成できます。次の例は、その方法を示しています。

  • Java

  • Kotlin

import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketSession;

public class MyWebSocketHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		// ...
	}
}
import org.springframework.web.reactive.socket.WebSocketHandler
import org.springframework.web.reactive.socket.WebSocketSession

class MyWebSocketHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		// ...
	}
}

次に、それを URL にマップできます。

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public HandlerMapping handlerMapping() {
		Map<String, WebSocketHandler> map = new HashMap<>();
		map.put("/path", new MyWebSocketHandler());
		int order = -1; // before annotated controllers

		return new SimpleUrlHandlerMapping(map, order);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerMapping(): HandlerMapping {
		val map = mapOf("/path" to MyWebSocketHandler())
		val order = -1 // before annotated controllers

		return SimpleUrlHandlerMapping(map, order)
	}
}

WebFlux 構成を使用している場合は、これ以上何もする必要はありません。そうでない場合は、WebFlux 構成を使用していない場合は、以下に示すように WebSocketHandlerAdapter を宣言する必要があります。

  • Java

  • Kotlin

@Configuration
class WebConfig {

	// ...

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter();
	}
}
@Configuration
class WebConfig {

	// ...

	@Bean
	fun handlerAdapter() =  WebSocketHandlerAdapter()
}

WebSocketHandler

WebSocketHandler の handle メソッドは、WebSocketSession を取得して Mono<Void> を返し、セッションのアプリケーション処理が完了したことを示します。セッションは、受信メッセージ用と送信メッセージ用の 2 つのストリームを介して処理されます。次の表に、ストリームを処理する 2 つのメソッドを示します。

WebSocketSession メソッド 説明

Flux<WebSocketMessage> receive()

受信メッセージストリームへのアクセスを提供し、接続が閉じられると完了します。

Mono<Void> send(Publisher<WebSocketMessage>)

送信メッセージのソースを取得し、メッセージを書き込み、ソースが完了して書き込みが完了すると完了する Mono<Void> を返します。

WebSocketHandler は、受信ストリームと送信ストリームを統合されたフローに構成し、そのフローの補完を反映する Mono<Void> を返す必要があります。アプリケーション要件に応じて、統合フローは次の場合に完了します。

  • 受信または送信のメッセージストリームが完了します。

  • 受信ストリームは完了します(つまり、接続が閉じられます)が、送信ストリームは無限です。

  • WebSocketSession の close メソッドを介して選択したポイント。

受信と送信のメッセージストリームが一緒に構成される場合、Reactive Streams は終了アクティビティを通知するため、接続が開いているかどうかを確認する必要はありません。受信ストリームは完了またはエラー信号を受信し、送信ストリームはキャンセル信号を受信します。

ハンドラーの最も基本的な実装は、受信ストリームを処理するものです。次の例は、このような実装を示しています。

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {
		return session.receive()			(1)
				.doOnNext(message -> {
					// ...					(2)
				})
				.concatMap(message -> {
					// ...					(3)
				})
				.then();					(4)
	}
}
1 受信メッセージのストリームにアクセスします。
2 各メッセージで何かをします。
3 メッセージコンテンツを使用するネストされた非同期操作を実行します。
4 受信が完了したときに完了する Mono<Void> を返します。
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {
		return session.receive()            (1)
				.doOnNext {
					// ...					(2)
				}
				.concatMap {
					// ...					(3)
				}
				.then()                     (4)
	}
}
1 受信メッセージのストリームにアクセスします。
2 各メッセージで何かをします。
3 メッセージコンテンツを使用するネストされた非同期操作を実行します。
4 受信が完了したときに完了する Mono<Void> を返します。
ネストされた非同期操作の場合、プールされたデータバッファーを使用する基になるサーバー(たとえば、Netty)で message.retain() を呼び出す必要があります。そうしないと、データを読み取る前にデータバッファが解放される可能性があります。背景については、データバッファとコーデックを参照してください。

次の実装では、受信ストリームと送信ストリームを組み合わせます。

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Flux<WebSocketMessage> output = session.receive()				(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.map(value -> session.textMessage("Echo " + value));	(2)

		return session.send(output);									(3)
	}
}
1 受信メッセージストリームを処理します。
2 送信メッセージを作成し、結合されたフローを作成します。
3 受信を続けている間に完了しない Mono<Void> を返します。
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val output = session.receive()                      (1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.map { session.textMessage("Echo $it") }    (2)

		return session.send(output)                         (3)
	}
}
1 受信メッセージストリームを処理します。
2 送信メッセージを作成し、結合されたフローを作成します。
3 受信を続けている間に完了しない Mono<Void> を返します。

次の例に示すように、受信ストリームと送信ストリームは独立しており、完了のためにのみ結合できます。

  • Java

  • Kotlin

class ExampleHandler implements WebSocketHandler {

	@Override
	public Mono<Void> handle(WebSocketSession session) {

		Mono<Void> input = session.receive()								(1)
				.doOnNext(message -> {
					// ...
				})
				.concatMap(message -> {
					// ...
				})
				.then();

		Flux<String> source = ... ;
		Mono<Void> output = session.send(source.map(session::textMessage));	(2)

		return Mono.zip(input, output).then();								(3)
	}
}
1 受信メッセージストリームを処理します。
2 送信メッセージを送信します。
3 ストリームに参加し、いずれかのストリームが終了すると完了する Mono<Void> を返します。
class ExampleHandler : WebSocketHandler {

	override fun handle(session: WebSocketSession): Mono<Void> {

		val input = session.receive()									(1)
				.doOnNext {
					// ...
				}
				.concatMap {
					// ...
				}
				.then()

		val source: Flux<String> = ...
		val output = session.send(source.map(session::textMessage))		(2)

		return Mono.zip(input, output).then()							(3)
	}
}
1 受信メッセージストリームを処理します。
2 送信メッセージを送信します。
3 ストリームに参加し、いずれかのストリームが終了すると完了する Mono<Void> を返します。

DataBuffer

DataBuffer は、WebFlux のバイトバッファーの表現です。参照の Spring コア部分は、データバッファとコーデックのセクションでさらに詳しく説明しています。理解すべき重要な点は、Netty のような一部のサーバーでは、バイトバッファーがプールされ、参照カウントされ、メモリリークを避けるために消費されたときに解放する必要があるということです。

Netty で実行する場合、アプリケーションが解放されないように入力データバッファーを保持する場合は、DataBufferUtils.retain(dataBuffer) を使用し、その後バッファーが消費されるときに DataBufferUtils.release(dataBuffer) を使用する必要があります。

ハンドシェーク

WebSocketHandlerAdapter は WebSocketService に委譲します。デフォルトでは、これは HandshakeWebSocketService のインスタンスであり、WebSocket リクエストに対して基本的なチェックを実行してから、使用中のサーバーに RequestUpgradeStrategy を使用します。現在、Reactor Netty、Tomcat、Jetty、Undertow のサポートが組み込まれています。

HandshakeWebSocketService は Predicate<String> を設定して WebSession から属性を抽出し、WebSocketSession の属性に挿入できる sessionAttributePredicate プロパティを公開します。

サーバー構成

各サーバーの RequestUpgradeStrategy は、基盤となる WebSocket サーバーエンジンに固有の構成を公開します。WebFlux Java 構成を使用する場合は、WebFlux 構成の対応するセクションに示されているようなプロパティをカスタマイズできます。そうでない場合は、WebFlux 構成を使用しない場合は、以下を使用します。

  • Java

  • Kotlin

@Configuration
class WebConfig {

	@Bean
	public WebSocketHandlerAdapter handlerAdapter() {
		return new WebSocketHandlerAdapter(webSocketService());
	}

	@Bean
	public WebSocketService webSocketService() {
		TomcatRequestUpgradeStrategy strategy = new TomcatRequestUpgradeStrategy();
		strategy.setMaxSessionIdleTimeout(0L);
		return new HandshakeWebSocketService(strategy);
	}
}
@Configuration
class WebConfig {

	@Bean
	fun handlerAdapter() =
			WebSocketHandlerAdapter(webSocketService())

	@Bean
	fun webSocketService(): WebSocketService {
		val strategy = TomcatRequestUpgradeStrategy().apply {
			setMaxSessionIdleTimeout(0L)
		}
		return HandshakeWebSocketService(strategy)
	}
}

サーバーのアップグレード戦略を確認して、使用可能なオプションを確認してください。現在、Tomcat と Jetty のみがこのようなオプションを公開しています。

CORS

CORS を設定し、WebSocket エンドポイントへのアクセスを制限する最も簡単な方法は、WebSocketHandler に CorsConfigurationSource を実装させ、許可された発信元、ヘッダー、その他の詳細を含む CorsConfiguration を返すことです。それができない場合は、SimpleUrlHandler の corsConfigurations プロパティを設定して、URL パターンで CORS 設定を指定することもできます。両方が指定されている場合は、CorsConfiguration で combine メソッドを使用して結合されます。

クライアント

Spring WebFlux は、Reactor Netty、Tomcat、Jetty、Undertow、標準 Java(つまり、JSR-356)の実装を備えた WebSocketClient 抽象化を提供します。

Tomcat クライアントは事実上、WebSocketSession 処理にいくつかの追加機能を備えた標準 Java の拡張であり、Tomcat 固有の API を利用してバックプレッシャーのメッセージの受信を一時停止します。

WebSocket セッションを開始するには、クライアントのインスタンスを作成し、その execute メソッドを使用できます。

  • Java

  • Kotlin

WebSocketClient client = new ReactorNettyWebSocketClient();

URI url = new URI("ws://localhost:8080/path");
client.execute(url, session ->
		session.receive()
				.doOnNext(System.out::println)
				.then());
val client = ReactorNettyWebSocketClient()

		val url = URI("ws://localhost:8080/path")
		client.execute(url) { session ->
			session.receive()
					.doOnNext(::println)
			.then()
		}

Jetty などの一部のクライアントは Lifecycle を実装しており、使用する前に停止および開始する必要があります。すべてのクライアントには、基になる WebSocket クライアントの構成に関連するコンストラクターオプションがあります。