WebSockets サポート

バージョン 4.1 以降、Spring Integration は WebSocket をサポートしています。Spring Framework の web-socket モジュールのアーキテクチャ、インフラストラクチャ、API に基づいています。Spring WebSocket のコンポーネント(SubProtocolHandler や WebSocketClient など)および構成オプション(@EnableWebSocketMessageBroker など)の多くは、Spring Integration 内で再利用できます。詳細については、Spring Framework リファレンスマニュアルの Spring Framework WebSocket のサポートの章を参照してください。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-websocket</artifactId>
    <version>6.3.4</version>
</dependency>
compile "org.springframework.integration:spring-integration-websocket:6.3.4"

サーバー側では、org.springframework:spring-webmvc 依存関係を明示的に含める必要があります。

Spring Framework WebSocket インフラストラクチャは、Spring メッセージング基盤に基づいており、Spring Integration が使用するのと同じ MessageChannel 実装および MessageHandler 実装(およびいくつかの POJO メソッドアノテーションマッピング)に基づく基本的なメッセージングフレームワークを提供します。その結果、Spring Integration は、WebSocket アダプターがなくても、WebSocket フローに直接関与できます。この目的のために、次の例に示すように、適切なアノテーションを使用して Spring Integration @MessagingGateway を構成できます。

@MessagingGateway
@Controller
public interface WebSocketGateway {

    @MessageMapping("/greeting")
    @SendToUser("/queue/answer")
    @Gateway(requestChannel = "greetingChannel")
    String greeting(String payload);

}

概要

WebSocket プロトコルは定義上ストリーミングであり、WebSocket との間で同時にメッセージを送受信できるため、クライアントまたはサーバー側に関係なく、適切な WebSocketSession を処理できます。接続管理と WebSocketSession レジストリをカプセル化するために、IntegrationWebSocketContainer には ClientWebSocketContainer と ServerWebSocketContainer の実装が用意されています。WebSocket API (英語) と Spring Framework でのその実装 (多くの拡張機能付き) のおかげで、同じクラスがサーバー側とクライアント側で使用されます (もちろん、Java の観点から)。ほとんどの接続および WebSocketSession レジストリオプションは両側で同じです。これにより、多くの構成アイテムとインフラストラクチャフックを再利用して、サーバー側とクライアント側で WebSocket アプリケーションを構築できます。次の例は、コンポーネントが両方の目的を果たす方法を示しています。

//Client side
@Bean
public WebSocketClient webSocketClient() {
    return new SockJsClient(Collections.singletonList(new WebSocketTransport(new JettyWebSocketClient())));
}

@Bean
public IntegrationWebSocketContainer clientWebSocketContainer() {
    return new ClientWebSocketContainer(webSocketClient(), "ws://my.server.com/endpoint");
}

//Server side
@Bean
public IntegrationWebSocketContainer serverWebSocketContainer() {
    return new ServerWebSocketContainer("/endpoint").withSockJs();
}

IntegrationWebSocketContainer は、双方向メッセージングを実現するように設計されており、受信および送信チャネルアダプター間で共有でき(以下を参照)、一方向(送信または受信)WebSocket メッセージングを使用する場合、そのうちの 1 つからのみ参照できます。チャネルアダプターなしでも使用できますが、この場合、IntegrationWebSocketContainer は WebSocketSession レジストリとしてのみ機能します。

ServerWebSocketContainer は WebSocketConfigurer を実装して、内部 IntegrationWebSocketContainer.IntegrationWebSocketHandler を Endpoint として登録します。ターゲットベンダー WebSocket コンテナーの ServletWebSocketHandlerRegistry 内の提供された paths およびその他のサーバー WebSocket オプション(HandshakeHandler または SockJS fallback など)でそうします。この登録は、@EnableWebSocket アノテーションと同じことを行うインフラストラクチャー WebSocketIntegrationConfigurationInitializer コンポーネントで実現されます。これは、Spring Integration インフラストラクチャがすべての WebSocket エンドポイントを検出するため、@EnableIntegration (またはアプリケーションコンテキストの任意の Spring Integration 名前空間)を使用して、@EnableWebSocket 宣言を省略することができることを意味します。

バージョン 6.1 以降では、uriTemplate と uriVariables の組み合わせの代わりに、提供された URI を使用して ClientWebSocketContainer を構成できます。これは、URI の一部にカスタムエンコードが必要な場合に便利です。便宜上、UriComponentsBuilder API を参照してください。

WebSocket 受信チャネルアダプター

WebSocketInboundChannelAdapter は、WebSocketSession 相互作用の受信部分を実装します。IntegrationWebSocketContainer を指定する必要があり、アダプターは自身を WebSocketListener として登録して、受信メッセージと WebSocketSession イベントを処理します。

IntegrationWebSocketContainer に登録できる WebSocketListener は 1 つだけです。

WebSocket サブプロトコルの場合、WebSocketInboundChannelAdapter は、2 番目のコンストラクター引数として SubProtocolHandlerRegistry で構成できます。アダプターは、SubProtocolHandlerRegistry に委譲して、受け入れられた WebSocketSession に適した SubProtocolHandler を決定し、サブプロトコルの実装に従って WebSocketMessage を Message に変換します。

デフォルトでは、WebSocketInboundChannelAdapter は WebSocketMessage を Message に変換する未加工の PassThruSubProtocolHandler 実装のみに依存します。

WebSocketInboundChannelAdapter は、SimpMessageType.MESSAGE または空の simpMessageType ヘッダーを持つ Message インスタンスのみを受け入れ、基礎となる統合フローに送信します。他のすべての Message 型は、SubProtocolHandler 実装(StompSubProtocolHandler など)から発行された ApplicationEvent インスタンスを介して処理されます。

サーバー側では、@EnableWebSocketMessageBroker 構成が存在する場合、useBroker = true オプションを使用して WebSocketInboundChannelAdapter を構成できます。この場合、すべての non-MESSAGEMessage 型は提供された AbstractBrokerMessageHandler に委譲されます。さらに、ブローカーリレーが宛先プレフィックスで構成されている場合、ブローカーの宛先に一致するメッセージは、WebSocketInboundChannelAdapter の outputChannel ではなく AbstractBrokerMessageHandler にルーティングされます。

useBroker = false および受信メッセージが SimpMessageType.CONNECT 型の場合、WebSocketInboundChannelAdapter は SimpMessageType.CONNECT_ACK メッセージを WebSocketSession に送信しますが、チャネルには送信しません。

Spring の WebSocket サポートでは、ブローカーリレーを 1 つだけ設定できます。AbstractBrokerMessageHandler 参照は必要ありません。アプリケーションコンテキストで検出されます。

その他の構成オプションについては、WebSockets 名前空間のサポートを参照してください。

WebSocket 送信チャネルアダプター

WebSocketOutboundChannelAdapter:

  1. MessageChannel から Spring Integration メッセージを受け入れます

  2. MessageHeaders から WebSocketSessionid を決定します

  3. 提供された IntegrationWebSocketContainer から WebSocketSession を取得します

  4. WebSocketMessage 作業の変換と送信を、提供された SubProtocolHandlerRegistry から適切な SubProtocolHandler に委譲します。

クライアント側では、ClientWebSocketContainer は単一の接続とその WebSocketSession のみをそれぞれ処理するため、WebSocketSessionid メッセージヘッダーは必要ありません。

STOMP サブプロトコルを使用するには、このアダプターを StompSubProtocolHandler で構成する必要があります。その後、StompHeaderAccessor.create(StompCommand…​) および MessageBuilder を使用して、または単に HeaderEnricher を使用して、任意の STOMP メッセージ型をこのアダプターに送信できます(ヘッダーエンリッチャーを参照)。

この章の残りの部分では、主に追加の構成オプションについて説明します。

WebSockets 名前空間のサポート

Spring Integration WebSocket 名前空間には、この章の残りの部分で説明するいくつかのコンポーネントが含まれています。構成に含めるには、アプリケーションコンテキスト構成ファイルで次の名前空間宣言を使用します。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-websocket="http://www.springframework.org/schema/integration/websocket"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/websocket
    https://www.springframework.org/schema/integration/websocket/spring-integration-websocket.xsd">
    ...
</beans>

<int-websocket:client-container> 属性

以下のリストは、<int-websocket:client-container> エレメントで使用可能な属性を示しています。

<int-websocket:client-container
                  id=""                             (1)
                  client=""                         (2)
                  uri=""                            (3)
                  uri-variables=""                  (4)
                  origin=""                         (5)
                  send-time-limit=""                (6)
                  send-buffer-size-limit=""         (7)
                  send-buffer-overflow-strategy=""  (8)
                  auto-startup=""                   (9)
                  phase="">                        (10)
                <int-websocket:http-headers>
                  <entry key="" value=""/>
                </int-websocket:http-headers>      (11)
</int-websocket:client-container>
1 コンポーネント Bean 名。
2WebSocketClient Bean リファレンス。
3 ターゲット WebSocket サービスへの uri または uriTemplate。URI 変数プレースホルダーを使用して uriTemplate として使用する場合、uri-variables 属性が必要です。
4uri 属性値内の URI 変数プレースホルダーのカンマ区切り値。値は、uri での順序に従ってプレースホルダーに置き換えられます。UriComponents.expand(Object…​uriVariableValues) (Javadoc) を参照してください。
5Origin ハンドシェイク HTTP ヘッダー値。
6WebSocket セッションの「送信」タイムアウト制限。デフォルトは 10000 です。
7WebSocket セッションの「送信」メッセージサイズ制限。デフォルトは 524288 です。
8WebSocket セッションの送信バッファオーバーフロー戦略は、セッションの送信メッセージバッファが send-buffer-size-limit に到達したときの動作を決定します。可能な値と詳細については、" ConcurrentWebSocketSessionDecorator.OverflowStrategy " を参照してください。
9 このエンドポイントが自動的に開始するかどうかを示すブール値。このコンテナーが WebSocket 受信アダプターから開始されると仮定すると、デフォルトは false になります。
10 このエンドポイントが開始および停止するライフサイクルフェーズ。値が低いほど、このエンドポイントは早く開始し、遅くなります。デフォルトは Integer.MAX_VALUE です。値は負の値にすることができます。SmartLifeCycle (Javadoc) を参照してください。
11 ハンドシェイクリクエストで使用される HttpHeaders の Map

<int-websocket:server-container> 属性

以下のリストは、<int-websocket:server-container> エレメントで使用可能な属性を示しています。

<int-websocket:server-container
          id=""                             (1)
          path=""                           (2)
          handshake-handler=""              (3)
          handshake-interceptors=""         (4)
          decorator-factories=""            (5)
          send-time-limit=""                (6)
          send-buffer-size-limit=""         (7)
          send-buffer-overflow-strategy=""  (8)
          allowed-origins="">               (9)
          <int-websocket:sockjs
            client-library-url=""          (10)
            stream-bytes-limit=""          (11)
            session-cookie-needed=""       (12)
            heartbeat-time=""              (13)
            disconnect-delay=""            (14)
            message-cache-size=""          (15)
            websocket-enabled=""           (16)
            scheduler=""                   (17)
            message-codec=""               (18)
            transport-handlers=""          (19)
            suppress-cors="true" />        (20)
</int-websocket:server-container>
1 コンポーネント Bean 名。
2 特定のリクエストを WebSocketHandler にマップするパス(またはコンマ区切りのパス)。正確なパスマッピング URI(/myPath など)および ant スタイルのパスパターン(/myPath/** など)をサポートします。
3HandshakeHandler Bean リファレンス。デフォルトは DefaultHandshakeHandler です。
4HandshakeInterceptor Bean 参照のリスト。
5WebSocket メッセージの処理に使用されるハンドラーを修飾する 1 つ以上のファクトリ(WebSocketHandlerDecoratorFactory)のリスト。これは、いくつかの高度なユースケースに役立ちます(たとえば、対応する HTTP セッションの有効期限が切れたときに Spring Security が WebSocket セッションを強制的に閉じることを許可する場合)。詳細については、Spring Session プロジェクトを参照してください。
6<int-websocket:client-container> で同じオプションを参照してください。
7<int-websocket:client-container> で同じオプションを参照してください。
8WebSocket セッションの送信バッファオーバーフロー戦略は、セッションの送信メッセージバッファが send-buffer-size-limit に到達したときの動作を決定します。可能な値と詳細については、" ConcurrentWebSocketSessionDecorator.OverflowStrategy " を参照してください。
9 許可される起点ヘッダー値。複数の起点をコンマ区切りのリストとして指定できます。このチェックは主にブラウザークライアント向けに設計されています。他の型のクライアントがオリジンヘッダー値を変更することを妨げるものは何もありません。SockJS が有効で、許可されたオリジンが制限されている場合、クロスオリジンリクエスト(jsonp-pollingiframe-xhr-pollingiframe-eventsourceiframe-htmlfile)にオリジンヘッダーを使用しないトランスポート型は無効になります。結果として、IE6 と IE7 はサポートされておらず、IE8 と IE9 は Cookie なしでのみサポートされています。デフォルトでは、すべてのオリジンが許可されます。
10 ネイティブのクロスドメイン通信を持たないトランスポート(eventsource や htmlfile など)は、iframe のコードをローカルのドメインから SockJS サーバーに実行できるように、非表示の iframe の「外部」ドメインから簡単なページを取得する必要があります。iframe は SockJS javascript クライアントライブラリをロードする必要があるため、このプロパティを使用して、ロード元の場所を指定できます。デフォルトでは、d1fxtkz8shb9d2.cloudfront.net/sockjs-0.3.4.min.js (英語) を指します。ただし、アプリケーションが提供する URL を指すように設定することもできます。相対 URL を指定できることに注意してください。この場合、URL は iframe URL に対して相対的でなければなりません。例: SockJS エンドポイントが /sockjs にマッピングされ、結果の iframe URL が /sockjs/iframe.html であると想定すると、相対 URL は "../../" で始まり、SockJS マッピングの上の位置まで移動する必要があります。プレフィックスベースのサーブレットマッピングの場合、もう 1 つのトラバースが必要になる場合があります。
11 閉じられる前に単一の HTTP ストリーミングリクエストで送信できる最小バイト数。デフォルトは 128K (つまり、128*1024 または 131072 バイト)です。
12SockJs /info エンドポイントからのレスポンスの cookie_needed 値。このプロパティは、アプリケーションが正しく機能するために JSESSIONID Cookie が必要かどうかを示します(たとえば、負荷分散のため、HTTP セッションを使用するための Java サーブレットコンテナー内)。
13 サーバーがメッセージを送信しなかった後、接続が切断されないようにするためにサーバーがクライアントにハートビートフレームを送信するまでの時間(ミリ秒)。デフォルト値は 25,000 (25 秒)です。
14 受信接続(つまり、サーバーがクライアントにデータを送信できるアクティブな接続)がなくなった後、クライアントが切断されたと見なされるまでの時間(ミリ秒単位)。デフォルト値は 5000 です。
15 クライアントからの次の HTTP ポーリングリクエストを待っている間にセッションがキャッシュできるサーバーからクライアントへのメッセージの数。デフォルトのサイズは 100 です。
16 一部のロードバランサーは WebSockets をサポートしていません。このオプションを false に設定して、サーバー側で WebSocket トランスポートを無効にします。デフォルト値は true です。
17TaskScheduler Bean リファレンス。値が指定されていない場合、新しい ThreadPoolTaskScheduler インスタンスが作成されます。このスケジューラインスタンスは、ハートビートメッセージのスケジュールに使用されます。
18SockJS メッセージのエンコードおよびデコードに使用する SockJsMessageCodec Bean 参照。デフォルトでは、Jackson2SockJsMessageCodec が使用されます。これには、Jackson ライブラリがクラスパスに存在する必要があります。
19TransportHandler Bean 参照のリスト。
20SockJS リクエストの CORS ヘッダーの自動追加を無効にするかどうか。デフォルト値は false です。

<int-websocket:outbound-channel-adapter> 属性

以下のリストは、<int-websocket:outbound-channel-adapter> エレメントで使用可能な属性を示しています。

<int-websocket:outbound-channel-adapter
                          id=""                             (1)
                          channel=""                        (2)
                          container=""                      (3)
                          default-protocol-handler=""       (4)
                          protocol-handlers=""              (5)
                          message-converters=""             (6)
                          merge-with-default-converters=""  (7)
                          auto-startup=""                   (8)
                          phase=""/>                        (9)
1 コンポーネント Bean 名。channel 属性を指定しない場合、DirectChannel が作成され、この id 属性を Bean 名としてアプリケーションコンテキストに登録されます。この場合、エンドポイントは Bean 名 id と .adapter で登録されます。そして、MessageHandler は Bean エイリアス id plus .handler で登録されます。
2 このアダプターに接続されているチャネルを識別します。
3 低レベル接続と WebSocketSession 処理操作をカプセル化する IntegrationWebSocketContainer Bean への参照。必須。
4SubProtocolHandler インスタンスへのオプションの参照。クライアントがサブプロトコルをリクエストしなかった場合、または単一のプロトコルハンドラーである場合に使用されます。この参照または protocol-handlers リストが提供されない場合、デフォルトで PassThruSubProtocolHandler が使用されます。
5 このチャネルアダプターの SubProtocolHandler Bean 参照のリスト。単一の Bean 参照のみを提供し、default-protocol-handler を提供しない場合、その単一の SubProtocolHandler が default-protocol-handler として使用されます。この属性または default-protocol-handler を設定しない場合、デフォルトで PassThruSubProtocolHandler が使用されます。
6 このチャネルアダプターの MessageConverter Bean 参照のリスト。
7 デフォルトのコンバーターをカスタムコンバーターの後に登録する必要があるかどうかを示すブール値。このフラグは、message-converters が提供されている場合にのみ使用されます。それ以外の場合は、すべてのデフォルトコンバーターが登録されます。デフォルトは false です。デフォルトのコンバーターは(順番に): StringMessageConverterByteArrayMessageConverterMappingJackson2MessageConverter (Jackson ライブラリがクラスパスに存在する場合)。
8 このエンドポイントが自動的に開始するかどうかを示すブール値。デフォルトは true です。
9 このエンドポイントが開始および停止するライフサイクルフェーズ。値が低いほど、このエンドポイントは早く開始し、遅くなります。デフォルトは Integer.MIN_VALUE です。値は負の値にすることができます。SmartLifeCycle (Javadoc) を参照してください。

<int-websocket:inbound-channel-adapter> 属性

以下のリストは、<int-websocket:outbound-channel-adapter> エレメントで使用可能な属性を示しています。

<int-websocket:inbound-channel-adapter
                            id=""  (1)
                            channel=""  (2)
                            error-channel=""  (3)
                            container=""  (4)
                            default-protocol-handler=""  (5)
                            protocol-handlers=""  (6)
                            message-converters=""  (7)
                            merge-with-default-converters=""  (8)
                            send-timeout=""  (9)
                            payload-type=""  (10)
                            use-broker=""  (11)
                            auto-startup=""  (12)
                            phase=""/>  (13)
1 コンポーネント Bean 名。channel 属性を設定しない場合、DirectChannel が作成され、この id 属性を Bean 名としてアプリケーションコンテキストに登録されます。この場合、エンドポイントは Bean 名 id と .adapter で登録されます。
2 このアダプターに接続されているチャネルを識別します。
3ErrorMessage インスタンスの送信先となる MessageChannel Bean 参照。
4<int-websocket:outbound-channel-adapter> で同じオプションを参照してください。
5<int-websocket:outbound-channel-adapter> で同じオプションを参照してください。
6<int-websocket:outbound-channel-adapter> で同じオプションを参照してください。
7<int-websocket:outbound-channel-adapter> で同じオプションを参照してください。
8<int-websocket:outbound-channel-adapter> で同じオプションを参照してください。
9 チャネルがブロックできる場合にチャネルにメッセージを送信するときに待機する最大時間(ミリ秒)。例: QueueChannel は、最大容量に達した場合、スペースが使用可能になるまでブロックできます。
10 受信 WebSocketMessage から変換するターゲット payload の Java 型の完全修飾名。デフォルトは java.lang.String です。
11 このアダプターが non-MESSAGEWebSocketMessage インスタンスおよびブローカー宛先を持つメッセージをアプリケーションコンテキストから AbstractBrokerMessageHandler に送信するかどうかを示します。この属性が true の場合、Broker Relay 構成が必要です。この属性はサーバー側でのみ使用されます。クライアント側では、無視されます。デフォルトは false です。
12<int-websocket:outbound-channel-adapter> で同じオプションを参照してください。
13<int-websocket:outbound-channel-adapter> で同じオプションを参照してください。

ClientStompEncoder を使用する

バージョン 4.3.13 以降、Spring Integration は、WebSocket チャネルアダプターのクライアント側で使用するための ClientStompEncoder (標準 StompEncoder の拡張として)を提供します。クライアント側のメッセージを適切に準備するには、ClientStompEncoder のインスタンスを StompSubProtocolHandler に挿入する必要があります。デフォルトの StompSubProtocolHandler の問題の 1 つは、サーバー側用に設計されているため、SENDstompCommand ヘッダーを MESSAGE に更新することです(サーバー側の STOMP プロトコルで必要とされます)。クライアントが適切な SEND Web ソケットフレームでメッセージを送信しない場合、一部の STOMP ブローカーはメッセージを受け入れません。この場合、ClientStompEncoder の目的は、メッセージを byte[] にエンコードする前に、stompCommand ヘッダーをオーバーライドし、それを SEND 値に設定することです。

動的 WebSocket エンドポイント登録

バージョン 5.5 以降、WebSocket サーバーエンドポイント(ServerWebSocketContainer に基づくチャネルアダプター)を実行時に登録(および削除)できるようになりました。paths は ServerWebSocketContainer がマップされ、HandlerMapping を介して DispatcherServlet に公開され、WebSocket クライアントがアクセスできます。動的およびランタイム統合フローサポートは、これらのエンドポイントを透過的に登録できます。

@Autowired
IntegrationFlowContext integrationFlowContext;

@Autowired
HandshakeHandler handshakeHandler;
...
ServerWebSocketContainer serverWebSocketContainer =
       new ServerWebSocketContainer("/dynamic")
               .setHandshakeHandler(this.handshakeHandler);

WebSocketInboundChannelAdapter webSocketInboundChannelAdapter =
       new WebSocketInboundChannelAdapter(serverWebSocketContainer);

QueueChannel dynamicRequestsChannel = new QueueChannel();

IntegrationFlow serverFlow =
       IntegrationFlow.from(webSocketInboundChannelAdapter)
               .channel(dynamicRequestsChannel)
               .get();

IntegrationFlowContext.IntegrationFlowRegistration dynamicServerFlow =
       this.integrationFlowContext.registration(serverFlow)
               .addBean(serverWebSocketContainer)
               .register();
...
dynamicServerFlow.destroy();
動的フロー登録で .addBean(serverWebSocketContainer) を呼び出して、エンドポイント登録のために ServerWebSocketContainer のインスタンスを ApplicationContext に追加することが重要です。動的フロー登録が破棄されると、関連する ServerWebSocketContainer インスタンスも破棄され、URL パスマッピングを含むそれぞれのエンドポイント登録も破棄されます。
動的 Websocket エンドポイントは、Spring Integration メカニズムを介してのみ登録できます。通常の Spring @EnableWebsocket が使用されている場合、Spring Integration 構成はバックオフし、動的エンドポイントのインフラストラクチャは登録されません。