STOMP サポート

Spring Integration バージョン 4.2 は、STOMP(Simple Text Orientated Messaging Protocol)クライアントサポートを導入しました。Spring Framework のメッセージングモジュールである stomp パッケージのアーキテクチャ、インフラストラクチャ、API に基づいています。Spring Integration は、Spring STOMP コンポーネントの多く(StompSession や StompClientSupport など)を使用します。詳細については、Spring Framework リファレンスマニュアルの Spring Framework STOMP サポートの章を参照してください。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>6.5.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-stomp:6.5.1"

サーバー側コンポーネントの場合、org.springframework:spring-websocket および / または io.projectreactor.netty:reactor-netty 依存関係を追加する必要があります。

概要

STOMP を構成するには、STOMP クライアントオブジェクトから開始する必要があります。Spring Framework は、次の実装を提供します。

  • WebSocketStompClient: SockJS クライアントでの HTTP ベースの WebSocket エミュレーションのために、標準の JSR-356 WebSocket、Jetty 9、SockJS をサポートする Spring WebSocket API 上に構築されています。

  • ReactorNettyTcpStompClientreactor-netty プロジェクトの ReactorNettyTcpClient 上に構築。

他の StompClientSupport 実装を提供できます。これらのクラスの Javadoc を参照してください。

StompClientSupport クラスは、提供された StompSessionHandler の StompSession を生成するファクトリとして設計され、残りのすべての作業は、その StompSessionHandler および StompSession 抽象化へのコールバックを通じて行われます。Spring Integration アダプターの抽象化では、アプリケーションを一意のセッションを持つ STOMP クライアントとして表すために、管理された共有オブジェクトを提供する必要があります。このために、Spring Integration は StompSessionManager 抽象化を提供して、提供された StompSessionHandler 間の単一の  StompSession を管理します。これにより、特定の STOMP ブローカーに受信または送信チャネルアダプター(またはその両方)を使用できます。詳細については、StompSessionManager (およびその実装)JavaDocs を参照してください。

STOMP 受信チャネルアダプター

StompInboundChannelAdapter は、Spring Integration アプリケーションを提供された STOMP 宛先にサブスクライブし、それらからメッセージを受信するワンストップ MessageProducer コンポーネントです(接続された StompSession で提供された MessageConverter を使用して STOMP フレームから変換されます)。StompInboundChannelAdapter で適切な @ManagedOperation アノテーションを使用することにより、実行時に宛先(したがって STOMP サブスクリプション)を変更できます。

その他の構成オプションについては、STOMP 名前空間のサポートおよび StompInboundChannelAdapter Javadoc を参照してください。

STOMP 送信チャネルアダプター

StompMessageHandler は <int-stomp:outbound-channel-adapter> の MessageHandler であり、発信 Message<?> インスタンスを StompSession (共有 StompSessionManager によって提供される)を介して STOMP destination (SpEL 式を使用して事前構成または決定)に送信するために使用されます。

その他の構成オプションについては、STOMP 名前空間のサポートおよび StompMessageHandler Javadoc を参照してください。

STOMP ヘッダーマッピング

STOMP プロトコルは、フレームの一部としてヘッダーを提供します。STOMP フレームの構造全体の形式は次のとおりです。

....
COMMAND
header1:value1
header2:value2

Body^@
....

Spring Framework は、これらのヘッダーを表す StompHeaders を提供します。詳細については、Javadoc を参照してください。STOMP フレームは Message<?> インスタンスとの間で変換され、これらのヘッダーは MessageHeaders インスタンスとの間でマッピングされます。Spring Integration は、STOMP アダプター用のデフォルト HeaderMapper 実装を提供します。実装は StompHeaderMapper です。受信および送信のアダプターに対して、それぞれ fromHeaders() および toHeaders() 操作を提供します。

他の多くの Spring Integration モジュールと同様に、標準の STOMP ヘッダーを MessageHeaders にマッピングするために IntegrationStompHeaders クラスが導入され、ヘッダー名のプレフィックスとして stomp_ が使用されています。さらに、そのプレフィックスを持つすべての MessageHeaders インスタンスは、宛先に送信するときに StompHeaders にマップされます。

詳細については、それらのクラスの Javadoc および STOMP 名前空間のサポートの mapped-headers 属性の説明を参照してください。

STOMP 統合イベント

多くの STOMP 操作は、エラー処理を含む非同期です。例: STOMP には RECEIPT サーバーフレームがあり、クライアントフレームが RECEIPT ヘッダーを追加してリクエストしたときに返される RECEIPT サーバーフレームがあります。これらの非同期イベントへのアクセスを提供するために、Spring Integration は StompIntegrationEvent インスタンスを発行します。これは、ApplicationListener を実装するか、<int-event:inbound-channel-adapter> を使用して取得できます(Spring アプリケーションイベントの受信を参照)。

具体的には、stompSessionListenableFuture が STOMP ブローカーへの接続の失敗により onFailure() を受信すると、StompExceptionEvent が AbstractStompSessionManager から発行されます。別の例は StompMessageHandler です。ERROR STOMP フレームを処理します。これは、この StompMessageHandler によって送信された不適切な(受け入れられない)メッセージに対するサーバーのレスポンスです。

StompMessageHandler は、StompSession に送信されたメッセージに対する非同期応答内の StompSession.Receiptable コールバックの一部として StompReceiptEvent を発行します。StompReceiptEvent は、StompClientSupport インスタンスで構成できる receiptTimeLimit 期間内にサーバーから RECEIPT フレームを受信したかどうかに応じて、正または負になります。デフォルトは 15 * 1000 (ミリ秒単位なので 15 秒) です。

StompSession.Receiptable コールバックは、送信するメッセージの RECEIPT STOMP ヘッダーが null でない場合にのみ追加されます。autoReceipt オプションを介して StompSession および StompSessionManager でそれぞれ RECEIPT ヘッダーの自動生成を有効にできます。

それらの ApplicationEvent インスタンスを受け入れるように Spring Integration を構成する方法の詳細については、STOMP アダプターの Java 構成を参照してください。

STOMP アダプターの Java 構成

次の例は、STOMP アダプターの包括的な Java 構成を示しています。

@Configuration
@EnableIntegration
public class StompConfiguration {

    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }

    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }

    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }

    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }

    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }

    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }

}

STOMP 名前空間のサポート

Spring Integration STOMP 名前空間は、受信および送信のチャネルアダプターコンポーネントを実装します。構成に含めるには、アプリケーションコンテキスト構成ファイルで次の名前空間宣言を指定します。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>

<int-stomp:outbound-channel-adapter> 要素を理解する

次のリストは、STOMP 送信チャネルアダプターで使用可能な属性を示しています。

<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)
1 コンポーネント Bean 名。MessageHandler は、id と .handler の Bean エイリアスで登録されます。channel 属性を設定しない場合、DirectChannel が作成され、この id 属性の値が Bean 名としてアプリケーションコンテキストに登録されます。この場合、エンドポイントは Bean 名 id と .adapter で登録されます。
2id が存在する場合、このアダプターに接続されているチャネルを識別します。id を参照してください。オプション。
3 低レベル接続と StompSession 処理操作をカプセル化する StompSessionManager Bean への参照。必須。
4HeaderMapper<StompHeaders> を実装する Bean への参照。HeaderMapper<StompHeaders> は、Spring Integration MessageHeaders を STOMP フレームヘッダーとの間でマッピングします。mapped-headers と相互に排他的です。デフォルトは StompHeaderMapper です。
5STOMP フレームヘッダーにマッピングされる STOMP ヘッダーの名前のカンマ区切りリスト。header-mapper 参照が設定されていない場合にのみ提供できます。このリストの値は、ヘッダー名(myheader* や *myheader など)と照合する単純なパターンにすることもできます。特別なトークン(STOMP_OUTBOUND_HEADERS)は、すべての標準 STOMP ヘッダー(コンテンツ長、受信、ハートビートなど)を表します。これらはデフォルトで含まれています。独自のヘッダーを追加し、標準ヘッダーもマッピングする場合は、このトークンを含めるか、header-mapper を使用して独自の HeaderMapper 実装を提供する必要があります。
6STOMP メッセージの送信先の名前。destination-expression と相互に排他的です。
7 ルートオブジェクトとして各 Spring Integration Message に対して実行時に評価される SpEL 式。destination と相互に排他的です。
8 このエンドポイントが自動的に開始するかどうかを示すブール値。デフォルトは true です。
9 このエンドポイントが開始および停止するライフサイクルフェーズ。値が低いほど、このエンドポイントは早く開始し、遅くなります。デフォルトは Integer.MIN_VALUE です。値は負の値にすることができます。SmartLifeCycle (Javadoc) を参照してください。

<int-stomp:inbound-channel-adapter> 要素を理解する

次のリストは、STOMP 受信チャネルアダプターで使用可能な属性を示しています。

<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)
1 コンポーネント Bean 名。channel 属性を設定しない場合、DirectChannel が作成され、この id 属性の値が Bean 名としてアプリケーションコンテキストに登録されます。この場合、エンドポイントは Bean 名 id と .adapter で登録されます。
2 このアダプターに接続されているチャネルを識別します。
3ErrorMessage インスタンスの送信先となる MessageChannel Bean 参照。
4<int-stomp:outbound-channel-adapter> で同じオプションを参照してください。
5STOMP フレームヘッダーからマッピングされる STOMP ヘッダーの名前のカンマ区切りリスト。これは、header-mapper 参照が設定されていない場合にのみ提供できます。このリストの値は、ヘッダー名(たとえば、myheader* または *myheader)と照合する単純なパターンにすることもできます。特別なトークン(STOMP_INBOUND_HEADERS)は、すべての標準 STOMP ヘッダー(コンテンツ長、受信、ハートビートなど)を表します。これらはデフォルトで含まれています。独自のヘッダーを追加し、標準ヘッダーもマッピングする場合は、このトークンを含めるか、header-mapper を使用して独自の HeaderMapper 実装を提供する必要があります。
6<int-stomp:outbound-channel-adapter> で同じオプションを参照してください。
7 サブスクライブする STOMP 宛先名のコンマ区切りリスト。宛先のリスト(したがってサブスクリプション)は、addDestination() および removeDestination() @ManagedOperation アノテーションを介して実行時に変更できます。
8 チャネルがブロックできる場合にチャネルにメッセージを送信するときに待機する最大時間(ミリ秒)。例: QueueChannel は、最大容量に達した場合、スペースが使用可能になるまでブロックできます。
9 受信 STOMP フレームから変換するターゲット payload の Java 型の完全修飾名。デフォルトは String.class です。
10<int-stomp:outbound-channel-adapter> で同じオプションを参照してください。
11<int-stomp:outbound-channel-adapter> で同じオプションを参照してください。