このバージョンはまだ開発中であり、まだ安定しているとは見なされていません。最新の安定バージョンについては、Spring Integration 6.5.3 を使用してください! |
メッセージチャネルの実装
Spring Integration は、さまざまなメッセージチャネルの実装を提供します。次のセクションでは、それぞれについて簡単に説明します。
PublishSubscribeChannel
PublishSubscribeChannel 実装は、送信された Message をそのサブスクライブされたすべてのハンドラーにブロードキャストします。これは、主に通知がイベントメッセージの送信に最もよく使用されます(通常、単一のハンドラーによって処理されることを意図したドキュメントメッセージとは対照的です)。PublishSubscribeChannel は送信専用であることに注意してください。send(Message) メソッドが呼び出されると、サブスクライバーに直接ブロードキャストするため、コンシューマーはメッセージをポーリングできません(PollableChannel を実装していないため、receive() メソッドはありません)。代わりに、サブスクライバー自体が MessageHandler でなければならず、サブスクライバーの handleMessage(Message) メソッドが順番に呼び出されます。
バージョン 3.0 より前は、サブスクライバーがいない PublishSubscribeChannel で send メソッドを呼び出すと false が返されました。MessagingTemplate と組み合わせて使用すると、MessageDeliveryException がスローされました。バージョン 3.0 から、少なくとも最小サブスクライバーが存在する(およびメッセージを正常に処理する)場合に send が常に成功したと見なされるように、動作が変更されました。この動作を変更するには、minSubscribers プロパティを設定します。デフォルトは 0 です。
TaskExecutor を使用する場合、メッセージの実際の処理は非同期に実行されるため、この決定には正しい数のサブスクライバーの存在のみが使用されます。 |
QueueChannel
QueueChannel 実装はキューをラップします。PublishSubscribeChannel とは異なり、QueueChannel にはポイントツーポイントのセマンティクスがあります。つまり、チャンネルに複数のコンシューマーがある場合でも、そのチャンネルに送信された Message を受信できるのはそのうちの 1 つだけです。次のように、デフォルトの引数なしのコンストラクター(本質的に Integer.MAX_VALUE の無制限の容量を提供)と、キュー容量を受け入れるコンストラクターを提供します。
public QueueChannel(int capacity) 容量制限に達していないチャネルは、メッセージを内部キューに格納し、send(Message<?>) メソッドは、メッセージを処理する受信側がなくてもすぐに戻ります。キューが容量に達した場合、送信側はキューに空きができるまでブロックします。または、追加のタイムアウトパラメーターを持つ send メソッドを使用する場合は、空きができるまで、またはタイムアウト期間が経過するまで、キューはブロックされます (どちらか早い方)。同様に、receive() 呼び出しはキューにメッセージがある場合はすぐに戻りますが、キューが空の場合、receive 呼び出しは、メッセージが使用可能になるか、タイムアウトが指定されている場合はタイムアウトが経過するまでブロックされることがあります。いずれの場合でも、タイムアウト値に 0 を渡すことで、キューの状態に関係なく、すぐに戻るように強制できます。ただし、timeout パラメーターを指定しない send() および receive() のバージョンを呼び出すと、無期限にブロックされることに注意してください。
PriorityChannel
QueueChannel は先入れ先出し(FIFO)の順序付けを実行しますが、PriorityChannel は優先度に基づいてチャネル内でメッセージを順序付けできる代替実装です。デフォルトでは、優先度は各メッセージ内の priority ヘッダーによって決定されます。ただし、カスタム優先順位決定ロジックの場合、Comparator<Message<?>> 型のコンパレーターを PriorityChannel コンストラクターに提供できます。
RendezvousChannel
RendezvousChannel は、「直接ハンドオフ」シナリオを可能にします。このシナリオでは、送信者は、別のパーティがチャネルの receive() メソッドを呼び出すまでブロックします。相手は、送信者がメッセージを送信するまでブロックします。内部的には、この実装は SynchronousQueue (BlockingQueue のゼロ容量実装)を使用することを除いて、QueueChannel と非常に似ています。これは、送信者と受信者が異なるスレッドで動作する状況ではうまく機能しますが、メッセージを非同期的にキューにドロップすることは適切ではありません。つまり、RendezvousChannel では、送信者は一部の受信者がメッセージを受け入れたことを認識しますが、QueueChannel では、メッセージは内部キューに格納され、受信されない可能性があります。
これらのキューベースのチャネルはすべて、デフォルトでのみメッセージをメモリに保存していることに注意してください。永続性が必要な場合は、'queue' 要素内で 'message-store' 属性を提供して永続的な MessageStore 実装を参照するか、ローカルチャネルを JMS で裏付けられたチャネルやチャネルアダプターなどの永続的なブローカによってバッキングされたものに置き換えることができます。後者のオプションを使用すると、JMS サポートで説明したように、JMS プロバイダーのメッセージ永続性の実装を利用できます。ただし、キューにバッファリングする必要がない場合、最も簡単なアプローチは、次のセクションで説明する DirectChannel に依存することです。 |
RendezvousChannel は、リクエスト応答操作の実装にも役立ちます。送信者は、RendezvousChannel の一時的な匿名インスタンスを作成し、Message を構築するときにそれを 'replyChannel' ヘッダーとして設定できます。その Message を送信した後、送信者はすぐに receive を呼び出し (オプションでタイムアウト値を指定)、応答 Message を待機している間ブロックできます。これは、多くの Spring Integration のリクエスト応答コンポーネントで内部的に使用されている実装と非常によく似ています。
DirectChannel
DirectChannel にはポイントツーポイントセマンティクスがありますが、それ以外の点では、前述のキューベースのチャネル実装のいずれよりも PublishSubscribeChannel に類似しています。PollableChannel インターフェースの代わりに SubscribableChannel インターフェースを実装するため、サブスクライバーに直接メッセージをディスパッチします。ただし、ポイントツーポイントチャネルとして、各 Message を単一のサブスクライブされた MessageHandler に送信するという点で、PublishSubscribeChannel とは異なります。
最もシンプルなポイントツーポイントチャネルオプションであることに加えて、最も重要な機能の 1 つは、単一スレッドがチャネルの「両側」で操作を実行できるようにすることです。例: ハンドラーが DirectChannel をサブスクライブする場合、Message をそのチャネルに送信すると、send() メソッドの呼び出しが戻る前に、そのハンドラーの handleMessage(Message) メソッドの呼び出しが送信者のスレッドで直接トリガーされます。
The key motivation for providing a channel implementation with this behavior is to support transactions that must span across the channel while still benefiting from the abstraction and loose coupling that the channel provides. If the send() call is invoked within the scope of a transaction, the outcome of the handler’s invocation, (for example, updating a database record) plays a role in determining the ultimate result of that transaction (commit or rollback).
Since the DirectChannel is the simplest option and does not add any additional overhead that would be required for scheduling and managing the threads of a poller, it is the default channel type within Spring Integration. The general idea is to define the channels for an application, consider which of those need to provide buffering or to throttle input, and modify those to be queue-based PollableChannels. Likewise, if a channel needs to broadcast messages, it should not be a DirectChannel but rather a PublishSubscribeChannel. Later we show how each of these channels can be configured. |
DirectChannel は、サブスクライブされたメッセージハンドラーを呼び出すために、メッセージディスパッチャーに内部的に委譲します。そのディスパッチャーは、load-balancer または load-balancer-ref 属性(相互に排他的)によって公開される負荷分散戦略を持つことができます。負荷分散戦略は、複数のメッセージハンドラーが同じチャネルにサブスクライブするときに、メッセージがメッセージハンドラー間でどのように分散されるかを決定するためにメッセージディスパッチャーによって使用されます。便宜上、load-balancer 属性は、LoadBalancingStrategy の既存の実装を指す値の列挙を公開します。使用可能な値は、round-robin (ローテーション中のハンドラー間の負荷分散)と none (負荷分散を明示的に無効にする場合)のみです。他の戦略の実装は、将来のバージョンで追加される可能性があります。ただし、バージョン 3.0 以降、LoadBalancingStrategy の独自の実装を提供し、次の例に示すように、LoadBalancingStrategy を実装する Bean を指す load-balancer-ref 属性を使用してそれを挿入できます。
FixedSubscriberChannel は、サブスクライブ解除できない単一の MessageHandler サブスクライバーのみをサポートする SubscribableChannel です。これは、他のサブスクライバーが関与せず、チャネルインターセプターが必要ない、高スループットパフォーマンスのユースケースに役立ちます。
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>load-balancer 属性と load-balancer-ref 属性は相互に排他的であることに注意してください。
ロードバランシングは、ブール failover プロパティと連携して機能します。failover 値が true(デフォルト)の場合、前のハンドラーが例外をスローすると、ディスパッチャーは(必要に応じて)後続のハンドラーにフォールバックします。順序は、ハンドラー自体で定義されたオプションの順序値、またはそのような値が存在しない場合、ハンドラーがサブスクライブする順序によって決定されます。
特定の状況で、ディスパッチャーが常に最初のハンドラーを呼び出して、エラーが発生するたびに同じ固定順序でフォールバックすることを必要とする場合、負荷分散戦略は提供されません。つまり、ロードバランシングが有効になっていない場合でも、ディスパッチャーは failover ブールプロパティを引き続きサポートします。ただし、負荷分散を行わない場合、ハンドラーの呼び出しは、順序に従って常に最初から開始されます。例: プライマリ、セカンダリ、ターシャリなどの明確な定義がある場合、このアプローチはうまく機能します。名前空間のサポートを使用する場合、エンドポイントの order 属性が順序を決定します。
負荷分散と failover は、チャネルに複数のサブスクライブされたメッセージハンドラーがある場合にのみ適用されることに注意してください。ネームスペースサポートを使用する場合、これは、複数のエンドポイントが input-channel 属性で定義された同じチャネル参照を共有することを意味します。 |
バージョン 5.2 以降、failover が true の場合、現在のハンドラーの失敗と失敗したメッセージは、それぞれ構成されている場合、debug または info に記録されます。
ExecutorChannel
ExecutorChannel は、DirectChannel (負荷分散戦略および failover ブールプロパティ)と同じディスパッチャー構成をサポートするポイントツーポイントチャネルです。これら 2 つのディスパッチチャネル型の主な違いは、ExecutorChannel が TaskExecutor のインスタンスに委譲してディスパッチを実行することです。これは、通常、send メソッドがブロックしないことを意味しますが、ハンドラーの呼び出しが送信者のスレッドで発生しない可能性があることも意味します。送信側と受信側のハンドラーにまたがるトランザクションはサポートしていません。
送信者がブロックすることがあります。例: TaskExecutor を使用して、クライアントを調整する拒否ポリシー(ThreadPoolExecutor.CallerRunsPolicy など)を使用すると、送信者のスレッドは、スレッドプールが最大容量になり、エグゼキューターの作業キューがいっぱいになるといつでもメソッドを実行できます。そのような状況は予測不可能な方法でのみ発生するため、トランザクションに依存するべきではありません。 |
PartitionedChannel
バージョン 6.1 以降、PartitionedChannel 実装が提供されます。これは AbstractExecutorChannel の拡張であり、実際の消費が特定のスレッドで処理されるポイントツーポイントディスパッチングロジックを表し、このチャネルに送信されたメッセージから評価されたパーティションキーによって決定されます。このチャネルは前述の ExecutorChannel に似ていますが、同じパーティションキーを持つメッセージが常に同じスレッドで処理され、順序が維持される点が異なります。外部 TaskExecutor は必要ありませんが、カスタム ThreadFactory (例: Thread.ofVirtual().name("partition-", 0).factory()) で構成できます。このファクトリは、パーティションごとにシングルスレッドエグゼキュータを MessageDispatcher デリゲートに設定するために使用されます。デフォルトでは、IntegrationMessageHeaderAccessor.CORRELATION_ID メッセージヘッダーがパーティションキーとして使用されます。このチャネルは、単純な Bean として構成できます。
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
} チャネルには 3 パーティション (専用スレッド) が含まれます。partitionKey ヘッダーを使用して、メッセージがどのパーティションで処理されるかを決定します。詳細については、PartitionedChannel クラスの Javadoc を参照してください。
FluxMessageChannel
FluxMessageChannel は、ダウンストリームのリアクティブサブスクライバーによるオンデマンド消費のために、"sinking" が内部 reactor.core.publisher.Flux にメッセージを送信するための org.reactivestreams.Publisher 実装です。このチャネル実装は SubscribableChannel でも PollableChannel でもないため、org.reactivestreams.Subscriber インスタンスのみを使用して、このチャネルからリアクティブストリームのバックプレッシャー特性を尊重することができます。一方、FluxMessageChannel は subscribeTo(Publisher<Message<?>>) 契約で ReactiveStreamsSubscribableChannel を実装し、リアクティブソースパブリッシャーからのイベントの受信を可能にし、リアクティブストリームを統合フローにブリッジします。統合フロー全体に対して完全にリアクティブな動作を実現するには、フロー内のすべてのエンドポイント間にそのようなチャネルを配置する必要があります。
Reactive Streams との相互作用の詳細については、Reactive Streams サポートを参照してください。
スコープチャネル
Spring Integration 1.0 は ThreadLocalChannel 実装を提供していましたが、2.0 から削除されました。同じ要件を処理するより一般的な方法は、チャネルに scope 属性を追加することです。属性の値には、コンテキスト内で使用可能なスコープの名前を指定できます。例: Web 環境では、特定のスコープを使用でき、カスタムスコープの実装をコンテキストに登録できます。次の例は、スコープ自体の登録を含め、スレッドローカルスコープがチャネルに適用されていることを示しています。
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean> 前の例で定義したチャネルも内部的にキューに委譲しますが、チャネルは現在のスレッドにバインドされているため、キューの内容も同様にバインドされます。そうすれば、チャネルに送信するスレッドは後で同じメッセージを受信できますが、他のスレッドはそれらにアクセスできません。スレッドスコープのチャネルが必要になることはめったにありませんが、DirectChannel インスタンスを使用して単一のスレッドオペレーションを実施しているが、応答メッセージは「ターミナル」チャネルに送信する必要がある状況で役立ちます。そのターミナルチャネルがスレッドスコープの場合、元の送信スレッドはターミナルチャネルから応答を収集できます。
これで、任意のチャネルのスコープを設定できるため、スレッドローカルに加えて独自のスコープを定義できます。