システムマネジメント
指標と管理
このセクションでは、Spring Integration のメトリクスをキャプチャーする方法について説明します。最近のバージョンでは、Micrometer(https://micrometer.io (英語) を参照)にさらに依存しており、将来のリリースでは Micrometer をさらに使用する予定です。
大量の環境でのロギングの無効化
メインメッセージフローでデバッグログを制御できます。非常に大量のアプリケーションでは、isDebugEnabled()
の呼び出しは、一部のロギングサブシステムでは非常にコストがかかる可能性があります。このオーバーヘッドを回避するために、このようなすべてのロギングを無効にすることができます。例外ログ(デバッグまたはその他)は、この設定の影響を受けません。
次のリストは、ロギングを制御するために使用可能なオプションを示しています。
@Configuration
@EnableIntegration
@EnableIntegrationManagement(
defaultLoggingEnabled = "true" <1>)
public static class ContextConfiguration {
...
}
<int:management default-logging-enabled="true"/> (1)
1 | false に設定すると、ログシステムのカテゴリ設定に関係なく、メインメッセージフローのすべてのロギングが無効になります。デバッグログを有効にするには、"true" に設定します(ログサブシステムでも有効になっている場合)。Bean 定義で設定を明示的に構成していない場合にのみ適用されます。デフォルトは true です。 |
defaultLoggingEnabled は、Bean 定義で対応する設定を明示的に構成していない場合にのみ適用されます。 |
Micrometer 統合
概要
バージョン 5.0.3 以降、アプリケーションコンテキストに Micrometer (英語) MeterRegistry
が存在すると、Micrometer メトリクスのサポートがトリガーされます。
Micrometer を使用するには、MeterRegistry
Bean の 1 つをアプリケーションコンテキストに追加します。
MessageHandler
および MessageChannel
ごとに、タイマーが登録されます。MessageSource
ごとに、カウンターが登録されます。
これは、AbstractMessageHandler
、AbstractMessageChannel
、AbstractMessageSource
を継承するオブジェクトにのみ適用されます(ほとんどのフレームワークコンポーネントに当てはまります)。
メッセージチャネルの送信操作用の Timer
メーターには、次の名前またはタグがあります。
name
:spring.integration.send
tag
:type:channel
tag
:name:<componentName>
tag
:result:(success|failure)
tag
:exception:(none|exception simple class name)
description
:Send processing time
(none
例外を含む failure
結果は、チャネルの send()
操作が false
を返したことを意味します)
ポーリング可能なメッセージチャネルでの受信操作用の Counter
メーターには、次の名前またはタグがあります。
name
:spring.integration.receive
tag
:type:channel
tag
:name:<componentName>
tag
:result:(success|failure)
tag
:exception:(none|exception simple class name)
description
:Messages received
メッセージハンドラーの操作用の Timer
メーターには、次の名前またはタグがあります。
name
:spring.integration.send
tag
:type:handler
tag
:name:<componentName>
tag
:result:(success|failure)
tag
:exception:(none|exception simple class name)
description
:Send processing time
メッセージソースの Counter
メーターには、次の名前 / タグがあります。
name
:spring.integration.receive
tag
:type:source
tag
:name:<componentName>
tag
:result:success
tag
:exception:none
description
:Messages received
さらに、3 つの Gauge
メーターがあります。
spring.integration.channels
: アプリケーション内のMessageChannels
の数。spring.integration.handlers
: アプリケーション内のMessageHandlers
の数。spring.integration.sources
: アプリケーション内のMessageSources
の数。
MicrometerMetricsCaptor
のサブクラスを提供することにより、統合コンポーネントによって作成された Meters
の名前とタグをカスタマイズできます。MicrometerCustomMetricsTests [GitHub] (英語) テストケースは、その方法の簡単な例を示しています。ビルダーサブクラスの build()
メソッドをオーバーロードすることにより、メーターをさらにカスタマイズすることもできます。
バージョン 5.1.13 以降、QueueChannel
は、キューサイズと残り容量の Micrometer ゲージを公開します。
name
:spring.integration.channel.queue.size
tag
:type:channel
tag
:name:<componentName>
description
:The size of the queue channel
および
name
:spring.integration.channel.queue.remaining.capacity
tag
:type:channel
tag
:name:<componentName>
description
:The remaining capacity of the queue channel
メーターの無効化
レガシーメトリクス(現在は削除されています)を使用すると、メトリクスを収集する統合コンポーネントを指定できます。デフォルトでは、すべてのメーターは最初に使用されたときに登録されます。現在、Micrometer を使用すると、MeterFilter
を MeterRegistry
に追加して、一部またはすべてが登録されないようにすることができます。提供されている任意のプロパティ、name
、tag
などでメーターをフィルターで除外(拒否)できます。詳細については、Micrometer ドキュメントのメーターフィルター (英語) を参照してください。
例: 与えられた:
@Bean
public QueueChannel noMeters() {
return new QueueChannel(10);
}
次の方法で、このチャネルのみのメーターの登録を抑制することができます。
registry.config().meterFilter(MeterFilter.deny(id ->
"channel".equals(id.getTag("type")) &&
"noMeters".equals(id.getTag("name"))));
Spring Integration JMX サポート
JMX サポートも参照してください。
メッセージ履歴
メッセージングアーキテクチャの主な利点は、参加しているコンポーネントが相互の認識を維持できないように、疎結合です。この事実だけで、アプリケーションは非常に柔軟になり、残りのフローに影響を与えずにコンポーネントを変更したり、メッセージングルートを変更したり、メッセージ消費スタイルを変更したりできます(ポーリングとイベントドリブン)。しかし、この控えめなスタイルのアーキテクチャは、物事がうまくいかない場合には難しいことがわかります。デバッグするときは、おそらくメッセージに関する情報(その発信元、通過したチャネル、その他の詳細)をできるだけ多く取得する必要があります。
メッセージ履歴は、デバッグまたは監査証跡を維持するために、メッセージパスの認識レベルを維持するオプションを提供することで役立つパターンの 1 つです。Spring 統合は、メッセージにヘッダーを追加し、メッセージが追跡対象コンポーネントを通過するたびにそのヘッダーを更新することにより、メッセージフローを設定してメッセージ履歴を維持する簡単な方法を提供します。
メッセージ履歴の構成
メッセージ履歴を有効にするには、次の例に示すように、構成で message-history
要素 (または @EnableMessageHistory
) を定義するだけで済みます。
@Configuration
@EnableIntegration
@EnableMessageHistory
<int:message-history/>
これで、すべての名前付きコンポーネント( "id" が定義されているコンポーネント)が追跡されます。フレームワークは、メッセージに "history" ヘッダーを設定します。その値は List<Properties>
です。
次の構成例を検討してください。
@MessagingGateway(defaultRequestChannel = "bridgeInChannel")
public interface SampleGateway {
...
}
@Bean
@Transformer(inputChannel = "enricherChannel", outputChannel="filterChannel")
HeaderEnricher sampleEnricher() {
HeaderEnricher enricher =
new HeaderEnricher(Collections.singletonMap("baz", new StaticHeaderValueMessageProcessor("baz")));
return enricher;
}
<int:gateway id="sampleGateway"
service-interface="org.springframework.integration.history.sample.SampleGateway"
default-request-channel="bridgeInChannel"/>
<int:header-enricher id="sampleEnricher" input-channel="enricherChannel" output-channel="filterChannel">
<int:header name="baz" value="baz"/>
</int:header-enricher>
上記の構成により、単純なメッセージ履歴構造が生成され、出力は次のようになります。
[{name=sampleGateway, type=gateway, timestamp=1283281668091},
{name=sampleEnricher, type=header-enricher, timestamp=1283281668094}]
メッセージ履歴にアクセスするには、MessageHistory
ヘッダーにアクセスするだけで済みます。次の例は、その方法を示しています。
Iterator<Properties> historyIterator =
message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));
すべてのコンポーネントを追跡したくない場合があります。名前に基づいて特定のコンポーネントに履歴を制限するには、tracked-components
属性を指定し、追跡するコンポーネントに一致するコンポーネント名とパターンのコンマ区切りリストを指定できます。次の例は、その方法を示しています。
@Configuration
@EnableIntegration
@EnableMessageHistory("*Gateway", "sample*", "aName")
<int:message-history tracked-components="*Gateway, sample*, aName"/>
上記の例では、メッセージの履歴は、"Gateway" で終わるコンポーネント、"sample" で始まるコンポーネント、名前 "aName" と完全に一致するコンポーネントについてのみ保持されます。
さらに、MessageHistoryConfigurer
Bean が IntegrationMBeanExporter
によって JMX MBean として公開され ( MBean エクスポーターを参照)、実行時にパターンを変更できるようになりました。ただし、パターンを変更するには、Bean を停止 (メッセージ履歴をオフにする) する必要があることに注意してください。この機能は、システムを分析するために履歴を一時的にオンにするのに役立つ場合があります。MBean のオブジェクト名は <domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer
です。
1 つの @EnableMessageHistory (または <message-history/> ) のみを、コンポーネント追跡構成の単一ソースとしてアプリケーションコンテキストで宣言する必要があります。MessageHistoryConfigurer に汎用 Bean 定義を使用しないでください。 |
定義により、メッセージ履歴ヘッダーは不変です(履歴を書き換えることはできません)。メッセージ履歴値を書き込むとき、コンポーネントは新しいメッセージを作成するか(コンポーネントがオリジンの場合)、リクエストメッセージから履歴をコピーし、それを変更して応答メッセージに新しいリストを設定します。どちらの場合でも、メッセージ自体がスレッドの境界を越えている場合でも、値を追加できます。つまり、履歴値により、非同期メッセージフローのデバッグが大幅に簡素化されます。 |
メッセージストア
エンタープライズ統合パターン (英語) (EIP)ブックは、メッセージをバッファリングする機能を持ついくつかのパターンを識別します。例: アグリゲーターは、解放できるまでメッセージをバッファリングし、QueueChannel
は、コンシューマーがそのチャネルからメッセージを明示的に受信するまでメッセージをバッファリングします。メッセージフロー内の任意の時点で発生する可能性がある障害のため、メッセージをバッファリングする EIP コンポーネントは、メッセージが失われる可能性のあるポイントも導入します。
メッセージを失うリスクを軽減するために、EIP はメッセージストア (英語) パターンを定義します。これにより、EIP コンポーネントは通常、ある種の永続ストア(RDBMS など)にメッセージを保存 (英語) できます。
Spring Integration は、以下によってメッセージストアパターンのサポートを提供します。
org.springframework.integration.store.MessageStore
戦略インターフェースの定義このインターフェースのいくつかの実装を提供する
MessageStore
インターフェースを実装するインスタンスを挿入できるように、メッセージをバッファリングする機能を持つすべてのコンポーネントでmessage-store
属性を公開します。
特定のメッセージストア実装の設定方法と特定のバッファリングコンポーネントへの MessageStore
実装の注入方法の詳細は、マニュアル全体で説明されています(QueueChannel、アグリゲーター、遅延器などの特定のコンポーネントを参照)。次の例のペアは、QueueChannel
およびアグリゲーターのメッセージストアへの参照を追加する方法を示しています。
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator … message-store="refToMessageStore"/>
デフォルトでは、メッセージは MessageStore
の実装である o.s.i.store.SimpleMessageStore
を使用してメモリに保存されます。これは、非永続メッセージの潜在的な損失が懸念されない開発環境または単純な低ボリューム環境では問題ない場合があります。ただし、一般的な本番アプリケーションには、メッセージ損失のリスクを軽減するだけでなく、潜在的なメモリ不足エラーを回避するために、より堅牢なオプションが必要です。そのため、さまざまなデータストア用の MessageStore
実装も提供しています。以下は、サポートされている実装の完全なリストです。
JDBC メッセージストア : RDBMS を使用してメッセージを保存する
Redis メッセージストア : Redis キー / 値データストアを使用してメッセージを保存する
MongoDB メッセージストア : MongoDB ドキュメントストアを使用してメッセージを保存する
Gemfire メッセージストア : Gemfire 分散キャッシュを使用してメッセージを保存します
ただし、 メッセージデータ(ペイロードとヘッダー)は、 特定の型のデータを表すヘッダーに特に注意してください。例: ヘッダーの 1 つに Spring Bean のインスタンスが含まれている場合、逆直列化すると、その Bean の別のインスタンスになり、フレームワークによって作成された暗黙的なヘッダーの一部( Spring Integration バージョン 3.0 から、 また、次のようにメッセージフローを構成するときに何が起こるかを考慮してください: ゲートウェイ→キューチャネル(永続的なメッセージストアによってバッキング)→サービスアクティベーター。そのゲートウェイは一時的な応答チャネルを作成しますが、サービスアクティベーターのポーラーがキューから読み取るまでに失われます。この場合も、ヘッダーエンリッチャーを使用して、ヘッダーを 詳しくは、ヘッダーエンリッチャーを参照してください。 |
Spring Integration 4.0 は 2 つの新しいインターフェースを導入しました:
ChannelMessageStore
:QueueChannel
インスタンスに固有の操作を実装するにはPriorityCapableChannelMessageStore
:PriorityChannel
インスタンスに使用されるMessageStore
実装をマークし、持続メッセージの優先順位を提供するため。
実際の動作は実装によって異なります。このフレームワークは、QueueChannel
および PriorityChannel
の永続的な MessageStore
として使用できる次の実装を提供します。
SimpleMessageStore に関する注意 バージョン 4.1 以降、 アグリゲーターなどのコンポーネントの外部のグループストアにアクセスするユーザーは、コピーの代わりにアグリゲーターが使用しているグループへの直接参照を取得するようになりました。アグリゲーターの外部でグループを操作すると、予測できない結果が生じる可能性があります。 このため、このような操作を実行しないか、 |
MessageGroupFactory
を使用する
バージョン 4.3 から、MessageGroupStore
実装の一部にカスタム MessageGroupFactory
戦略を注入して、MessageGroupStore
が使用する MessageGroup
インスタンスを作成およびカスタマイズできます。これはデフォルトで SimpleMessageGroupFactory
になり、GroupType.HASH_SET
(LinkedHashSet
)内部コレクションに基づいて SimpleMessageGroup
インスタンスを生成します。他の可能なオプションは SYNCHRONISED_SET
および BLOCKING_QUEUE
です。最後のオプションを使用して、以前の SimpleMessageGroup
の動作を復元できます。PERSISTENT
オプションも利用できます。詳細については、次のセクションを参照してください。バージョン 5.0.1 から、LIST
オプションは、グループ内のメッセージの順序と一意性が重要でない場合にも使用できます。
永続的な MessageGroupStore
および遅延ロード
バージョン 4.3 以降、すべての永続的な MessageGroupStore
インスタンスは、遅延ロード方式でストアから MessageGroup
インスタンスとその messages
を取得します。ほとんどの場合、各相関操作でストアから MessageGroup
全体をロードするオーバーヘッドを追加する場合、相関 MessageHandler
インスタンス(アグリゲーターおよびリシーケンサーを参照)に役立ちます。
AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
オプションを使用して、構成から遅延ロード動作をオフに切り替えることができます。
MongoDB MessageStore
(MongoDB メッセージストア)および <aggregator>
(アグリゲーター)での遅延ロードのパフォーマンステストでは、次のようなカスタム release-strategy
を使用します。
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
1000 個の単純なメッセージに対して、次のような結果が生成されます。
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
ただし、バージョン 5.5 以降、すべての永続的な MessageGroupStore
実装は、ターゲットデータベースストリーミング API に基づいて streamMessagesForGroup(Object groupId)
契約を提供します。これにより、ストア内のグループが非常に大きい場合のリソース使用率が向上します。フレームワークの内部では、この新しい API は、起動時に永続化されたメッセージを再スケジュールするときに(たとえば)遅延器で使用されます。返された Stream<Message<?>>
は、処理の最後に閉じる必要があります。try-with-resources
による自動クローズを介して。PersistentMessageGroup
が使用されるときはいつでも、その streamMessages()
は MessageGroupStore.streamMessagesForGroup()
に委譲します。
メッセージグループの状態
バージョン 5.5 以降、MessageGroup
抽象化は condition
文字列オプションを提供します。このオプションの値は、グループの決定を行うために何らかの理由で後で解析できるものであれば何でもかまいません。たとえば、相関メッセージハンドラーからの ReleaseStrategy
は、グループ内のすべてのメッセージを繰り返す代わりに、グループからこのプロパティを参照する場合があります。MessageGroupStore
は setGroupCondition(Object groupId, String condition)
API を公開します。この目的のために、setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
オプションが AbstractCorrelatingMessageHandler
に追加されました。この関数は、グループに追加された後の各メッセージと、グループの既存の状態に対して評価されます。実装は、新しい値または既存の値を返すか、ターゲット条件を null
にリセットすることを決定する場合があります。condition
の値は、JSON、SpEL 式、数値、文字列として直列化して後で解析できるものであれば何でもかまいません。例: ファイルアグリゲーターコンポーネントの FileMarkerReleaseStrategy
は、FileSplitter.FileMarker.Mark.END
メッセージの FileHeaders.LINE_COUNT
ヘッダーからグループに条件を入力し、canRelease()
からそれを参照して、グループサイズをこの条件の値と比較します。このように、グループ内のすべてのメッセージを繰り返して、FileHeaders.LINE_COUNT
ヘッダーを持つ FileSplitter.FileMarker.Mark.END
メッセージを見つけることはありません。また、他のすべてのレコードの前に終了マーカーがアグリゲーターに到達できるようにします。たとえば、マルチスレッド環境でファイルを処理する場合です。
さらに、構成の便宜上、GroupConditionProvider
契約が導入されました。AbstractCorrelatingMessageHandler
は、提供された ReleaseStrategy
がこのインターフェースを実装しているかどうかを確認し、グループ条件評価ロジック用に conditionSupplier
を抽出します。
メタデータストア
多くの外部システム、サービス、リソースはトランザクションではなく(Twitter、RSS、ファイルシステムなど)、データを既読としてマークする機能はありません。また、一部の統合ソリューションでは、エンタープライズ統合パターンべき等レシーバー (英語) を実装する必要がある場合があります。このゴールを達成し、外部システムとの次の対話の前にエンドポイントの以前の状態を保存するため、または次のメッセージを処理するために、Spring Integration は、一般的なキー値契約を備えた org.springframework.integration.metadata.MetadataStore
インターフェースの実装としてメタデータストアコンポーネントを提供します。
メタデータストアは、さまざまな種類の汎用メタデータ(たとえば、処理された最後のフィードエントリの公開日)を格納して、フィードアダプターなどのコンポーネントが重複を処理できるように設計されています。コンポーネントに MetadataStore
への参照が直接提供されない場合、メタデータストアを見つけるためのアルゴリズムは次のとおりです。最初に、アプリケーションコンテキストで metadataStore
ID を持つ Bean を探します。見つかった場合は、それを使用します。そうでない場合は、SimpleMetadataStore
の新しいインスタンスを作成します。これは、現在実行中のアプリケーションコンテキストのライフサイクル内でメタデータのみを永続化するメモリ内実装です。つまり、再起動すると、エントリが重複する可能性があります。
アプリケーションコンテキストの再起動間でメタデータを保持する必要がある場合、フレームワークは次の永続的な MetadataStores
を提供します。
PropertiesPersistingMetadataStore
PropertiesPersistingMetadataStore
は、プロパティファイルと PropertiesPersister
(Javadoc) によってサポートされています。
デフォルトでは、アプリケーションコンテキストが正常に閉じられたときの状態のみを保持します。Flushable
を実装しているため、flush()
を呼び出すことで、状態を自由に維持できます。次の例は、XML で "PropertiesPersistingMetadataStore" を構成する方法を示しています。
<bean id="metadataStore"
class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>
または、MetadataStore
インターフェースの独自の実装(たとえば JdbcMetadataStore
)を提供し、それをアプリケーションコンテキストで Bean として構成できます。
バージョン 4.0 以降、SimpleMetadataStore
、PropertiesPersistingMetadataStore
、RedisMetadataStore
は ConcurrentMetadataStore
を実装します。これらはアトミックアップデートを提供し、複数のコンポーネントまたはアプリケーションインスタンスで使用できます。
べき等レシーバーとメタデータストア
メタデータストアは、受信メッセージがすでに処理されており、破棄するか、破棄時に他のロジックを実行できる場合、受信メッセージをフィルターする必要がある場合に、EIP べき等レシーバー (英語) パターンを実装できます。次の構成は、その方法の例を示しています。
<int:filter input-channel="serviceChannel"
output-channel="idempotentServiceChannel"
discard-channel="discardChannel"
expression="@metadataStore.get(headers.businessKey) == null"/>
<int:publish-subscribe-channel id="idempotentServiceChannel"/>
<int:outbound-channel-adapter channel="idempotentServiceChannel"
expression="@metadataStore.put(headers.businessKey, '')"/>
<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>
べき等エントリの value
は有効期限になる可能性があり、その後、そのエントリは何らかのスケジュールされたリーパーによってメタデータストアから削除される必要があります。
べき等レシーバーエンタープライズ統合パターンも参照してください。
MetadataStoreListener
次の例に示すように、一部のメタデータストア(現在は zookeeper のみ)は、アイテムの変更時にイベントを受信するリスナーの登録をサポートしています。
public interface MetadataStoreListener {
void onAdd(String key, String value);
void onRemove(String key, String oldValue);
void onUpdate(String key, String newValue);
}
詳細については、Javadoc を参照してください。イベントのサブセットのみに関心がある場合は、MetadataStoreListenerAdapter
をサブクラス化できます。
制御バス
エンタープライズ統合パターン (英語) (EIP)ブックに従って、コントロールバスの背景となる考え方は、「アプリケーションレベル」メッセージングに使用されるのと同じメッセージングシステムをフレームワーク内のコンポーネントの監視と管理に使用できるということです。Spring Integration では、公開された操作を呼び出す手段としてメッセージを送信できるように、上記のアダプターに基づいて構築しています。
次の例は、XML で制御バスを構成する方法を示しています。
<int:control-bus input-channel="operationChannel"/>
制御バスには、アプリケーションコンテキストで Bean の操作を呼び出すためにアクセスできる入力チャネルがあります。また、エンドポイントをアクティブにするサービスのすべての共通プロパティもあります。例: 操作の結果に、ダウンストリームチャネルに送信する戻り値がある場合、出力チャネルを指定できます。
制御バスは、Spring Expression Language(SpEL)式として入力チャネルでメッセージを実行します。メッセージを受け取り、本文を式にコンパイルし、コンテキストを追加してから実行します。デフォルトのコンテキストは、@ManagedAttribute
または @ManagedOperation
でアノテーションが付けられたメソッドをサポートします。また、Spring の Lifecycle
インターフェース(およびバージョン 5.2 以降の Pausable
拡張)のメソッドをサポートし、Spring の TaskExecutor
および TaskScheduler
実装のいくつかを構成するために使用されるメソッドをサポートします。制御バスで独自のメソッドを使用できるようにする最も簡単な方法は、@ManagedAttribute
または @ManagedOperation
アノテーションを使用することです。これらのアノテーションは、メソッドを JMX MBean レジストリに公開するためにも使用されるため、便利な副産物を提供します。多くの場合、コントロールバスに公開するのと同じ型の操作は、JMX を介して公開するのに適しています。アプリケーションコンテキスト内の特定のインスタンスの解決は、一般的な SpEL 構文で実現されます。これを行うには、Bean の SpEL 接頭辞(@
)を Bean 名に指定します。例: Spring Bean でメソッドを実行するために、クライアントは次のように操作チャネルにメッセージを送信できます。
Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)
式のコンテキストのルートは Message
自体であるため、式内の変数として payload
および headers
にもアクセスできます。これは、Spring Integration エンドポイントの他のすべての式サポートと一致しています。
Java アノテーションを使用すると、コントロールバスを次のように構成できます。
@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
return new ExpressionControlBusFactoryBean();
}
同様に、Java DSL フロー定義を次のように構成できます。
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlows.from("controlBus")
.controlBus()
.get();
}
DirectChannel
の自動作成でラムダを使用する場合は、次のように制御バスを作成できます。
@Bean
public IntegrationFlow controlBus() {
return IntegrationFlowDefinition::controlBus;
}
この場合、チャネルの名前は controlBus.input
です。
正常なシャットダウン
"MBean エクスポーター" に従って、MBean エクスポーターは stopActiveComponents
と呼ばれる JMX 操作を提供します。これは、アプリケーションを正常に停止するために使用されます。操作には単一の Long
パラメーターがあります。このパラメーターは、処理中のメッセージの補完を待機する時間(ミリ秒単位)を示します。操作は次のように機能します。
OrderlyShutdownCapable
を実装するすべての Bean でbeforeShutdown()
を呼び出します。そうすることで、そのようなコンポーネントはシャットダウンの準備ができます。このインターフェースを実装するコンポーネントの例と、この呼び出しで行うことには、リスナーコンテナーを停止する JMS および AMQP メッセージ駆動型アダプター、新しい接続の受け入れを停止する(既存の接続を開いたまま)TCP サーバー接続ファクトリ、ドロップする TCP 受信エンドポイントが含まれます(ログ)受信した新しいメッセージ、新しいリクエストに対して
503 - Service Unavailable
を返す HTTP 受信エンドポイント。JMS または AMQP-backed チャネルなどのアクティブなチャネルを停止します。
すべての
MessageSource
インスタンスを停止します。すべての受信
MessageProducer
を停止します(OrderlyShutdownCapable
ではありません)。操作に渡される
Long
パラメーターの値で定義されているように、残り時間が残っているのを待ちます。そうすることで、飛行中のメッセージがすべての旅を完了することができます。この操作を呼び出すときに適切なタイムアウトを選択することが重要です。
すべての
OrderlyShutdownCapable
コンポーネントでafterShutdown()
を呼び出します。これにより、そのようなコンポーネントは最終的なシャットダウンタスクを実行できます(たとえば、開いているすべてのソケットを閉じる)。
正常なシャットダウン管理操作で説明したように、この操作は JMX を使用して呼び出すことができます。プログラムでメソッドを呼び出す場合は、IntegrationMBeanExporter
への参照を挿入するか、取得する必要があります。id
属性が <int-jmx:mbean-export/>
定義で指定されていない場合、Bean には名前が生成されます。この名前には、同じ JVM(MBeanServer
)に複数の Spring Integration コンテキストが存在する場合に ObjectName
の衝突を回避するためのランダムなコンポーネントが含まれています。
このため、プログラムでメソッドを呼び出す場合は、アプリケーションコンテキストで簡単にアクセスできるように、エクスポーターに id
属性を提供することをお勧めします。
最後に、<control-bus>
要素を使用して操作を呼び出すことができます。詳細については、モニタリング Spring Integration サンプルアプリケーション [GitHub] (英語) を参照してください。
前述のアルゴリズムは、バージョン 4.1 で改善されました。以前は、すべてのタスクエグゼキューターとスケジューラーが停止していました。これにより、QueueChannel インスタンスの中間フローメッセージが残る可能性があります。これで、シャットダウンによりポーラーが実行されたままになり、これらのメッセージを排出して処理できるようになります。 |
統合グラフ
バージョン 4.3 から、Spring Integration はアプリケーションのランタイムオブジェクトモデルへのアクセスを提供します。これには、オプションでコンポーネントメトリクスを含めることができます。グラフとして公開され、統合アプリケーションの現在の状態を視覚化するために使用できます。o.s.i.support.management.graph
パッケージには、Spring Integration コンポーネントのランタイム状態を単一のツリーのような Graph
オブジェクトとして収集、構築、レンダリングするために必要なすべてのクラスが含まれています。IntegrationGraphServer
は、Graph
オブジェクトを作成、取得、リフレッシュするために Bean として宣言する必要があります。結果の Graph
オブジェクトは任意の形式に直列化できますが、JSON はクライアント側で解析および表現するのに柔軟で便利です。デフォルトのコンポーネントのみを持つ Spring Integration アプリケーションは、次のようなグラフを公開します。
{
"contentDescriptor" : {
"providerVersion" : "5.5.8",
"providerFormatVersion" : 1.2,
"provider" : "spring-integration",
"name" : "myAppName:1.0"
},
"nodes" : [ {
"nodeId" : 1,
"componentType" : "null-channel",
"integrationPatternType" : "null_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 0.0,
"max" : 0.0
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"receiveCounters" : {
"successes" : 0,
"failures" : 0
},
"name" : "nullChannel"
}, {
"nodeId" : 2,
"componentType" : "publish-subscribe-channel",
"integrationPatternType" : "publish_subscribe_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 7.807002,
"max" : 7.807002
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorChannel"
}, {
"nodeId" : 3,
"componentType" : "logging-channel-adapter",
"integrationPatternType" : "outbound_channel_adapter",
"integrationPatternCategory" : "messaging_endpoint",
"properties" : { },
"output" : null,
"input" : "errorChannel",
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 6.742722,
"max" : 6.742722
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorLogger"
} ],
"links" : [ {
"from" : 2,
"to" : 3,
"type" : "input"
} ]
}
バージョン 5.2 は、メトリクス管理に従って、Micrometer メーターを優先してレガシーメトリクスを廃止しました。レガシーメトリクスはバージョン 5.4 で削除され、グラフに表示されなくなります。 |
前の例では、グラフは 3 つの最上位要素で構成されています。
contentDescriptor
グラフ要素には、データを提供するアプリケーションに関する一般情報が含まれています。name
は、IntegrationGraphServer
Bean または spring.application.name
アプリケーションコンテキスト環境プロパティでカスタマイズできます。他のプロパティはフレームワークによって提供され、同様のモデルを他のソースと区別できます。
links
グラフ要素は、nodes
グラフ要素のノード間の接続を表します。ソース Spring Integration アプリケーションの統合コンポーネント間の接続を表します。例: MessageChannel
から MessageHandler
を含む EventDrivenConsumer
へ、または AbstractReplyProducingMessageHandler
から MessageChannel
へ。便宜上、リンクの目的を判断できるように、モデルには type
属性が含まれています。可能な型は次のとおりです。
input
:MessageChannel
からエンドポイント、inputChannel
、requestChannel
プロパティへの方向を識別しますoutput
:MessageHandler
、MessageProducer
、SourcePollingChannelAdapter
からoutputChannel
またはreplyChannel
プロパティを介したMessageChannel
への方向error
:PollingConsumer
またはMessageProducer
またはSourcePollingChannelAdapter
上のMessageHandler
からerrorChannel
プロパティを介してMessageChannel
へ。discard
:errorChannel
プロパティを介してDiscardingMessageHandler
(MessageFilter
など)からMessageChannel
へ。route
:AbstractMappingMessageRouter
(HeaderValueRouter
など)からMessageChannel
へ。output
に似ていますが、実行時に決定されます。構成されたチャネルマッピングまたは動的に解決されたチャネルの場合があります。通常、ルーターはこの目的のために最大 100 個の動的ルートのみを保持しますが、dynamicChannelLimit
プロパティを設定することでこの値を変更できます。
この要素からの情報を視覚化ツールで使用して、nodes
グラフ要素のノード間の接続をレンダリングできます。from
および to
番号は、リンクされたノードの nodeId
プロパティの値を表します。例: link
要素を使用して、ターゲットノードで適切な port
を決定できます。
次の「テキストイメージ」は、型間の関連を示しています。
+---(discard) | +----o----+ | | | | | | (input)--o o---(output) | | | | | | +----o----+ | +---(error)
nodes
グラフ要素は、おそらく最も興味深いものです。その要素には、componentType
インスタンスと name
値を持つランタイムコンポーネントが含まれるだけでなく、オプションでコンポーネントによって公開されるメトリクスも含まれる可能性があるためです。ノード要素には、一般的に一目でわかるさまざまなプロパティが含まれています。例: 式ベースのコンポーネントには、コンポーネントの 1 次式文字列を含む expression
プロパティが含まれます。メトリクスを有効にするには、@EnableIntegrationManagement
を @Configuration
クラスに追加するか、<int:management/>
要素を XML 構成に追加します。詳細については、指標と管理を参照してください。
nodeId
は、あるコンポーネントと別のコンポーネントを区別できるようにする一意の増分識別子を表します。links
要素でも使用され、このコンポーネントと他のコンポーネント(ある場合)との関連(接続)を表します。input
および output
属性は、AbstractEndpoint
、MessageHandler
、SourcePollingChannelAdapter
または MessageProducerSupport
の inputChannel
および outputChannel
プロパティ用です。詳細については、次のセクションを参照してください。
バージョン 5.1 以降、IntegrationGraphServer
は、特定の NamedComponent
の IntegrationNode
上の追加プロパティの作成に Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback
を受け入れます。たとえば、SmartLifecycle
autoStartup
、running
プロパティをターゲットグラフに公開できます。
server.setAdditionalPropertiesCallback(namedComponent -> {
Map<String, Object> properties = null;
if (namedComponent instanceof SmartLifecycle) {
SmartLifecycle smartLifecycle = (SmartLifecycle) namedComponent;
properties = new HashMap<>();
properties.put("auto-startup", smartLifecycle.isAutoStartup());
properties.put("running", smartLifecycle.isRunning());
}
return properties;
});
グラフランタイムモデル
Spring Integration コンポーネントには、さまざまなレベルの複雑さがあります。例: ポーリングされた MessageSource
には、ソースデータからメッセージを定期的に送信する SourcePollingChannelAdapter
と MessageChannel
もあります。他のコンポーネントは、メッセージ用の requestChannel
(input
)にサブスクライブ(またはポーリング)するための消費 AbstractEndpoint
と、ダウンストリームに送信するための応答メッセージを生成する replyChannel
(output
)を備えたミドルウェアのリクエスト / 応答コンポーネント(JmsOutboundGateway
など)です。一方、MessageProducerSupport
実装(ApplicationEventListeningMessageProducer
など)は、一部のソースプロトコルリスニングロジックをラップし、outputChannel
にメッセージを送信します。
グラフ内で、Spring Integration コンポーネントは、IntegrationNode
クラス階層を使用して表されます。これは、o.s.i.support.management.graph
パッケージに含まれています。例: AggregatingMessageHandler
には ErrorCapableDiscardingMessageHandlerNode
を使用でき(discardChannel
オプションがあるため)、PollingConsumer
を使用して PollableChannel
からコンシュームするときにエラーを生成できます。もう 1 つの例は CompositeMessageHandlerNode
です。EventDrivenConsumer
を使用して SubscribableChannel
にサブスクライブした場合の MessageHandlerChain
用です。
@MessagingGateway (メッセージングゲートウェイを参照)は、各メソッドにノードを提供します。name 属性は、ゲートウェイの Bean 名と短いメソッドシグネチャーに基づいています。ゲートウェイの次の例を検討してください。 |
@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {
void foo(String foo);
void foo(Integer foo);
void bar(String bar);
}
前述のゲートウェイは、次のようなノードを生成します。
{
"nodeId" : 10,
"name" : "gate.bar(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 11,
"name" : "gate.foo(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 12,
"name" : "gate.foo(class java.lang.Integer)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
}
この IntegrationNode
階層を使用して、クライアント側でグラフモデルを解析したり、一般的な Spring Integration ランタイムの動作を理解したりできます。詳細については、プログラミングのヒントとコツも参照してください。
バージョン 5.3 は、IntegrationPattern
抽象化と、エンタープライズ統合パターン(EIP)を表すすべての標準コンポーネントを導入し、この抽象化を実装し、IntegrationPatternType
列挙値を提供します。この情報は、ターゲットアプリケーションのいくつかの分類ロジックに役立ちます。または、グラフノードに公開されているため、UI がコンポーネントの描画方法を決定するために使用できます。
統合グラフコントローラー
アプリケーションが Web ベース(または Web コンテナーが埋め込まれた Spring Boot 上に構築されている)であり、Spring Integration HTTP または WebFlux モジュール(それぞれ HTTP サポートおよび WebFlux サポートを参照)がクラスパスに存在する場合、IntegrationGraphController
を使用して IntegrationGraphServer
を公開できます。REST サービスとしての機能。この目的のために、@EnableIntegrationGraphController
および @Configuration
クラスのアノテーションと <int-http:graph-controller/>
XML 要素が HTTP モジュールで使用可能です。この構成は、@EnableWebMvc
アノテーション(または XML 定義の場合は <mvc:annotation-driven/>
)とともに、IntegrationGraphController
@RestController
を登録します。この構成では、@RequestMapping.path
を @EnableIntegrationGraphController
アノテーションまたは <int-http:graph-controller/>
要素で構成できます。デフォルトのパスは /integration
です。
IntegrationGraphController
@RestController
は、次のサービスを提供します。
@GetMapping(name = "getGraph")
: 最後のIntegrationGraphServer
リフレッシュ以降の Spring Integration コンポーネントの状態を取得します。o.s.i.support.management.graph.Graph
は、REST サービスの@ResponseBody
として返されます。@GetMapping(path = "/refresh", name = "refreshGraph")
: 実際のランタイム状態の現在のGraph
をリフレッシュし、REST レスポンスとして返します。メトリクスのグラフをリフレッシュする必要はありません。グラフが取得されると、リアルタイムで提供されます。グラフが最後に取得されてからアプリケーションコンテキストが変更された場合、リフレッシュを呼び出すことができます。その場合、グラフは完全に再構築されます。
Spring Security および Spring MVC プロジェクトによって提供される標準の構成オプションとコンポーネントを使用して、IntegrationGraphController
のセキュリティとクロスオリジンの制限を設定できます。次の例は、これらのゴールを達成します。
<mvc:annotation-driven />
<mvc:cors>
<mvc:mapping path="/myIntegration/**"
allowed-origins="http://localhost:9090"
allowed-methods="GET" />
</mvc:cors>
<security:http>
<security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>
<int-http:graph-controller path="/myIntegration" />
次の例は、Java 構成で同じことを行う方法を示しています。
@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/testIntegration/**").hasRole("ADMIN")
// ...
.formLogin();
}
//...
}
便宜上、@EnableIntegrationGraphController
アノテーションは allowedOrigins
属性を提供することに注意してください。これにより、path
への GET
アクセスが提供されます。より高度にするために、標準の Spring MVC メカニズムを使用して CORS マッピングを構成できます。