メッセージチャネルの構成
メッセージチャネルインスタンスを作成するには、次のように、xml の場合は <channel/>
要素を、Java 構成の場合は DirectChannel
インスタンスを使用できます。
Java
XML
@Bean
public MessageChannel exampleChannel() {
return new DirectChannel();
}
<int:channel id="exampleChannel"/>
サブエレメントなしで <channel/>
エレメントを使用すると、DirectChannel
インスタンス(SubscribableChannel
)が作成されます。
パブリッシュ / サブスクライブチャネルを作成するには、次のように <publish-subscribe-channel/>
要素(Java では PublishSubscribeChannel
)を使用します。
Java
XML
@Bean
public MessageChannel exampleChannel() {
return new PublishSubscribeChannel();
}
<int:publish-subscribe-channel id="exampleChannel"/>
あるいは、さまざまな <queue/>
サブ要素を提供して、ポーリング可能なチャネル型のいずれかを作成することもできます(メッセージチャネルの実装で説明されています)。次のセクションでは、各チャネル型の例を示します。
DirectChannel
設定
前述のように、DirectChannel
はデフォルトの型です。次のリストは、誰を定義するかを示しています。
Java
XML
@Bean
public MessageChannel directChannel() {
return new DirectChannel();
}
<int:channel id="directChannel"/>
デフォルトのチャネルにはラウンドロビンロードバランサーがあり、フェイルオーバーも有効になっています(詳細については DirectChannel
を参照してください)。これらの一方または両方を無効にするには、<dispatcher/>
サブ要素(DirectChannel
の LoadBalancingStrategy
コンストラクター)を追加し、次のように属性を構成します。
Java
XML
@Bean
public MessageChannel failFastChannel() {
DirectChannel channel = new DirectChannel();
channel.setFailover(false);
return channel;
}
@Bean
public MessageChannel failFastChannel() {
return new DirectChannel(null);
}
<int:channel id="failFastChannel">
<int:dispatcher failover="false"/>
</channel>
<int:channel id="channelWithFixedOrderSequenceFailover">
<int:dispatcher load-balancer="none"/>
</int:channel>
バージョン 6.3 以降、UnicastingDispatcher
に基づくすべての MessageChannel
実装は、単純な failover
オプションの代わりに Predicate<Exception> failoverStrategy
を使用して構成できます。この述語は、現在の MessageHandler
からスローされた例外に基づいて、次の MessageHandler
にフェイルオーバーするかどうかを決定します。より複雑なエラー分析は、ErrorMessageExceptionTypeRouter
を使用して実行する必要があります。
データ型チャネルの構成
コンシューマーは特定の型のペイロードのみを処理できる場合があり、入力メッセージのペイロード型を確認する必要があります。最初に思い浮かぶのは、メッセージフィルターを使用することです。ただし、メッセージフィルターでできることは、コンシューマーの要件に準拠していないメッセージを除外することだけです。別の方法は、コンテンツベースのルーターを使用し、非準拠のデータ型のメッセージを特定のトランスフォーマーにルーティングして、必要なデータ型への変換と変換を実施することです。これは機能しますが、同じことを達成するためのより簡単な方法は、データ型チャネル (英語) パターンを適用することです。特定のペイロードデータ型ごとに個別のデータ型チャネルを使用できます。
特定のペイロード型を含むメッセージのみを受け入れるデータ型チャネルを作成するには、次の例に示すように、チャネル要素の datatype
属性にデータ型の完全修飾クラス名を指定します。
Java
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(Number.class);
return channel;
}
<int:channel id="numberChannel" datatype="java.lang.Number"/>
型チェックは、チャネルのデータ型に割り当て可能なすべての型に合格することに注意してください。つまり、前述の例の numberChannel
は、ペイロードが java.lang.Integer
または java.lang.Double
であるメッセージを受け入れます。次の例に示すように、複数の型をコンマ区切りリストとして提供できます。
Java
XML
@Bean
public MessageChannel numberChannel() {
DirectChannel channel = new DirectChannel();
channel.setDatatypes(String.class, Number.class);
return channel;
}
<int:channel id="stringOrNumberChannel" datatype="java.lang.String,java.lang.Number"/>
前述の例の "numberChannel" は、データ型が java.lang.Number
のメッセージのみを受け入れます。しかし、メッセージのペイロードが必要な型でない場合はどうなるでしょうか。これは、Spring の変換サービスのインスタンスである integrationConversionService
という名前の Bean を定義したかどうかによって異なります。定義していない場合は、直ちに Exception
がスローされます。ただし、integrationConversionService
Bean を定義した場合は、メッセージのペイロードを受け入れ可能な型に変換するためにそれが使用されます。
カスタムコンバーターを登録することもできます。例: String
ペイロードを含むメッセージを上記で設定した "numberChannel" に送信するとします。メッセージを次のように処理できます。
MessageChannel inChannel = context.getBean("numberChannel", MessageChannel.class);
inChannel.send(new GenericMessage<String>("5"));
通常、これは完全に正当的な操作です。ただし、データ型チャネルを使用しているため、このような操作の結果、次のような例外が生成されます。
Exception in thread "main" org.springframework.integration.MessageDeliveryException:
Channel 'numberChannel'
expected one of the following datataypes [class java.lang.Number],
but received [class java.lang.String]
…
ペイロード型を Number
にする必要があるため、例外が発生しますが、String
を送信しました。そのため、String
を Number
に変換するものが必要です。そのために、次の例のようなコンバーターを実装できます。
public static class StringToIntegerConverter implements Converter<String, Integer> {
public Integer convert(String source) {
return Integer.parseInt(source);
}
}
次に、次の例に示すように、Integration Conversion Service でコンバーターとして登録できます。
Java
XML
@Bean
@IntegrationConverter
public StringToIntegerConverter strToInt {
return new StringToIntegerConverter();
}
<int:converter ref="strToInt"/>
<bean id="strToInt" class="org.springframework.integration.util.Demo.StringToIntegerConverter"/>
または、StringToIntegerConverter
クラスで、自動スキャン用に @Component
アノテーションが付けられている場合。
'converter' 要素が解析されると、integrationConversionService
Bean が作成されます(まだ定義されていない場合)。データ型チャネルはそのコンバーターを使用して String
ペイロードを Integer
に変換するため、そのコンバーターが適切に配置されていれば、send
操作は成功します。
ペイロード型の変換に関する詳細については、ペイロード型変換を参照してください。
バージョン 4.0 以降、integrationConversionService
は DefaultDatatypeChannelMessageConverter
によって呼び出され、DefaultDatatypeChannelMessageConverter
はアプリケーションコンテキストで変換サービスを検索します。別の変換手法を使用するには、チャネルで message-converter
属性を指定できます。これは、MessageConverter
実装への参照でなければなりません。fromMessage
メソッドのみが使用されます。コンバーターにメッセージヘッダーへのアクセスを提供します(content-type
などのヘッダーからの情報が必要な場合に備えて)。メソッドは、変換されたペイロードまたは完全な Message
オブジェクトのみを返すことができます。後者の場合、コンバーターは受信メッセージからすべてのヘッダーをコピーするように注意する必要があります。
あるいは、型 MessageConverter
の <bean/>
を ID datatypeChannelMessageConverter
で宣言でき、そのコンバーターは datatype
のすべてのチャネルで使用されます。
QueueChannel
設定
QueueChannel
を作成するには、<queue/>
サブ要素を使用します。次のようにチャンネルの容量を指定できます。
Java
XML
@Bean
public PollableChannel queueChannel() {
return new QueueChannel(25);
}
<int:channel id="queueChannel">
<queue capacity="25"/>
</int:channel>
この <queue/> サブエレメントの 'capacity' 属性に値を指定しない場合、結果のキューは無制限になります。メモリ不足などの課題を回避するために、制限キューに明示的な値を設定することを強くお勧めします。 |
永続的な QueueChannel
設定
QueueChannel
はメッセージをバッファリングする機能を提供しますが、デフォルトではメモリ内でのみ行うため、システム障害が発生した場合にメッセージが失われる機能があります。このリスクを軽減するために、QueueChannel
は、MessageGroupStore
戦略インターフェースの永続的な実装によって支援される場合があります。MessageGroupStore
および MessageStore
の詳細については、メッセージストアを参照してください。
message-store 属性が使用されている場合、capacity 属性は許可されません。 |
QueueChannel
は Message
を受信すると、メッセージをメッセージストアに追加します。Message
が QueueChannel
からポーリングされると、メッセージストアから削除されます。
デフォルトでは、QueueChannel
はメッセージをインメモリキューに格納します。これにより、前述のメッセージの損失シナリオが発生する可能性があります。ただし、Spring Integration は JdbcChannelMessageStore
などの永続ストアを提供します。
次の例に示すように、message-store
属性を追加することにより、任意の QueueChannel
のメッセージストアを設定できます。
<int:channel id="dbBackedChannel">
<int:queue message-store="channelStore"/>
</int:channel>
<bean id="channelStore" class="o.s.i.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource"/>
<property name="channelMessageStoreQueryProvider" ref="queryProvider"/>
</bean>
(Java/Kotlin 構成オプションについては、以下のサンプルを参照してください)
Spring Integration JDBC モジュールは、多くの一般的なデータベース用のスキーマデータ定義言語 (DDL) も提供します。これらのスキーマは、そのモジュール (spring-integration-jdbc
) の org.springframework.integration.jdbc.store.channel パッケージにあります。
重要な機能の 1 つは、トランザクション永続ストア(JdbcChannelMessageStore など)では、ポーラーにトランザクションが構成されている限り、トランザクションが正常に完了した場合にのみ、ストアから削除されたメッセージを完全に削除できることです。それ以外の場合、トランザクションはロールバックされ、Message は失われません。 |
"NoSQL" データストアに関連する Spring プロジェクトの数が増え、これらのストアの基礎となるサポートが提供されるようになったため、メッセージストアの他の多くの実装が利用可能になりました。特定のニーズを満たすものが見つからない場合は、MessageGroupStore
インターフェースの独自の実装を提供することもできます。
バージョン 4.0 以降、可能であれば ChannelMessageStore
を使用するように QueueChannel
インスタンスを構成することをお勧めします。これらは、一般的なメッセージストアと比較して、一般的にこの用途に最適化されています。ChannelMessageStore
が ChannelPriorityMessageStore
の場合、メッセージは優先順位内で FIFO で受信されます。優先順位の概念は、メッセージストアの実装によって決まります。例: 次の例は、MongoDB チャネルメッセージストアの Java 構成を示しています。
Java
Java DSL
Kotlin DSL
@Bean
public BasicMessageGroupStore mongoDbChannelMessageStore(MongoDbFactory mongoDbFactory) {
MongoDbChannelMessageStore store = new MongoDbChannelMessageStore(mongoDbFactory);
store.setPriorityEnabled(true);
return store;
}
@Bean
public PollableChannel priorityQueue(BasicMessageGroupStore mongoDbChannelMessageStore) {
return new PriorityChannel(new MessageGroupQueue(mongoDbChannelMessageStore, "priorityQueue"));
}
@Bean
public IntegrationFlow priorityFlow(PriorityCapableChannelMessageStore mongoDbChannelMessageStore) {
return IntegrationFlow.from((Channels c) ->
c.priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup"))
....
.get();
}
@Bean
fun priorityFlow(mongoDbChannelMessageStore: PriorityCapableChannelMessageStore) =
integrationFlow {
channel { priority("priorityChannel", mongoDbChannelMessageStore, "priorityGroup") }
}
MessageGroupQueue クラスに注意してください。これは、MessageGroupStore 操作を使用する BlockingQueue 実装です。 |
QueueChannel
環境をカスタマイズする別のオプションは、<int:queue>
サブエレメントまたはその特定のコンストラクターの ref
属性によって提供されます。この属性は、java.util.Queue
実装への参照を提供します。例: Hazelcast 分散 IQueue
(英語) は、次のように構成できます。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(new Config()
.setProperty("hazelcast.logging.type", "log4j"));
}
@Bean
public PollableChannel distributedQueue() {
return new QueueChannel(hazelcastInstance()
.getQueue("springIntegrationQueue"));
}
PublishSubscribeChannel
設定
PublishSubscribeChannel
を作成するには、<publish-subscribe-channel/> 要素を使用します。この要素を使用する場合、次のように、メッセージの公開に使用する task-executor
を指定することもできます(指定しない場合、送信者のスレッドで公開します)。
Java
XML
@Bean
public MessageChannel pubsubChannel() {
return new PublishSubscribeChannel(someExecutor());
}
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
PublishSubscribeChannel
の下流にリシーケンサーまたはアグリゲーターを提供する場合、チャネルの "apply-sequence" プロパティを true
に設定できます。そうすることは、メッセージを渡す前に、チャネルが sequence-size
および sequence-number
メッセージヘッダーと相関 ID を設定する必要があることを示します。例: サブスクライバーが 5 人の場合、sequence-size
は 5
に設定され、メッセージは 1
から 5
の範囲の sequence-number
ヘッダー値を持ちます。
Executor
とともに、ErrorHandler
を構成することもできます。デフォルトでは、PublishSubscribeChannel
は MessagePublishingErrorHandler
実装を使用して、errorChannel
ヘッダーから MessageChannel
またはグローバル errorChannel
インスタンスにエラーを送信します。Executor
が構成されていない場合、ErrorHandler
は無視され、呼び出し元のスレッドに例外が直接スローされます。
PublishSubscribeChannel
から Resequencer
または Aggregator
ダウンストリームを提供する場合、チャネルの 'apply-sequence' プロパティを true
に設定できます。そうすることは、チャネルがメッセージを渡す前にシーケンス ID とシーケンス番号のメッセージヘッダーと相関 ID を設定する必要があることを示します。例: サブスクライバーが 5 人の場合、sequence-size は 5
に設定され、メッセージは 1
から 5
の範囲のシーケンス番号ヘッダー値を持ちます。
次の例は、apply-sequence
ヘッダーを true
に設定する方法を示しています。
Java
XML
@Bean
public MessageChannel pubsubChannel() {
PublishSubscribeChannel channel = new PublishSubscribeChannel();
channel.setApplySequence(true);
return channel;
}
<int:publish-subscribe-channel id="pubsubChannel" apply-sequence="true"/>
パブリッシュ / サブスクライブチャネルがまったく同じメッセージインスタンスを複数の送信チャネルに送信できるように、apply-sequence 値はデフォルトで false です。Spring Integration はペイロードとヘッダー参照の不変性を強制するため、フラグが true に設定されると、チャネルは同じペイロード参照で異なるヘッダー値を持つ新しい Message インスタンスを作成します。 |
バージョン 5.4.3 以降、PublishSubscribeChannel
は、BroadcastingDispatcher
の requireSubscribers
オプションを使用して構成することもでき、サブスクライバーがない場合にこのチャネルがメッセージをサイレントに無視しないことを示します。サブスクライバーがなく、このオプションが true
に設定されている場合、Dispatcher has no subscribers
メッセージを含む MessageDispatchingException
がスローされます。
ExecutorChannel
ExecutorChannel
を作成するには、task-executor
属性を持つ <dispatcher>
サブ要素を追加します。属性の値は、コンテキスト内の任意の TaskExecutor
を参照できます。例: これにより、サブスクライブされたハンドラーにメッセージをディスパッチするためのスレッドプールの構成が可能になります。前述のように、これにより、送信者と受信者の間のシングルスレッド実行コンテキストが中断され、アクティブなトランザクションコンテキストがハンドラーの呼び出しで共有されなくなります(つまり、ハンドラーは Exception
をスローしますが、send
呼び出しはすでに返されます)正常に)。次の例は、dispatcher
要素を使用して、task-executor
属性でエグゼキュータを指定する方法を示しています。
Java
XML
@Bean
public MessageChannel executorChannel() {
return new ExecutorChannel(someExecutor());
}
<int:channel id="executorChannel">
<int:dispatcher task-executor="someExecutor"/>
</int:channel>
|
PriorityChannel
設定
PriorityChannel
を作成するには、次の例に示すように、<priority-queue/>
サブ要素を使用します。
Java
XML
@Bean
public PollableChannel priorityChannel() {
return new PriorityChannel(20);
}
<int:channel id="priorityChannel">
<int:priority-queue capacity="20"/>
</int:channel>
デフォルトでは、チャネルはメッセージの priority
ヘッダーを調べます。ただし、代わりにカスタム Comparator
参照を提供できます。また、PriorityChannel
は(他の型と同様に) datatype
属性をサポートしていることに注意してください。QueueChannel
と同様に、capacity
属性もサポートします。次の例は、これらすべてを示しています。
Java
XML
@Bean
public PollableChannel priorityChannel() {
PriorityChannel channel = new PriorityChannel(20, widgetComparator());
channel.setDatatypes(example.Widget.class);
return channel;
}
<int:channel id="priorityChannel" datatype="example.Widget">
<int:priority-queue comparator="widgetComparator"
capacity="10"/>
</int:channel>
バージョン 4.0 以降、priority-channel
子エレメントは message-store
オプションをサポートします(その場合、comparator
および capacity
は許可されません)。メッセージストアは PriorityCapableChannelMessageStore
である必要があります。PriorityCapableChannelMessageStore
の実装は、現在 Redis
、JDBC
、MongoDB
用に提供されています。詳細については、QueueChannel
の構成およびメッセージストアを参照してください。サンプル構成はバッキングメッセージチャネルにあります。
RendezvousChannel
設定
キューのサブ要素が <rendezvous-queue>
の場合、RendezvousChannel
が作成されます。前述のオプションには追加の設定オプションはありません。また、キューは容量ゼロの直接ハンドオフキューであるため、容量値を受け入れません。次の例は、RendezvousChannel
を宣言する方法を示しています。
Java
XML
@Bean
public PollableChannel rendezvousChannel() {
return new RendezvousChannel();
}
<int:channel id="rendezvousChannel"/>
<int:rendezvous-queue/>
</int:channel>
スコープチャネル構成
次の例に示すように、scope
属性を使用して任意のチャネルを構成できます。
<int:channel id="threadLocalChannel" scope="thread"/>
チャネルインターセプターの構成
チャネルインターセプターに従って、メッセージチャネルにはインターセプターもあります。<interceptors/>
サブ要素は、<channel/>
(またはより具体的な要素型)に追加できます。次の例に示すように、ref
属性を指定して、ChannelInterceptor
インターフェースを実装する Spring 管理対象オブジェクトを参照できます。
<int:channel id="exampleChannel">
<int:interceptors>
<ref bean="trafficMonitoringInterceptor"/>
</int:interceptors>
</int:channel>
通常、インターセプター実装は通常、複数のチャネルで再利用できる共通の動作を提供するため、別の場所でインターセプター実装を定義することをお勧めします。
グローバルチャネルインターセプターの構成
チャネルインターセプターは、個々のチャネルごとに横断的動作を適用するためのクリーンで簡潔な方法を提供します。同じ動作を複数のチャネルに適用する必要がある場合、各チャネルに同じインターセプターのセットを構成することは最も効率的な方法ではありません。インターセプターを複数のチャネルに適用できるようにする一方で、構成の繰り返しを避けるために、Spring Integration はグローバルインターセプターを提供します。次のペアの例を検討してください。
<int:channel-interceptor pattern="input*, thing2*, thing1, !cat*" order="3">
<bean class="thing1.thing2SampleInterceptor"/>
</int:channel-interceptor>
<int:channel-interceptor ref="myInterceptor" pattern="input*, thing2*, thing1, !cat*" order="3"/>
<bean id="myInterceptor" class="thing1.thing2SampleInterceptor"/>
各 <channel-interceptor/>
要素を使用すると、pattern
属性で定義されたパターンに一致するすべてのチャネルに適用されるグローバルインターセプターを定義できます。上記の場合、グローバルインターセプターは、"thing1" チャネルと "thing2" または "input" で始まる他のすべてのチャネルに適用されますが、"thing3" で始まるチャネルには適用されません(バージョン 5.0 以降)。
この構文をパターンに追加すると、1 つの可能性のある(おそらくありそうもない)問題が発生します。!thing1 という名前の Bean があり、チャンネルインターセプターの pattern パターンに !thing1 のパターンを含めた場合、一致しなくなります。パターンは thing1 という名前ではないすべての Bean に一致するようになりました。この場合、\ を使用して、パターン内の ! をエスケープできます。パターン \!thing1 は、!thing1 という名前の Bean と一致します。 |
order 属性を使用すると、特定のチャネルに複数のインターセプターがある場合に、このインターセプターが挿入される場所を管理できます。例: チャネル 'inputChannel' には、次の例に示すように、個別のインターセプターがローカルに構成されている可能性があります (下記を参照)。
<int:channel id="inputChannel">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
合理的な質問は、「グローバルインターセプターは、他のインターセプターに関連してローカルまたは他のグローバルインターセプター定義を介してどのように注入されますか?」です。現在の実装では、インターセプターの実行順序を定義する簡単なメカニズムを提供します。order
属性に正の数を指定すると、既存のインターセプターの後にインターセプターが挿入され、負の数を指定すると既存のインターセプターの前にインターセプターが挿入されます。これは、前の例では、グローバルインターセプターが(その order
が 0
よりも大きいため)ローカルに構成された「ワイヤータップ」インターセプターの後に挿入されることを意味します。pattern
に一致する別のグローバルインターセプターが存在する場合、その順序は、両方のインターセプターの order
属性の値を比較することにより決定されます。既存のインターセプターの前にグローバルインターセプターを挿入するには、order
属性に負の値を使用します。
order 属性と pattern 属性の両方がオプションであることに注意してください。order のデフォルト値は 0 で、pattern のデフォルト値は "*" です(すべてのチャネルに一致するため)。 |
ワイヤータップ
前述のように、Spring Integration はシンプルなワイヤータップインターセプターを提供します。<interceptors/>
要素内の任意のチャネルでワイヤタップを構成できます。これはデバッグに特に役立ち、次のように Spring Integration のロギングチャネルアダプターと組み合わせて使用できます。
<int:channel id="in">
<int:interceptors>
<int:wire-tap channel="logger"/>
</int:interceptors>
</int:channel>
<int:logging-channel-adapter id="logger" level="DEBUG"/>
"logging-channel-adapter" も 'expression' 属性を受け入れるため、"payload" および "headers" 変数に対して SpEL 式を評価できます。または、メッセージ toString() の結果全体をログに記録するには、'log-full-message' 属性に値 true を指定します。デフォルトでは、false であるため、ペイロードのみがログに記録されます。true に設定すると、ペイロードに加えてすべてのヘッダーのログが有効になります。「式」オプションは、最も柔軟性が高くなります(例: expression="payload.user.name" )。 |
ワイヤタップおよびその他の類似のコンポーネント(メッセージ公開構成)に関する一般的な誤解の 1 つは、それらが本質的に自動的に非同期であるということです。デフォルトでは、コンポーネントとしてのワイヤタップは非同期で呼び出されません。代わりに、Spring Integration は、非同期動作を構成するための単一の統一アプローチであるメッセージチャネルに焦点を当てています。メッセージフローの特定の部分を同期または非同期にするのは、そのフロー内で構成されたメッセージチャネルの型です。これは、メッセージチャネルの抽象化の主な利点の 1 つです。フレームワークの開始から、フレームワークの第一級オブジェクトとしてのメッセージチャネルの必要性と価値を常に強調してきました。これは、EIP パターンの内部的な暗黙的な実現だけではありません。構成可能なコンポーネントとしてエンドユーザーに完全に公開されます。そのため、ワイヤータップコンポーネントは、次のタスクの実行のみを担当します。
チャネルをタップしてメッセージフローをインターセプトする (たとえば、
channelA
)各メッセージを取得する
別のチャネルにメッセージを送信します (たとえば、
channelB
)
これは本質的にブリッジパターンのバリエーションですが、チャネル定義内にカプセル化されます(したがって、フローを中断することなく有効化および無効化が容易になります)。また、ブリッジとは異なり、基本的に別のメッセージフローをフォークします。そのフローは同期ですか、それとも非同期ですか? 答えは、"channelB" であるメッセージチャネルの型によって異なります。次のオプションがあります: ダイレクトチャネル、ポーリング可能チャネル、エグゼキュータチャネル。最後の 2 つはスレッドの境界を破り、そのチャネルを介した通信を非同期にします。これは、そのチャネルからサブスクライブされたハンドラーへのメッセージのディスパッチが、そのチャネルへのメッセージの送信に使用されたスレッドとは異なるスレッドで行われるためです。これにより、ワイヤータップフローが同期または非同期になります。フレームワーク内の他のコンポーネント(メッセージパブリッシャーなど)と一貫性があり、特定のコードを次のように実装する必要があるかどうかについて事前に心配する必要がないようにすることで、一貫性とシンプルさのレベルを追加します(スレッドセーフコードの記述以外)。同期または非同期。メッセージチャネルを介した 2 つのコード(たとえば、コンポーネント A とコンポーネント B)の実際の接続が、コラボレーションを同期または非同期にします。将来的には同期から非同期に変更することもできます。メッセージチャネルを使用すると、コードに触れることなくすばやく変更できます。
ワイヤータップに関する最後のポイントの 1 つは、デフォルトでは非同期ではないという上記の理由にもかかわらず、通常はできるだけ早くメッセージを渡すことが望ましいことを覚えておく必要があります。ワイヤタップの送信チャネルとして非同期チャネルオプションを使用することは非常に一般的です。ただし、非同期動作はデフォルトでは強制されません。トランザクションの境界を破りたくない場合など、やった場合に破られるユースケースはたくさんあります。おそらく、監査目的でワイヤータップパターンを使用し、監査メッセージを元のトランザクション内で送信する必要があります。例として、ワイヤータップを JMS 送信チャネルアダプターに接続する場合があります。このようにして、両方の長所を活用できます。1)JMS メッセージの送信はトランザクション内で発生する可能性がありますが、2)それは依然として「ファイアアンドフォーゲット」アクションであるため、メインメッセージフローの顕著な遅延を防ぐことができます。
バージョン 4.0 以降、インターセプター(WireTap クラス (英語) など)がチャネルを参照する場合、循環参照を回避することが重要です。現在のインターセプターによってインターセプトされているチャネルからこのようなチャネルを除外する必要があります。これは、適切なパターンで、またはプログラムで実行できます。channel を参照するカスタム ChannelInterceptor がある場合は、VetoCapableInterceptor の実装を検討してください。そのようにして、フレームワークはインターセプターに、提供されたパターンに基づいて、候補である各チャネルをインターセプトしてもよいかどうかを確認します。また、インターセプターメソッドにランタイム保護を追加して、インターセプターによって参照されるチャネルではないことを確認することもできます。WireTap はこれらの手法の両方を使用します。 |
バージョン 4.3 以降、WireTap
には、MessageChannel
インスタンスの代わりに channelName
を使用する追加のコンストラクターがあります。これは、Java 構成およびチャネル自動作成ロジックが使用されている場合に便利です。ターゲット MessageChannel
Bean は、後でインターセプターとの最初の対話で、提供された channelName
から解決されます。
チャネル解決には BeanFactory が必要であるため、ワイヤータップインスタンスは Spring 管理の Bean でなければなりません。 |
このレイトバインディングアプローチにより、次の例に示すように、Java DSL 構成を使用して一般的な盗聴パターンを簡素化することもできます。
@Bean
public PollableChannel myChannel() {
return MessageChannels.queue()
.wireTap("loggingFlow.input")
.get();
}
@Bean
public IntegrationFlow loggingFlow() {
return f -> f.log();
}
条件付きワイヤタップ
selector
または selector-expression
属性を使用して、ワイヤタップを条件付きにすることができます。selector
は MessageSelector
Bean を参照します。MessageSelector
は、実行時にメッセージをタップチャネルに送信するかどうかを決定できます。同様に、selector-expression
は同じ目的を実行するブール SpEL 式です。式が true
に評価される場合、メッセージはタップチャネルに送信されます。
グローバルワイヤタップ構成
グローバルチャネルインターセプターの構成の特殊なケースとしてグローバルワイヤタップを設定することが可能です。そのためには、最上位の wire-tap
要素を構成します。現在、通常の wire-tap
名前空間のサポートに加えて、pattern
および order
属性がサポートされており、channel-interceptor
の場合とまったく同じように機能します。次の例は、グローバルワイヤタップを設定する方法を示しています。
Java
XML
@Bean
@GlobalChannelInterceptor(patterns = "input*,thing2*,thing1", order = 3)
public WireTap wireTap(MessageChannel wiretapChannel) {
return new WireTap(wiretapChannel);
}
<int:wire-tap pattern="input*, thing2*, thing1" order="3" channel="wiretapChannel"/>
グローバルワイヤタップは、既存のチャネル構成を変更せずに、外部でシングルチャネルワイヤタップを構成する便利な方法を提供します。これを行うには、pattern 属性をターゲットチャネル名に設定します。例: この手法を使用して、チャネル上のメッセージを検証するテストケースを構成できます。 |