メッセージチャネルの実装

Spring Integration は、さまざまなメッセージチャネルの実装を提供します。次のセクションでは、それぞれについて簡単に説明します。

PublishSubscribeChannel

PublishSubscribeChannel 実装は、送信された Message をそのサブスクライブされたすべてのハンドラーにブロードキャストします。これは、主に通知がイベントメッセージの送信に最もよく使用されます(通常、単一のハンドラーによって処理されることを意図したドキュメントメッセージとは対照的です)。PublishSubscribeChannel は送信専用であることに注意してください。send(Message) メソッドが呼び出されると、サブスクライバーに直接ブロードキャストするため、コンシューマーはメッセージをポーリングできません(PollableChannel を実装していないため、receive() メソッドはありません)。代わりに、サブスクライバー自体が MessageHandler でなければならず、サブスクライバーの handleMessage(Message) メソッドが順番に呼び出されます。

バージョン 3.0 より前は、サブスクライバーがいない PublishSubscribeChannel で send メソッドを呼び出すと false が返されました。MessagingTemplate と組み合わせて使用すると、MessageDeliveryException がスローされました。バージョン 3.0 から、少なくとも最小サブスクライバーが存在する(およびメッセージを正常に処理する)場合に send が常に成功したと見なされるように、動作が変更されました。この動作を変更するには、minSubscribers プロパティを設定します。デフォルトは 0 です。

TaskExecutor を使用する場合、メッセージの実際の処理は非同期に実行されるため、この決定には正しい数のサブスクライバーの存在のみが使用されます。

QueueChannel

QueueChannel 実装はキューをラップします。PublishSubscribeChannel とは異なり、QueueChannel にはポイントツーポイントのセマンティクスがあります。つまり、チャンネルに複数のコンシューマーがある場合でも、そのチャンネルに送信された Message を受信できるのはそのうちの 1 つだけです。次のように、デフォルトの引数なしのコンストラクター(本質的に Integer.MAX_VALUE の無制限の容量を提供)と、キュー容量を受け入れるコンストラクターを提供します。

public QueueChannel(int capacity)

容量制限に達していないチャネルは、内部キューにメッセージを格納し、メッセージを処理する準備ができている受信者がいない場合でも、send(Message<?>) メソッドはすぐに戻ります。キューが容量に達すると、送信者はキューに空きができるまでブロックします。または、追加のタイムアウトパラメーターを持つ send メソッドを使用する場合、キューは、空きができるかタイムアウト期間が経過するかのどちらか早い方までブロックされます。同様に、receive() 呼び出しは、メッセージがキューで利用可能な場合はすぐに戻りますが、キューが空の場合、メッセージが利用可能になるか、タイムアウトが提供される場合はタイムアウトになるまで受信呼び出しはブロックされます。どちらの場合でも、タイムアウト値 0 を渡すことで、キューの状態に関係なくすぐに強制的に戻ることができます。ただし、timeout パラメーターブロックを使用せずに send() および receive() のバージョンを呼び出すことに注意してください。

PriorityChannel

QueueChannel は先入れ先出し(FIFO)の順序付けを実行しますが、PriorityChannel は優先度に基づいてチャネル内でメッセージを順序付けできる代替実装です。デフォルトでは、優先度は各メッセージ内の priority ヘッダーによって決定されます。ただし、カスタム優先順位決定ロジックの場合、Comparator<Message<?>> 型のコンパレーターを PriorityChannel コンストラクターに提供できます。

RendezvousChannel

RendezvousChannel は、「直接ハンドオフ」シナリオを可能にします。このシナリオでは、送信者は、別のパーティがチャネルの receive() メソッドを呼び出すまでブロックします。相手は、送信者がメッセージを送信するまでブロックします。内部的には、この実装は SynchronousQueue (BlockingQueue のゼロ容量実装)を使用することを除いて、QueueChannel と非常に似ています。これは、送信者と受信者が異なるスレッドで動作する状況ではうまく機能しますが、メッセージを非同期的にキューにドロップすることは適切ではありません。つまり、RendezvousChannel では、送信者は一部の受信者がメッセージを受け入れたことを認識しますが、QueueChannel では、メッセージは内部キューに格納され、受信されない可能性があります。

これらのキューベースのチャネルはすべて、デフォルトでのみメッセージをメモリに保存していることに注意してください。永続性が必要な場合は、'queue' 要素内で 'message-store' 属性を提供して永続的な MessageStore 実装を参照するか、ローカルチャネルを JMS で裏付けられたチャネルやチャネルアダプターなどの永続的なブローカによってバッキングされたものに置き換えることができます。後者のオプションを使用すると、JMS サポートで説明したように、JMS プロバイダーのメッセージ永続性の実装を利用できます。ただし、キューにバッファリングする必要がない場合、最も簡単なアプローチは、次のセクションで説明する DirectChannel に依存することです。

RendezvousChannel は、リクエスト / 応答操作の実装にも役立ちます。送信者は、RendezvousChannel の一時的な匿名インスタンスを作成し、Message を構築するときに "replyChannel" ヘッダーとして設定します。その Message を送信した後、送信者はすぐに receive (オプションでタイムアウト値を提供)を呼び出して、応答 Message を待っている間にブロックできます。これは、Spring Integration のリクエスト / 応答コンポーネントの多くで内部的に使用される実装に非常に似ています。

DirectChannel

DirectChannel にはポイントツーポイントセマンティクスがありますが、それ以外の点では、前述のキューベースのチャネル実装のいずれよりも PublishSubscribeChannel に類似しています。PollableChannel インターフェースの代わりに SubscribableChannel インターフェースを実装するため、サブスクライバーに直接メッセージをディスパッチします。ただし、ポイントツーポイントチャネルとして、各 Message を単一のサブスクライブされた MessageHandler に送信するという点で、PublishSubscribeChannel とは異なります。

最もシンプルなポイントツーポイントチャネルオプションであることに加えて、最も重要な機能の 1 つは、単一スレッドがチャネルの「両側」で操作を実行できるようにすることです。例: ハンドラーが DirectChannel をサブスクライブする場合、Message をそのチャネルに送信すると、send() メソッドの呼び出しが戻る前に、そのハンドラーの handleMessage(Message) メソッドの呼び出しが送信者のスレッドで直接トリガーされます。

この動作を備えたチャネル実装を提供する主な動機は、チャネルが提供する抽象化と緩い結合の恩恵を受けながら、チャネル全体にまたがる必要があるトランザクションをサポートすることです。send() 呼び出しがトランザクションのスコープ内で呼び出された場合、ハンドラーの呼び出しの結果(たとえば、データベースレコードの更新)は、そのトランザクションの最終的な結果(コミットまたはロールバック)を決定する上でロールを果たします。

DirectChannel は最も単純なオプションであり、ポーラーのスレッドのスケジューリングと管理に必要な追加のオーバーヘッドを追加しないため、Spring Integration 内のデフォルトのチャネル型です。一般的な考え方は、アプリケーションのチャネルを定義し、バッファリングを提供する必要があるか、入力を調整する必要があるかを検討し、キューベースの PollableChannels に変更することです。同様に、チャネルがメッセージをブロードキャストする必要がある場合、DirectChannel ではなく PublishSubscribeChannel である必要があります。後で、これらの各チャネルの構成方法を示します。

DirectChannel は、サブスクライブされたメッセージハンドラーを呼び出すために、メッセージディスパッチャーに内部的に委譲します。そのディスパッチャーは、load-balancer または load-balancer-ref 属性(相互に排他的)によって公開される負荷分散戦略を持つことができます。負荷分散戦略は、複数のメッセージハンドラーが同じチャネルにサブスクライブするときに、メッセージがメッセージハンドラー間でどのように分散されるかを決定するためにメッセージディスパッチャーによって使用されます。便宜上、load-balancer 属性は、LoadBalancingStrategy の既存の実装を指す値の列挙を公開します。使用可能な値は、round-robin (ローテーション中のハンドラー間の負荷分散)と none (負荷分散を明示的に無効にする場合)のみです。他の戦略の実装は、将来のバージョンで追加される可能性があります。ただし、バージョン 3.0 以降、LoadBalancingStrategy の独自の実装を提供し、次の例に示すように、LoadBalancingStrategy を実装する Bean を指す load-balancer-ref 属性を使用してそれを挿入できます。

FixedSubscriberChannel は、サブスクライブ解除できない単一の MessageHandler サブスクライバーのみをサポートする SubscribableChannel です。これは、他のサブスクライバーが関与せず、チャネルインターセプターが必要ない、高スループットパフォーマンスのユースケースに役立ちます。

<int:channel id="lbRefChannel">
  <int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>

load-balancer 属性と load-balancer-ref 属性は相互に排他的であることに注意してください。

ロードバランシングは、ブール failover プロパティと連携して機能します。failover 値が true(デフォルト)の場合、前のハンドラーが例外をスローすると、ディスパッチャーは(必要に応じて)後続のハンドラーにフォールバックします。順序は、ハンドラー自体で定義されたオプションの順序値、またはそのような値が存在しない場合、ハンドラーがサブスクライブする順序によって決定されます。

特定の状況で、ディスパッチャーが常に最初のハンドラーを呼び出して、エラーが発生するたびに同じ固定順序でフォールバックすることを必要とする場合、負荷分散戦略は提供されません。つまり、ロードバランシングが有効になっていない場合でも、ディスパッチャーは failover ブールプロパティを引き続きサポートします。ただし、負荷分散を行わない場合、ハンドラーの呼び出しは、順序に従って常に最初から開始されます。例: プライマリ、セカンダリ、ターシャリなどの明確な定義がある場合、このアプローチはうまく機能します。名前空間のサポートを使用する場合、エンドポイントの order 属性が順序を決定します。

負荷分散と failover は、チャネルに複数のサブスクライブされたメッセージハンドラーがある場合にのみ適用されることに注意してください。ネームスペースサポートを使用する場合、これは、複数のエンドポイントが input-channel 属性で定義された同じチャネル参照を共有することを意味します。

バージョン 5.2 以降、failover が true の場合、現在のハンドラーの失敗と失敗したメッセージは、それぞれ構成されている場合、debug または info に記録されます。

ExecutorChannel

ExecutorChannel は、DirectChannel (負荷分散戦略および failover ブールプロパティ)と同じディスパッチャー構成をサポートするポイントツーポイントチャネルです。これら 2 つのディスパッチチャネル型の主な違いは、ExecutorChannel が TaskExecutor のインスタンスに委譲してディスパッチを実行することです。これは、通常、send メソッドがブロックしないことを意味しますが、ハンドラーの呼び出しが送信者のスレッドで発生しない可能性があることも意味します。送信側と受信側のハンドラーにまたがるトランザクションはサポートしていません。

送信者がブロックすることがあります。例: TaskExecutor を使用して、クライアントを調整する拒否ポリシー(ThreadPoolExecutor.CallerRunsPolicy など)を使用すると、送信者のスレッドは、スレッドプールが最大容量になり、エグゼキューターの作業キューがいっぱいになるといつでもメソッドを実行できます。そのような状況は予測不可能な方法でのみ発生するため、トランザクションに依存するべきではありません。

PartitionedChannel

バージョン 6.1 以降、PartitionedChannel 実装が提供されます。これは AbstractExecutorChannel の拡張であり、実際の消費が特定のスレッドで処理されるポイントツーポイントディスパッチングロジックを表し、このチャネルに送信されたメッセージから評価されたパーティションキーによって決定されます。このチャネルは前述の ExecutorChannel に似ていますが、同じパーティションキーを持つメッセージが常に同じスレッドで処理され、順序が維持される点が異なります。外部 TaskExecutor は必要ありませんが、カスタム ThreadFactory (例: Thread.ofVirtual().name("partition-", 0).factory()) で構成できます。このファクトリは、パーティションごとにシングルスレッドエグゼキュータを MessageDispatcher デリゲートに設定するために使用されます。デフォルトでは、IntegrationMessageHeaderAccessor.CORRELATION_ID メッセージヘッダーがパーティションキーとして使用されます。このチャネルは、単純な Bean として構成できます。

@Bean
PartitionedChannel somePartitionedChannel() {
    return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}

チャネルには 3 パーティション (専用スレッド) が含まれます。partitionKey ヘッダーを使用して、メッセージがどのパーティションで処理されるかを決定します。詳細については、PartitionedChannel クラスの Javadoc を参照してください。

FluxMessageChannel

FluxMessageChannel は、ダウンストリームのリアクティブサブスクライバーによるオンデマンド消費のために、"sinking" が内部 reactor.core.publisher.Flux にメッセージを送信するための org.reactivestreams.Publisher 実装です。このチャネル実装は SubscribableChannel でも PollableChannel でもないため、org.reactivestreams.Subscriber インスタンスのみを使用して、このチャネルからリアクティブストリームのバックプレッシャー特性を尊重することができます。一方、FluxMessageChannel は subscribeTo(Publisher<Message<?>>) 契約で ReactiveStreamsSubscribableChannel を実装し、リアクティブソースパブリッシャーからのイベントの受信を可能にし、リアクティブストリームを統合フローにブリッジします。統合フロー全体に対して完全にリアクティブな動作を実現するには、フロー内のすべてのエンドポイント間にそのようなチャネルを配置する必要があります。

Reactive Streams との相互作用の詳細については、Reactive Streams サポートを参照してください。

スコープチャネル

Spring Integration 1.0 は ThreadLocalChannel 実装を提供していましたが、2.0 から削除されました。同じ要件を処理するより一般的な方法は、チャネルに scope 属性を追加することです。属性の値には、コンテキスト内で使用可能なスコープの名前を指定できます。例: Web 環境では、特定のスコープを使用でき、カスタムスコープの実装をコンテキストに登録できます。次の例は、スコープ自体の登録を含め、スレッドローカルスコープがチャネルに適用されていることを示しています。

<int:channel id="threadScopedChannel" scope="thread">
     <int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
    <property name="scopes">
        <map>
            <entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
        </map>
    </property>
</bean>

前の例で定義したチャネルも内部的にキューに委譲しますが、チャネルは現在のスレッドにバインドされているため、キューの内容も同様にバインドされます。そうすれば、チャネルに送信するスレッドは後で同じメッセージを受信できますが、他のスレッドはそれらにアクセスできません。スレッドスコープのチャネルが必要になることはめったにありませんが、DirectChannel インスタンスを使用して単一のスレッドオペレーションを実施しているが、応答メッセージは「ターミナル」チャネルに送信する必要がある状況で役立ちます。そのターミナルチャネルがスレッドスコープの場合、元の送信スレッドはターミナルチャネルから応答を収集できます。

これで、任意のチャネルのスコープを設定できるため、スレッドローカルに加えて独自のスコープを定義できます。