public class ZeroMqChannel extends AbstractMessageChannel implements SubscribableChannel
SubscribableChannel
実装。2 つのメッセージングモデルで機能します。- push-pull
。送信されたメッセージは、それぞれの ZeroMQ SocketType.PUSH
および SocketType.PULL
ソケット型ロジックに従ってラウンドロビン方式でサブスクライバーに配信されます。- pub-sub
、送信されたメッセージはすべてのサブスクライバーに配信されます。 このメッセージチャネルは、SocketType.PAIR
型の ZeroMQ ソケットのペアがスレッド間トランスポートバインディングを使用してパブリッシャー(送信操作)とサブスクライバーの間に接続されている場合、ローカルモードで機能します。
分散モードでは、このチャネルを外部で管理されている ZeroMQ プロキシに接続する必要があります。setConnectUrl(String)
は、標準の ZeroMQ 接続文字列である必要がありますが、コロンを超える追加のポートがあり、ZeroMQ プロキシのフロントエンドとバックエンドのソケットペアを表します。例: tcp://localhost:6001:6002
. 別のオプションは、同じアプリケーションで管理される ZeroMqProxy
インスタンスへの参照を提供することです。フロントエンドポートとバックエンドポートはこのプロキシから評価され、それぞれの接続文字列がそれらから構築されます。
このように、このチャネルでの送受信操作は、メッセージングブローカーを介したやり取りに似ています。
このメッセージチャネル実装の内部ロジックは、Mono
、Flux
、Scheduler
API を使用するプロジェクト Reactor に基づいており、同じアプリケーション内でのマルチパブリッシャー(サブスクライバー)通信の同時実行プリミティブを回避するために、より優れた thead モデルとフロー制御を実現します。
AbstractMessageChannel.ChannelInterceptorList
IntegrationManagement.ManagementOverrides
修飾子と型 | フィールドと説明 |
---|---|
static java.time.Duration | DEFAULT_CONSUME_DELAY |
interceptors, meters
EXPRESSION_PARSER, logger
INDEFINITE_TIMEOUT
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAME
コンストラクターと説明 |
---|
ZeroMqChannel(org.zeromq.ZContext context) 提供されているプッシュ / プル通信モデルを備えた ZContext に基づいてチャネルインスタンスを作成します。 |
ZeroMqChannel(org.zeromq.ZContext context, boolean pubSub) 提供された ZContext と提供された通信モデルに基づいてチャネルインスタンスを作成します。 |
修飾子と型 | メソッドと説明 |
---|---|
void | destroy() |
protected boolean | doSend(Message<?> message, long timeout) サブクラスはこのメソッドを実装する必要があります。 |
protected void | onInit() サブクラスは、初期化ロジック用にこれを実装できます。 |
void | setConnectUrl(StringSE connectUrl) プロキシのフロントエンドおよびバックエンドソケットのコロンを介してポートのペアで ZeroMQ プロキシへの接続を構成します。 |
void | setConsumeDelay(java.time.Duration consumeDelay) データが受信されないときに消費を遅らせるには、 Duration を指定します。 |
void | setMessageMapper(BytesMessageMapper messageMapper) ソケットで送信または受信が発生したときにメッセージとの間で変換する BytesMessageMapper を提供します。 |
void | setSendSocketConfigurer(java.util.function.ConsumerSE<org.zeromq.ZMQ.Socket> sendSocketConfigurer) パブリッシングソケットを構成するための Consumer SE コールバック。 |
void | setSubscribeSocketConfigurer(java.util.function.ConsumerSE<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer) 消費ソケットを構成するための Consumer SE コールバック。 |
void | setZeroMqProxy(ZeroMqProxy zeroMqProxy) 同じアプリケーションで ZeroMqProxy インスタンスへの参照を指定して、そのポート構成に依存し、プロキシがいつ起動されるかを推測することなく、自然なライフサイクル依存関係を作成します。 |
boolean | subscribe(MessageHandler handler) |
boolean | unsubscribe(MessageHandler handler) |
addInterceptor, addInterceptor, getComponentType, getFullChannelName, getIChannelInterceptorList, getIntegrationPatternType, getInterceptors, getMetricsCaptor, getOverrides, isLoggingEnabled, registerMetricsCaptor, removeInterceptor, removeInterceptor, send, send, setDatatypes, setInterceptors, setLoggingEnabled, setMessageConverter, setShouldTrack
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toString
cloneSE, equalsSE, finalizeSE, getClassSE, hashCodeSE, notifySE, notifyAllSE, waitSE, waitSE, waitSE
send, send
getManagedName, getManagedType, getThisAs, setManagedName, setManagedType
getBeanName, getComponentName
public ZeroMqChannel(org.zeromq.ZContext context)
ZContext
に基づいてチャネルインスタンスを作成します。context
- 使用する ZContext
。public ZeroMqChannel(org.zeromq.ZContext context, boolean pubSub)
ZContext
と提供された通信モデルに基づいてチャネルインスタンスを作成します。context
- 使用する ZContext
。pubSub
- コミュニケーションモデル: プッシュ / プルまたは pub/ サブ。public void setConnectUrl(@Nullable StringSE connectUrl)
setZeroMqProxy(ZeroMqProxy)
と相互に排他的です。connectUrl
- PROTOCOL://HOST:FRONTEND_PORT:BACKEND_PORT
形式の接続文字列。tcp://localhost:6001:6002
public void setZeroMqProxy(@Nullable ZeroMqProxy zeroMqProxy)
ZeroMqProxy
インスタンスへの参照を指定して、そのポート構成に依存し、プロキシがいつ起動されるかを推測することなく、自然なライフサイクル依存関係を作成します。setConnectUrl(String)
と相互に排他的です。zeroMqProxy
- 使用する ZeroMqProxy
インスタンス public void setConsumeDelay(java.time.Duration consumeDelay)
Duration
を指定します。consumeDelay
- 空のときに消費を遅らせる Duration
。デフォルトは DEFAULT_CONSUME_DELAY
です。public void setMessageMapper(BytesMessageMapper messageMapper)
BytesMessageMapper
を提供します。messageMapper
- 使用する BytesMessageMapper
。デフォルトは EmbeddedJsonHeadersMessageMapper
です。public void setSendSocketConfigurer(java.util.function.ConsumerSE<org.zeromq.ZMQ.Socket> sendSocketConfigurer)
Consumer
SE コールバック。sendSocketConfigurer
- 使用する Consumer
SE。public void setSubscribeSocketConfigurer(java.util.function.ConsumerSE<org.zeromq.ZMQ.Socket> subscribeSocketConfigurer)
Consumer
SE コールバック。subscribeSocketConfigurer
- 使用する Consumer
SE。protected void onInit()
IntegrationObjectSupport
AbstractMessageChannel
の onInit
protected boolean doSend(Message<?> message, long timeout)
AbstractMessageChannel
AbstractMessageChannel
の doSend
message
- メッセージ。timeout
- タイムアウト。public boolean subscribe(MessageHandler handler)
SubscribableChannel
の subscribe
public boolean unsubscribe(MessageHandler handler)
SubscribableChannel
の unsubscribe
public void destroy()
DisposableBean
の destroy
IntegrationManagement
の destroy
AbstractMessageChannel
の destroy