メッセージチャンネル

Message はデータをカプセル化するという重要なロールを果たしますが、メッセージプロデューサーをメッセージコンシューマーから切り離すのは MessageChannel です。

MessageChannel インターフェース

Spring Integration のトップレベル MessageChannel インターフェースは、次のように定義されています。

public interface MessageChannel {

    boolean send(Message message);

    boolean send(Message message, long timeout);
}

メッセージを送信する場合、メッセージが正常に送信されると、戻り値は true になります。送信呼び出しがタイムアウトまたは中断された場合、false を返します。

PollableChannel

メッセージチャネルはメッセージをバッファリングする場合としない場合があるため(Spring Integration の概要で説明)、2 つのサブインターフェースがバッファリング(ポーリング可能)および非バッファリング(サブスクライブ可能)チャネルの動作を定義します。以下のリストは、PollableChannel インターフェースの定義を示しています。

public interface PollableChannel extends MessageChannel {

    Message<?> receive();

    Message<?> receive(long timeout);

}

送信メソッドと同様に、メッセージを受信すると、タイムアウトまたは割り込みの場合、戻り値は null になります。

SubscribableChannel

SubscribableChannel ベースインターフェースは、サブスクライブされた MessageHandler インスタンスにメッセージを直接送信するチャネルによって実装されます。ポーリング用の受信メソッドは提供されません。代わりに、これらのサブスクライバを管理するためのメソッドを定義します。次のリストは、SubscribableChannel インターフェースの定義を示しています。

public interface SubscribableChannel extends MessageChannel {

    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);

}

メッセージチャネルの実装

Spring Integration は、いくつかの異なるメッセージチャネルの実装を提供します。次のセクションでは、それぞれについて簡単に説明します。

PublishSubscribeChannel

PublishSubscribeChannel 実装は、送信された Message をそのサブスクライブされたすべてのハンドラーにブロードキャストします。これは、主に通知がイベントメッセージの送信に最もよく使用されます(通常、単一のハンドラーによって処理されることを意図したドキュメントメッセージとは対照的です)。PublishSubscribeChannel は送信専用であることに注意してください。send(Message) メソッドが呼び出されると、サブスクライバーに直接ブロードキャストするため、コンシューマーはメッセージをポーリングできません(PollableChannel を実装していないため、receive() メソッドはありません)。代わりに、サブスクライバー自体が MessageHandler でなければならず、サブスクライバーの handleMessage(Message) メソッドが順番に呼び出されます。

バージョン 3.0 より前は、サブスクライバーがいない PublishSubscribeChannel で send メソッドを呼び出すと false が返されました。MessagingTemplate と組み合わせて使用すると、MessageDeliveryException がスローされました。バージョン 3.0 から、少なくとも最小サブスクライバーが存在する(およびメッセージを正常に処理する)場合に send が常に成功したと見なされるように、動作が変更されました。この動作を変更するには、minSubscribers プロパティを設定します。デフォルトは 0 です。

TaskExecutor を使用する場合、メッセージの実際の処理は非同期に実行されるため、この決定には正しい数のサブスクライバーの存在のみが使用されます。
QueueChannel

QueueChannel 実装はキューをラップします。PublishSubscribeChannel とは異なり、QueueChannel にはポイントツーポイントのセマンティクスがあります。つまり、チャンネルに複数のコンシューマーがある場合でも、そのチャンネルに送信された Message を受信できるのはそのうちの 1 つだけです。次のように、デフォルトの引数なしのコンストラクター(本質的に Integer.MAX_VALUE の無制限の容量を提供)と、キュー容量を受け入れるコンストラクターを提供します。

public QueueChannel(int capacity)

容量制限に達していないチャネルは、内部キューにメッセージを格納し、メッセージを処理する準備ができている受信者がいない場合でも、send(Message<?>) メソッドはすぐに戻ります。キューが容量に達すると、送信者はキューに空きができるまでブロックします。または、追加のタイムアウトパラメーターを持つ send メソッドを使用する場合、キューは、空きができるかタイムアウト期間が経過するかのどちらか早い方までブロックされます。同様に、receive() 呼び出しは、メッセージがキューで利用可能な場合はすぐに戻りますが、キューが空の場合、メッセージが利用可能になるか、タイムアウトが提供される場合はタイムアウトになるまで受信呼び出しはブロックされます。どちらの場合でも、タイムアウト値 0 を渡すことで、キューの状態に関係なくすぐに強制的に戻ることができます。ただし、timeout パラメーターブロックを使用せずに send() および receive() のバージョンを呼び出すことに注意してください。

PriorityChannel

QueueChannel は先入れ先出し(FIFO)の順序付けを実行しますが、PriorityChannel は優先度に基づいてチャネル内でメッセージを順序付けできる代替実装です。デフォルトでは、優先度は各メッセージ内の priority ヘッダーによって決定されます。ただし、カスタム優先順位決定ロジックの場合、Comparator<Message<?>> 型のコンパレーターを PriorityChannel コンストラクターに提供できます。

RendezvousChannel

RendezvousChannel は、「直接ハンドオフ」シナリオを可能にします。このシナリオでは、送信者は、別のパーティがチャネルの receive() メソッドを呼び出すまでブロックします。相手は、送信者がメッセージを送信するまでブロックします。内部的には、この実装は SynchronousQueue (BlockingQueue のゼロ容量実装)を使用することを除いて、QueueChannel と非常に似ています。これは、送信者と受信者が異なるスレッドで動作する状況ではうまく機能しますが、メッセージを非同期的にキューにドロップすることは適切ではありません。つまり、RendezvousChannel では、送信者は一部の受信者がメッセージを受け入れたことを認識しますが、QueueChannel では、メッセージは内部キューに格納され、受信されない可能性があります。

これらのキューベースのチャネルはすべて、デフォルトでのみメッセージをメモリに保存していることに注意してください。永続性が必要な場合は、'queue' 要素内で 'message-store' 属性を提供して永続的な MessageStore 実装を参照するか、ローカルチャネルを JMS で裏付けられたチャネルやチャネルアダプターなどの永続的なブローカによってバッキングされたものに置き換えることができます。後者のオプションを使用すると、JMS サポートで説明したように、JMS プロバイダーのメッセージ永続性の実装を利用できます。ただし、キューにバッファリングする必要がない場合、最も簡単なアプローチは、次のセクションで説明する DirectChannel に依存することです。

RendezvousChannel は、リクエスト / 応答操作の実装にも役立ちます。送信者は、RendezvousChannel の一時的な匿名インスタンスを作成し、Message を構築するときに "replyChannel" ヘッダーとして設定します。その Message を送信した後、送信者はすぐに receive (オプションでタイムアウト値を提供)を呼び出して、応答 Message を待っている間にブロックできます。これは、Spring Integration のリクエスト / 応答コンポーネントの多くで内部的に使用される実装に非常に似ています。

DirectChannel

DirectChannel にはポイントツーポイントセマンティクスがありますが、それ以外の点では、前述のキューベースのチャネル実装のいずれよりも PublishSubscribeChannel に類似しています。PollableChannel インターフェースの代わりに SubscribableChannel インターフェースを実装するため、サブスクライバーに直接メッセージをディスパッチします。ただし、ポイントツーポイントチャネルとして、各 Message を単一のサブスクライブされた MessageHandler に送信するという点で、PublishSubscribeChannel とは異なります。

最もシンプルなポイントツーポイントチャネルオプションであることに加えて、最も重要な機能の 1 つは、単一スレッドがチャネルの「両側」で操作を実行できるようにすることです。例: ハンドラーが DirectChannel をサブスクライブする場合、Message をそのチャネルに送信すると、send() メソッドの呼び出しが戻る前に、そのハンドラーの handleMessage(Message) メソッドの呼び出しが送信者のスレッドで直接トリガーされます。

この動作をチャネル実装に提供する主な動機は、チャネルが提供する抽象化と疎結合の恩恵を受けながら、チャネル全体に広がるトランザクションをサポートすることです。送信呼び出しがトランザクションのスコープ内で呼び出される場合、ハンドラーの呼び出しの結果(データベースレコードの更新など)は、そのトランザクションの最終結果(コミットまたはロールバック)を決定するロールを果たします。

DirectChannel は最も単純なオプションであり、ポーラーのスレッドのスケジューリングと管理に必要な追加のオーバーヘッドを追加しないため、Spring Integration 内のデフォルトのチャネル型です。一般的な考え方は、アプリケーションのチャネルを定義し、バッファリングを提供する必要があるか、入力を調整する必要があるかを検討し、キューベースの PollableChannels に変更することです。同様に、チャネルがメッセージをブロードキャストする必要がある場合、DirectChannel ではなく PublishSubscribeChannel である必要があります。後で、これらの各チャネルの構成方法を示します。

DirectChannel は、サブスクライブされたメッセージハンドラーを呼び出すために、メッセージディスパッチャーに内部的に委譲します。そのディスパッチャーは、load-balancer または load-balancer-ref 属性(相互に排他的)によって公開される負荷分散戦略を持つことができます。負荷分散戦略は、複数のメッセージハンドラーが同じチャネルにサブスクライブするときに、メッセージがメッセージハンドラー間でどのように分散されるかを決定するためにメッセージディスパッチャーによって使用されます。便宜上、load-balancer 属性は、LoadBalancingStrategy の既存の実装を指す値の列挙を公開します。使用可能な値は、round-robin (ローテーション中のハンドラー間の負荷分散)と none (負荷分散を明示的に無効にする場合)のみです。他の戦略の実装は、将来のバージョンで追加される可能性があります。ただし、バージョン 3.0 以降、LoadBalancingStrategy の独自の実装を提供し、次の例に示すように、LoadBalancingStrategy を実装する Bean を指す load-balancer-ref 属性を使用してそれを挿入できます。

FixedSubscriberChannel は、サブスクライブ解除できない単一の MessageHandler サブスクライバーのみをサポートする SubscribableChannel です。これは、他のサブスクライバーが関与せず、チャネルインターセプターが必要ない、高スループットパフォーマンスのユースケースに役立ちます。

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

load-balancer 属性と load-balancer-ref 属性は相互に排他的であることに注意してください。

ロードバランシングは、ブール failover プロパティと連携して機能します。failover 値が true(デフォルト)の場合、前のハンドラーが例外をスローすると、ディスパッチャーは(必要に応じて)後続のハンドラーにフォールバックします。順序は、ハンドラー自体で定義されたオプションの順序値、またはそのような値が存在しない場合、ハンドラーがサブスクライブする順序によって決定されます。

特定の状況で、ディスパッチャーが常に最初のハンドラーを呼び出して、エラーが発生するたびに同じ固定順序でフォールバックすることを必要とする場合、負荷分散戦略は提供されません。つまり、ロードバランシングが有効になっていない場合でも、ディスパッチャーは failover ブールプロパティを引き続きサポートします。ただし、負荷分散を行わない場合、ハンドラーの呼び出しは、順序に従って常に最初から開始されます。例: プライマリ、セカンダリ、ターシャリなどの明確な定義がある場合、このアプローチはうまく機能します。名前空間のサポートを使用する場合、エンドポイントの order 属性が順序を決定します。

負荷分散と failover は、チャネルに複数のサブスクライブされたメッセージハンドラーがある場合にのみ適用されることに注意してください。ネームスペースサポートを使用する場合、これは、複数のエンドポイントが input-channel 属性で定義された同じチャネル参照を共有することを意味します。

バージョン 5.2 以降、failover が true の場合、現在のハンドラーの失敗と失敗したメッセージは、それぞれ構成されている場合、debug または info に記録されます。

ExecutorChannel

ExecutorChannel は、DirectChannel (負荷分散戦略および failover ブールプロパティ)と同じディスパッチャー構成をサポートするポイントツーポイントチャネルです。これら 2 つのディスパッチチャネル型の主な違いは、ExecutorChannel が TaskExecutor のインスタンスに委譲してディスパッチを実行することです。これは、通常、send メソッドがブロックしないことを意味しますが、ハンドラーの呼び出しが送信者のスレッドで発生しない可能性があることも意味します。送信側と受信側のハンドラーにまたがるトランザクションはサポートしていません。

送信者がブロックすることがあります。例: TaskExecutor を使用して、クライアントを調整する拒否ポリシー(ThreadPoolExecutor.CallerRunsPolicy など)を使用すると、送信者のスレッドは、スレッドプールが最大容量になり、エグゼキューターの作業キューがいっぱいになるといつでもメソッドを実行できます。そのような状況は予測不可能な方法でのみ発生するため、トランザクションに依存するべきではありません。
FluxMessageChannel

FluxMessageChannel は、ダウンストリームのリアクティブサブスクライバーによるオンデマンド消費のために、"sinking" が内部 reactor.core.publisher.Flux にメッセージを送信するための org.reactivestreams.Publisher 実装です。このチャネル実装は SubscribableChannel でも PollableChannel でもないため、org.reactivestreams.Subscriber インスタンスのみを使用して、このチャネルからリアクティブストリームのバックプレッシャー特性を尊重することができます。一方、FluxMessageChannel は subscribeTo(Publisher<Message<?>>) 契約で ReactiveStreamsSubscribableChannel を実装し、リアクティブソースパブリッシャーからのイベントの受信を可能にし、リアクティブストリームを統合フローにブリッジします。統合フロー全体に対して完全にリアクティブな動作を実現するには、フロー内のすべてのエンドポイント間にそのようなチャネルを配置する必要があります。

Reactive Streams との相互作用の詳細については、Reactive Streams サポートを参照してください。

スコープチャネル

Spring Integration 1.0 は ThreadLocalChannel 実装を提供していましたが、2.0 から削除されました。同じ要件を処理するより一般的な方法は、チャネルに scope 属性を追加することです。属性の値には、コンテキスト内で使用可能なスコープの名前を指定できます。例: Web 環境では、特定のスコープを使用でき、カスタムスコープの実装をコンテキストに登録できます。次の例は、スコープ自体の登録を含め、スレッドローカルスコープがチャネルに適用されていることを示しています。

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

前の例で定義したチャネルも内部的にキューに委譲しますが、チャネルは現在のスレッドにバインドされているため、キューの内容も同様にバインドされます。そうすれば、チャネルに送信するスレッドは後で同じメッセージを受信できますが、他のスレッドはそれらにアクセスできません。スレッドスコープのチャネルが必要になることはめったにありませんが、DirectChannel インスタンスを使用して単一のスレッドオペレーションを実施しているが、応答メッセージは「ターミナル」チャネルに送信する必要がある状況で役立ちます。そのターミナルチャネルがスレッドスコープの場合、元の送信スレッドはターミナルチャネルから応答を収集できます。

これで、任意のチャネルのスコープを設定できるため、スレッドローカルに加えて独自のスコープを定義できます。

チャネルインターセプター

メッセージングアーキテクチャの利点の 1 つは、一般的な動作を提供し、非侵襲的な方法でシステムを通過するメッセージに関する意味のある情報を取得できることです。Message インスタンスは MessageChannel インスタンスとの間で送受信されるため、これらのチャネルは送受信操作をインターセプトする機会を提供します。次のリストに示す ChannelInterceptor 戦略インターフェースは、これらの各操作のメソッドを提供します。

public interface ChannelInterceptor {

    Message<?> preSend(Message<?> message, MessageChannel channel);

    void postSend(Message<?> message, MessageChannel channel, boolean sent);

    void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, Exception ex);

    boolean preReceive(MessageChannel channel);

    Message<?> postReceive(Message<?> message, MessageChannel channel);

    void afterReceiveCompletion(Message<?> message, MessageChannel channel, Exception ex);
}

インターフェースを実装した後、インターセプターをチャネルに登録するには、次の呼び出しを行うだけです。

channel.addInterceptor(someChannelInterceptor);

Message インスタンスを返すメソッドは、Message の変換に使用するか、"null" を返してさらなる処理を防ぐことができます(もちろん、どのメソッドも RuntimeException をスローできます)。また、preReceive メソッドは false を返して、受信操作が進行しないようにすることができます。

receive() 呼び出しは PollableChannels にのみ関連することに注意してください。実際、SubscribableChannel インターフェースは receive() メソッドを定義していません。これは、Message が SubscribableChannel に送信されると、チャネルの型に応じて、0 個以上のサブスクライバーに直接送信されるためです(たとえば、PublishSubscribeChannel はすべてのサブスクライバーに送信されます)。preReceive(…​)postReceive(…​)afterReceiveCompletion(…​) インターセプターメソッドは、インターセプターが PollableChannel に適用された場合にのみ呼び出されます。

Spring Integration は、ワイヤータップ (英語) パターンの実装も提供します。これは、既存のフローを変更せずに Message を別のチャネルに送信する単純なインターセプターです。デバッグと監視に非常に役立ちます。ワイヤータップに例を示します。

すべてのインターセプターメソッドを実装する必要があることはめったにないため、インターフェースは no-op メソッドを提供します(void メソッドを返すメソッドにはコードがなく、Message を返すメソッドは Message をそのまま返し、boolean メソッドは true を返します)。

インターセプターメソッドの呼び出し順序は、チャネルの型によって異なります。前に説明したように、キューベースのチャネルは、最初に受信メソッドがインターセプトされる唯一のチャネルです。さらに、送信インターセプトと受信インターセプトの関連は、個別の送信者スレッドと受信者スレッドのタイミングによって異なります。例: メッセージの待機中に受信者がすでにブロックされている場合、順序は次のようになります: preSendpreReceivepostReceivepostSend。ただし、送信者がチャネルにメッセージを送信してすでに戻ってきた後に受信者がポーリングする場合、順序は次のようになります: preSendpostSend (しばらく経過)、preReceivepostReceive。このような場合に経過する時間は、いくつかの要因に依存するため、一般的に予測できません(実際、受信が発生することはありません)。キューの型もロールを果たします(たとえば、ランデブーと優先度)。つまり、preSend が postSend に先行し、preReceive が postReceive に先行するという事実を超えて、順序を信頼することはできません。

Spring Framework 4.1 および Spring Integration 4.1 以降、ChannelInterceptor は新しいメソッド afterSendCompletion() および afterReceiveCompletion() を提供します。発生した例外に関係なく、send()' and 'receive() 呼び出しの後に呼び出され、リソースのクリーンアップが可能になります。チャネルは、最初の preSend() および preReceive() 呼び出しの逆の順序で ChannelInterceptor リストのこれらのメソッドを呼び出すことに注意してください。

バージョン 5.1 以降、グローバルチャネルインターセプターは、Java DSL を使用するときに beanFactory.initializeBean() または IntegrationFlowContext を使用して初期化される Bean を介するなど、動的に登録されたチャネルに適用されるようになりました。以前は、アプリケーションコンテキストのリフレッシュ後に Bean が作成されたときにインターセプターは適用されませんでした。

また、バージョン 5.1 以降、メッセージが受信されないときに ChannelInterceptor.postReceive() が呼び出されなくなりました。nullMessage<?> をチェックする必要はなくなりました。以前は、メソッドが呼び出されていました。以前の動作に依存するインターセプターがある場合は、代わりに afterReceiveCompleted() を実装してください。これは、メッセージが受信されたかどうかに関係なく、そのメソッドが呼び出されるためです。

バージョン 5.2 から、ChannelInterceptorAware は Spring メッセージングモジュールの InterceptableChannel に代わって非推奨になりました。これは下位互換性のために拡張されています。

MessagingTemplate

エンドポイントとそのさまざまな構成オプションが導入されると、Spring Integration はメッセージングコンポーネントからの基盤を提供し、メッセージングシステムからアプリケーションコードを非侵襲的に呼び出すことができます。ただし、アプリケーションコードからメッセージングシステムを呼び出すことが必要な場合があります。このようなユースケースを実装する際の便宜上、Spring Integration は、リクエストおよび応答シナリオを含む、メッセージチャネル全体のさまざまな操作をサポートする MessagingTemplate を提供します。例: 次のように、リクエストを送信して応答を待つことができます。

MessagingTemplate template = new MessagingTemplate();

Message reply = template.sendAndReceive(someChannel, new GenericMessage("test"));

上記の例では、一時的な匿名チャンネルがテンプレートによって内部的に作成されます。"sendTimeout" および "receiveTimeout" プロパティもテンプレートで設定でき、他の交換型もサポートされます。次のリストは、そのようなメソッドのシグネチャーを示しています。

public boolean send(final MessageChannel channel, final Message<?> message) { ...
}

public Message<?> sendAndReceive(final MessageChannel channel, final Message<?> request) { ...
}

public Message<?> receive(final PollableChannel<?> channel) { ...
}
Message インスタンスの代わりにペイロードまたはヘッダー値を使用して単純なインターフェースを呼び出すことができる、より侵襲性の低いアプローチについては、GatewayProxyFactoryBean を入力してくださいで説明しています。

メッセージチャネルの構成

メッセージチャネルインスタンスを作成するには、次のように、xml の場合は <channel/> 要素を、Java 構成の場合は DirectChannel インスタンスを使用できます。

Java
@Bean
public MessageChannel exampleChannel() {
    return new DirectChannel();
}
XML
<int:channel id="exampleChannel"/>

サブエレメントなしで <channel/> エレメントを使用すると、DirectChannel インスタンス(SubscribableChannel)が作成されます。

パブリッシュ / サブスクライブチャネルを作成するには、次のように <publish-subscribe-channel/> 要素(Java では PublishSubscribeChannel)を使用します。

Java
@Bean
public MessageChannel exampleChannel() {
    return new PublishSubscribeChannel();
}
XML
<int:publish-subscribe-channel id="exampleChannel"/>

あるいは、さまざまな <queue/> サブ要素を提供して、ポーリング可能なチャネル型のいずれかを作成することもできます(メッセージチャネルの実装で説明されています)。次のセクションでは、各チャネル型の例を示します。

DirectChannel 設定

前述のように、DirectChannel はデフォルトの型です。次のリストは、誰を定義するかを示しています。

Java
@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
XML
<int:channel id="directChannel"/>

デフォルトのチャネルにはラウンドロビンロードバランサーがあり、フェイルオーバーも有効になっています(詳細については DirectChannel を参照してください)。これらの一方または両方を無効にするには、<dispatcher/> サブ要素(DirectChannel の LoadBalancingStrategy コンストラクター)を追加し、次のように属性を構成します。

Java
@Bean
public MessageChannel failFastChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(false);
    return channel;
}

@Bean
public MessageChannel failFastChannel() {
    return new DirectChannel(null);
}
XML
<int:channel id="failFastChannel">
    <int:dispatcher failover="false"/>
</channel>

<int:channel id="channelWithFixedOrderSequenceFailover">
    <int:dispatcher load-balancer="none"/>
</int:channel>
データ型チャネルの構成

コンシューマーは特定の型のペイロードのみを処理できる場合があり、入力メッセージのペイロード型を確認する必要があります。最初に思い浮かぶのは、メッセージフィルターを使用することです。ただし、メッセージフィルターでできることは、コンシューマーの要件に準拠していないメッセージを除外することだけです。別の方法は、コンテンツベースのルーターを使用し、非準拠のデータ型のメッセージを特定のトランスフォーマーにルーティングして、必要なデータ型への変換と変換を実施することです。これは機能しますが、同じことを達成するためのより簡単な方法は、データ型チャネル (英語) パターンを適用することです。特定のペイロードデータ型ごとに個別のデータ型チャネルを使用できます。

特定のペイロード型を含むメッセージのみを受け入れるデータ型チャネルを作成するには、次の例に示すように、チャネル要素の datatype 属性にデータ型の完全修飾クラス名を指定します。

Java
@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(Number.class);
    return channel;
}
XML
<int:channel id="numberChannel" datatype="java.lang.Number"/>

型チェックは、チャネルのデータ型に割り当て可能なすべての型に合格することに注意してください。つまり、前述の例の numberChannel は、ペイロードが java.lang.Integer または java.lang.Double であるメッセージを受け入れます。次の例に示すように、複数の型をコンマ区切りリストとして提供できます。

Java
@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(String.class, Number.class);
    return channel;
}
XML
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

前述の例の "numberChannel" は、java.lang.Number のデータ型のメッセージのみを受け入れます。しかし、メッセージのペイロードが必要な型ではない場合はどうなるでしょうか? Spring の変換サービスのインスタンスである integrationConversionService という名前の Bean を定義したかどうかによります。そうでない場合は、Exception がすぐにスローされます。ただし、integrationConversionService Bean を定義している場合は、メッセージのペイロードを許容可能な型に変換する試みで使用されます。

カスタムコンバーターを登録することもできます。例: String ペイロードを含むメッセージを、上記で設定した "numberChannel" に送信するとします。次のようにメッセージを処理できます。

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));

通常、これは完全に正当的な操作です。ただし、Datatype Channel を使用しているため、このような操作の結果は次のような例外を生成します。

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…

ペイロード型を Number にする必要があるため、例外が発生しますが、String を送信しました。そのため、String を Number に変換するものが必要です。そのために、次の例のようなコンバーターを実装できます。

public static class StringToIntegerConverter implements Converter<String, Integer> {
    public Integer convert(String source) {
        return Integer.parseInt(source);
    }
}

次に、次の例に示すように、Integration Conversion Service でコンバーターとして登録できます。

Java
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
    return new StringToIntegerConverter();
}
XML
<int:converter ref="strToInt"/>

<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>

または、StringToIntegerConverter クラスで、自動スキャン用に @Component アノテーションが付けられている場合。

'converter' 要素が解析されると、integrationConversionService Bean が作成されます(まだ定義されていない場合)。データ型チャネルはそのコンバーターを使用して String ペイロードを Integer に変換するため、そのコンバーターが適切に配置されていれば、send 操作は成功します。

ペイロード型の変換に関する詳細については、ペイロード型変換を参照してください。

バージョン 4.0 以降、integrationConversionService は DefaultDatatypeChannelMessageConverter によって呼び出され、DefaultDatatypeChannelMessageConverter はアプリケーションコンテキストで変換サービスを検索します。別の変換手法を使用するには、チャネルで message-converter 属性を指定できます。これは、MessageConverter 実装への参照でなければなりません。fromMessage メソッドのみが使用されます。コンバーターにメッセージヘッダーへのアクセスを提供します(content-type などのヘッダーからの情報が必要な場合に備えて)。メソッドは、変換されたペイロードまたは完全な Message オブジェクトのみを返すことができます。後者の場合、コンバーターは受信メッセージからすべてのヘッダーをコピーするように注意する必要があります。

あるいは、型 MessageConverter の <bean/> を ID datatypeChannelMessageConverter で宣言でき、そのコンバーターは datatype のすべてのチャネルで使用されます。

QueueChannel 設定

QueueChannel を作成するには、<queue/> サブ要素を使用します。次のようにチャンネルの容量を指定できます。

Java
@Bean
public PollableChannel queueChannel() {
    return new QueueChannel(25);
}
XML
<int:channel id="queueChannel">
    <queue capacity="25"/>
</int:channel>
この <queue/> サブエレメントの 'capacity' 属性に値を指定しない場合、結果のキューは無制限になります。メモリ不足などの課題を回避するために、制限キューに明示的な値を設定することを強くお勧めします。
永続的な QueueChannel 設定

QueueChannel はメッセージをバッファリングする機能を提供しますが、デフォルトではメモリ内でのみ行うため、システム障害が発生した場合にメッセージが失われる機能があります。このリスクを軽減するために、QueueChannel は、MessageGroupStore 戦略インターフェースの永続的な実装によって支援される場合があります。MessageGroupStore および MessageStore の詳細については、メッセージストアを参照してください。

message-store 属性が使用されている場合、capacity 属性は許可されません。

QueueChannel は Message を受信すると、メッセージをメッセージストアに追加します。Message が QueueChannel からポーリングされると、メッセージストアから削除されます。

デフォルトでは、QueueChannel はメッセージをインメモリキューに格納します。これにより、前述のメッセージの損失シナリオが発生する可能性があります。ただし、Spring Integration は JdbcChannelMessageStore などの永続ストアを提供します。

次の例に示すように、message-store 属性を追加することにより、任意の QueueChannel のメッセージストアを設定できます。

<int:channel id="dbBackedChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

(Java/Kotlin 構成オプションについては、以下のサンプルを参照してください)

Spring Integration JDBC モジュールは、多くの一般的なデータベース用のスキーマデータ定義言語 (DDL) も提供します。これらのスキーマは、そのモジュール (spring-integration-jdbc) の org.springframework.integration.jdbc.store.channel パッケージにあります。

重要な機能の 1 つは、トランザクション永続ストア(JdbcChannelMessageStore など)で、ポーラーにトランザクションが構成されている限り、ストアから削除されたメッセージは、トランザクションが正常に完了した場合にのみ永久に削除できることです。それ以外の場合、トランザクションはロールバックされ、Message は失われません。

"NoSQL" データストアに関連する Spring プロジェクトの数が増え、これらのストアの基礎となるサポートが提供されるようになったため、メッセージストアの他の多くの実装が利用可能になりました。特定のニーズを満たすものが見つからない場合は、MessageGroupStore インターフェースの独自の実装を提供することもできます。

バージョン 4.0 以降、可能であれば ChannelMessageStore を使用するように QueueChannel インスタンスを構成することをお勧めします。これらは、一般的なメッセージストアと比較して、一般的にこの用途に最適化されています。ChannelMessageStore が ChannelPriorityMessageStore の場合、メッセージは優先順位内で FIFO で受信されます。優先順位の概念は、メッセージストアの実装によって決まります。例: 次の例は、MongoDB チャネルメッセージストアの Java 構成を示しています。

Java
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
Java DSL
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
    return IntegrationFlows.from((Channels c) ->
            c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
            ....
            .get();
}
Kotlin DSL
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
    integrationFlow {
        channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
    }
MessageGroupQueue クラスに注意してください。これは、MessageGroupStore 操作を使用する BlockingQueue 実装です。

QueueChannel 環境をカスタマイズする別のオプションは、<int:queue> サブエレメントまたはその特定のコンストラクターの ref 属性によって提供されます。この属性は、java.util.Queue 実装への参照を提供します。例: Hazelcast 分散 IQueue (英語) は、次のように構成できます。

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(new Config()
                                           .setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
    return new QueueChannel(hazelcastInstance()
                              .getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel 設定

PublishSubscribeChannel を作成するには、<publish-subscribe-channel/> 要素を使用します。この要素を使用する場合、次のように、メッセージの公開に使用する task-executor を指定することもできます(指定しない場合、送信者のスレッドで公開します)。

Java
@Bean
public MessageChannel pubsubChannel() {
    return new PublishSubscribeChannel(someExecutor());
}
XML
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

PublishSubscribeChannel の下流にリシーケンサーまたはアグリゲーターを提供する場合、チャネルの "apply-sequence" プロパティを true に設定できます。そうすることは、メッセージを渡す前に、チャネルが sequence-size および sequence-number メッセージヘッダーと相関 ID を設定する必要があることを示します。例: サブスクライバーが 5 人の場合、sequence-size は 5 に設定され、メッセージは 1 から 5 の範囲の sequence-number ヘッダー値を持ちます。

Executor とともに、ErrorHandler を構成することもできます。デフォルトでは、PublishSubscribeChannel は MessagePublishingErrorHandler 実装を使用して、errorChannel ヘッダーから MessageChannel またはグローバル errorChannel インスタンスにエラーを送信します。Executor が構成されていない場合、ErrorHandler は無視され、呼び出し元のスレッドに例外が直接スローされます。

PublishSubscribeChannel から Resequencer または Aggregator ダウンストリームを提供する場合、チャネルの 'apply-sequence' プロパティを true に設定できます。そうすることは、チャネルがメッセージを渡す前にシーケンス ID とシーケンス番号のメッセージヘッダーと相関 ID を設定する必要があることを示します。例: サブスクライバーが 5 人の場合、sequence-size は 5 に設定され、メッセージは 1 から 5 の範囲のシーケンス番号ヘッダー値を持ちます。

次の例は、apply-sequence ヘッダーを true に設定する方法を示しています。

Java
@Bean
public MessageChannel pubsubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel();
    channel.setApplySequence(false);
    return channel;
}
XML
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
パブリッシュ / サブスクライブチャネルがまったく同じメッセージインスタンスを複数の送信チャネルに送信できるように、apply-sequence 値はデフォルトで false です。Spring Integration はペイロードとヘッダー参照の不変性を強制するため、フラグが true に設定されると、チャネルは同じペイロード参照で異なるヘッダー値を持つ新しい Message インスタンスを作成します。

バージョン 5.4.3 以降、PublishSubscribeChannel は、BroadcastingDispatcher の requireSubscribers オプションを使用して構成することもでき、サブスクライバーがない場合にこのチャネルがメッセージをサイレントに無視しないことを示します。サブスクライバーがなく、このオプションが true に設定されている場合、Dispatcher has no subscribers メッセージを含む MessageDispatchingException がスローされます。

ExecutorChannel

ExecutorChannel を作成するには、task-executor 属性を持つ <dispatcher> サブ要素を追加します。属性の値は、コンテキスト内の任意の TaskExecutor を参照できます。例: これにより、サブスクライブされたハンドラーにメッセージをディスパッチするためのスレッドプールの構成が可能になります。前述のように、これにより、送信者と受信者の間のシングルスレッド実行コンテキストが中断され、アクティブなトランザクションコンテキストがハンドラーの呼び出しで共有されなくなります(つまり、ハンドラーは Exception をスローしますが、send 呼び出しはすでに返されます)正常に)。次の例は、dispatcher 要素を使用して、task-executor 属性でエグゼキュータを指定する方法を示しています。

Java
@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(someExecutor());
}
XML
<int:channel id="executorChannel">
    <int:dispatcher task-executor="someExecutor"/>
</int:channel>

DirectChannel 設定で前述したように、load-balancer オプションと failover オプションは両方とも <dispatcher/> サブ要素でも使用できます。同じデフォルトが適用されます。その結果、次の例に示すように、これらの属性の一方または両方に対して明示的な構成が提供されない限り、チャネルにはフェールオーバーが有効なラウンドロビンロードバランシング戦略があります。

<int:channel id="executorChannelWithoutFailover">
    <int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>
PriorityChannel 設定

PriorityChannel を作成するには、次の例に示すように、<priority-queue/> サブ要素を使用します。

Java
@Bean
public PollableChannel priorityChannel() {
    return new PriorityChannel(20);
}
XML
<int:channel id="priorityChannel">
    <int:priority-queue capacity="20"/>
</int:channel>

デフォルトでは、チャネルはメッセージの priority ヘッダーを調べます。ただし、代わりにカスタム Comparator 参照を提供できます。また、PriorityChannel は(他の型と同様に) datatype 属性をサポートしていることに注意してください。QueueChannel と同様に、capacity 属性もサポートします。次の例は、これらすべてを示しています。

Java
@Bean
public PollableChannel priorityChannel() {
    PriorityChannel channel = new PriorityChannel(20, widgetComparator());
    channel.setDatatypes(example.Widget.class);
    return channel;
}
XML
<int:channel id="priorityChannel" datatype="example.Widget">
    <int:priority-queue comparator="widgetComparator"
                    capacity="10"/>
</int:channel>

バージョン 4.0 以降、priority-channel 子エレメントは message-store オプションをサポートします(その場合、comparator および capacity は許可されません)。メッセージストアは PriorityCapableChannelMessageStore である必要があります。PriorityCapableChannelMessageStore の実装は、現在 RedisJDBCMongoDB 用に提供されています。詳細については、QueueChannel の構成およびメッセージストアを参照してください。サンプル構成はバッキングメッセージチャネルにあります。

RendezvousChannel 設定

キューのサブ要素が <rendezvous-queue> の場合、RendezvousChannel が作成されます。前述のオプションには追加の設定オプションはありません。また、キューは容量ゼロの直接ハンドオフキューであるため、容量値を受け入れません。次の例は、RendezvousChannel を宣言する方法を示しています。

Java
@Bean
public PollableChannel rendezvousChannel() {
    return new RendezvousChannel();
}
XML
<int:channel id="rendezvousChannel"/>
    <int:rendezvous-queue/>
</int:channel>
スコープチャネル構成

次の例に示すように、scope 属性を使用して任意のチャネルを構成できます。

<int:channel id="threadLocalChannel" scope="thread"/>
チャネルインターセプターの構成

チャネルインターセプターに従って、メッセージチャネルにはインターセプターもあります。<interceptors/> サブ要素は、<channel/> (またはより具体的な要素型)に追加できます。次の例に示すように、ref 属性を指定して、ChannelInterceptor インターフェースを実装する Spring 管理対象オブジェクトを参照できます。

<int:channel id="exampleChannel">
    <int:interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </int:interceptors>
</int:channel>

通常、インターセプター実装は通常、複数のチャネルで再利用できる共通の動作を提供するため、別の場所でインターセプター実装を定義することをお勧めします。

グローバルチャネルインターセプターの構成

チャネルインターセプターは、個々のチャネルごとに横断的動作を適用するためのクリーンで簡潔な方法を提供します。同じ動作を複数のチャネルに適用する必要がある場合、各チャネルに同じインターセプターのセットを構成することは最も効率的な方法ではありません。インターセプターを複数のチャネルに適用できるようにする一方で、構成の繰り返しを避けるために、Spring Integration はグローバルインターセプターを提供します。次のペアの例を検討してください。

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
    <bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>

各 <channel-interceptor/> 要素を使用すると、pattern 属性で定義されたパターンに一致するすべてのチャネルに適用されるグローバルインターセプターを定義できます。上記の場合、グローバルインターセプターは、"thing1" チャネルと "thing2" または "input" で始まる他のすべてのチャネルに適用されますが、"thing3" で始まるチャネルには適用されません(バージョン 5.0 以降)。

この構文をパターンに追加すると、1 つの可能性のある(おそらくありそうもない)問題が発生します。!thing1 という名前の Bean があり、チャンネルインターセプターの pattern パターンに !thing1 のパターンを含めた場合、一致しなくなります。パターンは thing1 という名前ではないすべての Bean に一致するようになりました。この場合、\ を使用して、パターン内の ! をエスケープできます。パターン \!thing1 は、!thing1 という名前の Bean と一致します。

order 属性を使用すると、特定のチャネルに複数のインターセプターがある場合に、このインターセプターが挿入される場所を管理できます。例: 次の例に示すように、チャネル "inputChannel" には個別にインターセプターをローカルに構成できます(以下を参照)。

<int:channel id="inputChannel">
  <int:interceptors>
    <int:wire-tap channel="logger"/>
  </int:interceptors>
</int:channel>

合理的な質問は、「グローバルインターセプターは、他のインターセプターに関連してローカルまたは他のグローバルインターセプター定義を介してどのように注入されますか?」です。現在の実装では、インターセプターの実行順序を定義する簡単なメカニズムを提供します。order 属性に正の数を指定すると、既存のインターセプターの後にインターセプターが挿入され、負の数を指定すると既存のインターセプターの前にインターセプターが挿入されます。これは、前の例では、グローバルインターセプターが(その order が 0 よりも大きいため)ローカルに構成された「ワイヤータップ」インターセプターの後に挿入されることを意味します。pattern に一致する別のグローバルインターセプターが存在する場合、その順序は、両方のインターセプターの order 属性の値を比較することにより決定されます。既存のインターセプターの前にグローバルインターセプターを挿入するには、order 属性に負の値を使用します。

order 属性と pattern 属性の両方がオプションであることに注意してください。order のデフォルト値は 0 で、pattern のデフォルト値は "*" です(すべてのチャネルに一致するため)。
ワイヤータップ

前述のように、Spring Integration はシンプルなワイヤータップインターセプターを提供します。<interceptors/> 要素内の任意のチャネルでワイヤタップを構成できます。これはデバッグに特に役立ち、次のように Spring Integration のロギングチャネルアダプターと組み合わせて使用できます。

<int:channel id="in">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>
"logging-channel-adapter" も 'expression' 属性を受け入れるため、"payload" および "headers" 変数に対して SpEL 式を評価できます。または、メッセージ toString() の結果全体をログに記録するには、'log-full-message' 属性に値 true を指定します。デフォルトでは、false であるため、ペイロードのみがログに記録されます。true に設定すると、ペイロードに加えてすべてのヘッダーのログが有効になります。「式」オプションは、最も柔軟性が高くなります(例: expression="payload.user.name")。

ワイヤタップおよびその他の類似のコンポーネント(メッセージ公開構成)に関する一般的な誤解の 1 つは、それらが本質的に自動的に非同期であるということです。デフォルトでは、コンポーネントとしてのワイヤタップは非同期で呼び出されません。代わりに、Spring Integration は、非同期動作を構成するための単一の統一アプローチであるメッセージチャネルに焦点を当てています。メッセージフローの特定の部分を同期または非同期にするのは、そのフロー内で構成されたメッセージチャネルの型です。これは、メッセージチャネルの抽象化の主な利点の 1 つです。フレームワークの開始から、フレームワークの第一級オブジェクトとしてのメッセージチャネルの必要性と価値を常に強調してきました。これは、EIP パターンの内部的な暗黙的な実現だけではありません。構成可能なコンポーネントとしてエンドユーザーに完全に公開されます。そのため、ワイヤータップコンポーネントは、次のタスクの実行のみを担当します。

  • チャネルをタップしてメッセージフローをインターセプトする (たとえば、channelA)

  • 各メッセージを取得する

  • 別のチャネルにメッセージを送信します (たとえば、channelB)

これは基本的にブリッジパターンのバリエーションですが、チャネル定義内にカプセル化されます(したがって、フローを中断せずに有効化および無効化するのが簡単です)。また、ブリッジとは異なり、基本的に別のメッセージフローを分岐します。そのフローは同期ですか、非同期ですか? 答えは、"channelB" であるメッセージチャネルの型によって異なります。次のオプションがあります: ダイレクトチャンネル、ポーリング可能チャンネル、エグゼキューターチャンネル。最後の 2 つはスレッド境界を破り、そのようなチャネルを介した通信を非同期にします。これは、そのチャネルからサブスクライブされたハンドラーへのメッセージのディスパッチが、そのチャネルへのメッセージ送信に使用したスレッドとは異なるスレッドで発生するためです。それが、ワイヤタップフローを同期または非同期にするものです。フレームワーク内の他のコンポーネント(メッセージパブリッシャーなど)と一貫性があり、特定のコードを次のように実装する必要があるかどうかを事前に心配すること(スレッドセーフコードを書くこと以外)を回避することで、一定レベルの一貫性とシンプルさを追加します。同期または非同期。メッセージチャネルを介した 2 つのコード(たとえば、コンポーネント A とコンポーネント B)の実際の接続が、コラボレーションを同期または非同期にします。将来的には同期から非同期に変更することもできます。メッセージチャネルを使用すると、コードに触れることなく、迅速に変更できます。

ワイヤタップに関する最後のポイントは、デフォルトでは非同期ではないという上記の根拠にもかかわらず、通常はできるだけ早くメッセージをハンドオフすることが望ましいことを念頭に置いてください。ワイヤタップの送信チャネルとして非同期チャネルオプションを使用することは非常に一般的です。ただし、デフォルトでは非同期動作を強制しません。トランザクション境界を壊したくない場合を含めて、壊した場合に壊されるユースケースがいくつかあります。おそらく、監査目的でワイヤータップパターンを使用し、元のトランザクション内で監査メッセージを送信する必要があります。例として、ワイヤタップを JMS 発信チャネルアダプターに接続できます。これにより、両方の長所を活用できます。1)JMS メッセージの送信はトランザクション内で行うことができますが、2)まだ「ファイアアンドフォーゲット」アクションなので、メインメッセージフローの顕著な遅延を防ぎます。

バージョン 4.0 以降、インターセプター(WireTap クラス (英語) など)がチャネルを参照する場合、循環参照を回避することが重要です。現在のインターセプターによってインターセプトされているチャネルからこのようなチャネルを除外する必要があります。これは、適切なパターンで、またはプログラムで実行できます。channel を参照するカスタム ChannelInterceptor がある場合は、VetoCapableInterceptor の実装を検討してください。そのようにして、フレームワークはインターセプターに、提供されたパターンに基づいて、候補である各チャネルをインターセプトしてもよいかどうかを確認します。また、インターセプターメソッドにランタイム保護を追加して、インターセプターによって参照されるチャネルではないことを確認することもできます。WireTap はこれらの手法の両方を使用します。

バージョン 4.3 以降、WireTap には、MessageChannel インスタンスの代わりに channelName を使用する追加のコンストラクターがあります。これは、Java 構成およびチャネル自動作成ロジックが使用されている場合に便利です。ターゲット MessageChannel Bean は、後でインターセプターとの最初の対話で、提供された channelName から解決されます。

チャネル解決には BeanFactory が必要であるため、ワイヤータップインスタンスは Spring 管理の Bean でなければなりません。

このレイトバインディングアプローチにより、次の例に示すように、Java DSL 構成を使用して一般的な盗聴パターンを簡素化することもできます。

@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}
条件付きワイヤタップ

selector または selector-expression 属性を使用して、ワイヤタップを条件付きにすることができます。selector は MessageSelector Bean を参照します。MessageSelector は、実行時にメッセージをタップチャネルに送信するかどうかを決定できます。同様に、selector-expression は同じ目的を実行するブール SpEL 式です。式が true に評価される場合、メッセージはタップチャネルに送信されます。

グローバルワイヤタップ構成

グローバルチャネルインターセプターの構成の特殊なケースとしてグローバルワイヤタップを設定することが可能です。そのためには、最上位の wire-tap 要素を構成します。現在、通常の wire-tap 名前空間のサポートに加えて、pattern および order 属性がサポートされており、channel-interceptor の場合とまったく同じように機能します。次の例は、グローバルワイヤタップを設定する方法を示しています。

Java
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
    return new WireTap(wiretapChannel);
}
XML
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
グローバルワイヤタップは、既存のチャネル構成を変更せずに、外部でシングルチャネルワイヤタップを構成する便利な方法を提供します。これを行うには、pattern 属性をターゲットチャネル名に設定します。例: この手法を使用して、チャネル上のメッセージを検証するテストケースを構成できます。

特別チャンネル

デフォルトでは、errorChannel と nullChannel の 2 つの特別なチャネルがアプリケーションコンテキスト内で定義されています。'nullChannel' (NullChannel のインスタンス)は /dev/null のように機能し、DEBUG レベルで送信されたメッセージをログに記録し、すぐに戻ります。送信されたメッセージの org.reactivestreams.Publisher ペイロードには特別な処理が適用されます。データは破棄されますが、リアクティブストリーム処理を開始するために、このチャネルですぐにサブスクライブされます。リアクティブストリーム処理(Subscriber.onError(Throwable) を参照)からスローされたエラーは、調査の可能性があるため、警告レベルでログに記録されます。このようなエラーで何かを行う必要がある場合は、Mono.doOnError() をカスタマイズした ReactiveRequestHandlerAdvice をメッセージハンドラーに適用して、この nullChannel に Mono 応答を生成できます。気にしない応答でチャネル解決エラーが発生した場合はいつでも、影響を受けるコンポーネントの output-channel 属性を "nullChannel" に設定できます(名前 "nullChannel" はアプリケーションコンテキスト内で予約されています)。

'errorChannel' は、エラーメッセージを送信するために内部的に使用され、カスタム構成でオーバーライドされる場合があります。これについては、エラー処理で詳しく説明されています。

メッセージチャネルとインターセプターの詳細については、Java DSL の章のメッセージチャンネルも参照してください。