ZeroMQ サポート
Spring Integration は、アプリケーションで ZeroMQ (英語) 通信をサポートするためのコンポーネントを提供します。実装は、JeroMQ [GitHub] (英語) ライブラリの十分にサポートされている JavaAPI に基づいています。すべてのコンポーネントは ZeroMQ ソケットのライフサイクルをカプセル化し、それらのスレッドを内部的に管理して、これらのコンポーネントとの相互作用をロックフリーおよびスレッドセーフにします。
この依存関係をプロジェクトに含める必要があります。
ZeroMQ プロキシ
ZeroMqProxy
は、組み込みの ZMQ.proxy()
関数 (英語) の Spring 対応ラッパーです。ソケットのライフサイクルとスレッド管理をカプセル化します。このプロキシのクライアントは、引き続き標準の ZeroMQ ソケット接続および対話 API を使用できます。標準の ZContext
に加えて、よく知られている ZeroMQ プロキシモードの 1 つ(SUB/PUB、PULL/PUSH、ROUTER/DEALER)が必要です。このようにして、プロキシのフロントエンドとバックエンドに適切な ZeroMQ ソケット型のペアが使用されます。詳細については、ZeroMqProxy.Type
を参照してください。
ZeroMqProxy
は SmartLifecycle
を実装して、ソケットを作成、バインド、構成し、Executor
(存在する場合) から専用スレッドで ZMQ.proxy()
を開始します。フロントエンドソケットとバックエンドソケットのバインドは、tcp://
プロトコルを介して、提供されたポートを使用して利用可能なすべてのネットワークインターフェースに対して行われます。それ以外の場合は、後でそれぞれの getFrontendPort()
および getBackendPort()
API メソッドを介して取得できるランダムポートにバインドされます。
制御ソケットは、"inproc://" + beanName + ".control"
アドレスでのスレッド間トランスポートを備えた SocketType.PAIR
として公開されます。getControlAddress()
から入手できます。ZMQ.PROXY_TERMINATE
、ZMQ.PROXY_PAUSE
、/ または ZMQ.PROXY_RESUME
コマンドを送信するには、別の SocketType.PAIR
ソケットの同じアプリケーションで使用する必要があります。ZeroMqProxy
は、stop()
のライフサイクルが呼び出されたときに ZMQ.PROXY_TERMINATE
コマンドを実行して、ZMQ.proxy()
ループを終了し、バインドされたすべてのソケットを正常に閉じます。
setExposeCaptureSocket(boolean)
オプションを使用すると、このコンポーネントは追加のスレッド間ソケットを SocketType.PUB
にバインドして、ZMQ.proxy()
実装で述べられているように、フロントエンドソケットとバックエンドソケット間のすべての通信をキャプチャーして公開します。このソケットは "inproc://" + beanName + ".capture"
アドレスにバインドされており、フィルタリングのための特定のサブスクリプションを想定していません。
フロントエンドソケットとバックエンドソケットは、読み取り / 書き込みタイムアウトやセキュリティなどの追加のプロパティを使用してカスタマイズできます。このカスタマイズは、それぞれ setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)
および setBackendSocketConfigurer(Consumer<ZMQ.Socket>)
コールバックを介して利用できます。
ZeroMqProxy
は、次のような単純な Bean として提供できます。
@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
すべてのクライアントノードは、tcp://
を介してこのプロキシのホストに接続し、対象のそれぞれのポートを使用する必要があります。
ZeroMQ メッセージチャネル
ZeroMqChannel
は、ZeroMQ ソケットのペアを使用してパブリッシャーとサブスクライバーを接続し、メッセージングのやり取りを行う SubscribableChannel
です。PUB/SUB モード (デフォルトは PUSH/PULL) で動作します。ローカルのスレッド間チャネルとしても使用できます (PAIR
ソケットを使用)。この場合、connectUrl
は提供されません。分散モードでは、外部で管理される ZeroMQ プロキシに接続する必要があり、そこで同じプロキシに接続された他の同様のチャネルとメッセージを交換できます。接続 url オプションは、プロトコルとホスト、ZeroMQ プロキシのフロントエンドとバックエンドのソケット用のコロンで囲まれたポートのペアを含む標準の ZeroMQ 接続文字列です。便宜上、チャネルがプロキシと同じアプリケーションで構成されている場合は、接続文字列の代わりに ZeroMqProxy
インスタンスをチャネルに提供できます。
送信ソケットと受信ソケットの両方が独自の専用スレッドで管理されるため、このチャネルは同時実行に適しています。このようにして、同期せずに異なるスレッドから ZeroMqChannel
との間で公開および消費できます。
デフォルトでは、ZeroMqChannel
は EmbeddedJsonHeadersMessageMapper
を使用して、Jackson JSON プロセッサーを使用して byte[]
との間で Message
(ヘッダーを含む) を (逆) 直列化します。このロジックは、setMessageMapper(BytesMessageMapper)
を介して構成できます。
送信ソケットと受信ソケットは、それぞれの setSendSocketConfigurer(Consumer<ZMQ.Socket>)
および setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>)
コールバックを介して、任意のオプション(読み取り / 書き込みタイムアウト、セキュリティなど)に合わせてカスタマイズできます。
ZeroMqChannel
の内部ロジックは、Project Reactor Flux
および Mono
オペレーターを介したリアクティブストリームに基づいています。これにより、スレッド制御が容易になり、チャネルとの間でロックフリーの同時発行と消費が可能になります。ローカル PUB/SUB ロジックは Flux.publish()
オペレーターとして実装され、このチャネルのすべてのローカルサブスクライバーが、PUB
ソケットの分散サブスクライバーと同じパブリッシュ済みメッセージを受信できるようにします。
以下は、ZeroMqChannel
構成の簡単な例です。
@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
ZeroMQ 受信チャネルアダプター
ZeroMqMessageProducer
は、リアクティブセマンティクスを備えた MessageProducerSupport
実装です。ZeroMQ ソケットからノンブロッキング方式で常にデータを読み取り、出力チャネルがリアクティブでない場合は、FluxMessageChannel
によってサブスクライブされているか start()
メソッドで明示的にサブスクライブされている無限の Flux
にメッセージを公開します。ソケットでデータが受信されない場合、次の読み取り試行の前に consumeDelay
(デフォルトは 1 秒)が適用されます。
ZeroMqMessageProducer
では SocketType.PAIR
、SocketType.PULL
、SocketType.SUB
のみがサポートされています。このコンポーネントは、リモートソケットに接続するか、提供されたポートまたはランダムなポートを使用して TCP プロトコルにバインドできます。実際のポートは、このコンポーネントが開始され、ZeroMQ ソケットがバインドされた後、getBoundPort()
を介して取得できます。ソケットオプション(セキュリティや書き込みタイムアウトなど)は、setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
コールバックを介して構成できます。
receiveRaw
オプションが true
に設定されている場合、ソケットから消費された ZMsg
は、生成された Message
のペイロードとしてそのまま送信されます。ZMsg
を解析して変換するのはダウンストリームフロー次第です。それ以外の場合は、消費されたデータを Message
に変換するために InboundMessageMapper
が使用されます。受信した ZMsg
がマルチフレームの場合、最初のフレームは、この ZeroMQ メッセージが公開された ZeroMqHeaders.TOPIC
ヘッダーとして扱われます。
unwrapTopic
オプションが false
に設定されている場合、受信メッセージはトピックと ZeroMQ メッセージの 2 つのフレームで構成されていると見なされます。それ以外の場合、デフォルトでは、ZMsg
は 3 つのフレームで構成されていると見なされます。最初のフレームにはトピックが含まれ、最後のフレームにはメッセージが含まれ、中央に空のフレームがあります。
SocketType.SUB
を使用すると、ZeroMqMessageProducer
はサブスクリプションに提供された topics
オプションを使用します。デフォルトでは、すべてをサブスクライブします。サブスクリプションは、subscribeToTopics()
および unsubscribeFromTopics()
@ManagedOperation
を使用して実行時に調整できます。
ZeroMqMessageProducer
構成のサンプルを次に示します。
@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
ZeroMQ 送信チャネルアダプター
ZeroMqMessageHandler
は、ZeroMQ ソケットにパブリッシュメッセージを生成する ReactiveMessageHandler
実装です。サポートされるのは SocketType.PAIR
、SocketType.PUSH
、SocketType.PUB
のみです。このコンポーネントは、リモートソケットに接続したり、指定されたポートまたはランダムポートを使用して TCP プロトコルにバインドしたりできます。実際のポートは、このコンポーネントが起動され、ZeroMQ ソケットがバインドされた後に、getBoundPort()
を介して取得できます。
SocketType.PUB
が使用される場合、topicExpression
はリクエストメッセージに対して評価され、トピックフレームが null でない場合は ZeroMQ メッセージに挿入されます。サブスクライバー側 (SocketType.SUB
) は、実際のデータを解析する前に、まずトピックフレームを受信する必要があります。
wrapTopic
オプションが false
に設定されている場合、挿入されたトピックが存在する場合は、そのトピックの後に ZeroMQ メッセージフレームが送信されます。デフォルトでは、トピックとメッセージの間に追加の空のフレームが送信されます。
リクエストメッセージのペイロードが ZMsg
の場合、変換やトピックの抽出は実行されません。ZMsg
はそのままソケットに送信され、再利用の可能性を考慮して破棄されることはありません。それ以外の場合は、リクエストメッセージ (またはそのペイロードのみ) を ZeroMQ フレームに変換して公開するために OutboundMessageMapper<byte[]>
が使用されます。デフォルトでは、ConfigurableCompositeMessageConverter
とともに提供される ConvertingBytesMessageMapper
が使用されます。ソケットオプション (セキュリティや書き込みタイムアウトなど) は、setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer)
コールバックを介して構成できます。
以下はソケットに接続する ZeroMqMessageHandler
構成のサンプルです。
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
指定されたポートにバインドする ZeroMqMessageHandler
構成のサンプルを次に示します。
@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
ZeroMQ JavaDSL サポート
spring-integration-zeromq
は、ZeroMq
ファクトリおよび上記のコンポーネントの IntegrationComponentSpec
実装を介して便利な Java DSL fluentAPI を提供します。
これは ZeroMqChannel
用の JavaDSL のサンプルです。
.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ JavaDSL 用の受信チャネルアダプターは次のとおりです。
IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
ZeroMQ JavaDSL 用の送信チャネルアダプターは次のとおりです。
.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}