メッセージチャネルの構成

メッセージチャネルインスタンスを作成するには、次のように、xml の場合は <channel/> 要素を、Java 構成の場合は DirectChannel インスタンスを使用できます。

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new DirectChannel();
}
<int:channel id="exampleChannel"/>

サブエレメントなしで <channel/> エレメントを使用すると、DirectChannel インスタンス(SubscribableChannel)が作成されます。

パブリッシュ / サブスクライブチャネルを作成するには、次のように <publish-subscribe-channel/> 要素(Java では PublishSubscribeChannel)を使用します。

  • Java

  • XML

@Bean
public MessageChannel exampleChannel() {
    return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>

あるいは、さまざまな <queue/> サブ要素を提供して、ポーリング可能なチャネル型のいずれかを作成することもできます(メッセージチャネルの実装で説明されています)。次のセクションでは、各チャネル型の例を示します。

DirectChannel 設定

前述のように、DirectChannel はデフォルトの型です。次のリストは、誰を定義するかを示しています。

  • Java

  • XML

@Bean
public MessageChannel directChannel() {
    return new DirectChannel();
}
<int:channel id="directChannel"/>

デフォルトのチャネルにはラウンドロビンロードバランサーがあり、フェイルオーバーも有効になっています(詳細については DirectChannel を参照してください)。これらの一方または両方を無効にするには、<dispatcher/> サブ要素(DirectChannel の LoadBalancingStrategy コンストラクター)を追加し、次のように属性を構成します。

  • Java

  • XML

@Bean
public MessageChannel failFastChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setFailover(false);
    return channel;
}

@Bean
public MessageChannel failFastChannel() {
    return new DirectChannel(null);
}
<int:channel id="failFastChannel">
    <int:dispatcher failover="false"/>
</channel>

<int:channel id="channelWithFixedOrderSequenceFailover">
    <int:dispatcher load-balancer="none"/>
</int:channel>

バージョン 6.3 以降、UnicastingDispatcher に基づくすべての MessageChannel 実装は、単純な failover オプションの代わりに Predicate<Exception> failoverStrategy を使用して構成できます。この述語は、現在の MessageHandler からスローされた例外に基づいて、次の MessageHandler にフェイルオーバーするかどうかを決定します。より複雑なエラー分析は、ErrorMessageExceptionTypeRouter を使用して実行する必要があります。

データ型チャネルの構成

コンシューマーは特定の型のペイロードのみを処理できる場合があり、入力メッセージのペイロード型を確認する必要があります。最初に思い浮かぶのは、メッセージフィルターを使用することです。ただし、メッセージフィルターでできることは、コンシューマーの要件に準拠していないメッセージを除外することだけです。別の方法は、コンテンツベースのルーターを使用し、非準拠のデータ型のメッセージを特定のトランスフォーマーにルーティングして、必要なデータ型への変換と変換を実施することです。これは機能しますが、同じことを達成するためのより簡単な方法は、データ型チャネル (英語) パターンを適用することです。特定のペイロードデータ型ごとに個別のデータ型チャネルを使用できます。

特定のペイロード型を含むメッセージのみを受け入れるデータ型チャネルを作成するには、次の例に示すように、チャネル要素の datatype 属性にデータ型の完全修飾クラス名を指定します。

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(Number.class);
    return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>

型チェックは、チャネルのデータ型に割り当て可能なすべての型に合格することに注意してください。つまり、前述の例の numberChannel は、ペイロードが java.lang.Integer または java.lang.Double であるメッセージを受け入れます。次の例に示すように、複数の型をコンマ区切りリストとして提供できます。

  • Java

  • XML

@Bean
public MessageChannel numberChannel() {
    DirectChannel channel = new DirectChannel();
    channel.setDatatypes(String.class, Number.class);
    return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>

前述の例の "numberChannel" は、java.lang.Number のデータ型のメッセージのみを受け入れます。しかし、メッセージのペイロードが必要な型ではない場合はどうなるでしょうか? Spring の変換サービスのインスタンスである integrationConversionService という名前の Bean を定義したかどうかによります。そうでない場合は、Exception がすぐにスローされます。ただし、integrationConversionService Bean を定義している場合は、メッセージのペイロードを許容可能な型に変換する試みで使用されます。

カスタムコンバーターを登録することもできます。例: String ペイロードを含むメッセージを、上記で設定した "numberChannel" に送信するとします。次のようにメッセージを処理できます。

MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));

通常、これは完全に正当的な操作です。ただし、データ型チャネルを使用しているため、このような操作の結果、次のような例外が生成されます。

Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…

ペイロード型を Number にする必要があるため、例外が発生しますが、String を送信しました。そのため、String を Number に変換するものが必要です。そのために、次の例のようなコンバーターを実装できます。

public static class StringToIntegerConverter implements Converter<String, Integer> {
    public Integer convert(String source) {
        return Integer.parseInt(source);
    }
}

次に、次の例に示すように、Integration Conversion Service でコンバーターとして登録できます。

  • Java

  • XML

@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
    return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>

<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>

または、StringToIntegerConverter クラスで、自動スキャン用に @Component アノテーションが付けられている場合。

'converter' 要素が解析されると、integrationConversionService Bean が作成されます(まだ定義されていない場合)。データ型チャネルはそのコンバーターを使用して String ペイロードを Integer に変換するため、そのコンバーターが適切に配置されていれば、send 操作は成功します。

ペイロード型の変換に関する詳細については、ペイロード型変換を参照してください。

バージョン 4.0 以降、integrationConversionService は DefaultDatatypeChannelMessageConverter によって呼び出され、DefaultDatatypeChannelMessageConverter はアプリケーションコンテキストで変換サービスを検索します。別の変換手法を使用するには、チャネルで message-converter 属性を指定できます。これは、MessageConverter 実装への参照でなければなりません。fromMessage メソッドのみが使用されます。コンバーターにメッセージヘッダーへのアクセスを提供します(content-type などのヘッダーからの情報が必要な場合に備えて)。メソッドは、変換されたペイロードまたは完全な Message オブジェクトのみを返すことができます。後者の場合、コンバーターは受信メッセージからすべてのヘッダーをコピーするように注意する必要があります。

あるいは、型 MessageConverter の <bean/> を ID datatypeChannelMessageConverter で宣言でき、そのコンバーターは datatype のすべてのチャネルで使用されます。

QueueChannel 設定

QueueChannel を作成するには、<queue/> サブ要素を使用します。次のようにチャンネルの容量を指定できます。

  • Java

  • XML

@Bean
public PollableChannel queueChannel() {
    return new QueueChannel(25);
}
<int:channel id="queueChannel">
    <queue capacity="25"/>
</int:channel>
この <queue/> サブエレメントの 'capacity' 属性に値を指定しない場合、結果のキューは無制限になります。メモリ不足などの課題を回避するために、制限キューに明示的な値を設定することを強くお勧めします。

永続的な QueueChannel 設定

QueueChannel はメッセージをバッファリングする機能を提供しますが、デフォルトではメモリ内でのみ行うため、システム障害が発生した場合にメッセージが失われる機能があります。このリスクを軽減するために、QueueChannel は、MessageGroupStore 戦略インターフェースの永続的な実装によって支援される場合があります。MessageGroupStore および MessageStore の詳細については、メッセージストアを参照してください。

message-store 属性が使用されている場合、capacity 属性は許可されません。

QueueChannel は Message を受信すると、メッセージをメッセージストアに追加します。Message が QueueChannel からポーリングされると、メッセージストアから削除されます。

デフォルトでは、QueueChannel はメッセージをインメモリキューに格納します。これにより、前述のメッセージの損失シナリオが発生する可能性があります。ただし、Spring Integration は JdbcChannelMessageStore などの永続ストアを提供します。

次の例に示すように、message-store 属性を追加することにより、任意の QueueChannel のメッセージストアを設定できます。

<int:channel id="dbBackedChannel">
    <int:queue message-store="channelStore"/>
</int:channel>

<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource"/>
    <property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>

(Java/Kotlin 構成オプションについては、以下のサンプルを参照してください)

Spring Integration JDBC モジュールは、多くの一般的なデータベース用のスキーマデータ定義言語 (DDL) も提供します。これらのスキーマは、そのモジュール (spring-integration-jdbc) の org.springframework.integration.jdbc.store.channel パッケージにあります。

重要な機能の 1 つは、トランザクション永続ストア(JdbcChannelMessageStore など)では、ポーラーにトランザクションが構成されている限り、トランザクションが正常に完了した場合にのみ、ストアから削除されたメッセージを完全に削除できることです。それ以外の場合、トランザクションはロールバックされ、Message は失われません。

"NoSQL" データストアに関連する Spring プロジェクトの数が増え、これらのストアの基礎となるサポートが提供されるようになったため、メッセージストアの他の多くの実装が利用可能になりました。特定のニーズを満たすものが見つからない場合は、MessageGroupStore インターフェースの独自の実装を提供することもできます。

バージョン 4.0 以降、可能であれば ChannelMessageStore を使用するように QueueChannel インスタンスを構成することをお勧めします。これらは、一般的なメッセージストアと比較して、一般的にこの用途に最適化されています。ChannelMessageStore が ChannelPriorityMessageStore の場合、メッセージは優先順位内で FIFO で受信されます。優先順位の概念は、メッセージストアの実装によって決まります。例: 次の例は、MongoDB チャネルメッセージストアの Java 構成を示しています。

  • Java

  • Java DSL

  • Kotlin DSL

@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
    MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
    store.setPriorityEnabled(true);
    return store;
}

@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
    return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
    return IntegrationFlow.from((Channels c) ->
            c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
            ....
            .get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
    integrationFlow {
        channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
    }
MessageGroupQueue クラスに注意してください。これは、MessageGroupStore 操作を使用する BlockingQueue 実装です。

QueueChannel 環境をカスタマイズする別のオプションは、<int:queue> サブエレメントまたはその特定のコンストラクターの ref 属性によって提供されます。この属性は、java.util.Queue 実装への参照を提供します。例: Hazelcast 分散 IQueue (英語) は、次のように構成できます。

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(new Config()
                                           .setProperty("hazelcast.logging.type", "log4j"));
}

@Bean
public PollableChannel distributedQueue() {
    return new QueueChannel(hazelcastInstance()
                              .getQueue("springIntegrationQueue"));
}

PublishSubscribeChannel 設定

PublishSubscribeChannel を作成するには、<publish-subscribe-channel/> 要素を使用します。この要素を使用する場合、次のように、メッセージの公開に使用する task-executor を指定することもできます(指定しない場合、送信者のスレッドで公開します)。

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>

PublishSubscribeChannel の下流にリシーケンサーまたはアグリゲーターを提供する場合、チャネルの "apply-sequence" プロパティを true に設定できます。そうすることは、メッセージを渡す前に、チャネルが sequence-size および sequence-number メッセージヘッダーと相関 ID を設定する必要があることを示します。例: サブスクライバーが 5 人の場合、sequence-size は 5 に設定され、メッセージは 1 から 5 の範囲の sequence-number ヘッダー値を持ちます。

Executor とともに、ErrorHandler を構成することもできます。デフォルトでは、PublishSubscribeChannel は MessagePublishingErrorHandler 実装を使用して、errorChannel ヘッダーから MessageChannel またはグローバル errorChannel インスタンスにエラーを送信します。Executor が構成されていない場合、ErrorHandler は無視され、呼び出し元のスレッドに例外が直接スローされます。

PublishSubscribeChannel から Resequencer または Aggregator ダウンストリームを提供する場合、チャネルの 'apply-sequence' プロパティを true に設定できます。そうすることは、チャネルがメッセージを渡す前にシーケンス ID とシーケンス番号のメッセージヘッダーと相関 ID を設定する必要があることを示します。例: サブスクライバーが 5 人の場合、sequence-size は 5 に設定され、メッセージは 1 から 5 の範囲のシーケンス番号ヘッダー値を持ちます。

次の例は、apply-sequence ヘッダーを true に設定する方法を示しています。

  • Java

  • XML

@Bean
public MessageChannel pubsubChannel() {
    PublishSubscribeChannel channel = new PublishSubscribeChannel();
    channel.setApplySequence(true);
    return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
パブリッシュ / サブスクライブチャネルがまったく同じメッセージインスタンスを複数の送信チャネルに送信できるように、apply-sequence 値はデフォルトで false です。Spring Integration はペイロードとヘッダー参照の不変性を強制するため、フラグが true に設定されると、チャネルは同じペイロード参照で異なるヘッダー値を持つ新しい Message インスタンスを作成します。

バージョン 5.4.3 以降、PublishSubscribeChannel は、BroadcastingDispatcher の requireSubscribers オプションを使用して構成することもでき、サブスクライバーがない場合にこのチャネルがメッセージをサイレントに無視しないことを示します。サブスクライバーがなく、このオプションが true に設定されている場合、Dispatcher has no subscribers メッセージを含む MessageDispatchingException がスローされます。

ExecutorChannel

ExecutorChannel を作成するには、task-executor 属性を持つ <dispatcher> サブ要素を追加します。属性の値は、コンテキスト内の任意の TaskExecutor を参照できます。例: これにより、サブスクライブされたハンドラーにメッセージをディスパッチするためのスレッドプールの構成が可能になります。前述のように、これにより、送信者と受信者の間のシングルスレッド実行コンテキストが中断され、アクティブなトランザクションコンテキストがハンドラーの呼び出しで共有されなくなります(つまり、ハンドラーは Exception をスローしますが、send 呼び出しはすでに返されます)正常に)。次の例は、dispatcher 要素を使用して、task-executor 属性でエグゼキュータを指定する方法を示しています。

  • Java

  • XML

@Bean
public MessageChannel executorChannel() {
    return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
    <int:dispatcher task-executor="someExecutor"/>
</int:channel>

DirectChannel 設定で前述したように、load-balancer オプションと failover オプションは両方とも <dispatcher/> サブ要素でも使用できます。同じデフォルトが適用されます。その結果、次の例に示すように、これらの属性の一方または両方に対して明示的な構成が提供されない限り、チャネルにはフェールオーバーが有効なラウンドロビンロードバランシング戦略があります。

<int:channel id="executorChannelWithoutFailover">
    <int:dispatcher task-executor="someExecutor" failover="false"/>
</int:channel>

PriorityChannel 設定

PriorityChannel を作成するには、次の例に示すように、<priority-queue/> サブ要素を使用します。

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
    <int:priority-queue capacity="20"/>
</int:channel>

デフォルトでは、チャネルはメッセージの priority ヘッダーを調べます。ただし、代わりにカスタム Comparator 参照を提供できます。また、PriorityChannel は(他の型と同様に) datatype 属性をサポートしていることに注意してください。QueueChannel と同様に、capacity 属性もサポートします。次の例は、これらすべてを示しています。

  • Java

  • XML

@Bean
public PollableChannel priorityChannel() {
    PriorityChannel channel = new PriorityChannel(20, widgetComparator());
    channel.setDatatypes(example.Widget.class);
    return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
    <int:priority-queue comparator="widgetComparator"
                    capacity="10"/>
</int:channel>

バージョン 4.0 以降、priority-channel 子エレメントは message-store オプションをサポートします(その場合、comparator および capacity は許可されません)。メッセージストアは PriorityCapableChannelMessageStore である必要があります。PriorityCapableChannelMessageStore の実装は、現在 RedisJDBCMongoDB 用に提供されています。詳細については、QueueChannel の構成およびメッセージストアを参照してください。サンプル構成はバッキングメッセージチャネルにあります。

RendezvousChannel 設定

キューのサブ要素が <rendezvous-queue> の場合、RendezvousChannel が作成されます。前述のオプションには追加の設定オプションはありません。また、キューは容量ゼロの直接ハンドオフキューであるため、容量値を受け入れません。次の例は、RendezvousChannel を宣言する方法を示しています。

  • Java

  • XML

@Bean
public PollableChannel rendezvousChannel() {
    return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
    <int:rendezvous-queue/>
</int:channel>

スコープチャネル構成

次の例に示すように、scope 属性を使用して任意のチャネルを構成できます。

<int:channel id="threadLocalChannel" scope="thread"/>

チャネルインターセプターの構成

チャネルインターセプターに従って、メッセージチャネルにはインターセプターもあります。<interceptors/> サブ要素は、<channel/> (またはより具体的な要素型)に追加できます。次の例に示すように、ref 属性を指定して、ChannelInterceptor インターフェースを実装する Spring 管理対象オブジェクトを参照できます。

<int:channel id="exampleChannel">
    <int:interceptors>
        <ref bean="trafficMonitoringInterceptor"/>
    </int:interceptors>
</int:channel>

通常、インターセプター実装は通常、複数のチャネルで再利用できる共通の動作を提供するため、別の場所でインターセプター実装を定義することをお勧めします。

グローバルチャネルインターセプターの構成

チャネルインターセプターは、個々のチャネルごとに横断的動作を適用するためのクリーンで簡潔な方法を提供します。同じ動作を複数のチャネルに適用する必要がある場合、各チャネルに同じインターセプターのセットを構成することは最も効率的な方法ではありません。インターセプターを複数のチャネルに適用できるようにする一方で、構成の繰り返しを避けるために、Spring Integration はグローバルインターセプターを提供します。次のペアの例を検討してください。

<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
    <bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>

<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>

各 <channel-interceptor/> 要素を使用すると、pattern 属性で定義されたパターンに一致するすべてのチャネルに適用されるグローバルインターセプターを定義できます。上記の場合、グローバルインターセプターは、"thing1" チャネルと "thing2" または "input" で始まる他のすべてのチャネルに適用されますが、"thing3" で始まるチャネルには適用されません(バージョン 5.0 以降)。

この構文をパターンに追加すると、1 つの可能性のある(おそらくありそうもない)問題が発生します。!thing1 という名前の Bean があり、チャンネルインターセプターの pattern パターンに !thing1 のパターンを含めた場合、一致しなくなります。パターンは thing1 という名前ではないすべての Bean に一致するようになりました。この場合、\ を使用して、パターン内の ! をエスケープできます。パターン \!thing1 は、!thing1 という名前の Bean と一致します。

order 属性を使用すると、特定のチャネルに複数のインターセプターがある場合に、このインターセプターが挿入される場所を管理できます。例: 次の例に示すように、チャネル "inputChannel" には個別にインターセプターをローカルに構成できます(以下を参照)。

<int:channel id="inputChannel">
  <int:interceptors>
    <int:wire-tap channel="logger"/>
  </int:interceptors>
</int:channel>

合理的な質問は、「グローバルインターセプターは、他のインターセプターに関連してローカルまたは他のグローバルインターセプター定義を介してどのように注入されますか?」です。現在の実装では、インターセプターの実行順序を定義する簡単なメカニズムを提供します。order 属性に正の数を指定すると、既存のインターセプターの後にインターセプターが挿入され、負の数を指定すると既存のインターセプターの前にインターセプターが挿入されます。これは、前の例では、グローバルインターセプターが(その order が 0 よりも大きいため)ローカルに構成された「ワイヤータップ」インターセプターの後に挿入されることを意味します。pattern に一致する別のグローバルインターセプターが存在する場合、その順序は、両方のインターセプターの order 属性の値を比較することにより決定されます。既存のインターセプターの前にグローバルインターセプターを挿入するには、order 属性に負の値を使用します。

order 属性と pattern 属性の両方がオプションであることに注意してください。order のデフォルト値は 0 で、pattern のデフォルト値は "*" です(すべてのチャネルに一致するため)。

ワイヤータップ

前述のように、Spring Integration はシンプルなワイヤータップインターセプターを提供します。<interceptors/> 要素内の任意のチャネルでワイヤタップを構成できます。これはデバッグに特に役立ち、次のように Spring Integration のロギングチャネルアダプターと組み合わせて使用できます。

<int:channel id="in">
    <int:interceptors>
        <int:wire-tap channel="logger"/>
    </int:interceptors>
</int:channel>

<int:logging-channel-adapter id="logger" level="DEBUG"/>
"logging-channel-adapter" も 'expression' 属性を受け入れるため、"payload" および "headers" 変数に対して SpEL 式を評価できます。または、メッセージ toString() の結果全体をログに記録するには、'log-full-message' 属性に値 true を指定します。デフォルトでは、false であるため、ペイロードのみがログに記録されます。true に設定すると、ペイロードに加えてすべてのヘッダーのログが有効になります。「式」オプションは、最も柔軟性が高くなります(例: expression="payload.user.name")。

ワイヤタップおよびその他の類似のコンポーネント(メッセージ公開構成)に関する一般的な誤解の 1 つは、それらが本質的に自動的に非同期であるということです。デフォルトでは、コンポーネントとしてのワイヤタップは非同期で呼び出されません。代わりに、Spring Integration は、非同期動作を構成するための単一の統一アプローチであるメッセージチャネルに焦点を当てています。メッセージフローの特定の部分を同期または非同期にするのは、そのフロー内で構成されたメッセージチャネルの型です。これは、メッセージチャネルの抽象化の主な利点の 1 つです。フレームワークの開始から、フレームワークの第一級オブジェクトとしてのメッセージチャネルの必要性と価値を常に強調してきました。これは、EIP パターンの内部的な暗黙的な実現だけではありません。構成可能なコンポーネントとしてエンドユーザーに完全に公開されます。そのため、ワイヤータップコンポーネントは、次のタスクの実行のみを担当します。

  • チャネルをタップしてメッセージフローをインターセプトする (たとえば、channelA)

  • 各メッセージを取得する

  • 別のチャネルにメッセージを送信します (たとえば、channelB)

これは本質的にブリッジパターンのバリエーションですが、チャネル定義内にカプセル化されます(したがって、フローを中断することなく有効化および無効化が容易になります)。また、ブリッジとは異なり、基本的に別のメッセージフローをフォークします。そのフローは同期ですか、それとも非同期ですか? 答えは、"channelB" であるメッセージチャネルの型によって異なります。次のオプションがあります: ダイレクトチャネル、ポーリング可能チャネル、エグゼキュータチャネル。最後の 2 つはスレッドの境界を破り、そのチャネルを介した通信を非同期にします。これは、そのチャネルからサブスクライブされたハンドラーへのメッセージのディスパッチが、そのチャネルへのメッセージの送信に使用されたスレッドとは異なるスレッドで行われるためです。これにより、ワイヤータップフローが同期または非同期になります。フレームワーク内の他のコンポーネント(メッセージパブリッシャーなど)と一貫性があり、特定のコードを次のように実装する必要があるかどうかについて事前に心配する必要がないようにすることで、一貫性とシンプルさのレベルを追加します(スレッドセーフコードの記述以外)。同期または非同期。メッセージチャネルを介した 2 つのコード(たとえば、コンポーネント A とコンポーネント B)の実際の接続が、コラボレーションを同期または非同期にします。将来的には同期から非同期に変更することもできます。メッセージチャネルを使用すると、コードに触れることなくすばやく変更できます。

ワイヤータップに関する最後のポイントの 1 つは、デフォルトでは非同期ではないという上記の理由にもかかわらず、通常はできるだけ早くメッセージを渡すことが望ましいことを覚えておく必要があります。ワイヤタップの送信チャネルとして非同期チャネルオプションを使用することは非常に一般的です。ただし、非同期動作はデフォルトでは強制されません。トランザクションの境界を破りたくない場合など、やった場合に破られるユースケースはたくさんあります。おそらく、監査目的でワイヤータップパターンを使用し、監査メッセージを元のトランザクション内で送信する必要があります。例として、ワイヤータップを JMS 送信チャネルアダプターに接続する場合があります。このようにして、両方の長所を活用できます。1)JMS メッセージの送信はトランザクション内で発生する可能性がありますが、2)それは依然として「ファイアアンドフォーゲット」アクションであるため、メインメッセージフローの顕著な遅延を防ぐことができます。

バージョン 4.0 以降、インターセプター(WireTap クラス (英語) など)がチャネルを参照する場合、循環参照を回避することが重要です。現在のインターセプターによってインターセプトされているチャネルからこのようなチャネルを除外する必要があります。これは、適切なパターンで、またはプログラムで実行できます。channel を参照するカスタム ChannelInterceptor がある場合は、VetoCapableInterceptor の実装を検討してください。そのようにして、フレームワークはインターセプターに、提供されたパターンに基づいて、候補である各チャネルをインターセプトしてもよいかどうかを確認します。また、インターセプターメソッドにランタイム保護を追加して、インターセプターによって参照されるチャネルではないことを確認することもできます。WireTap はこれらの手法の両方を使用します。

バージョン 4.3 以降、WireTap には、MessageChannel インスタンスの代わりに channelName を使用する追加のコンストラクターがあります。これは、Java 構成およびチャネル自動作成ロジックが使用されている場合に便利です。ターゲット MessageChannel Bean は、後でインターセプターとの最初の対話で、提供された channelName から解決されます。

チャネル解決には BeanFactory が必要であるため、ワイヤータップインスタンスは Spring 管理の Bean でなければなりません。

このレイトバインディングアプローチにより、次の例に示すように、Java DSL 構成を使用して一般的な盗聴パターンを簡素化することもできます。

@Bean
public PollableChannel myChannel() {
    return MessageChannels.queue()
            .wireTap("loggingFlow.input")
            .get();
}

@Bean
public IntegrationFlow loggingFlow() {
    return f -> f.log();
}

条件付きワイヤタップ

selector または selector-expression 属性を使用して、ワイヤタップを条件付きにすることができます。selector は MessageSelector Bean を参照します。MessageSelector は、実行時にメッセージをタップチャネルに送信するかどうかを決定できます。同様に、selector-expression は同じ目的を実行するブール SpEL 式です。式が true に評価される場合、メッセージはタップチャネルに送信されます。

グローバルワイヤタップ構成

グローバルチャネルインターセプターの構成の特殊なケースとしてグローバルワイヤタップを設定することが可能です。そのためには、最上位の wire-tap 要素を構成します。現在、通常の wire-tap 名前空間のサポートに加えて、pattern および order 属性がサポートされており、channel-interceptor の場合とまったく同じように機能します。次の例は、グローバルワイヤタップを設定する方法を示しています。

  • Java

  • XML

@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
    return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
グローバルワイヤタップは、既存のチャネル構成を変更せずに、外部でシングルチャネルワイヤタップを構成する便利な方法を提供します。これを行うには、pattern 属性をターゲットチャネル名に設定します。例: この手法を使用して、チャネル上のメッセージを検証するテストケースを構成できます。