STOMP サポート
Spring Integration バージョン 4.2 は、STOMP(Simple Text Orientated Messaging Protocol)クライアントサポートを導入しました。Spring Framework のメッセージングモジュールである stomp パッケージのアーキテクチャ、インフラストラクチャ、API に基づいています。Spring Integration は、Spring STOMP コンポーネントの多く(StompSession
や StompClientSupport
など)を使用します。詳細については、Spring Framework リファレンスマニュアルの Spring Framework STOMP サポートの章を参照してください。
この依存関係をプロジェクトに含める必要があります。
サーバー側コンポーネントの場合、org.springframework:spring-websocket
および / または io.projectreactor.netty:reactor-netty
依存関係を追加する必要があります。
概要
STOMP を構成するには、STOMP クライアントオブジェクトから開始する必要があります。Spring Framework は、次の実装を提供します。
WebSocketStompClient
: SockJS クライアントでの HTTP ベースの WebSocket エミュレーションのために、標準の JSR-356 WebSocket、Jetty 9、SockJS をサポートする Spring WebSocket API 上に構築されています。ReactorNettyTcpStompClient
:reactor-netty
プロジェクトのReactorNettyTcpClient
上に構築。
他の StompClientSupport
実装を提供できます。これらのクラスの Javadoc を参照してください。
StompClientSupport
クラスは、提供された StompSessionHandler
の StompSession
を生成するファクトリとして設計され、残りのすべての作業は、その StompSessionHandler
および StompSession
抽象化へのコールバックを通じて行われます。Spring Integration アダプターの抽象化では、アプリケーションを一意のセッションを持つ STOMP クライアントとして表すために、管理された共有オブジェクトを提供する必要があります。このために、Spring Integration は StompSessionManager
抽象化を提供して、提供された StompSessionHandler
間の単一の StompSession
を管理します。これにより、特定の STOMP ブローカーに受信または送信チャネルアダプター(またはその両方)を使用できます。詳細については、StompSessionManager
(およびその実装)JavaDocs を参照してください。
STOMP 受信チャネルアダプター
StompInboundChannelAdapter
は、Spring Integration アプリケーションを提供された STOMP 宛先にサブスクライブし、それらからメッセージを受信するワンストップ MessageProducer
コンポーネントです(接続された StompSession
で提供された MessageConverter
を使用して STOMP フレームから変換されます)。StompInboundChannelAdapter
で適切な @ManagedOperation
アノテーションを使用することにより、実行時に宛先(したがって STOMP サブスクリプション)を変更できます。
その他の構成オプションについては、STOMP 名前空間のサポートおよび StompInboundChannelAdapter
Javadoc を参照してください。
STOMP 送信チャネルアダプター
StompMessageHandler
は <int-stomp:outbound-channel-adapter>
の MessageHandler
であり、発信 Message<?>
インスタンスを StompSession
(共有 StompSessionManager
によって提供される)を介して STOMP destination
(SpEL 式を使用して事前構成または決定)に送信するために使用されます。
その他の構成オプションについては、STOMP 名前空間のサポートおよび StompMessageHandler
Javadoc を参照してください。
STOMP ヘッダーマッピング
STOMP プロトコルは、フレームの一部としてヘッダーを提供します。STOMP フレームの構造全体の形式は次のとおりです。
....
COMMAND
header1:value1
header2:value2
Body^@
....
Spring Framework は、これらのヘッダーを表す StompHeaders
を提供します。詳細については、Javadoc を参照してください。STOMP フレームは Message<?>
インスタンスとの間で変換され、これらのヘッダーは MessageHeaders
インスタンスとの間でマッピングされます。Spring Integration は、STOMP アダプター用のデフォルト HeaderMapper
実装を提供します。実装は StompHeaderMapper
です。受信および送信のアダプターに対して、それぞれ fromHeaders()
および toHeaders()
操作を提供します。
他の多くの Spring Integration モジュールと同様に、標準の STOMP ヘッダーを MessageHeaders
にマッピングするために IntegrationStompHeaders
クラスが導入され、ヘッダー名のプレフィックスとして stomp_
が使用されています。さらに、そのプレフィックスを持つすべての MessageHeaders
インスタンスは、宛先に送信するときに StompHeaders
にマップされます。
詳細については、それらのクラスの Javadoc および STOMP 名前空間のサポートの mapped-headers
属性の説明を参照してください。
STOMP 統合イベント
多くの STOMP 操作は、エラー処理を含む非同期です。例: STOMP には RECEIPT
サーバーフレームがあり、クライアントフレームが RECEIPT
ヘッダーを追加してリクエストしたときに返される RECEIPT
サーバーフレームがあります。これらの非同期イベントへのアクセスを提供するために、Spring Integration は StompIntegrationEvent
インスタンスを発行します。これは、ApplicationListener
を実装するか、<int-event:inbound-channel-adapter>
を使用して取得できます(Spring アプリケーションイベントの受信を参照)。
具体的には、stompSessionListenableFuture
が STOMP ブローカーへの接続の失敗により onFailure()
を受信すると、StompExceptionEvent
が AbstractStompSessionManager
から発行されます。別の例は StompMessageHandler
です。ERROR
STOMP フレームを処理します。これは、この StompMessageHandler
によって送信された不適切な(受け入れられない)メッセージに対するサーバーのレスポンスです。
StompMessageHandler
は、StompSession
に送信されたメッセージに対する非同期応答内の StompSession.Receiptable
コールバックの一部として StompReceiptEvent
を発行します。StompReceiptEvent
は、StompClientSupport
インスタンスで構成できる receiptTimeLimit
期間内にサーバーから RECEIPT
フレームを受信したかどうかに応じて、正または負になります。デフォルトは 15 * 1000
(ミリ秒単位なので 15 秒) です。
StompSession.Receiptable コールバックは、送信するメッセージの RECEIPT STOMP ヘッダーが null でない場合にのみ追加されます。autoReceipt オプションを介して StompSession および StompSessionManager でそれぞれ RECEIPT ヘッダーの自動生成を有効にできます。 |
それらの ApplicationEvent
インスタンスを受け入れるように Spring Integration を構成する方法の詳細については、STOMP アダプターの Java 構成を参照してください。
STOMP アダプターの Java 構成
次の例は、STOMP アダプターの包括的な Java 構成を示しています。
@Configuration
@EnableIntegration
public class StompConfiguration {
@Bean
public ReactorNettyTcpStompClient stompClient() {
ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
stompClient.setTaskScheduler(taskScheduler);
stompClient.setReceiptTimeLimit(5000);
return stompClient;
}
@Bean
public StompSessionManager stompSessionManager() {
ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
stompSessionManager.setAutoReceipt(true);
return stompSessionManager;
}
@Bean
public PollableChannel stompInputChannel() {
return new QueueChannel();
}
@Bean
public StompInboundChannelAdapter stompInboundChannelAdapter() {
StompInboundChannelAdapter adapter =
new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
adapter.setOutputChannel(stompInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "stompOutputChannel")
public MessageHandler stompMessageHandler() {
StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
handler.setDestination("/topic/myTopic");
return handler;
}
@Bean
public PollableChannel stompEvents() {
return new QueueChannel();
}
@Bean
public ApplicationListener<ApplicationEvent> stompEventListener() {
ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
producer.setEventTypes(StompIntegrationEvent.class);
producer.setOutputChannel(stompEvents());
return producer;
}
}
STOMP 名前空間のサポート
Spring Integration STOMP 名前空間は、受信および送信のチャネルアダプターコンポーネントを実装します。構成に含めるには、アプリケーションコンテキスト構成ファイルで次の名前空間宣言を指定します。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/stomp
https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
...
</beans>
<int-stomp:outbound-channel-adapter>
要素を理解する
次のリストは、STOMP 送信チャネルアダプターで使用可能な属性を示しています。
<int-stomp:outbound-channel-adapter
id="" (1)
channel="" (2)
stomp-session-manager="" (3)
header-mapper="" (4)
mapped-headers="" (5)
destination="" (6)
destination-expression="" (7)
auto-startup="" (8)
phase=""/> (9)
1 | コンポーネント Bean 名。MessageHandler は、id と .handler の Bean エイリアスで登録されます。channel 属性を設定しない場合、DirectChannel が作成され、この id 属性の値が Bean 名としてアプリケーションコンテキストに登録されます。この場合、エンドポイントは Bean 名 id と .adapter で登録されます。 |
2 | id が存在する場合、このアダプターに接続されているチャネルを識別します。id を参照してください。オプション。 |
3 | 低レベル接続と StompSession 処理操作をカプセル化する StompSessionManager Bean への参照。必須。 |
4 | HeaderMapper<StompHeaders> を実装する Bean への参照。HeaderMapper<StompHeaders> は、Spring Integration MessageHeaders を STOMP フレームヘッダーとの間でマッピングします。mapped-headers と相互に排他的です。デフォルトは StompHeaderMapper です。 |
5 | STOMP フレームヘッダーにマッピングされる STOMP ヘッダーの名前のカンマ区切りリスト。header-mapper 参照が設定されていない場合にのみ提供できます。このリストの値は、ヘッダー名(myheader* や *myheader など)と照合する単純なパターンにすることもできます。特別なトークン(STOMP_OUTBOUND_HEADERS )は、すべての標準 STOMP ヘッダー(コンテンツ長、受信、ハートビートなど)を表します。これらはデフォルトで含まれています。独自のヘッダーを追加し、標準ヘッダーもマッピングする場合は、このトークンを含めるか、header-mapper を使用して独自の HeaderMapper 実装を提供する必要があります。 |
6 | STOMP メッセージの送信先の名前。destination-expression と相互に排他的です。 |
7 | ルートオブジェクトとして各 Spring Integration Message に対して実行時に評価される SpEL 式。destination と相互に排他的です。 |
8 | このエンドポイントが自動的に開始するかどうかを示すブール値。デフォルトは true です。 |
9 | このエンドポイントが開始および停止するライフサイクルフェーズ。値が低いほど、このエンドポイントは早く開始し、遅くなります。デフォルトは Integer.MIN_VALUE です。値は負の値にすることができます。SmartLifeCycle (Javadoc) を参照してください。 |
<int-stomp:inbound-channel-adapter>
要素を理解する
次のリストは、STOMP 受信チャネルアダプターで使用可能な属性を示しています。
<int-stomp:inbound-channel-adapter
id="" (1)
channel="" (2)
error-channel="" (3)
stomp-session-manager="" (4)
header-mapper="" (5)
mapped-headers="" (6)
destinations="" (7)
send-timeout="" (8)
payload-type="" (9)
auto-startup="" (10)
phase=""/> (11)
1 | コンポーネント Bean 名。channel 属性を設定しない場合、DirectChannel が作成され、この id 属性の値が Bean 名としてアプリケーションコンテキストに登録されます。この場合、エンドポイントは Bean 名 id と .adapter で登録されます。 |
2 | このアダプターに接続されているチャネルを識別します。 |
3 | ErrorMessage インスタンスの送信先となる MessageChannel Bean 参照。 |
4 | <int-stomp:outbound-channel-adapter> で同じオプションを参照してください。 |
5 | STOMP フレームヘッダーからマッピングされる STOMP ヘッダーの名前のカンマ区切りリスト。これは、header-mapper 参照が設定されていない場合にのみ提供できます。このリストの値は、ヘッダー名(たとえば、myheader* または *myheader )と照合する単純なパターンにすることもできます。特別なトークン(STOMP_INBOUND_HEADERS )は、すべての標準 STOMP ヘッダー(コンテンツ長、受信、ハートビートなど)を表します。これらはデフォルトで含まれています。独自のヘッダーを追加し、標準ヘッダーもマッピングする場合は、このトークンを含めるか、header-mapper を使用して独自の HeaderMapper 実装を提供する必要があります。 |
6 | <int-stomp:outbound-channel-adapter> で同じオプションを参照してください。 |
7 | サブスクライブする STOMP 宛先名のコンマ区切りリスト。宛先のリスト(したがってサブスクリプション)は、addDestination() および removeDestination() @ManagedOperation アノテーションを介して実行時に変更できます。 |
8 | チャネルがブロックできる場合にチャネルにメッセージを送信するときに待機する最大時間(ミリ秒)。例: QueueChannel は、最大容量に達した場合、スペースが使用可能になるまでブロックできます。 |
9 | 受信 STOMP フレームから変換するターゲット payload の Java 型の完全修飾名。デフォルトは String.class です。 |
10 | <int-stomp:outbound-channel-adapter> で同じオプションを参照してください。 |
11 | <int-stomp:outbound-channel-adapter> で同じオプションを参照してください。 |