ZeroMQ サポート

Spring Integration は、アプリケーションで ZeroMQ (英語) 通信をサポートするためのコンポーネントを提供します。実装は、JeroMQ [GitHub] (英語) ライブラリの十分にサポートされている JavaAPI に基づいています。すべてのコンポーネントは ZeroMQ ソケットのライフサイクルをカプセル化し、それらのスレッドを内部的に管理して、これらのコンポーネントとの相互作用をロックフリーおよびスレッドセーフにします。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-zeromq</artifactId>
    <version>6.3.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-zeromq:6.3.1"

ZeroMQ プロキシ

ZeroMqProxy は、組み込みの ZMQ.proxy()  関数 (英語) の Spring 対応ラッパーです。ソケットのライフサイクルとスレッド管理をカプセル化します。このプロキシのクライアントは、引き続き標準の ZeroMQ ソケット接続および対話 API を使用できます。標準の ZContext に加えて、よく知られている ZeroMQ プロキシモードの 1 つ(SUB/PUB、PULL/PUSH、ROUTER/DEALER)が必要です。このようにして、プロキシのフロントエンドとバックエンドに適切な ZeroMQ ソケット型のペアが使用されます。詳細については、ZeroMqProxy.Type を参照してください。

ZeroMqProxy は SmartLifecycle を実装して、ソケットを作成、バインド、構成し、Executor (存在する場合) から専用スレッドで ZMQ.proxy() を開始します。フロントエンドソケットとバックエンドソケットのバインドは、tcp:// プロトコルを介して、提供されたポートを使用して利用可能なすべてのネットワークインターフェースに対して行われます。それ以外の場合は、後でそれぞれの getFrontendPort() および getBackendPort() API メソッドを介して取得できるランダムポートにバインドされます。

制御ソケットは、"inproc://" + beanName + ".control" アドレスでのスレッド間トランスポートを備えた SocketType.PAIR として公開されます。getControlAddress() から入手できます。ZMQ.PROXY_TERMINATEZMQ.PROXY_PAUSE、/ または ZMQ.PROXY_RESUME コマンドを送信するには、別の SocketType.PAIR ソケットの同じアプリケーションで使用する必要があります。ZeroMqProxy は、stop() のライフサイクルが呼び出されたときに ZMQ.PROXY_TERMINATE コマンドを実行して、ZMQ.proxy() ループを終了し、バインドされたすべてのソケットを正常に閉じます。

setExposeCaptureSocket(boolean) オプションを使用すると、このコンポーネントは追加のスレッド間ソケットを SocketType.PUB にバインドして、ZMQ.proxy() 実装で述べられているように、フロントエンドソケットとバックエンドソケット間のすべての通信をキャプチャーして公開します。このソケットは "inproc://" + beanName + ".capture" アドレスにバインドされており、フィルタリングのための特定のサブスクリプションを想定していません。

フロントエンドソケットとバックエンドソケットは、読み取り / 書き込みタイムアウトやセキュリティなどの追加のプロパティを使用してカスタマイズできます。このカスタマイズは、それぞれ setFrontendSocketConfigurer(Consumer<ZMQ.Socket>) および setBackendSocketConfigurer(Consumer<ZMQ.Socket>) コールバックを介して利用できます。

ZeroMqProxy は、次のような単純な Bean として提供できます。

@Bean
ZeroMqProxy zeroMqProxy() {
    ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
    proxy.setExposeCaptureSocket(true);
    proxy.setFrontendPort(6001);
    proxy.setBackendPort(6002);
    return proxy;
}

すべてのクライアントノードは、tcp:// を介してこのプロキシのホストに接続し、対象のそれぞれのポートを使用する必要があります。

ZeroMQ メッセージチャネル

ZeroMqChannel は、ZeroMQ ソケットのペアを使用してパブリッシャーとサブスクライバーを接続し、メッセージングのやり取りを行う SubscribableChannel です。PUB/SUB モード (デフォルトは PUSH/PULL) で動作します。ローカルのスレッド間チャネルとしても使用できます (PAIR ソケットを使用)。この場合、connectUrl は提供されません。分散モードでは、外部で管理される ZeroMQ プロキシに接続する必要があり、そこで同じプロキシに接続された他の同様のチャネルとメッセージを交換できます。接続 url オプションは、プロトコルとホスト、ZeroMQ プロキシのフロントエンドとバックエンドのソケット用のコロンで囲まれたポートのペアを含む標準の ZeroMQ 接続文字列です。便宜上、チャネルがプロキシと同じアプリケーションで構成されている場合は、接続文字列の代わりに ZeroMqProxy インスタンスをチャネルに提供できます。

送信ソケットと受信ソケットの両方が独自の専用スレッドで管理されるため、このチャネルは同時実行に適しています。このようにして、同期せずに異なるスレッドから ZeroMqChannel との間で公開および消費できます。

デフォルトでは、ZeroMqChannel は EmbeddedJsonHeadersMessageMapper を使用して、Jackson JSON プロセッサーを使用して byte[] との間で Message (ヘッダーを含む) を (逆) 直列化します。このロジックは、setMessageMapper(BytesMessageMapper) を介して構成できます。

送信ソケットと受信ソケットは、それぞれの setSendSocketConfigurer(Consumer<ZMQ.Socket>) および setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) コールバックを介して、任意のオプション(読み取り / 書き込みタイムアウト、セキュリティなど)に合わせてカスタマイズできます。

ZeroMqChannel の内部ロジックは、Project Reactor Flux および Mono オペレーターを介したリアクティブストリームに基づいています。これにより、スレッド制御が容易になり、チャネルとの間でロックフリーの同時発行と消費が可能になります。ローカル PUB/SUB ロジックは Flux.publish() オペレーターとして実装され、このチャネルのすべてのローカルサブスクライバーが、PUB ソケットの分散サブスクライバーと同じパブリッシュ済みメッセージを受信できるようにします。

以下は、ZeroMqChannel 構成の簡単な例です。

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
    ZeroMqChannel channel = new ZeroMqChannel(context, true);
    channel.setConnectUrl("tcp://localhost:6001:6002");
    channel.setConsumeDelay(Duration.ofMillis(100));
    return channel;
}

ZeroMQ 受信チャネルアダプター

ZeroMqMessageProducer は、リアクティブセマンティクスを備えた MessageProducerSupport 実装です。ZeroMQ ソケットからノンブロッキング方式で常にデータを読み取り、出力チャネルがリアクティブでない場合は、FluxMessageChannel によってサブスクライブされているか start() メソッドで明示的にサブスクライブされている無限の Flux にメッセージを公開します。ソケットでデータが受信されない場合、次の読み取り試行の前に consumeDelay (デフォルトは 1 秒)が適用されます。

ZeroMqMessageProducer では SocketType.PAIRSocketType.PULLSocketType.SUB のみがサポートされています。このコンポーネントは、リモートソケットに接続するか、提供されたポートまたはランダムなポートを使用して TCP プロトコルにバインドできます。実際のポートは、このコンポーネントが開始され、ZeroMQ ソケットがバインドされた後、getBoundPort() を介して取得できます。ソケットオプション(セキュリティや書き込みタイムアウトなど)は、setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) コールバックを介して構成できます。

receiveRaw オプションが true に設定されている場合、ソケットから消費された ZMsg は、生成された Message のペイロードとしてそのまま送信されます。ZMsg を解析して変換するのはダウンストリームフロー次第です。それ以外の場合は、消費されたデータを Message に変換するために InboundMessageMapper が使用されます。受信した ZMsg がマルチフレームの場合、最初のフレームは、この ZeroMQ メッセージが公開された ZeroMqHeaders.TOPIC ヘッダーとして扱われます。

unwrapTopic オプションが false に設定されている場合、受信メッセージはトピックと ZeroMQ メッセージの 2 つのフレームで構成されていると見なされます。それ以外の場合、デフォルトでは、ZMsg は 3 つのフレームで構成されていると見なされます。最初のフレームにはトピックが含まれ、最後のフレームにはメッセージが含まれ、中央に空のフレームがあります。

SocketType.SUB を使用すると、ZeroMqMessageProducer はサブスクリプションに提供された topics オプションを使用します。デフォルトでは、すべてをサブスクライブします。サブスクリプションは、subscribeToTopics() および unsubscribeFromTopics()@ManagedOperation を使用して実行時に調整できます。

ZeroMqMessageProducer 構成のサンプルを次に示します。

@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
    ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
    messageProducer.setOutputChannel(outputChannel);
    messageProducer.setTopics("some");
    messageProducer.setReceiveRaw(true);
    messageProducer.setBindPort(7070);
    messageProducer.setConsumeDelay(Duration.ofMillis(100));
    return messageProducer;
}

ZeroMQ 送信チャネルアダプター

ZeroMqMessageHandler は、ZeroMQ ソケットにパブリッシュメッセージを生成する ReactiveMessageHandler 実装です。サポートされているのは SocketType.PAIRSocketType.PUSHSocketType.PUB のみです。ZeroMqMessageHandler は ZeroMQ ソケットへの接続のみをサポートし、バインディングはサポートされていません。SocketType.PUB を使用すると、topicExpression がリクエストメッセージに対して評価され、トピックフレームが null でない場合は ZeroMQ メッセージに挿入されます。サブスクライバー側 (SocketType.SUB) は、実際のデータを解析する前に、まずトピックフレームを受信する必要があります。

wrapTopic オプションが false に設定されている場合、挿入されたトピックが存在する場合は、そのトピックの後に ZeroMQ メッセージフレームが送信されます。デフォルトでは、トピックとメッセージの間に追加の空のフレームが送信されます。

リクエストメッセージのペイロードが ZMsg の場合、変換やトピックの抽出は実行されません。ZMsg はそのままソケットに送信され、再利用の可能性を考慮して破棄されることはありません。それ以外の場合は、リクエストメッセージ (またはそのペイロードのみ) を ZeroMQ フレームに変換して公開するために OutboundMessageMapper<byte[]> が使用されます。デフォルトでは、ConfigurableCompositeMessageConverter とともに提供される ConvertingBytesMessageMapper が使用されます。ソケットオプション (セキュリティや書き込みタイムアウトなど) は、setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) コールバックを介して構成できます。

ZeroMqMessageHandler 構成のサンプルを次に示します。

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
    ZeroMqMessageHandler messageHandler =
                  new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
    messageHandler.setTopicExpression(
                  new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
    messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}

ZeroMQ JavaDSL サポート

spring-integration-zeromq は、ZeroMq ファクトリおよび上記のコンポーネントの IntegrationComponentSpec 実装を介して便利な Java DSL fluentAPI を提供します。

これは ZeroMqChannel 用の JavaDSL のサンプルです。

.channel(ZeroMq.zeroMqChannel(this.context)
            .connectUrl("tcp://localhost:6001:6002")
            .consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ JavaDSL 用の受信チャネルアダプターは次のとおりです。

IntegrationFlow.from(
            ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
                        .connectUrl("tcp://localhost:9000")
                        .topics("someTopic")
                        .receiveRaw(true)
                        .consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ JavaDSL 用の送信チャネルアダプターは次のとおりです。

.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
                  .topicFunction(message -> message.getHeaders().get("myTopic")))
}