クラス ZeroMqChannel

実装されたすべてのインターフェース:
AwareBeanFactoryAwareBeanNameAwareDisposableBeanInitializingBeanApplicationContextAwareExpressionCapableIntegrationPatternNamedComponentIntegrationManagementTrackableComponentMessageChannelSubscribableChannelInterceptableChannel

public class ZeroMqChannel extends AbstractMessageChannel implements SubscribableChannel
ZeroMQ ソケットを介した SubscribableChannel の実装。これは、次の 2 つのメッセージングモデルで機能します。- push-pull。送信されたメッセージは、それぞれの ZeroMQ SocketType.PUSH および SocketType.PULL ソケット型ロジックに従ってラウンドロビン方式でサブスクライバーに配信されます。- pub-sub。送信されたメッセージはすべてのサブスクライバーに配信されます。

このメッセージチャネルは、SocketType.PAIR 型の ZeroMQ ソケットのペアがスレッド間トランスポートバインディングを使用してパブリッシャー(送信操作)とサブスクライバーの間に接続されている場合、ローカルモードで機能します。

分散モードでは、このチャネルを外部で管理されている ZeroMQ プロキシに接続する必要があります。setConnectUrl(String) は、標準の ZeroMQ 接続文字列である必要がありますが、コロンを超える追加のポートがあり、ZeroMQ プロキシのフロントエンドとバックエンドのソケットペアを表します。例: tcp://localhost:6001:6002. 別のオプションは、同じアプリケーションで管理される ZeroMqProxy インスタンスへの参照を提供することです。フロントエンドポートとバックエンドポートはこのプロキシから評価され、それぞれの接続文字列がそれらから構築されます。

このように、このチャネルでの送受信操作は、メッセージングブローカーを介したやり取りに似ています。

このメッセージチャネル実装の内部ロジックは、MonoFluxScheduler API を使用するプロジェクト Reactor に基づいており、スレッドモデルとフロー制御を改善して、同じアプリケーション内でのマルチパブリッシャー(サブスクライバー)通信の同時実行プリミティブを回避します。

導入:
5.4
作成者:
Artem Bilan
  • フィールドの詳細

    • DEFAULT_CONSUME_DELAY

      public static final DurationSE DEFAULT_CONSUME_DELAY
  • コンストラクターの詳細

    • ZeroMqChannel

      public ZeroMqChannel(org.zeromq.ZContext context)
      提供されているプッシュ / プル通信モデルを備えた ZContext に基づいてチャネルインスタンスを作成します。
      パラメーター:
      context - 使用する ZContext
    • ZeroMqChannel

      public ZeroMqChannel(org.zeromq.ZContext context, boolean pubSub)
      提供された ZContext と提供された通信モデルに基づいてチャネルインスタンスを作成します。
      パラメーター:
      context - 使用する ZContext
      pubSub - コミュニケーションモデル: プッシュ / プルまたは pub/ サブ。
  • メソッドの詳細

    • setConnectUrl

      public void setConnectUrl(@Nullable StringSE connectUrl)
      プロキシのフロントエンドおよびバックエンドソケットのコロンを介してポートのペアで ZeroMQ プロキシへの接続を構成します。setZeroMqProxy(ZeroMqProxy) と相互に排他的です。
      パラメーター:
      connectUrl - PROTOCOL://HOST:FRONTEND_PORT:BACKEND_PORT 形式の接続文字列。tcp://localhost:6001:6002
    • setZeroMqProxy

      public void setZeroMqProxy(@Nullable ZeroMqProxy zeroMqProxy)
      同じアプリケーションで ZeroMqProxy インスタンスへの参照を指定して、そのポート構成に依存し、プロキシがいつ起動されるかを推測することなく、自然なライフサイクル依存関係を作成します。setConnectUrl(String) と相互に排他的です。
      パラメーター:
      zeroMqProxy - 使用する ZeroMqProxy インスタンス
    • setConsumeDelay

      public void setConsumeDelay(DurationSE consumeDelay)
      データが受信されないときに消費を遅らせるには、DurationSE を指定します。
      パラメーター:
      consumeDelay - 空のときに消費を遅らせる DurationSE。デフォルトは DEFAULT_CONSUME_DELAY です。
    • setMessageMapper

      public void setMessageMapper(BytesMessageMapper messageMapper)
      ソケットで送信または受信が発生したときにメッセージとの間で変換する BytesMessageMapper を提供します。
      パラメーター:
      messageMapper - 使用する BytesMessageMapper。デフォルトは EmbeddedJsonHeadersMessageMapper です。
    • setSendSocketConfigurer

      public void setSendSocketConfigurer(ConsumerSE<org.zeromq.ZMQ.Socket> sendSocketConfigurer)
      パブリッシングソケットを構成するための ConsumerSE コールバック。送信ソケットは、ZeroMQ プロキシのフロントエンドソケット(存在する場合)に接続されます。
      パラメーター:
      sendSocketConfigurer - 使用する ConsumerSE
    • setSubscribeSocketConfigurer

      public void setSubscribeSocketConfigurer(ConsumerSE<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)
      消費ソケットを構成するための ConsumerSE コールバック。サブスクライブソケットは、ZeroMQ プロキシのバックエンドソケット(存在する場合)に接続されています。
      パラメーター:
      subscribeSocketConfigurer - 使用する ConsumerSE
    • onInit

      protected void onInit()
      クラスからコピーされた説明: IntegrationObjectSupport
      サブクラスは、初期化ロジック用にこれを実装できます。
      オーバーライド:
      クラス AbstractMessageChannelonInit 
    • doSend

      protected boolean doSend(Message<?> message, long timeout)
      クラスからコピーされた説明: AbstractMessageChannel
      サブクラスはこのメソッドを実装する必要があります。負でないタイムアウトは、チャネルが容量に達した場合に待機する時間を示します(値が 0 の場合、成功の有無にかかわらずすぐに戻る必要があります)。負のタイムアウト値は、メッセージが受け入れられるか、ブロッキングスレッドが中断されるまでメソッドがブロックされることを示します。
      次で指定:
      クラス AbstractMessageChanneldoSend 
      パラメーター:
      message - メッセージ。
      timeout - タイムアウト。
      戻り値:
      send が成功した場合は true。
    • subscribe

      public boolean subscribe(MessageHandler handler)
      次で指定:
      インターフェース SubscribableChannelsubscribe 
    • unsubscribe

      public boolean unsubscribe(MessageHandler handler)
      次で指定:
      インターフェース SubscribableChannelunsubscribe 
    • destroy

      public void destroy()
      次で指定:
      インターフェース DisposableBeandestroy 
      次で指定:
      インターフェース IntegrationManagementdestroy 
      オーバーライド:
      クラス AbstractMessageChanneldestroy