システムマネジメント

指標と管理

このセクションでは、Spring Integration のメトリクスをキャプチャーする方法について説明します。最近のバージョンでは、Micrometer(https://micrometer.io (英語) を参照)にさらに依存しており、将来のリリースでは Micrometer をさらに使用する予定です。

レガシーメトリクス

レガシーメトリクスはバージョン 5.4 で削除されました。以下の Micrometer 統合を参照してください。

大量の環境でのロギングの無効化

メインメッセージフローでデバッグログを制御できます。非常に大量のアプリケーションでは、isDebugEnabled() の呼び出しは、一部のロギングサブシステムでは非常にコストがかかる可能性があります。このオーバーヘッドを回避するために、このようなすべてのロギングを無効にすることができます。例外ログ(デバッグまたはその他)は、この設定の影響を受けません。

次のリストは、ロギングを制御するために使用可能なオプションを示しています。

Java
@Configuration
@EnableIntegration
@EnableIntegrationManagement(
    defaultLoggingEnabled = "true" <1>)

public static class ContextConfiguration {
...
}
XML
<int:management default-logging-enabled="true"/> (1)
1false に設定すると、ログシステムのカテゴリ設定に関係なく、メインメッセージフローのすべてのロギングが無効になります。デバッグログを有効にするには、"true" に設定します(ログサブシステムでも有効になっている場合)。Bean 定義で設定を明示的に構成していない場合にのみ適用されます。デフォルトは true です。
defaultLoggingEnabled は、Bean 定義で対応する設定を明示的に構成していない場合にのみ適用されます。

Micrometer 統合

概要

バージョン 5.0.3 以降、アプリケーションコンテキストに Micrometer (英語)  MeterRegistry が存在すると、Micrometer メトリクスのサポートがトリガーされます。

Micrometer を使用するには、MeterRegistry Bean の 1 つをアプリケーションコンテキストに追加します。

MessageHandler および MessageChannel ごとに、タイマーが登録されます。MessageSource ごとに、カウンターが登録されます。

これは、AbstractMessageHandlerAbstractMessageChannelAbstractMessageSource を継承するオブジェクトにのみ適用されます(ほとんどのフレームワークコンポーネントに当てはまります)。

メッセージチャネルの送信操作用の Timer メーターには、次の名前またはタグがあります。

  • namespring.integration.send

  • tagtype:channel

  • tagname:<componentName>

  • tagresult:(success|failure)

  • tagexception:(none|exception simple class name)

  • descriptionSend processing time

none 例外を含む failure 結果は、チャネルの send() 操作が false を返したことを意味します)

ポーリング可能なメッセージチャネルでの受信操作用の Counter メーターには、次の名前またはタグがあります。

  • namespring.integration.receive

  • tagtype:channel

  • tagname:<componentName>

  • tagresult:(success|failure)

  • tagexception:(none|exception simple class name)

  • descriptionMessages received

メッセージハンドラーの操作用の Timer メーターには、次の名前またはタグがあります。

  • namespring.integration.send

  • tagtype:handler

  • tagname:<componentName>

  • tagresult:(success|failure)

  • tagexception:(none|exception simple class name)

  • descriptionSend processing time

メッセージソースの Counter メーターには、次の名前 / タグがあります。

  • namespring.integration.receive

  • tagtype:source

  • tagname:<componentName>

  • tagresult:success

  • tagexception:none

  • descriptionMessages received

さらに、3 つの Gauge メーターがあります。

  • spring.integration.channels: アプリケーション内の MessageChannels の数。

  • spring.integration.handlers: アプリケーション内の MessageHandlers の数。

  • spring.integration.sources: アプリケーション内の MessageSources の数。

MicrometerMetricsCaptor のサブクラスを提供することにより、統合コンポーネントによって作成された Meters の名前とタグをカスタマイズできます。MicrometerCustomMetricsTests [GitHub] (英語) テストケースは、その方法の簡単な例を示しています。ビルダーサブクラスの build() メソッドをオーバーロードすることにより、メーターをさらにカスタマイズすることもできます。

バージョン 5.1.13 以降、QueueChannel は、キューサイズと残り容量の Micrometer ゲージを公開します。

  • namespring.integration.channel.queue.size

  • tagtype:channel

  • tagname:<componentName>

  • descriptionThe size of the queue channel

および

  • namespring.integration.channel.queue.remaining.capacity

  • tagtype:channel

  • tagname:<componentName>

  • descriptionThe remaining capacity of the queue channel

メーターの無効化

レガシーメトリクス(現在は削除されています)を使用すると、メトリクスを収集する統合コンポーネントを指定できます。デフォルトでは、すべてのメーターは最初に使用されたときに登録されます。現在、Micrometer を使用すると、MeterFilter を MeterRegistry に追加して、一部またはすべてが登録されないようにすることができます。提供されている任意のプロパティ、nametag などでメーターをフィルターで除外(拒否)できます。詳細については、Micrometer ドキュメントのメーターフィルター (英語) を参照してください。

例: 与えられた:

@Bean
public QueueChannel noMeters() {
    return new QueueChannel(10);
}

次の方法で、このチャネルのみのメーターの登録を抑制することができます。

registry.config().meterFilter(MeterFilter.deny(id ->
        "channel".equals(id.getTag("type")) &&
        "noMeters".equals(id.getTag("name"))));

Spring Integration JMX サポート

JMX サポートも参照してください。

メッセージ履歴

メッセージングアーキテクチャの主な利点は、参加しているコンポーネントが相互の認識を維持できないように、疎結合です。この事実だけで、アプリケーションは非常に柔軟になり、残りのフローに影響を与えずにコンポーネントを変更したり、メッセージングルートを変更したり、メッセージ消費スタイルを変更したりできます(ポーリングとイベントドリブン)。しかし、この控えめなスタイルのアーキテクチャは、物事がうまくいかない場合には難しいことがわかります。デバッグするときは、おそらくメッセージに関する情報(その発信元、通過したチャネル、その他の詳細)をできるだけ多く取得する必要があります。

メッセージ履歴は、デバッグまたは監査証跡を維持するために、メッセージパスの認識レベルを維持するオプションを提供することで役立つパターンの 1 つです。Spring 統合は、メッセージにヘッダーを追加し、メッセージが追跡対象コンポーネントを通過するたびにそのヘッダーを更新することにより、メッセージフローを設定してメッセージ履歴を維持する簡単な方法を提供します。

メッセージ履歴の構成

メッセージ履歴を有効にするには、次の例に示すように、構成で message-history 要素 (または @EnableMessageHistory) を定義するだけで済みます。

@Configuration
@EnableIntegration
@EnableMessageHistory
<int:message-history/>

これで、すべての名前付きコンポーネント( "id" が定義されているコンポーネント)が追跡されます。フレームワークは、メッセージに "history" ヘッダーを設定します。その値は List<Properties> です。

次の構成例を検討してください。

@MessagingGateway(defaultRequestChannel = "bridgeInChannel")
public interface SampleGateway {
   ...
}

@Bean
@Transformer(inputChannel = "enricherChannel", outputChannel="filterChannel")
HeaderEnricher sampleEnricher() {
    HeaderEnricher enricher =
           new HeaderEnricher(Collections.singletonMap("baz", new StaticHeaderValueMessageProcessor("baz")));
    return enricher;
}
<int:gateway id="sampleGateway"
    service-interface="org.springframework.integration.history.sample.SampleGateway"
    default-request-channel="bridgeInChannel"/>

<int:header-enricher id="sampleEnricher" input-channel="enricherChannel" output-channel="filterChannel">
    <int:header name="baz" value="baz"/>
</int:header-enricher>

上記の構成により、単純なメッセージ履歴構造が生成され、出力は次のようになります。

[{name=sampleGateway, type=gateway, timestamp=1283281668091},
 {name=sampleEnricher, type=header-enricher, timestamp=1283281668094}]

メッセージ履歴にアクセスするには、MessageHistory ヘッダーにアクセスするだけで済みます。次の例は、その方法を示しています。

Iterator<Properties> historyIterator =
    message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));

すべてのコンポーネントを追跡したくない場合があります。名前に基づいて特定のコンポーネントに履歴を制限するには、tracked-components 属性を指定し、追跡するコンポーネントに一致するコンポーネント名とパターンのコンマ区切りリストを指定できます。次の例は、その方法を示しています。

@Configuration
@EnableIntegration
@EnableMessageHistory("*Gateway", "sample*", "aName")
<int:message-history tracked-components="*Gateway, sample*, aName"/>

上記の例では、メッセージの履歴は、"Gateway" で終わるコンポーネント、"sample" で始まるコンポーネント、名前 "aName" と完全に一致するコンポーネントについてのみ保持されます。

さらに、MessageHistoryConfigurer Bean が IntegrationMBeanExporter によって JMX MBean として公開され ( MBean エクスポーターを参照)、実行時にパターンを変更できるようになりました。ただし、パターンを変更するには、Bean を停止 (メッセージ履歴をオフにする) する必要があることに注意してください。この機能は、システムを分析するために履歴を一時的にオンにするのに役立つ場合があります。MBean のオブジェクト名は <domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer です。

1 つの @EnableMessageHistory (または <message-history/>) のみを、コンポーネント追跡構成の単一ソースとしてアプリケーションコンテキストで宣言する必要があります。MessageHistoryConfigurer に汎用 Bean 定義を使用しないでください。
定義により、メッセージ履歴ヘッダーは不変です(履歴を書き換えることはできません)。メッセージ履歴値を書き込むとき、コンポーネントは新しいメッセージを作成するか(コンポーネントがオリジンの場合)、リクエストメッセージから履歴をコピーし、それを変更して応答メッセージに新しいリストを設定します。どちらの場合でも、メッセージ自体がスレッドの境界を越えている場合でも、値を追加できます。つまり、履歴値により、非同期メッセージフローのデバッグが大幅に簡素化されます。

メッセージストア

エンタープライズ統合パターン (英語) (EIP)ブックは、メッセージをバッファリングする機能を持ついくつかのパターンを識別します。例: アグリゲーターは、解放できるまでメッセージをバッファリングし、QueueChannel は、コンシューマーがそのチャネルからメッセージを明示的に受信するまでメッセージをバッファリングします。メッセージフロー内の任意の時点で発生する可能性がある障害のため、メッセージをバッファリングする EIP コンポーネントは、メッセージが失われる可能性のあるポイントも導入します。

メッセージを失うリスクを軽減するために、EIP はメッセージストア (英語) パターンを定義します。これにより、EIP コンポーネントは通常、ある種の永続ストア(RDBMS など)にメッセージを保存 (英語) できます。

Spring Integration は、以下によってメッセージストアパターンのサポートを提供します。

  • org.springframework.integration.store.MessageStore 戦略インターフェースの定義

  • このインターフェースのいくつかの実装を提供する

  • MessageStore インターフェースを実装するインスタンスを挿入できるように、メッセージをバッファリングする機能を持つすべてのコンポーネントで message-store 属性を公開します。

特定のメッセージストア実装の設定方法と特定のバッファリングコンポーネントへの MessageStore 実装の注入方法の詳細は、マニュアル全体で説明されています(QueueChannelアグリゲーター遅延器などの特定のコンポーネントを参照)。次の例のペアは、QueueChannel およびアグリゲーターのメッセージストアへの参照を追加する方法を示しています。

例 1: QueueChannel
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>
例 2: アグリゲーター
<int:aggregator … message-store="refToMessageStore"/>

デフォルトでは、メッセージは MessageStore の実装である o.s.i.store.SimpleMessageStore を使用してメモリに保存されます。これは、非永続メッセージの潜在的な損失が懸念されない開発環境または単純な低ボリューム環境では問題ない場合があります。ただし、一般的な本番アプリケーションには、メッセージ損失のリスクを軽減するだけでなく、潜在的なメモリ不足エラーを回避するために、より堅牢なオプションが必要です。そのため、さまざまなデータストア用の MessageStore 実装も提供しています。以下は、サポートされている実装の完全なリストです。

ただし、MessageStore の永続的な実装を使用するときは、いくつかの制限に注意してください。

メッセージデータ(ペイロードとヘッダー)は、MessageStore の実装に応じて、異なる直列化戦略を使用して直列化および非直列化されます。例: JdbcMessageStore を使用する場合、Serializable データのみがデフォルトで保持されます。この場合、直列化が発生する前に、直列化できないヘッダーが削除されます。また、トランスポートアダプター(FTP、HTTP、JMS など)によって挿入されるプロトコル固有のヘッダーにも注意してください。例: <http:inbound-channel-adapter/> は HTTP ヘッダーをメッセージヘッダーにマッピングします。そのうちの 1 つは、直列化できない org.springframework.http.MediaType インスタンスの ArrayList です。ただし、Serializer および Deserializer 戦略インターフェースの独自の実装をいくつかの MessageStore 実装(JdbcMessageStore など)に注入して、シリアライゼーションおよびデシリアライゼーションの動作を変更できます。

特定の型のデータを表すヘッダーに特に注意してください。例: ヘッダーの 1 つに Spring Bean のインスタンスが含まれている場合、逆直列化すると、その Bean の別のインスタンスになり、フレームワークによって作成された暗黙的なヘッダーの一部(REPLY_CHANNEL や ERROR_CHANNEL など)に直接影響する場合があります。現在、それらは直列化できませんが、仮にそうであっても、逆直列化されたチャネルは期待されるインスタンスを表しません。

Spring Integration バージョン 3.0 から、HeaderChannelRegistry でチャネルを登録した後にこれらのヘッダーを名前に置き換えるように構成されたヘッダーエンリッチャーでこの課題を解決できます。

また、次のようにメッセージフローを構成するときに何が起こるかを考慮してください: ゲートウェイ→キューチャネル(永続的なメッセージストアによってバッキング)→サービスアクティベーター。そのゲートウェイは一時的な応答チャネルを作成しますが、サービスアクティベーターのポーラーがキューから読み取るまでに失われます。この場合も、ヘッダーエンリッチャーを使用して、ヘッダーを String 表現に置き換えることができます。

詳しくは、ヘッダーエンリッチャーを参照してください。

Spring Integration 4.0 は 2 つの新しいインターフェースを導入しました:

  • ChannelMessageStoreQueueChannel インスタンスに固有の操作を実装するには

  • PriorityCapableChannelMessageStorePriorityChannel インスタンスに使用される MessageStore 実装をマークし、持続メッセージの優先順位を提供するため。

実際の動作は実装によって異なります。このフレームワークは、QueueChannel および PriorityChannel の永続的な MessageStore として使用できる次の実装を提供します。

SimpleMessageStore に関する注意

バージョン 4.1 以降、SimpleMessageStore は getMessageGroup() を呼び出したときにメッセージグループをコピーしなくなりました。大きなメッセージグループの場合、これは重大なパフォーマンスの問題でした。4.0.1 は、この動作を制御できるブール copyOnGet プロパティを導入しました。アグリゲーターによって内部的に使用される場合、このプロパティはパフォーマンスを改善するために false に設定されました。現在は、デフォルトで false です。

アグリゲーターなどのコンポーネントの外部のグループストアにアクセスするユーザーは、コピーの代わりにアグリゲーターが使用しているグループへの直接参照を取得するようになりました。アグリゲーターの外部でグループを操作すると、予測できない結果が生じる可能性があります。

このため、このような操作を実行しないか、copyOnGet プロパティを true に設定する必要があります。

MessageGroupFactory を使用する

バージョン 4.3 から、MessageGroupStore 実装の一部にカスタム MessageGroupFactory 戦略を注入して、MessageGroupStore が使用する MessageGroup インスタンスを作成およびカスタマイズできます。これはデフォルトで SimpleMessageGroupFactory になり、GroupType.HASH_SET (LinkedHashSet)内部コレクションに基づいて SimpleMessageGroup インスタンスを生成します。他の可能なオプションは SYNCHRONISED_SET および BLOCKING_QUEUE です。最後のオプションを使用して、以前の SimpleMessageGroup の動作を復元できます。PERSISTENT オプションも利用できます。詳細については、次のセクションを参照してください。バージョン 5.0.1 から、LIST オプションは、グループ内のメッセージの順序と一意性が重要でない場合にも使用できます。

永続的な MessageGroupStore および遅延ロード

バージョン 4.3 以降、すべての永続的な MessageGroupStore インスタンスは、遅延ロード方式でストアから MessageGroup インスタンスとその messages を取得します。ほとんどの場合、各相関操作でストアから MessageGroup 全体をロードするオーバーヘッドを追加する場合、相関 MessageHandler インスタンス(アグリゲーターおよびリシーケンサーを参照)に役立ちます。

AbstractMessageGroupStore.setLazyLoadMessageGroups(false) オプションを使用して、構成から遅延ロード動作をオフに切り替えることができます。

MongoDB MessageStore (MongoDB メッセージストア)および <aggregator> (アグリゲーター)での遅延ロードのパフォーマンステストでは、次のようなカスタム release-strategy を使用します。

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

1000 個の単純なメッセージに対して、次のような結果が生成されます。

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager
...

ただし、バージョン 5.5 以降、すべての永続的な MessageGroupStore 実装は、ターゲットデータベースストリーミング API に基づいて streamMessagesForGroup(Object groupId) 契約を提供します。これにより、ストア内のグループが非常に大きい場合のリソース使用率が向上します。フレームワークの内部では、この新しい API は、起動時に永続化されたメッセージを再スケジュールするときに(たとえば)遅延器で使用されます。返された Stream<Message<?>> は、処理の最後に閉じる必要があります。try-with-resources による自動クローズを介して。PersistentMessageGroup が使用されるときはいつでも、その streamMessages() は MessageGroupStore.streamMessagesForGroup() に委譲します。

メッセージグループの状態

バージョン 5.5 以降、MessageGroup 抽象化は condition 文字列オプションを提供します。このオプションの値は、グループの決定を行うために何らかの理由で後で解析できるものであれば何でもかまいません。たとえば、相関メッセージハンドラーからの ReleaseStrategy は、グループ内のすべてのメッセージを繰り返す代わりに、グループからこのプロパティを参照する場合があります。MessageGroupStore は setGroupCondition(Object groupId, String condition) API を公開します。この目的のために、setGroupConditionSupplier(BiFunction<Message<?>, String, String>) オプションが AbstractCorrelatingMessageHandler に追加されました。この関数は、グループに追加された後の各メッセージと、グループの既存の状態に対して評価されます。実装は、新しい値または既存の値を返すか、ターゲット条件を null にリセットすることを決定する場合があります。condition の値は、JSON、SpEL 式、数値、文字列として直列化して後で解析できるものであれば何でもかまいません。例: ファイルアグリゲーターコンポーネントの FileMarkerReleaseStrategy は、FileSplitter.FileMarker.Mark.END メッセージの FileHeaders.LINE_COUNT ヘッダーからグループに条件を入力し、canRelease() からそれを参照して、グループサイズをこの条件の値と比較します。このように、グループ内のすべてのメッセージを繰り返して、FileHeaders.LINE_COUNT ヘッダーを持つ FileSplitter.FileMarker.Mark.END メッセージを見つけることはありません。また、他のすべてのレコードの前に終了マーカーがアグリゲーターに到達できるようにします。たとえば、マルチスレッド環境でファイルを処理する場合です。

さらに、構成の便宜上、GroupConditionProvider 契約が導入されました。AbstractCorrelatingMessageHandler は、提供された ReleaseStrategy がこのインターフェースを実装しているかどうかを確認し、グループ条件評価ロジック用に conditionSupplier を抽出します。

メタデータストア

多くの外部システム、サービス、リソースはトランザクションではなく(Twitter、RSS、ファイルシステムなど)、データを既読としてマークする機能はありません。また、一部の統合ソリューションでは、エンタープライズ統合パターンべき等レシーバー (英語) を実装する必要がある場合があります。このゴールを達成し、外部システムとの次の対話の前にエンドポイントの以前の状態を保存するため、または次のメッセージを処理するために、Spring Integration は、一般的なキー値契約を備えた org.springframework.integration.metadata.MetadataStore インターフェースの実装としてメタデータストアコンポーネントを提供します。

メタデータストアは、さまざまな種類の汎用メタデータ(たとえば、処理された最後のフィードエントリの公開日)を格納して、フィードアダプターなどのコンポーネントが重複を処理できるように設計されています。コンポーネントに MetadataStore への参照が直接提供されない場合、メタデータストアを見つけるためのアルゴリズムは次のとおりです。最初に、アプリケーションコンテキストで metadataStore ID を持つ Bean を探します。見つかった場合は、それを使用します。そうでない場合は、SimpleMetadataStore の新しいインスタンスを作成します。これは、現在実行中のアプリケーションコンテキストのライフサイクル内でメタデータのみを永続化するメモリ内実装です。つまり、再起動すると、エントリが重複する可能性があります。

アプリケーションコンテキストの再起動間でメタデータを保持する必要がある場合、フレームワークは次の永続的な MetadataStores を提供します。

PropertiesPersistingMetadataStore は、プロパティファイルと PropertiesPersister (Javadoc) によってサポートされています。

デフォルトでは、アプリケーションコンテキストが正常に閉じられたときの状態のみを保持します。Flushable を実装しているため、flush() を呼び出すことで、状態を自由に維持できます。次の例は、XML で "PropertiesPersistingMetadataStore" を構成する方法を示しています。

<bean id="metadataStore"
    class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/>

または、MetadataStore インターフェースの独自の実装(たとえば JdbcMetadataStore)を提供し、それをアプリケーションコンテキストで Bean として構成できます。

バージョン 4.0 以降、SimpleMetadataStorePropertiesPersistingMetadataStoreRedisMetadataStore は ConcurrentMetadataStore を実装します。これらはアトミックアップデートを提供し、複数のコンポーネントまたはアプリケーションインスタンスで使用できます。

べき等レシーバーとメタデータストア

メタデータストアは、受信メッセージがすでに処理されており、破棄するか、破棄時に他のロジックを実行できる場合、受信メッセージをフィルターする必要がある場合に、EIP べき等レシーバー (英語) パターンを実装できます。次の構成は、その方法の例を示しています。

<int:filter input-channel="serviceChannel"
			output-channel="idempotentServiceChannel"
			discard-channel="discardChannel"
			expression="@metadataStore.get(headers.businessKey) == null"/>

<int:publish-subscribe-channel id="idempotentServiceChannel"/>

<int:outbound-channel-adapter channel="idempotentServiceChannel"
                              expression="@metadataStore.put(headers.businessKey, '')"/>

<int:service-activator input-channel="idempotentServiceChannel" ref="service"/>

べき等エントリの value は有効期限になる可能性があり、その後、そのエントリは何らかのスケジュールされたリーパーによってメタデータストアから削除される必要があります。

MetadataStoreListener

次の例に示すように、一部のメタデータストア(現在は zookeeper のみ)は、アイテムの変更時にイベントを受信するリスナーの登録をサポートしています。

public interface MetadataStoreListener {

	void onAdd(String key, String value);

	void onRemove(String key, String oldValue);

	void onUpdate(String key, String newValue);
}

詳細については、Javadoc を参照してください。イベントのサブセットのみに関心がある場合は、MetadataStoreListenerAdapter をサブクラス化できます。

制御バス

エンタープライズ統合パターン (英語) (EIP)ブックに従って、コントロールバスの背景となる考え方は、「アプリケーションレベル」メッセージングに使用されるのと同じメッセージングシステムをフレームワーク内のコンポーネントの監視と管理に使用できるということです。Spring Integration では、公開された操作を呼び出す手段としてメッセージを送信できるように、上記のアダプターに基づいて構築しています。

次の例は、XML で制御バスを構成する方法を示しています。

<int:control-bus input-channel="operationChannel"/>

制御バスには、アプリケーションコンテキストで Bean の操作を呼び出すためにアクセスできる入力チャネルがあります。また、エンドポイントをアクティブにするサービスのすべての共通プロパティもあります。例: 操作の結果に、ダウンストリームチャネルに送信する戻り値がある場合、出力チャネルを指定できます。

制御バスは、Spring Expression Language(SpEL)式として入力チャネルでメッセージを実行します。メッセージを受け取り、本文を式にコンパイルし、コンテキストを追加してから実行します。デフォルトのコンテキストは、@ManagedAttribute または @ManagedOperation でアノテーションが付けられたメソッドをサポートします。また、Spring の Lifecycle インターフェース(およびバージョン 5.2 以降の Pausable 拡張)のメソッドをサポートし、Spring の TaskExecutor および TaskScheduler 実装のいくつかを構成するために使用されるメソッドをサポートします。制御バスで独自のメソッドを使用できるようにする最も簡単な方法は、@ManagedAttribute または @ManagedOperation アノテーションを使用することです。これらのアノテーションは、メソッドを JMX MBean レジストリに公開するためにも使用されるため、便利な副産物を提供します。多くの場合、コントロールバスに公開するのと同じ型の操作は、JMX を介して公開するのに適しています。アプリケーションコンテキスト内の特定のインスタンスの解決は、一般的な SpEL 構文で実現されます。これを行うには、Bean の SpEL 接頭辞(@)を Bean 名に指定します。例: Spring Bean でメソッドを実行するために、クライアントは次のように操作チャネルにメッセージを送信できます。

Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)

式のコンテキストのルートは Message 自体であるため、式内の変数として payload および headers にもアクセスできます。これは、Spring Integration エンドポイントの他のすべての式サポートと一致しています。

Java アノテーションを使用すると、コントロールバスを次のように構成できます。

@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
    return new ExpressionControlBusFactoryBean();
}

同様に、Java DSL フロー定義を次のように構成できます。

@Bean
public IntegrationFlow controlBusFlow() {
    return IntegrationFlows.from("controlBus")
              .controlBus()
              .get();
}

DirectChannel の自動作成でラムダを使用する場合は、次のように制御バスを作成できます。

@Bean
public IntegrationFlow controlBus() {
    return IntegrationFlowDefinition::controlBus;
}

この場合、チャネルの名前は controlBus.input です。

正常なシャットダウン

"MBean エクスポーター" に従って、MBean エクスポーターは stopActiveComponents と呼ばれる JMX 操作を提供します。これは、アプリケーションを正常に停止するために使用されます。操作には単一の Long パラメーターがあります。このパラメーターは、処理中のメッセージの補完を待機する時間(ミリ秒単位)を示します。操作は次のように機能します。

  1. OrderlyShutdownCapable を実装するすべての Bean で beforeShutdown() を呼び出します。

    そうすることで、そのようなコンポーネントはシャットダウンの準備ができます。このインターフェースを実装するコンポーネントの例と、この呼び出しで行うことには、リスナーコンテナーを停止する JMS および AMQP メッセージ駆動型アダプター、新しい接続の受け入れを停止する(既存の接続を開いたまま)TCP サーバー接続ファクトリ、ドロップする TCP 受信エンドポイントが含まれます(ログ)受信した新しいメッセージ、新しいリクエストに対して 503 - Service Unavailable を返す HTTP 受信エンドポイント。

  2. JMS または AMQP-backed チャネルなどのアクティブなチャネルを停止します。

  3. すべての MessageSource インスタンスを停止します。

  4. すべての受信 MessageProducer を停止します(OrderlyShutdownCapable ではありません)。

  5. 操作に渡される Long パラメーターの値で定義されているように、残り時間が残っているのを待ちます。

    そうすることで、飛行中のメッセージがすべての旅を完了することができます。この操作を呼び出すときに適切なタイムアウトを選択することが重要です。

  6. すべての OrderlyShutdownCapable コンポーネントで afterShutdown() を呼び出します。

    これにより、そのようなコンポーネントは最終的なシャットダウンタスクを実行できます(たとえば、開いているすべてのソケットを閉じる)。

正常なシャットダウン管理操作で説明したように、この操作は JMX を使用して呼び出すことができます。プログラムでメソッドを呼び出す場合は、IntegrationMBeanExporter への参照を挿入するか、取得する必要があります。id 属性が <int-jmx:mbean-export/> 定義で指定されていない場合、Bean には名前が生成されます。この名前には、同じ JVM(MBeanServer)に複数の Spring Integration コンテキストが存在する場合に ObjectName の衝突を回避するためのランダムなコンポーネントが含まれています。

このため、プログラムでメソッドを呼び出す場合は、アプリケーションコンテキストで簡単にアクセスできるように、エクスポーターに id 属性を提供することをお勧めします。

最後に、<control-bus> 要素を使用して操作を呼び出すことができます。詳細については、モニタリング Spring Integration サンプルアプリケーション [GitHub] (英語) を参照してください。

前述のアルゴリズムは、バージョン 4.1 で改善されました。以前は、すべてのタスクエグゼキューターとスケジューラーが停止していました。これにより、QueueChannel インスタンスの中間フローメッセージが残る可能性があります。これで、シャットダウンによりポーラーが実行されたままになり、これらのメッセージを排出して処理できるようになります。

統合グラフ

バージョン 4.3 から、Spring Integration はアプリケーションのランタイムオブジェクトモデルへのアクセスを提供します。これには、オプションでコンポーネントメトリクスを含めることができます。グラフとして公開され、統合アプリケーションの現在の状態を視覚化するために使用できます。o.s.i.support.management.graph パッケージには、Spring Integration コンポーネントのランタイム状態を単一のツリーのような Graph オブジェクトとして収集、構築、レンダリングするために必要なすべてのクラスが含まれています。IntegrationGraphServer は、Graph オブジェクトを作成、取得、リフレッシュするために Bean として宣言する必要があります。結果の Graph オブジェクトは任意の形式に直列化できますが、JSON はクライアント側で解析および表現するのに柔軟で便利です。デフォルトのコンポーネントのみを持つ Spring Integration アプリケーションは、次のようなグラフを公開します。

{
  "contentDescriptor" : {
    "providerVersion" : "5.5.8",
    "providerFormatVersion" : 1.2,
    "provider" : "spring-integration",
    "name" : "myAppName:1.0"
  },
  "nodes" : [ {
    "nodeId" : 1,
    "componentType" : "null-channel",
    "integrationPatternType" : "null_channel",
    "integrationPatternCategory" : "messaging_channel",
    "properties" : { },
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 0.0,
        "max" : 0.0
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "receiveCounters" : {
      "successes" : 0,
      "failures" : 0
    },
    "name" : "nullChannel"
  }, {
    "nodeId" : 2,
    "componentType" : "publish-subscribe-channel",
    "integrationPatternType" : "publish_subscribe_channel",
    "integrationPatternCategory" : "messaging_channel",
    "properties" : { },
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 7.807002,
        "max" : 7.807002
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "name" : "errorChannel"
  }, {
    "nodeId" : 3,
    "componentType" : "logging-channel-adapter",
    "integrationPatternType" : "outbound_channel_adapter",
    "integrationPatternCategory" : "messaging_endpoint",
    "properties" : { },
    "output" : null,
    "input" : "errorChannel",
    "sendTimers" : {
      "successes" : {
        "count" : 1,
        "mean" : 6.742722,
        "max" : 6.742722
      },
      "failures" : {
        "count" : 0,
        "mean" : 0.0,
        "max" : 0.0
      }
    },
    "name" : "errorLogger"
  } ],
  "links" : [ {
    "from" : 2,
    "to" : 3,
    "type" : "input"
  } ]
}
バージョン 5.2 は、メトリクス管理に従って、Micrometer メーターを優先してレガシーメトリクスを廃止しました。レガシーメトリクスはバージョン 5.4 で削除され、グラフに表示されなくなります。

前の例では、グラフは 3 つの最上位要素で構成されています。

contentDescriptor グラフ要素には、データを提供するアプリケーションに関する一般情報が含まれています。name は、IntegrationGraphServer Bean または spring.application.name アプリケーションコンテキスト環境プロパティでカスタマイズできます。他のプロパティはフレームワークによって提供され、同様のモデルを他のソースと区別できます。

links グラフ要素は、nodes グラフ要素のノード間の接続を表します。ソース Spring Integration アプリケーションの統合コンポーネント間の接続を表します。例: MessageChannel から MessageHandler を含む EventDrivenConsumer へ、または AbstractReplyProducingMessageHandler から MessageChannel へ。便宜上、リンクの目的を判断できるように、モデルには type 属性が含まれています。可能な型は次のとおりです。

  • inputMessageChannel からエンドポイント、inputChannelrequestChannel プロパティへの方向を識別します

  • outputMessageHandlerMessageProducerSourcePollingChannelAdapter から outputChannel または replyChannel プロパティを介した MessageChannel への方向

  • errorPollingConsumer または MessageProducer または SourcePollingChannelAdapter 上の MessageHandler から errorChannel プロパティを介して MessageChannel へ。

  • discarderrorChannel プロパティを介して DiscardingMessageHandler (MessageFilter など)から MessageChannel へ。

  • routeAbstractMappingMessageRouter (HeaderValueRouter など)から MessageChannel へ。output に似ていますが、実行時に決定されます。構成されたチャネルマッピングまたは動的に解決されたチャネルの場合があります。通常、ルーターはこの目的のために最大 100 個の動的ルートのみを保持しますが、dynamicChannelLimit プロパティを設定することでこの値を変更できます。

この要素からの情報を視覚化ツールで使用して、nodes グラフ要素のノード間の接続をレンダリングできます。from および to 番号は、リンクされたノードの nodeId プロパティの値を表します。例: link 要素を使用して、ターゲットノードで適切な port を決定できます。

次の「テキストイメージ」は、型間の関連を示しています。

              +---(discard)
              |
         +----o----+
         |         |
         |         |
         |         |
(input)--o         o---(output)
         |         |
         |         |
         |         |
         +----o----+
              |
              +---(error)

nodes グラフ要素は、おそらく最も興味深いものです。その要素には、componentType インスタンスと name 値を持つランタイムコンポーネントが含まれるだけでなく、オプションでコンポーネントによって公開されるメトリクスも含まれる可能性があるためです。ノード要素には、一般的に一目でわかるさまざまなプロパティが含まれています。例: 式ベースのコンポーネントには、コンポーネントの 1 次式文字列を含む expression プロパティが含まれます。メトリクスを有効にするには、@EnableIntegrationManagement を @Configuration クラスに追加するか、<int:management/> 要素を XML 構成に追加します。詳細については、指標と管理を参照してください。

nodeId は、あるコンポーネントと別のコンポーネントを区別できるようにする一意の増分識別子を表します。links 要素でも使用され、このコンポーネントと他のコンポーネント(ある場合)との関連(接続)を表します。input および output 属性は、AbstractEndpointMessageHandlerSourcePollingChannelAdapter または MessageProducerSupport の inputChannel および outputChannel プロパティ用です。詳細については、次のセクションを参照してください。

バージョン 5.1 以降、IntegrationGraphServer は、特定の NamedComponent の IntegrationNode 上の追加プロパティの作成に Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback を受け入れます。たとえば、SmartLifecycleautoStartuprunning プロパティをターゲットグラフに公開できます。

server.setAdditionalPropertiesCallback(namedComponent -> {
            Map<String, Object> properties = null;
            if (namedComponent instanceof SmartLifecycle) {
                SmartLifecycle smartLifecycle = (SmartLifecycle) namedComponent;
                properties = new HashMap<>();
                properties.put("auto-startup", smartLifecycle.isAutoStartup());
                properties.put("running", smartLifecycle.isRunning());
            }
            return properties;
        });

グラフランタイムモデル

Spring Integration コンポーネントには、さまざまなレベルの複雑さがあります。例: ポーリングされた MessageSource には、ソースデータからメッセージを定期的に送信する SourcePollingChannelAdapter と MessageChannel もあります。他のコンポーネントは、メッセージ用の requestChannel (input)にサブスクライブ(またはポーリング)するための消費 AbstractEndpoint と、ダウンストリームに送信するための応答メッセージを生成する replyChannel (output)を備えたミドルウェアのリクエスト / 応答コンポーネント(JmsOutboundGateway など)です。一方、MessageProducerSupport 実装(ApplicationEventListeningMessageProducer など)は、一部のソースプロトコルリスニングロジックをラップし、outputChannel にメッセージを送信します。

グラフ内で、Spring Integration コンポーネントは、IntegrationNode クラス階層を使用して表されます。これは、o.s.i.support.management.graph パッケージに含まれています。例: AggregatingMessageHandler には ErrorCapableDiscardingMessageHandlerNode を使用でき(discardChannel オプションがあるため)、PollingConsumer を使用して PollableChannel からコンシュームするときにエラーを生成できます。もう 1 つの例は CompositeMessageHandlerNode です。EventDrivenConsumer を使用して SubscribableChannel にサブスクライブした場合の MessageHandlerChain 用です。

@MessagingGateway (メッセージングゲートウェイを参照)は、各メソッドにノードを提供します。name 属性は、ゲートウェイの Bean 名と短いメソッドシグネチャーに基づいています。ゲートウェイの次の例を検討してください。
@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {

	void foo(String foo);

	void foo(Integer foo);

	void bar(String bar);

}

前述のゲートウェイは、次のようなノードを生成します。

{
  "nodeId" : 10,
  "name" : "gate.bar(class java.lang.String)",
  "stats" : null,
  "componentType" : "gateway",
  "integrationPatternType" : "gateway",
  "integrationPatternCategory" : "messaging_endpoint",
  "output" : "four",
  "errors" : null
},
{
  "nodeId" : 11,
  "name" : "gate.foo(class java.lang.String)",
  "stats" : null,
  "componentType" : "gateway",
  "integrationPatternType" : "gateway",
  "integrationPatternCategory" : "messaging_endpoint",
  "output" : "four",
  "errors" : null
},
{
  "nodeId" : 12,
  "name" : "gate.foo(class java.lang.Integer)",
  "stats" : null,
  "componentType" : "gateway",
  "integrationPatternType" : "gateway",
  "integrationPatternCategory" : "messaging_endpoint",
  "output" : "four",
  "errors" : null
}

この IntegrationNode 階層を使用して、クライアント側でグラフモデルを解析したり、一般的な Spring Integration ランタイムの動作を理解したりできます。詳細については、プログラミングのヒントとコツも参照してください。

バージョン 5.3 は、IntegrationPattern 抽象化と、エンタープライズ統合パターン(EIP)を表すすべての標準コンポーネントを導入し、この抽象化を実装し、IntegrationPatternType 列挙値を提供します。この情報は、ターゲットアプリケーションのいくつかの分類ロジックに役立ちます。または、グラフノードに公開されているため、UI がコンポーネントの描画方法を決定するために使用できます。

統合グラフコントローラー

アプリケーションが Web ベース(または Web コンテナーが埋め込まれた Spring Boot 上に構築されている)であり、Spring Integration HTTP または WebFlux モジュール(それぞれ HTTP サポートおよび WebFlux サポートを参照)がクラスパスに存在する場合、IntegrationGraphController を使用して IntegrationGraphServer を公開できます。REST サービスとしての機能。この目的のために、@EnableIntegrationGraphController および @Configuration クラスのアノテーションと <int-http:graph-controller/> XML 要素が HTTP モジュールで使用可能です。この構成は、@EnableWebMvc アノテーション(または XML 定義の場合は <mvc:annotation-driven/>)とともに、IntegrationGraphController@RestController を登録します。この構成では、@RequestMapping.path を @EnableIntegrationGraphController アノテーションまたは <int-http:graph-controller/> 要素で構成できます。デフォルトのパスは /integration です。

IntegrationGraphController@RestController は、次のサービスを提供します。

  • @GetMapping(name = "getGraph"): 最後の IntegrationGraphServer リフレッシュ以降の Spring Integration コンポーネントの状態を取得します。o.s.i.support.management.graph.Graph は、REST サービスの @ResponseBody として返されます。

  • @GetMapping(path = "/refresh", name = "refreshGraph"): 実際のランタイム状態の現在の Graph をリフレッシュし、REST レスポンスとして返します。メトリクスのグラフをリフレッシュする必要はありません。グラフが取得されると、リアルタイムで提供されます。グラフが最後に取得されてからアプリケーションコンテキストが変更された場合、リフレッシュを呼び出すことができます。その場合、グラフは完全に再構築されます。

Spring Security および Spring MVC プロジェクトによって提供される標準の構成オプションとコンポーネントを使用して、IntegrationGraphController のセキュリティとクロスオリジンの制限を設定できます。次の例は、これらのゴールを達成します。

<mvc:annotation-driven />

<mvc:cors>
	<mvc:mapping path="/myIntegration/**"
				 allowed-origins="http://localhost:9090"
				 allowed-methods="GET" />
</mvc:cors>

<security:http>
    <security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>


<int-http:graph-controller path="/myIntegration" />

次の例は、Java 構成で同じことを行う方法を示しています。

@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {

    @Override
    protected void configure(HttpSecurity http) throws Exception {
	    http
            .authorizeRequests()
               .antMatchers("/testIntegration/**").hasRole("ADMIN")
            // ...
            .formLogin();
    }

    //...

}

便宜上、@EnableIntegrationGraphController アノテーションは allowedOrigins 属性を提供することに注意してください。これにより、path への GET アクセスが提供されます。より高度にするために、標準の Spring MVC メカニズムを使用して CORS マッピングを構成できます。