メッセージチャンネル
Message
はデータをカプセル化するという重要なロールを果たしますが、メッセージプロデューサーをメッセージコンシューマーから切り離すのは MessageChannel
です。
MessageChannel インターフェース
Spring Integration のトップレベル MessageChannel
インターフェースは、次のように定義されています。
public interface MessageChannel {
boolean send(Message message);
boolean send(Message message, long timeout);
}
メッセージを送信する場合、メッセージが正常に送信されると、戻り値は true
になります。送信呼び出しがタイムアウトまたは中断された場合、false
を返します。
PollableChannel
メッセージチャネルはメッセージをバッファリングする場合としない場合があるため(Spring Integration の概要で説明)、2 つのサブインターフェースがバッファリング(ポーリング可能)および非バッファリング(サブスクライブ可能)チャネルの動作を定義します。以下のリストは、PollableChannel
インターフェースの定義を示しています。
public interface PollableChannel extends MessageChannel {
Message<?> receive();
Message<?> receive(long timeout);
}
送信メソッドと同様に、メッセージを受信すると、タイムアウトまたは割り込みの場合、戻り値は null になります。
SubscribableChannel
SubscribableChannel
ベースインターフェースは、サブスクライブされた MessageHandler
インスタンスにメッセージを直接送信するチャネルによって実装されます。ポーリング用の受信メソッドは提供されません。代わりに、これらのサブスクライバを管理するためのメソッドを定義します。次のリストは、SubscribableChannel
インターフェースの定義を示しています。
public interface SubscribableChannel extends MessageChannel {
boolean subscribe(MessageHandler handler);
boolean unsubscribe(MessageHandler handler);
}
メッセージチャネルの実装
Spring Integration は、いくつかの異なるメッセージチャネルの実装を提供します。次のセクションでは、それぞれについて簡単に説明します。
PublishSubscribeChannel
PublishSubscribeChannel
実装は、送信された Message
をそのサブスクライブされたすべてのハンドラーにブロードキャストします。これは、主に通知がイベントメッセージの送信に最もよく使用されます(通常、単一のハンドラーによって処理されることを意図したドキュメントメッセージとは対照的です)。PublishSubscribeChannel
は送信専用であることに注意してください。send(Message)
メソッドが呼び出されると、サブスクライバーに直接ブロードキャストするため、コンシューマーはメッセージをポーリングできません(PollableChannel
を実装していないため、receive()
メソッドはありません)。代わりに、サブスクライバー自体が MessageHandler
でなければならず、サブスクライバーの handleMessage(Message)
メソッドが順番に呼び出されます。
バージョン 3.0 より前は、サブスクライバーがいない PublishSubscribeChannel
で send
メソッドを呼び出すと false
が返されました。MessagingTemplate
と組み合わせて使用すると、MessageDeliveryException
がスローされました。バージョン 3.0 から、少なくとも最小サブスクライバーが存在する(およびメッセージを正常に処理する)場合に send
が常に成功したと見なされるように、動作が変更されました。この動作を変更するには、minSubscribers
プロパティを設定します。デフォルトは 0
です。
TaskExecutor を使用する場合、メッセージの実際の処理は非同期に実行されるため、この決定には正しい数のサブスクライバーの存在のみが使用されます。 |
QueueChannel
QueueChannel
実装はキューをラップします。PublishSubscribeChannel
とは異なり、QueueChannel
にはポイントツーポイントのセマンティクスがあります。つまり、チャンネルに複数のコンシューマーがある場合でも、そのチャンネルに送信された Message
を受信できるのはそのうちの 1 つだけです。次のように、デフォルトの引数なしのコンストラクター(本質的に Integer.MAX_VALUE
の無制限の容量を提供)と、キュー容量を受け入れるコンストラクターを提供します。
public QueueChannel(int capacity)
容量制限に達していないチャネルは、内部キューにメッセージを格納し、メッセージを処理する準備ができている受信者がいない場合でも、send(Message<?>)
メソッドはすぐに戻ります。キューが容量に達すると、送信者はキューに空きができるまでブロックします。または、追加のタイムアウトパラメーターを持つ send メソッドを使用する場合、キューは、空きができるかタイムアウト期間が経過するかのどちらか早い方までブロックされます。同様に、receive()
呼び出しは、メッセージがキューで利用可能な場合はすぐに戻りますが、キューが空の場合、メッセージが利用可能になるか、タイムアウトが提供される場合はタイムアウトになるまで受信呼び出しはブロックされます。どちらの場合でも、タイムアウト値 0 を渡すことで、キューの状態に関係なくすぐに強制的に戻ることができます。ただし、timeout
パラメーターブロックを使用せずに send()
および receive()
のバージョンを呼び出すことに注意してください。
PriorityChannel
QueueChannel
は先入れ先出し(FIFO)の順序付けを実行しますが、PriorityChannel
は優先度に基づいてチャネル内でメッセージを順序付けできる代替実装です。デフォルトでは、優先度は各メッセージ内の priority
ヘッダーによって決定されます。ただし、カスタム優先順位決定ロジックの場合、Comparator<Message<?>>
型のコンパレーターを PriorityChannel
コンストラクターに提供できます。
RendezvousChannel
RendezvousChannel
は、「直接ハンドオフ」シナリオを可能にします。このシナリオでは、送信者は、別のパーティがチャネルの receive()
メソッドを呼び出すまでブロックします。相手は、送信者がメッセージを送信するまでブロックします。内部的には、この実装は SynchronousQueue
(BlockingQueue
のゼロ容量実装)を使用することを除いて、QueueChannel
と非常に似ています。これは、送信者と受信者が異なるスレッドで動作する状況ではうまく機能しますが、メッセージを非同期的にキューにドロップすることは適切ではありません。つまり、RendezvousChannel
では、送信者は一部の受信者がメッセージを受け入れたことを認識しますが、QueueChannel
では、メッセージは内部キューに格納され、受信されない可能性があります。
これらのキューベースのチャネルはすべて、デフォルトでのみメッセージをメモリに保存していることに注意してください。永続性が必要な場合は、'queue' 要素内で 'message-store' 属性を提供して永続的な MessageStore 実装を参照するか、ローカルチャネルを JMS で裏付けられたチャネルやチャネルアダプターなどの永続的なブローカによってバッキングされたものに置き換えることができます。後者のオプションを使用すると、JMS サポートで説明したように、JMS プロバイダーのメッセージ永続性の実装を利用できます。ただし、キューにバッファリングする必要がない場合、最も簡単なアプローチは、次のセクションで説明する DirectChannel に依存することです。 |
RendezvousChannel
は、リクエスト / 応答操作の実装にも役立ちます。送信者は、RendezvousChannel
の一時的な匿名インスタンスを作成し、Message
を構築するときに "replyChannel" ヘッダーとして設定します。その Message
を送信した後、送信者はすぐに receive
(オプションでタイムアウト値を提供)を呼び出して、応答 Message
を待っている間にブロックできます。これは、Spring Integration のリクエスト / 応答コンポーネントの多くで内部的に使用される実装に非常に似ています。
DirectChannel
DirectChannel
にはポイントツーポイントセマンティクスがありますが、それ以外の点では、前述のキューベースのチャネル実装のいずれよりも PublishSubscribeChannel
に類似しています。PollableChannel
インターフェースの代わりに SubscribableChannel
インターフェースを実装するため、サブスクライバーに直接メッセージをディスパッチします。ただし、ポイントツーポイントチャネルとして、各 Message
を単一のサブスクライブされた MessageHandler
に送信するという点で、PublishSubscribeChannel
とは異なります。
最もシンプルなポイントツーポイントチャネルオプションであることに加えて、最も重要な機能の 1 つは、単一スレッドがチャネルの「両側」で操作を実行できるようにすることです。例: ハンドラーが DirectChannel
をサブスクライブする場合、Message
をそのチャネルに送信すると、send()
メソッドの呼び出しが戻る前に、そのハンドラーの handleMessage(Message)
メソッドの呼び出しが送信者のスレッドで直接トリガーされます。
この動作をチャネル実装に提供する主な動機は、チャネルが提供する抽象化と疎結合の恩恵を受けながら、チャネル全体に広がるトランザクションをサポートすることです。送信呼び出しがトランザクションのスコープ内で呼び出される場合、ハンドラーの呼び出しの結果(データベースレコードの更新など)は、そのトランザクションの最終結果(コミットまたはロールバック)を決定するロールを果たします。
DirectChannel は最も単純なオプションであり、ポーラーのスレッドのスケジューリングと管理に必要な追加のオーバーヘッドを追加しないため、Spring Integration 内のデフォルトのチャネル型です。一般的な考え方は、アプリケーションのチャネルを定義し、バッファリングを提供する必要があるか、入力を調整する必要があるかを検討し、キューベースの PollableChannels に変更することです。同様に、チャネルがメッセージをブロードキャストする必要がある場合、DirectChannel ではなく PublishSubscribeChannel である必要があります。後で、これらの各チャネルの構成方法を示します。 |
DirectChannel
は、サブスクライブされたメッセージハンドラーを呼び出すために、メッセージディスパッチャーに内部的に委譲します。そのディスパッチャーは、load-balancer
または load-balancer-ref
属性(相互に排他的)によって公開される負荷分散戦略を持つことができます。負荷分散戦略は、複数のメッセージハンドラーが同じチャネルにサブスクライブするときに、メッセージがメッセージハンドラー間でどのように分散されるかを決定するためにメッセージディスパッチャーによって使用されます。便宜上、load-balancer
属性は、LoadBalancingStrategy
の既存の実装を指す値の列挙を公開します。使用可能な値は、round-robin
(ローテーション中のハンドラー間の負荷分散)と none
(負荷分散を明示的に無効にする場合)のみです。他の戦略の実装は、将来のバージョンで追加される可能性があります。ただし、バージョン 3.0 以降、LoadBalancingStrategy
の独自の実装を提供し、次の例に示すように、LoadBalancingStrategy
を実装する Bean を指す load-balancer-ref
属性を使用してそれを挿入できます。
FixedSubscriberChannel
は、サブスクライブ解除できない単一の MessageHandler
サブスクライバーのみをサポートする SubscribableChannel
です。これは、他のサブスクライバーが関与せず、チャネルインターセプターが必要ない、高スループットパフォーマンスのユースケースに役立ちます。
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
load-balancer
属性と load-balancer-ref
属性は相互に排他的であることに注意してください。
ロードバランシングは、ブール failover
プロパティと連携して機能します。failover
値が true(デフォルト)の場合、前のハンドラーが例外をスローすると、ディスパッチャーは(必要に応じて)後続のハンドラーにフォールバックします。順序は、ハンドラー自体で定義されたオプションの順序値、またはそのような値が存在しない場合、ハンドラーがサブスクライブする順序によって決定されます。
特定の状況で、ディスパッチャーが常に最初のハンドラーを呼び出して、エラーが発生するたびに同じ固定順序でフォールバックすることを必要とする場合、負荷分散戦略は提供されません。つまり、ロードバランシングが有効になっていない場合でも、ディスパッチャーは failover
ブールプロパティを引き続きサポートします。ただし、負荷分散を行わない場合、ハンドラーの呼び出しは、順序に従って常に最初から開始されます。例: プライマリ、セカンダリ、ターシャリなどの明確な定義がある場合、このアプローチはうまく機能します。名前空間のサポートを使用する場合、エンドポイントの order
属性が順序を決定します。
負荷分散と failover は、チャネルに複数のサブスクライブされたメッセージハンドラーがある場合にのみ適用されることに注意してください。ネームスペースサポートを使用する場合、これは、複数のエンドポイントが input-channel 属性で定義された同じチャネル参照を共有することを意味します。 |
バージョン 5.2 以降、failover
が true の場合、現在のハンドラーの失敗と失敗したメッセージは、それぞれ構成されている場合、debug
または info
に記録されます。
ExecutorChannel
ExecutorChannel
は、DirectChannel
(負荷分散戦略および failover
ブールプロパティ)と同じディスパッチャー構成をサポートするポイントツーポイントチャネルです。これら 2 つのディスパッチチャネル型の主な違いは、ExecutorChannel
が TaskExecutor
のインスタンスに委譲してディスパッチを実行することです。これは、通常、send メソッドがブロックしないことを意味しますが、ハンドラーの呼び出しが送信者のスレッドで発生しない可能性があることも意味します。送信側と受信側のハンドラーにまたがるトランザクションはサポートしていません。
送信者がブロックすることがあります。例: TaskExecutor を使用して、クライアントを調整する拒否ポリシー(ThreadPoolExecutor.CallerRunsPolicy など)を使用すると、送信者のスレッドは、スレッドプールが最大容量になり、エグゼキューターの作業キューがいっぱいになるといつでもメソッドを実行できます。そのような状況は予測不可能な方法でのみ発生するため、トランザクションに依存するべきではありません。 |
FluxMessageChannel
FluxMessageChannel
は、ダウンストリームのリアクティブサブスクライバーによるオンデマンド消費のために、"sinking"
が内部 reactor.core.publisher.Flux
にメッセージを送信するための org.reactivestreams.Publisher
実装です。このチャネル実装は SubscribableChannel
でも PollableChannel
でもないため、org.reactivestreams.Subscriber
インスタンスのみを使用して、このチャネルからリアクティブストリームのバックプレッシャー特性を尊重することができます。一方、FluxMessageChannel
は subscribeTo(Publisher<Message<?>>)
契約で ReactiveStreamsSubscribableChannel
を実装し、リアクティブソースパブリッシャーからのイベントの受信を可能にし、リアクティブストリームを統合フローにブリッジします。統合フロー全体に対して完全にリアクティブな動作を実現するには、フロー内のすべてのエンドポイント間にそのようなチャネルを配置する必要があります。
Reactive Streams との相互作用の詳細については、Reactive Streams サポートを参照してください。
スコープチャネル
Spring Integration 1.0 は ThreadLocalChannel
実装を提供していましたが、2.0 から削除されました。同じ要件を処理するより一般的な方法は、チャネルに scope
属性を追加することです。属性の値には、コンテキスト内で使用可能なスコープの名前を指定できます。例: Web 環境では、特定のスコープを使用でき、カスタムスコープの実装をコンテキストに登録できます。次の例は、スコープ自体の登録を含め、スレッドローカルスコープがチャネルに適用されていることを示しています。
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
前の例で定義したチャネルも内部的にキューに委譲しますが、チャネルは現在のスレッドにバインドされているため、キューの内容も同様にバインドされます。そうすれば、チャネルに送信するスレッドは後で同じメッセージを受信できますが、他のスレッドはそれらにアクセスできません。スレッドスコープのチャネルが必要になることはめったにありませんが、DirectChannel
インスタンスを使用して単一のスレッドオペレーションを実施しているが、応答メッセージは「ターミナル」チャネルに送信する必要がある状況で役立ちます。そのターミナルチャネルがスレッドスコープの場合、元の送信スレッドはターミナルチャネルから応答を収集できます。
これで、任意のチャネルのスコープを設定できるため、スレッドローカルに加えて独自のスコープを定義できます。
チャネルインターセプター
メッセージングアーキテクチャの利点の 1 つは、一般的な動作を提供し、非侵襲的な方法でシステムを通過するメッセージに関する意味のある情報を取得できることです。Message
インスタンスは MessageChannel
インスタンスとの間で送受信されるため、これらのチャネルは送受信操作をインターセプトする機会を提供します。次のリストに示す ChannelInterceptor
戦略インターフェースは、これらの各操作のメソッドを提供します。
public interface ChannelInterceptor {
Message<?> preSend(Message<?> message, MessageChannel channel);
void postSend(Message<?> message, MessageChannel channel, boolean sent);
void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);
boolean preReceive(MessageChannel channel);
Message<?> postReceive(Message<?> message, MessageChannel channel);
void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}
インターフェースを実装した後、インターセプターをチャネルに登録するには、次の呼び出しを行うだけです。
channel.addInterceptor(someChannelInterceptor);
Message
インスタンスを返すメソッドは、Message
の変換に使用するか、"null" を返してさらなる処理を防ぐことができます(もちろん、どのメソッドも RuntimeException
をスローできます)。また、preReceive
メソッドは false
を返して、受信操作が進行しないようにすることができます。
receive() 呼び出しは PollableChannels にのみ関連することに注意してください。実際、SubscribableChannel インターフェースは receive() メソッドを定義していません。これは、Message が SubscribableChannel に送信されると、チャネルの型に応じて、0 個以上のサブスクライバーに直接送信されるためです(たとえば、PublishSubscribeChannel はすべてのサブスクライバーに送信されます)。preReceive(…) 、postReceive(…) 、afterReceiveCompletion(…) インターセプターメソッドは、インターセプターが PollableChannel に適用された場合にのみ呼び出されます。 |
Spring Integration は、ワイヤータップ (英語) パターンの実装も提供します。これは、既存のフローを変更せずに Message
を別のチャネルに送信する単純なインターセプターです。デバッグと監視に非常に役立ちます。ワイヤータップに例を示します。
すべてのインターセプターメソッドを実装する必要があることはめったにないため、インターフェースは no-op メソッドを提供します(void
メソッドを返すメソッドにはコードがなく、Message
を返すメソッドは Message
をそのまま返し、boolean
メソッドは true
を返します)。
インターセプターメソッドの呼び出し順序は、チャネルの型によって異なります。前に説明したように、キューベースのチャネルは、最初に受信メソッドがインターセプトされる唯一のチャネルです。さらに、送信インターセプトと受信インターセプトの関連は、個別の送信者スレッドと受信者スレッドのタイミングによって異なります。例: メッセージの待機中に受信者がすでにブロックされている場合、順序は次のようになります: preSend 、preReceive 、postReceive 、postSend 。ただし、送信者がチャネルにメッセージを送信してすでに戻ってきた後に受信者がポーリングする場合、順序は次のようになります: preSend 、postSend (しばらく経過)、preReceive 、postReceive 。このような場合に経過する時間は、いくつかの要因に依存するため、一般的に予測できません(実際、受信が発生することはありません)。キューの型もロールを果たします(たとえば、ランデブーと優先度)。つまり、preSend が postSend に先行し、preReceive が postReceive に先行するという事実を超えて、順序を信頼することはできません。 |
Spring Framework 4.1 および Spring Integration 4.1 以降、ChannelInterceptor
は新しいメソッド afterSendCompletion()
および afterReceiveCompletion()
を提供します。発生した例外に関係なく、send()' and 'receive()
呼び出しの後に呼び出され、リソースのクリーンアップが可能になります。チャネルは、最初の preSend()
および preReceive()
呼び出しの逆の順序で ChannelInterceptor
リストのこれらのメソッドを呼び出すことに注意してください。
バージョン 5.1 以降、グローバルチャネルインターセプターは、Java DSL を使用するときに beanFactory.initializeBean()
または IntegrationFlowContext
を使用して初期化される Bean を介するなど、動的に登録されたチャネルに適用されるようになりました。以前は、アプリケーションコンテキストのリフレッシュ後に Bean が作成されたときにインターセプターは適用されませんでした。
また、バージョン 5.1 以降、メッセージが受信されないときに ChannelInterceptor.postReceive()
が呼び出されなくなりました。null
Message<?>
をチェックする必要はなくなりました。以前は、メソッドが呼び出されていました。以前の動作に依存するインターセプターがある場合は、代わりに afterReceiveCompleted()
を実装してください。これは、メッセージが受信されたかどうかに関係なく、そのメソッドが呼び出されるためです。
バージョン 5.2 から、ChannelInterceptorAware は Spring メッセージングモジュールの InterceptableChannel に代わって非推奨になりました。これは下位互換性のために拡張されています。 |
MessagingTemplate
エンドポイントとそのさまざまな構成オプションが導入されると、Spring Integration はメッセージングコンポーネントからの基盤を提供し、メッセージングシステムからアプリケーションコードを非侵襲的に呼び出すことができます。ただし、アプリケーションコードからメッセージングシステムを呼び出すことが必要な場合があります。このようなユースケースを実装する際の便宜上、Spring Integration は、リクエストおよび応答シナリオを含む、メッセージチャネル全体のさまざまな操作をサポートする MessagingTemplate
を提供します。例: 次のように、リクエストを送信して応答を待つことができます。
MessagingTemplate template = new MessagingTemplate();
Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));
上記の例では、一時的な匿名チャンネルがテンプレートによって内部的に作成されます。"sendTimeout" および "receiveTimeout" プロパティもテンプレートで設定でき、他の交換型もサポートされます。次のリストは、そのようなメソッドのシグネチャーを示しています。
public boolean send(final MessageChannel channel, final Message<?> message) { ...
}
public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}
public Message<?> receive(final PollableChannel<?> channel) { ...
}
Message インスタンスの代わりにペイロードまたはヘッダー値を使用して単純なインターフェースを呼び出すことができる、より侵襲性の低いアプローチについては、GatewayProxyFactoryBean を入力してくださいで説明しています。 |
メッセージチャネルの構成
メッセージチャネルインスタンスを作成するには、次のように、xml の場合は <channel/>
要素を、Java 構成の場合は DirectChannel
インスタンスを使用できます。
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
サブエレメントなしで <channel/>
エレメントを使用すると、DirectChannel
インスタンス(SubscribableChannel
)が作成されます。
パブリッシュ / サブスクライブチャネルを作成するには、次のように <publish-subscribe-channel/>
要素(Java では PublishSubscribeChannel
)を使用します。
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
あるいは、さまざまな <queue/>
サブ要素を提供して、ポーリング可能なチャネル型のいずれかを作成することもできます(メッセージチャネルの実装で説明されています)。次のセクションでは、各チャネル型の例を示します。
DirectChannel
設定
前述のように、DirectChannel
はデフォルトの型です。次のリストは、誰を定義するかを示しています。
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
デフォルトのチャネルにはラウンドロビンロードバランサーがあり、フェイルオーバーも有効になっています(詳細については DirectChannel
を参照してください)。これらの一方または両方を無効にするには、<dispatcher/>
サブ要素(DirectChannel
の LoadBalancingStrategy
コンストラクター)を追加し、次のように属性を構成します。
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
データ型チャネルの構成
コンシューマーは特定の型のペイロードのみを処理できる場合があり、入力メッセージのペイロード型を確認する必要があります。最初に思い浮かぶのは、メッセージフィルターを使用することです。ただし、メッセージフィルターでできることは、コンシューマーの要件に準拠していないメッセージを除外することだけです。別の方法は、コンテンツベースのルーターを使用し、非準拠のデータ型のメッセージを特定のトランスフォーマーにルーティングして、必要なデータ型への変換と変換を実施することです。これは機能しますが、同じことを達成するためのより簡単な方法は、データ型チャネル (英語) パターンを適用することです。特定のペイロードデータ型ごとに個別のデータ型チャネルを使用できます。
特定のペイロード型を含むメッセージのみを受け入れるデータ型チャネルを作成するには、次の例に示すように、チャネル要素の datatype
属性にデータ型の完全修飾クラス名を指定します。
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
型チェックは、チャネルのデータ型に割り当て可能なすべての型に合格することに注意してください。つまり、前述の例の numberChannel
は、ペイロードが java.lang.Integer
または java.lang.Double
であるメッセージを受け入れます。次の例に示すように、複数の型をコンマ区切りリストとして提供できます。
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
前述の例の "numberChannel" は、java.lang.Number
のデータ型のメッセージのみを受け入れます。しかし、メッセージのペイロードが必要な型ではない場合はどうなるでしょうか? Spring の変換サービスのインスタンスである integrationConversionService
という名前の Bean を定義したかどうかによります。そうでない場合は、Exception
がすぐにスローされます。ただし、integrationConversionService
Bean を定義している場合は、メッセージのペイロードを許容可能な型に変換する試みで使用されます。
カスタムコンバーターを登録することもできます。例: String
ペイロードを含むメッセージを、上記で設定した "numberChannel" に送信するとします。次のようにメッセージを処理できます。
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常、これは完全に正当的な操作です。ただし、Datatype Channel を使用しているため、このような操作の結果は次のような例外を生成します。
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
ペイロード型を Number
にする必要があるため、例外が発生しますが、String
を送信しました。そのため、String
を Number
に変換するものが必要です。そのために、次の例のようなコンバーターを実装できます。
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
次に、次の例に示すように、Integration Conversion Service でコンバーターとして登録できます。
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
または、StringToIntegerConverter
クラスで、自動スキャン用に @Component
アノテーションが付けられている場合。
'converter' 要素が解析されると、integrationConversionService
Bean が作成されます(まだ定義されていない場合)。データ型チャネルはそのコンバーターを使用して String
ペイロードを Integer
に変換するため、そのコンバーターが適切に配置されていれば、send
操作は成功します。
ペイロード型の変換に関する詳細については、ペイロード型変換を参照してください。
バージョン 4.0 以降、integrationConversionService
は DefaultDatatypeChannelMessageConverter
によって呼び出され、DefaultDatatypeChannelMessageConverter
はアプリケーションコンテキストで変換サービスを検索します。別の変換手法を使用するには、チャネルで message-converter
属性を指定できます。これは、MessageConverter
実装への参照でなければなりません。fromMessage
メソッドのみが使用されます。コンバーターにメッセージヘッダーへのアクセスを提供します(content-type
などのヘッダーからの情報が必要な場合に備えて)。メソッドは、変換されたペイロードまたは完全な Message
オブジェクトのみを返すことができます。後者の場合、コンバーターは受信メッセージからすべてのヘッダーをコピーするように注意する必要があります。
あるいは、型 MessageConverter
の <bean/>
を ID datatypeChannelMessageConverter
で宣言でき、そのコンバーターは datatype
のすべてのチャネルで使用されます。
QueueChannel
設定
QueueChannel
を作成するには、<queue/>
サブ要素を使用します。次のようにチャンネルの容量を指定できます。
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
この <queue/> サブエレメントの 'capacity' 属性に値を指定しない場合、結果のキューは無制限になります。メモリ不足などの課題を回避するために、制限キューに明示的な値を設定することを強くお勧めします。 |
永続的な QueueChannel
設定
QueueChannel
はメッセージをバッファリングする機能を提供しますが、デフォルトではメモリ内でのみ行うため、システム障害が発生した場合にメッセージが失われる機能があります。このリスクを軽減するために、QueueChannel
は、MessageGroupStore
戦略インターフェースの永続的な実装によって支援される場合があります。MessageGroupStore
および MessageStore
の詳細については、メッセージストアを参照してください。
message-store 属性が使用されている場合、capacity 属性は許可されません。 |
QueueChannel
は Message
を受信すると、メッセージをメッセージストアに追加します。Message
が QueueChannel
からポーリングされると、メッセージストアから削除されます。
デフォルトでは、QueueChannel
はメッセージをインメモリキューに格納します。これにより、前述のメッセージの損失シナリオが発生する可能性があります。ただし、Spring Integration は JdbcChannelMessageStore
などの永続ストアを提供します。
次の例に示すように、message-store
属性を追加することにより、任意の QueueChannel
のメッセージストアを設定できます。
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(Java/Kotlin 構成オプションについては、以下のサンプルを参照してください)
Spring Integration JDBC モジュールは、多くの一般的なデータベース用のスキーマデータ定義言語 (DDL) も提供します。これらのスキーマは、そのモジュール (spring-integration-jdbc
) の org.springframework.integration.jdbc.store.channel パッケージにあります。
重要な機能の 1 つは、トランザクション永続ストア(JdbcChannelMessageStore など)で、ポーラーにトランザクションが構成されている限り、ストアから削除されたメッセージは、トランザクションが正常に完了した場合にのみ永久に削除できることです。それ以外の場合、トランザクションはロールバックされ、Message は失われません。 |
"NoSQL" データストアに関連する Spring プロジェクトの数が増え、これらのストアの基礎となるサポートが提供されるようになったため、メッセージストアの他の多くの実装が利用可能になりました。特定のニーズを満たすものが見つからない場合は、MessageGroupStore
インターフェースの独自の実装を提供することもできます。
バージョン 4.0 以降、可能であれば ChannelMessageStore
を使用するように QueueChannel
インスタンスを構成することをお勧めします。これらは、一般的なメッセージストアと比較して、一般的にこの用途に最適化されています。ChannelMessageStore
が ChannelPriorityMessageStore
の場合、メッセージは優先順位内で FIFO で受信されます。優先順位の概念は、メッセージストアの実装によって決まります。例: 次の例は、MongoDB チャネルメッセージストアの Java 構成を示しています。
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlows.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
MessageGroupQueue クラスに注意してください。これは、MessageGroupStore 操作を使用する BlockingQueue 実装です。 |
QueueChannel
環境をカスタマイズする別のオプションは、<int:queue>
サブエレメントまたはその特定のコンストラクターの ref
属性によって提供されます。この属性は、java.util.Queue
実装への参照を提供します。例: Hazelcast 分散 IQueue
(英語) は、次のように構成できます。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel
設定
PublishSubscribeChannel
を作成するには、<publish-subscribe-channel/> 要素を使用します。この要素を使用する場合、次のように、メッセージの公開に使用する task-executor
を指定することもできます(指定しない場合、送信者のスレッドで公開します)。
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
PublishSubscribeChannel
の下流にリシーケンサーまたはアグリゲーターを提供する場合、チャネルの "apply-sequence" プロパティを true
に設定できます。そうすることは、メッセージを渡す前に、チャネルが sequence-size
および sequence-number
メッセージヘッダーと相関 ID を設定する必要があることを示します。例: サブスクライバーが 5 人の場合、sequence-size
は 5
に設定され、メッセージは 1
から 5
の範囲の sequence-number
ヘッダー値を持ちます。
Executor
とともに、ErrorHandler
を構成することもできます。デフォルトでは、PublishSubscribeChannel
は MessagePublishingErrorHandler
実装を使用して、errorChannel
ヘッダーから MessageChannel
またはグローバル errorChannel
インスタンスにエラーを送信します。Executor
が構成されていない場合、ErrorHandler
は無視され、呼び出し元のスレッドに例外が直接スローされます。
PublishSubscribeChannel
から Resequencer
または Aggregator
ダウンストリームを提供する場合、チャネルの 'apply-sequence' プロパティを true
に設定できます。そうすることは、チャネルがメッセージを渡す前にシーケンス ID とシーケンス番号のメッセージヘッダーと相関 ID を設定する必要があることを示します。例: サブスクライバーが 5 人の場合、sequence-size は 5
に設定され、メッセージは 1
から 5
の範囲のシーケンス番号ヘッダー値を持ちます。
次の例は、apply-sequence
ヘッダーを true
に設定する方法を示しています。
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(false);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
パブリッシュ / サブスクライブチャネルがまったく同じメッセージインスタンスを複数の送信チャネルに送信できるように、apply-sequence 値はデフォルトで false です。Spring Integration はペイロードとヘッダー参照の不変性を強制するため、フラグが true に設定されると、チャネルは同じペイロード参照で異なるヘッダー値を持つ新しい Message インスタンスを作成します。 |
バージョン 5.4.3 以降、PublishSubscribeChannel
は、BroadcastingDispatcher
の requireSubscribers
オプションを使用して構成することもでき、サブスクライバーがない場合にこのチャネルがメッセージをサイレントに無視しないことを示します。サブスクライバーがなく、このオプションが true
に設定されている場合、Dispatcher has no subscribers
メッセージを含む MessageDispatchingException
がスローされます。
ExecutorChannel
ExecutorChannel
を作成するには、task-executor
属性を持つ <dispatcher>
サブ要素を追加します。属性の値は、コンテキスト内の任意の TaskExecutor
を参照できます。例: これにより、サブスクライブされたハンドラーにメッセージをディスパッチするためのスレッドプールの構成が可能になります。前述のように、これにより、送信者と受信者の間のシングルスレッド実行コンテキストが中断され、アクティブなトランザクションコンテキストがハンドラーの呼び出しで共有されなくなります(つまり、ハンドラーは Exception
をスローしますが、send
呼び出しはすでに返されます)正常に)。次の例は、dispatcher
要素を使用して、task-executor
属性でエグゼキュータを指定する方法を示しています。
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
|
PriorityChannel
設定
PriorityChannel
を作成するには、次の例に示すように、<priority-queue/>
サブ要素を使用します。
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
デフォルトでは、チャネルはメッセージの priority
ヘッダーを調べます。ただし、代わりにカスタム Comparator
参照を提供できます。また、PriorityChannel
は(他の型と同様に) datatype
属性をサポートしていることに注意してください。QueueChannel
と同様に、capacity
属性もサポートします。次の例は、これらすべてを示しています。
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
バージョン 4.0 以降、priority-channel
子エレメントは message-store
オプションをサポートします(その場合、comparator
および capacity
は許可されません)。メッセージストアは PriorityCapableChannelMessageStore
である必要があります。PriorityCapableChannelMessageStore
の実装は、現在 Redis
、JDBC
、MongoDB
用に提供されています。詳細については、QueueChannel
の構成およびメッセージストアを参照してください。サンプル構成はバッキングメッセージチャネルにあります。
RendezvousChannel
設定
キューのサブ要素が <rendezvous-queue>
の場合、RendezvousChannel
が作成されます。前述のオプションには追加の設定オプションはありません。また、キューは容量ゼロの直接ハンドオフキューであるため、容量値を受け入れません。次の例は、RendezvousChannel
を宣言する方法を示しています。
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
スコープチャネル構成
次の例に示すように、scope
属性を使用して任意のチャネルを構成できます。
<int:channel id="threadLocalChannel" scope="thread"/>
チャネルインターセプターの構成
チャネルインターセプターに従って、メッセージチャネルにはインターセプターもあります。<interceptors/>
サブ要素は、<channel/>
(またはより具体的な要素型)に追加できます。次の例に示すように、ref
属性を指定して、ChannelInterceptor
インターフェースを実装する Spring 管理対象オブジェクトを参照できます。
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
通常、インターセプター実装は通常、複数のチャネルで再利用できる共通の動作を提供するため、別の場所でインターセプター実装を定義することをお勧めします。
グローバルチャネルインターセプターの構成
チャネルインターセプターは、個々のチャネルごとに横断的動作を適用するためのクリーンで簡潔な方法を提供します。同じ動作を複数のチャネルに適用する必要がある場合、各チャネルに同じインターセプターのセットを構成することは最も効率的な方法ではありません。インターセプターを複数のチャネルに適用できるようにする一方で、構成の繰り返しを避けるために、Spring Integration はグローバルインターセプターを提供します。次のペアの例を検討してください。
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
各 <channel-interceptor/>
要素を使用すると、pattern
属性で定義されたパターンに一致するすべてのチャネルに適用されるグローバルインターセプターを定義できます。上記の場合、グローバルインターセプターは、"thing1" チャネルと "thing2" または "input" で始まる他のすべてのチャネルに適用されますが、"thing3" で始まるチャネルには適用されません(バージョン 5.0 以降)。
この構文をパターンに追加すると、1 つの可能性のある(おそらくありそうもない)問題が発生します。!thing1 という名前の Bean があり、チャンネルインターセプターの pattern パターンに !thing1 のパターンを含めた場合、一致しなくなります。パターンは thing1 という名前ではないすべての Bean に一致するようになりました。この場合、\ を使用して、パターン内の ! をエスケープできます。パターン \!thing1 は、!thing1 という名前の Bean と一致します。 |
order 属性を使用すると、特定のチャネルに複数のインターセプターがある場合に、このインターセプターが挿入される場所を管理できます。例: 次の例に示すように、チャネル "inputChannel" には個別にインターセプターをローカルに構成できます(以下を参照)。
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
合理的な質問は、「グローバルインターセプターは、他のインターセプターに関連してローカルまたは他のグローバルインターセプター定義を介してどのように注入されますか?」です。現在の実装では、インターセプターの実行順序を定義する簡単なメカニズムを提供します。order
属性に正の数を指定すると、既存のインターセプターの後にインターセプターが挿入され、負の数を指定すると既存のインターセプターの前にインターセプターが挿入されます。これは、前の例では、グローバルインターセプターが(その order
が 0
よりも大きいため)ローカルに構成された「ワイヤータップ」インターセプターの後に挿入されることを意味します。pattern
に一致する別のグローバルインターセプターが存在する場合、その順序は、両方のインターセプターの order
属性の値を比較することにより決定されます。既存のインターセプターの前にグローバルインターセプターを挿入するには、order
属性に負の値を使用します。
order 属性と pattern 属性の両方がオプションであることに注意してください。order のデフォルト値は 0 で、pattern のデフォルト値は "*" です(すべてのチャネルに一致するため)。 |
ワイヤータップ
前述のように、Spring Integration はシンプルなワイヤータップインターセプターを提供します。<interceptors/>
要素内の任意のチャネルでワイヤタップを構成できます。これはデバッグに特に役立ち、次のように Spring Integration のロギングチャネルアダプターと組み合わせて使用できます。
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
"logging-channel-adapter" も 'expression' 属性を受け入れるため、"payload" および "headers" 変数に対して SpEL 式を評価できます。または、メッセージ toString() の結果全体をログに記録するには、'log-full-message' 属性に値 true を指定します。デフォルトでは、false であるため、ペイロードのみがログに記録されます。true に設定すると、ペイロードに加えてすべてのヘッダーのログが有効になります。「式」オプションは、最も柔軟性が高くなります(例: expression="payload.user.name" )。 |
ワイヤタップおよびその他の類似のコンポーネント(メッセージ公開構成)に関する一般的な誤解の 1 つは、それらが本質的に自動的に非同期であるということです。デフォルトでは、コンポーネントとしてのワイヤタップは非同期で呼び出されません。代わりに、Spring Integration は、非同期動作を構成するための単一の統一アプローチであるメッセージチャネルに焦点を当てています。メッセージフローの特定の部分を同期または非同期にするのは、そのフロー内で構成されたメッセージチャネルの型です。これは、メッセージチャネルの抽象化の主な利点の 1 つです。フレームワークの開始から、フレームワークの第一級オブジェクトとしてのメッセージチャネルの必要性と価値を常に強調してきました。これは、EIP パターンの内部的な暗黙的な実現だけではありません。構成可能なコンポーネントとしてエンドユーザーに完全に公開されます。そのため、ワイヤータップコンポーネントは、次のタスクの実行のみを担当します。
チャネルをタップしてメッセージフローをインターセプトする (たとえば、
channelA
)各メッセージを取得する
別のチャネルにメッセージを送信します (たとえば、
channelB
)
これは基本的にブリッジパターンのバリエーションですが、チャネル定義内にカプセル化されます(したがって、フローを中断せずに有効化および無効化するのが簡単です)。また、ブリッジとは異なり、基本的に別のメッセージフローを分岐します。そのフローは同期ですか、非同期ですか? 答えは、"channelB" であるメッセージチャネルの型によって異なります。次のオプションがあります: ダイレクトチャンネル、ポーリング可能チャンネル、エグゼキューターチャンネル。最後の 2 つはスレッド境界を破り、そのようなチャネルを介した通信を非同期にします。これは、そのチャネルからサブスクライブされたハンドラーへのメッセージのディスパッチが、そのチャネルへのメッセージ送信に使用したスレッドとは異なるスレッドで発生するためです。それが、ワイヤタップフローを同期または非同期にするものです。フレームワーク内の他のコンポーネント(メッセージパブリッシャーなど)と一貫性があり、特定のコードを次のように実装する必要があるかどうかを事前に心配すること(スレッドセーフコードを書くこと以外)を回避することで、一定レベルの一貫性とシンプルさを追加します。同期または非同期。メッセージチャネルを介した 2 つのコード(たとえば、コンポーネント A とコンポーネント B)の実際の接続が、コラボレーションを同期または非同期にします。将来的には同期から非同期に変更することもできます。メッセージチャネルを使用すると、コードに触れることなく、迅速に変更できます。
ワイヤタップに関する最後のポイントは、デフォルトでは非同期ではないという上記の根拠にもかかわらず、通常はできるだけ早くメッセージをハンドオフすることが望ましいことを念頭に置いてください。ワイヤタップの送信チャネルとして非同期チャネルオプションを使用することは非常に一般的です。ただし、デフォルトでは非同期動作を強制しません。トランザクション境界を壊したくない場合を含めて、壊した場合に壊されるユースケースがいくつかあります。おそらく、監査目的でワイヤータップパターンを使用し、元のトランザクション内で監査メッセージを送信する必要があります。例として、ワイヤタップを JMS 発信チャネルアダプターに接続できます。これにより、両方の長所を活用できます。1)JMS メッセージの送信はトランザクション内で行うことができますが、2)まだ「ファイアアンドフォーゲット」アクションなので、メインメッセージフローの顕著な遅延を防ぎます。
バージョン 4.0 以降、インターセプター(WireTap クラス (英語) など)がチャネルを参照する場合、循環参照を回避することが重要です。現在のインターセプターによってインターセプトされているチャネルからこのようなチャネルを除外する必要があります。これは、適切なパターンで、またはプログラムで実行できます。channel を参照するカスタム ChannelInterceptor がある場合は、VetoCapableInterceptor の実装を検討してください。そのようにして、フレームワークはインターセプターに、提供されたパターンに基づいて、候補である各チャネルをインターセプトしてもよいかどうかを確認します。また、インターセプターメソッドにランタイム保護を追加して、インターセプターによって参照されるチャネルではないことを確認することもできます。WireTap はこれらの手法の両方を使用します。 |
バージョン 4.3 以降、WireTap
には、MessageChannel
インスタンスの代わりに channelName
を使用する追加のコンストラクターがあります。これは、Java 構成およびチャネル自動作成ロジックが使用されている場合に便利です。ターゲット MessageChannel
Bean は、後でインターセプターとの最初の対話で、提供された channelName
から解決されます。
チャネル解決には BeanFactory が必要であるため、ワイヤータップインスタンスは Spring 管理の Bean でなければなりません。 |
このレイトバインディングアプローチにより、次の例に示すように、Java DSL 構成を使用して一般的な盗聴パターンを簡素化することもできます。
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
条件付きワイヤタップ
selector
または selector-expression
属性を使用して、ワイヤタップを条件付きにすることができます。selector
は MessageSelector
Bean を参照します。MessageSelector
は、実行時にメッセージをタップチャネルに送信するかどうかを決定できます。同様に、selector-expression
は同じ目的を実行するブール SpEL 式です。式が true
に評価される場合、メッセージはタップチャネルに送信されます。
グローバルワイヤタップ構成
グローバルチャネルインターセプターの構成の特殊なケースとしてグローバルワイヤタップを設定することが可能です。そのためには、最上位の wire-tap
要素を構成します。現在、通常の wire-tap
名前空間のサポートに加えて、pattern
および order
属性がサポートされており、channel-interceptor
の場合とまったく同じように機能します。次の例は、グローバルワイヤタップを設定する方法を示しています。
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
グローバルワイヤタップは、既存のチャネル構成を変更せずに、外部でシングルチャネルワイヤタップを構成する便利な方法を提供します。これを行うには、pattern 属性をターゲットチャネル名に設定します。例: この手法を使用して、チャネル上のメッセージを検証するテストケースを構成できます。 |
特別チャンネル
デフォルトでは、errorChannel
と nullChannel
の 2 つの特別なチャネルがアプリケーションコンテキスト内で定義されています。'nullChannel' (NullChannel
のインスタンス)は /dev/null
のように機能し、DEBUG
レベルで送信されたメッセージをログに記録し、すぐに戻ります。送信されたメッセージの org.reactivestreams.Publisher
ペイロードには特別な処理が適用されます。データは破棄されますが、リアクティブストリーム処理を開始するために、このチャネルですぐにサブスクライブされます。リアクティブストリーム処理(Subscriber.onError(Throwable)
を参照)からスローされたエラーは、調査の可能性があるため、警告レベルでログに記録されます。このようなエラーで何かを行う必要がある場合は、Mono.doOnError()
をカスタマイズした ReactiveRequestHandlerAdvice
をメッセージハンドラーに適用して、この nullChannel
に Mono
応答を生成できます。気にしない応答でチャネル解決エラーが発生した場合はいつでも、影響を受けるコンポーネントの output-channel
属性を "nullChannel" に設定できます(名前 "nullChannel" はアプリケーションコンテキスト内で予約されています)。
'errorChannel' は、エラーメッセージを送信するために内部的に使用され、カスタム構成でオーバーライドされる場合があります。これについては、エラー処理で詳しく説明されています。
メッセージチャネルとインターセプターの詳細については、Java DSL の章のメッセージチャンネルも参照してください。