システムマネジメント
指標と管理
このセクションでは、Spring Integration のメトリクスをキャプチャーする方法について説明します。最近のバージョンでは、Micrometer(https://micrometer.io (英語) を参照)にさらに依存しており、将来のリリースでは Micrometer をさらに使用する予定です。
大量の環境でのロギングの無効化
メインメッセージフローでデバッグログを制御できます。非常に大量のアプリケーションでは、isDebugEnabled() の呼び出しは、一部のロギングサブシステムでは非常にコストがかかる可能性があります。このオーバーヘッドを回避するために、このようなすべてのロギングを無効にすることができます。例外ログ(デバッグまたはその他)は、この設定の影響を受けません。
次のリストは、ロギングを制御するために使用可能なオプションを示しています。
@Configuration
@EnableIntegration
@EnableIntegrationManagement(
defaultLoggingEnabled = "true" <1>)
public static class ContextConfiguration {
...
}
<int:management default-logging-enabled="true"/> (1)| 1 | false に設定すると、ログシステムのカテゴリ設定に関係なく、メインメッセージフローのすべてのロギングが無効になります。デバッグログを有効にするには、"true" に設定します(ログサブシステムでも有効になっている場合)。Bean 定義で設定を明示的に構成していない場合にのみ適用されます。デフォルトは true です。 |
defaultLoggingEnabled は、Bean 定義で対応する設定を明示的に構成していない場合にのみ適用されます。 |
Micrometer 統合
概要
バージョン 5.0.3 以降、アプリケーションコンテキストに Micrometer (英語) MeterRegistry が存在すると、Micrometer メトリクスのサポートがトリガーされます。
Micrometer を使用するには、MeterRegistry Bean の 1 つをアプリケーションコンテキストに追加します。
MessageHandler および MessageChannel ごとに、タイマーが登録されます。MessageSource ごとに、カウンターが登録されます。
これは、AbstractMessageHandler、AbstractMessageChannel、AbstractMessageSource を継承するオブジェクトにのみ適用されます(ほとんどのフレームワークコンポーネントに当てはまります)。
メッセージチャネルの送信操作用の Timer メーターには、次の名前またはタグがあります。
name:spring.integration.sendtag:type:channeltag:name:<componentName>tag:result:(success|failure)tag:exception:(none|exception simple class name)description:Send processing time
(none 例外を含む failure 結果は、チャネルの send() 操作が false を返したことを意味します)
ポーリング可能なメッセージチャネルでの受信操作用の Counter メーターには、次の名前またはタグがあります。
name:spring.integration.receivetag:type:channeltag:name:<componentName>tag:result:(success|failure)tag:exception:(none|exception simple class name)description:Messages received
メッセージハンドラーの操作用の Timer メーターには、次の名前またはタグがあります。
name:spring.integration.sendtag:type:handlertag:name:<componentName>tag:result:(success|failure)tag:exception:(none|exception simple class name)description:Send processing time
メッセージソースの Counter メーターには、次の名前 / タグがあります。
name:spring.integration.receivetag:type:sourcetag:name:<componentName>tag:result:successtag:exception:nonedescription:Messages received
さらに、3 つの Gauge メーターがあります。
spring.integration.channels: アプリケーション内のMessageChannelsの数。spring.integration.handlers: アプリケーション内のMessageHandlersの数。spring.integration.sources: アプリケーション内のMessageSourcesの数。
MicrometerMetricsCaptor のサブクラスを提供することにより、統合コンポーネントによって作成された Meters の名前とタグをカスタマイズできます。MicrometerCustomMetricsTests [GitHub] (英語) テストケースは、その方法の簡単な例を示しています。ビルダーサブクラスの build() メソッドをオーバーロードすることにより、メーターをさらにカスタマイズすることもできます。
バージョン 5.1.13 以降、QueueChannel は、キューサイズと残り容量の Micrometer ゲージを公開します。
name:spring.integration.channel.queue.sizetag:type:channeltag:name:<componentName>description:The size of the queue channel
および
name:spring.integration.channel.queue.remaining.capacitytag:type:channeltag:name:<componentName>description:The remaining capacity of the queue channel
メーターの無効化
デフォルトでは、最初の使用時にすべてのメーターが登録されます。Micrometer を使用すると、MeterFilter を MeterRegistry に追加して、一部またはすべてが登録されないようにすることができます。name、tag など、提供されている任意のプロパティでメーターを除外 (拒否) できます。詳細については、Micrometer ドキュメントのメーターフィルター (英語) を参照してください。
例: 与えられた:
@Bean
public QueueChannel noMeters() {
return new QueueChannel(10);
}
次の方法で、このチャネルのみのメーターの登録を抑制することができます。
registry.config().meterFilter(MeterFilter.deny(id ->
"channel".equals(id.getTag("type")) &&
"noMeters".equals(id.getTag("name"))));
Micrometer Observation
バージョン 6.0 から、Spring Integration は Micrometer Observation 抽象化を利用し、適切な ObservationHandler 構成を介してメトリクスとトレース (英語) を処理できます。
ObservationRegistry Bean がアプリケーションコンテキストに存在し、@EnableIntegrationManagement が構成されている場合は常に、IntegrationManagement コンポーネントで監視処理が有効になります。計測するコンポーネントのセットをカスタマイズするために、observationPatterns() 属性が @EnableIntegrationManagement アノテーションで公開されます。パターンマッチングアルゴリズムについては、javadoc を参照してください。
デフォルトでは、ObservationRegistry Bean で計測される IntegrationManagement コンポーネントはありません。すべてのコンポーネントに一致するように * として構成できます。 |
この場合、メーターは個別に収集されませんが、提供された ObservationRegistry で構成された適切な ObservationHandler に委譲されます。
次の Spring Integration コンポーネントは、それぞれの規則を持つ監視ロジックを備えています。
フローの受信エンドポイントである
MessageProducerSupportは、CONSUMERスパン型と見なされ、IntegrationObservation.HANDLERAPI を使用します。MessagingGatewaySupport ` は受信 リクエスト / リプライエンドポイントであり、
SERVERスパン型と見なされます。IntegrationObservation.GATEWAYAPI を使用します。AbstractMessageChannel.send()操作は、メッセージを生成する唯一の Spring Integration API です。そのため、PRODUCERスパン型として扱われ、IntegrationObservation.PRODCUERAPI を使用します。これは、チャネルが分散実装 (例:PublishSubscribeKafkaChannelまたはZeroMqChannel) であり、トレース情報をメッセージに追加する必要がある場合に、より意味があります。IntegrationObservation.PRODUCER観測はMessageSenderContextに基づいており、Spring Integration がMutableMessageを提供して、後続の追跡Propagatorがヘッダーを追加できるようにして、コンシューマーが使用できるようにします。AbstractMessageHandlerはCONSUMERスパン型であり、IntegrationObservation.HANDLERAPI を使用します。
IntegrationManagement コンポーネントの観測生成は、ObservationConvention 構成を介してカスタマイズできます。たとえば、AbstractMessageHandler は、その setObservationConvention() API を介して MessageReceiverObservationConvention を期待します。
以下は、Observation API でサポートされているメトリクス、スパン、規則です。
可観測性 - メトリクス
以下に、このプロジェクトで宣言されたすべての指標のリストを示します。
ゲートウェイ
受信メッセージゲートウェイの監視。
指標名 spring.integration.gateway (規約クラス o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention で定義)。タイプ timer.
指標名 spring.integration.gateway.active (規約クラス o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention で定義)。タイプ long task timer.
| 観測の開始後に追加された KeyValues は、*.active メトリクスから欠落している可能性があります。 |
Micrometer は、ベースユニットに nanoseconds を内部的に使用します。ただし、各バックエンドが実際のベースユニットを決定します。(つまり、Prometheus は秒を使用します) |
外側のクラス o.s.i.support.management.observation.IntegrationObservation の完全修飾名。
すべてのタグには、spring.integration. プレフィックスを付ける必要があります。 |
名前 | 説明 |
| メッセージゲートウェイコンポーネントの名前。 |
| リクエスト / リプライ実行の結果。 |
| コンポーネントの型 - 「ゲートウェイ」。 |
ハンドラー
メッセージハンドラーの観測。
指標名 spring.integration.handler (規約クラス o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention で定義)。タイプ timer.
指標名 spring.integration.handler.active (規約クラス o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention で定義)。タイプ long task timer.
| 観測の開始後に追加された KeyValues は、*.active メトリクスから欠落している可能性があります。 |
Micrometer は、ベースユニットに nanoseconds を内部的に使用します。ただし、各バックエンドが実際のベースユニットを決定します。(つまり、Prometheus は秒を使用します) |
外側のクラス o.s.i.support.management.observation.IntegrationObservation の完全修飾名。
すべてのタグには、spring.integration. プレフィックスを付ける必要があります。 |
名前 | 説明 |
| メッセージハンドラーコンポーネントの名前。 |
| コンポーネントの型 - 「ハンドラー」。 |
プロデューサー
メッセージプロデューサーの観測。チャンネル。
指標名 spring.integration.producer (規約クラス o.s.i.support.management.observation.DefaultMessageSenderObservationConvention で定義)。タイプ timer.
指標名 spring.integration.producer.active (規約クラス o.s.i.support.management.observation.DefaultMessageSenderObservationConvention で定義)。タイプ long task timer.
| 観測の開始後に追加された KeyValues は、*.active メトリクスから欠落している可能性があります。 |
Micrometer は、ベースユニットに nanoseconds を内部的に使用します。ただし、各バックエンドが実際のベースユニットを決定します。(つまり、Prometheus は秒を使用します) |
外側のクラス o.s.i.support.management.observation.IntegrationObservation の完全修飾名。
すべてのタグには、spring.integration. プレフィックスを付ける必要があります。 |
名前 | 説明 |
| メッセージハンドラーコンポーネントの名前。 |
| コンポーネントの型 - 「プロデューサー」。 |
可観測性 - スパン
以下に、このプロジェクトで宣言されたすべてのスパンのリストを示します。
ゲートウェイスパン
受信メッセージゲートウェイの監視。
スパン名 spring.integration.gateway (規約クラス o.s.i.support.management.observation.DefaultMessageRequestReplyReceiverObservationConvention で定義)。
外側のクラス o.s.i.support.management.observation.IntegrationObservation の完全修飾名。
すべてのタグには、spring.integration. プレフィックスを付ける必要があります。 |
名前 | 説明 |
|
Name of the message gateway component. |
|
Outcome of the request/reply execution. |
|
Type of the component - 'gateway'. |
Handler Span
Observation for message handlers.
Span name spring.integration.handler (defined by convention class o.s.i.support.management.observation.DefaultMessageReceiverObservationConvention).
Fully qualified name of the enclosing class o.s.i.support.management.observation.IntegrationObservation.
All tags must be prefixed with spring.integration. prefix!
|
Name |
Description |
|
Name of the message handler component. |
|
Type of the component - 'handler'. |
Producer Span
Observation for message producers, e.g. channels.
Span name spring.integration.producer (defined by convention class o.s.i.support.management.observation.DefaultMessageSenderObservationConvention).
Fully qualified name of the enclosing class o.s.i.support.management.observation.IntegrationObservation.
All tags must be prefixed with spring.integration. prefix!
|
Name |
Description |
|
Name of the message handler component. |
|
Type of the component - 'producer'. |
Observability - Conventions
Below you can find a list of all GlobalObservationConvention and ObservationConvention declared by this project.
ObservationConvention Class Name |
Applicable ObservationContext Class Name |
|
|
|
|
|
|
|
|
|
|
|
|
観測伝播
メッセージングフローの性質とは無関係に、1 つのトレースで接続されたスパンの チェーンを提供するために、Spring Integration は ObservationPropagationChannelInterceptor 実装を提供します。これは、個別に MessageChannnel Bean で構成することも、それぞれの MessageChannnel Bean 名パターンマッチングを使用して @GlobalChannelInterceptor として構成することもできます。このインターセプターの目的は、MessageChannnel の実装と性質に関係なく、プロデューサースレッドからコンシューマースレッドに Observation を伝達することです。ただし、DirectChannel は無視されます。これは、そのコンシューマーがプロデューサースレッドで直接実行されるためです。
Spring Integration JMX サポート
JMX サポートも参照してください。
メッセージ履歴
メッセージングアーキテクチャの主な利点は、参加しているコンポーネントが相互の認識を維持できないように、疎結合です。この事実だけで、アプリケーションは非常に柔軟になり、残りのフローに影響を与えずにコンポーネントを変更したり、メッセージングルートを変更したり、メッセージ消費スタイルを変更したりできます(ポーリングとイベントドリブン)。しかし、この控えめなスタイルのアーキテクチャは、物事がうまくいかない場合には難しいことがわかります。デバッグするときは、おそらくメッセージに関する情報(その発信元、通過したチャネル、その他の詳細)をできるだけ多く取得する必要があります。
メッセージ履歴は、デバッグまたは監査証跡を維持するために、メッセージパスの認識レベルを維持するオプションを提供することで役立つパターンの 1 つです。Spring 統合は、メッセージにヘッダーを追加し、メッセージが追跡対象コンポーネントを通過するたびにそのヘッダーを更新することにより、メッセージフローを設定してメッセージ履歴を維持する簡単な方法を提供します。
メッセージ履歴の構成
メッセージ履歴を有効にするには、次の例に示すように、構成で message-history 要素 (または @EnableMessageHistory) を定義するだけで済みます。
@Configuration
@EnableIntegration
@EnableMessageHistory
<int:message-history/> これで、すべての名前付きコンポーネント ( "id" が定義されている) が追跡されます。フレームワークは、メッセージに「履歴」ヘッダーを設定します。その値は List<Properties> です。
次の構成例を検討してください。
@MessagingGateway(defaultRequestChannel = "bridgeInChannel")
public interface SampleGateway {
...
}
@Bean
@Transformer(inputChannel = "enricherChannel", outputChannel="filterChannel")
HeaderEnricher sampleEnricher() {
HeaderEnricher enricher =
new HeaderEnricher(Collections.singletonMap("baz", new StaticHeaderValueMessageProcessor("baz")));
return enricher;
}
<int:gateway id="sampleGateway"
service-interface="org.springframework.integration.history.sample.SampleGateway"
default-request-channel="bridgeInChannel"/>
<int:header-enricher id="sampleEnricher" input-channel="enricherChannel" output-channel="filterChannel">
<int:header name="baz" value="baz"/>
</int:header-enricher>上記の構成により、単純なメッセージ履歴構造が生成され、出力は次のようになります。
[{name=sampleGateway, type=gateway, timestamp=1283281668091},
{name=sampleEnricher, type=header-enricher, timestamp=1283281668094}] メッセージ履歴にアクセスするには、MessageHistory ヘッダーにアクセスするだけで済みます。次の例は、その方法を示しています。
Iterator<Properties> historyIterator =
message.getHeaders().get(MessageHistory.HEADER_NAME, MessageHistory.class).iterator();
assertTrue(historyIterator.hasNext());
Properties gatewayHistory = historyIterator.next();
assertEquals("sampleGateway", gatewayHistory.get("name"));
assertTrue(historyIterator.hasNext());
Properties chainHistory = historyIterator.next();
assertEquals("sampleChain", chainHistory.get("name"));
すべてのコンポーネントを追跡したくない場合があります。名前に基づいて履歴を特定のコンポーネントに限定するには、tracked-components 属性を指定し、追跡するコンポーネントに一致するコンポーネント名とパターンのコンマ区切りリストを指定します。次の例は、その方法を示しています。
@Configuration
@EnableIntegration
@EnableMessageHistory("*Gateway", "sample*", "aName")
<int:message-history tracked-components="*Gateway, sample*, aName"/>前の例では、'Gateway' で終わる、'sample' で始まる、または名前 'aName' と完全に一致するコンポーネントに対してのみ、メッセージ履歴が保持されます。
さらに、MessageHistoryConfigurer Bean が IntegrationMBeanExporter によって JMX MBean として公開され ( MBean エクスポーターを参照)、実行時にパターンを変更できるようになりました。ただし、パターンを変更するには、Bean を停止 (メッセージ履歴をオフにする) する必要があることに注意してください。この機能は、システムを分析するために履歴を一時的にオンにするのに役立つ場合があります。MBean のオブジェクト名は <domain>:name=messageHistoryConfigurer,type=MessageHistoryConfigurer です。
1 つの @EnableMessageHistory (または <message-history/>) のみを、コンポーネント追跡構成の単一ソースとしてアプリケーションコンテキストで宣言する必要があります。MessageHistoryConfigurer に汎用 Bean 定義を使用しないでください。 |
| 定義により、メッセージ履歴ヘッダーは不変です(履歴を書き換えることはできません)。メッセージ履歴値を書き込むとき、コンポーネントは新しいメッセージを作成するか(コンポーネントがオリジンの場合)、リクエストメッセージから履歴をコピーし、それを変更して応答メッセージに新しいリストを設定します。どちらの場合でも、メッセージ自体がスレッドの境界を越えている場合でも、値を追加できます。つまり、履歴値により、非同期メッセージフローのデバッグが大幅に簡素化されます。 |
メッセージストア
エンタープライズ統合パターン (英語) (EIP)ブックは、メッセージをバッファリングする機能を持ついくつかのパターンを識別します。例: アグリゲーターは、解放できるまでメッセージをバッファリングし、QueueChannel は、コンシューマーがそのチャネルからメッセージを明示的に受信するまでメッセージをバッファリングします。メッセージフロー内の任意の時点で発生する可能性がある障害のため、メッセージをバッファリングする EIP コンポーネントは、メッセージが失われる可能性のあるポイントも導入します。
メッセージを失うリスクを軽減するために、EIP はメッセージストア (英語) パターンを定義します。これにより、EIP コンポーネントは通常、ある種の永続ストア(RDBMS など)にメッセージを保存 (英語) できます。
Spring Integration は、以下によってメッセージストアパターンのサポートを提供します。
org.springframework.integration.store.MessageStore戦略インターフェースの定義このインターフェースのいくつかの実装を提供する
MessageStoreインターフェースを実装するインスタンスを挿入できるように、メッセージをバッファリングする機能を持つすべてのコンポーネントでmessage-store属性を公開します。
特定のメッセージストア実装の設定方法と特定のバッファリングコンポーネントへの MessageStore 実装の注入方法の詳細は、マニュアル全体で説明されています(QueueChannel、アグリゲーター、遅延器などの特定のコンポーネントを参照)。次の例のペアは、QueueChannel およびアグリゲーターのメッセージストアへの参照を追加する方法を示しています。
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel><int:aggregator … message-store="refToMessageStore"/> デフォルトでは、メッセージは MessageStore の実装である o.s.i.store.SimpleMessageStore を使用してメモリに保存されます。これは、非永続メッセージの潜在的な損失が懸念されない開発環境または単純な低ボリューム環境では問題ない場合があります。ただし、一般的な本番アプリケーションには、メッセージ損失のリスクを軽減するだけでなく、潜在的なメモリ不足エラーを回避するために、より堅牢なオプションが必要です。そのため、さまざまなデータストア用の MessageStore 実装も提供しています。以下は、サポートされている実装の完全なリストです。
Hazelcast メッセージストア : Hazelcast 分散キャッシュを使用してメッセージを保存します
JDBC メッセージストア : RDBMS を使用してメッセージを保存する
Redis メッセージストア : Redis キー / 値データストアを使用してメッセージを保存する
MongoDB メッセージストア : MongoDB ドキュメントストアを使用してメッセージを保存する
ただし、 メッセージデータ(ペイロードとヘッダー)は、 特定の型のデータを表すヘッダーに特に注意してください。例: ヘッダーの 1 つに Spring Bean のインスタンスが含まれている場合、逆直列化すると、その Bean の別のインスタンスになり、フレームワークによって作成された暗黙的なヘッダーの一部( Spring Integration バージョン 3.0 から、 また、次のようにメッセージフローを構成するときに何が起こるかを考慮してください: ゲートウェイ→キューチャネル(永続的なメッセージストアによってバッキング)→サービスアクティベーター。そのゲートウェイは一時的な応答チャネルを作成しますが、サービスアクティベーターのポーラーがキューから読み取るまでに失われます。この場合も、ヘッダーエンリッチャーを使用して、ヘッダーを 詳しくは、ヘッダーエンリッチャーを参照してください。 |
Spring Integration 4.0 は 2 つの新しいインターフェースを導入しました:
ChannelMessageStore:QueueChannelインスタンスに固有の操作を実装するにはPriorityCapableChannelMessageStore:PriorityChannelインスタンスに使用されるMessageStore実装をマークし、持続メッセージの優先順位を提供するため。
実際の動作は実装によって異なります。このフレームワークは、QueueChannel および PriorityChannel の永続的な MessageStore として使用できる次の実装を提供します。
SimpleMessageStore に関する注意 バージョン 4.1 以降、 アグリゲーターなどのコンポーネントの外部でグループストアにアクセスするユーザーは、コピーではなく、アグリゲーターによって使用されているグループへの直接参照を取得するようになりました。アグリゲーターの外部でグループを操作すると、予期しない結果が生じる可能性があります。 このため、このような操作を実行しないか、 |
MessageGroupFactory を使用する
バージョン 4.3 以降では、一部の MessageGroupStore 実装にカスタム MessageGroupFactory 戦略を挿入して、MessageGroupStore で使用される MessageGroup インスタンスを作成およびカスタマイズできます。これは、GroupType.HASH_SET (LinkedHashSet) 内部コレクションに基づいて SimpleMessageGroup インスタンスを生成する SimpleMessageGroupFactory にデフォルト設定されます。他に考えられるオプションは SYNCHRONISED_SET と BLOCKING_QUEUE で、最後のオプションを使用して以前の SimpleMessageGroup の動作を復元できます。また、PERSISTENT オプションも用意されています。詳細については、次のセクションを参照してください。バージョン 5.0.1 以降、LIST オプションは、グループ内のメッセージの順序と一意性が問題にならない場合にも使用できます。
永続的な MessageGroupStore および遅延ロード
バージョン 4.3 以降、すべての永続的な MessageGroupStore インスタンスは、遅延ロード方式でストアから MessageGroup インスタンスとその messages を取得します。ほとんどの場合、各相関操作でストアから MessageGroup 全体をロードするオーバーヘッドを追加する場合、相関 MessageHandler インスタンス(アグリゲーターおよびリシーケンサーを参照)に役立ちます。
AbstractMessageGroupStore.setLazyLoadMessageGroups(false) オプションを使用して、構成から遅延ロード動作をオフに切り替えることができます。
MongoDB MessageStore (MongoDB メッセージストア)および <aggregator> (アグリゲーター)での遅延ロードのパフォーマンステストでは、次のようなカスタム release-strategy を使用します。
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>1000 個の単純なメッセージに対して、次のような結果が生成されます。
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
... ただし、バージョン 5.5 以降、永続的な MessageGroupStore 実装はすべて、ターゲットデータベースストリーミング API に基づく streamMessagesForGroup(Object groupId) 契約を提供します。これにより、ストア内のグループが非常に大きい場合に、リソースの使用率が向上します。内部的には、この新しい API は、起動時に永続メッセージを再スケジュールするときに、遅延器 (たとえば) で使用されます。返された Stream<Message<?>> は、処理の最後に閉じる必要があります。try-with-resources による自動クローズ経由。PersistentMessageGroup が使用されるたびに、その streamMessages() は MessageGroupStore.streamMessagesForGroup() に委譲されます。
メッセージグループの状態
バージョン 5.5 以降、MessageGroup 抽象化は condition 文字列オプションを提供します。このオプションの値は、グループの決定を行うために何らかの理由で後で解析できるものであれば何でもかまいません。たとえば、相関メッセージハンドラーからの ReleaseStrategy は、グループ内のすべてのメッセージを繰り返す代わりに、グループからこのプロパティを参照する場合があります。MessageGroupStore は setGroupCondition(Object groupId, String condition) API を公開します。この目的のために、setGroupConditionSupplier(BiFunction<Message<?>, String, String>) オプションが AbstractCorrelatingMessageHandler に追加されました。この関数は、グループに追加された後の各メッセージと、グループの既存の状態に対して評価されます。実装は、新しい値または既存の値を返すか、ターゲット条件を null にリセットすることを決定する場合があります。condition の値は、JSON、SpEL 式、数値、文字列として直列化して後で解析できるものであれば何でもかまいません。例: ファイルアグリゲーターコンポーネントの FileMarkerReleaseStrategy は、FileSplitter.FileMarker.Mark.END メッセージの FileHeaders.LINE_COUNT ヘッダーからグループに条件を入力し、canRelease() からそれを参照して、グループサイズをこの条件の値と比較します。このように、グループ内のすべてのメッセージを繰り返して、FileHeaders.LINE_COUNT ヘッダーを持つ FileSplitter.FileMarker.Mark.END メッセージを見つけることはありません。また、他のすべてのレコードの前に終了マーカーがアグリゲーターに到達できるようにします。たとえば、マルチスレッド環境でファイルを処理する場合です。
さらに、構成の便宜上、GroupConditionProvider 契約が導入されました。AbstractCorrelatingMessageHandler は、提供された ReleaseStrategy がこのインターフェースを実装しているかどうかを確認し、グループ条件評価ロジック用に conditionSupplier を抽出します。
メタデータストア
多くの外部システム、サービス、リソースはトランザクションではなく(Twitter、RSS、ファイルシステムなど)、データを既読としてマークする機能はありません。また、一部の統合ソリューションでは、エンタープライズ統合パターンべき等レシーバー (英語) を実装する必要がある場合があります。このゴールを達成し、外部システムとの次の対話の前にエンドポイントの以前の状態を保存するため、または次のメッセージを処理するために、Spring Integration は、一般的なキー値契約を備えた org.springframework.integration.metadata.MetadataStore インターフェースの実装としてメタデータストアコンポーネントを提供します。
メタデータストアは、さまざまな種類の汎用メタデータ(たとえば、処理された最後のフィードエントリの公開日)を格納して、フィードアダプターなどのコンポーネントが重複を処理できるように設計されています。コンポーネントに MetadataStore への参照が直接提供されない場合、メタデータストアを見つけるためのアルゴリズムは次のとおりです。最初に、アプリケーションコンテキストで metadataStore ID を持つ Bean を探します。見つかった場合は、それを使用します。そうでない場合は、SimpleMetadataStore の新しいインスタンスを作成します。これは、現在実行中のアプリケーションコンテキストのライフサイクル内でメタデータのみを永続化するメモリ内実装です。つまり、再起動すると、エントリが重複する可能性があります。
アプリケーションコンテキストの再起動間でメタデータを保持する必要がある場合、フレームワークは次の永続的な MetadataStores を提供します。
PropertiesPersistingMetadataStore
PropertiesPersistingMetadataStore は、プロパティファイルと PropertiesPersister (Javadoc) によってサポートされています。
デフォルトでは、アプリケーションコンテキストが正常に閉じられたときの状態のみを保持します。Flushable を実装しているため、flush() を呼び出すことで、状態を自由に維持できます。次の例は、XML で "PropertiesPersistingMetadataStore" を構成する方法を示しています。
<bean id="metadataStore"
class="org.springframework.integration.metadata.PropertiesPersistingMetadataStore"/> または、MetadataStore インターフェースの独自の実装(たとえば JdbcMetadataStore)を提供し、それをアプリケーションコンテキストで Bean として構成できます。
バージョン 4.0 以降、SimpleMetadataStore、PropertiesPersistingMetadataStore、RedisMetadataStore は ConcurrentMetadataStore を実装します。これらはアトミックアップデートを提供し、複数のコンポーネントまたはアプリケーションインスタンスで使用できます。
べき等レシーバーとメタデータストア
メタデータストアは、受信メッセージがすでに処理されており、破棄するか、破棄時に他のロジックを実行できる場合、受信メッセージをフィルターする必要がある場合に、EIP べき等レシーバー (英語) パターンを実装できます。次の構成は、その方法の例を示しています。
<int:filter input-channel="serviceChannel"
output-channel="idempotentServiceChannel"
discard-channel="discardChannel"
expression="@metadataStore.get(headers.businessKey) == null"/>
<int:publish-subscribe-channel id="idempotentServiceChannel"/>
<int:outbound-channel-adapter channel="idempotentServiceChannel"
expression="@metadataStore.put(headers.businessKey, '')"/>
<int:service-activator input-channel="idempotentServiceChannel" ref="service"/> べき等エントリの value は有効期限になる可能性があり、その後、そのエントリは何らかのスケジュールされたリーパーによってメタデータストアから削除される必要があります。
べき等レシーバーエンタープライズ統合パターンも参照してください。
MetadataStoreListener
次の例に示すように、一部のメタデータストア(現在は zookeeper のみ)は、アイテムの変更時にイベントを受信するリスナーの登録をサポートしています。
public interface MetadataStoreListener {
void onAdd(String key, String value);
void onRemove(String key, String oldValue);
void onUpdate(String key, String newValue);
}
詳細については、Javadoc を参照してください。イベントのサブセットのみに関心がある場合は、MetadataStoreListenerAdapter をサブクラス化できます。
制御バス
エンタープライズ統合パターン (英語) (EIP)ブックに従って、コントロールバスの背景となる考え方は、「アプリケーションレベル」メッセージングに使用されるのと同じメッセージングシステムをフレームワーク内のコンポーネントの監視と管理に使用できるということです。Spring Integration では、公開された操作を呼び出す手段としてメッセージを送信できるように、上記のアダプターに基づいて構築しています。
次の例は、XML で制御バスを構成する方法を示しています。
<int:control-bus input-channel="operationChannel"/>制御バスには、アプリケーションコンテキストで Bean の操作を呼び出すためにアクセスできる入力チャネルがあります。また、エンドポイントをアクティブにするサービスのすべての共通プロパティもあります。例: 操作の結果に、ダウンストリームチャネルに送信する戻り値がある場合、出力チャネルを指定できます。
制御バスは、Spring Expression Language(SpEL)式として入力チャネルでメッセージを実行します。メッセージを受け取り、本文を式にコンパイルし、コンテキストを追加してから実行します。デフォルトのコンテキストは、@ManagedAttribute または @ManagedOperation でアノテーションが付けられたメソッドをサポートします。また、Spring の Lifecycle インターフェース(およびバージョン 5.2 以降の Pausable 拡張)のメソッドをサポートし、Spring の TaskExecutor および TaskScheduler 実装のいくつかを構成するために使用されるメソッドをサポートします。制御バスで独自のメソッドを使用できるようにする最も簡単な方法は、@ManagedAttribute または @ManagedOperation アノテーションを使用することです。これらのアノテーションは、メソッドを JMX MBean レジストリに公開するためにも使用されるため、便利な副産物を提供します。多くの場合、コントロールバスに公開するのと同じ型の操作は、JMX を介して公開するのに適しています。アプリケーションコンテキスト内の特定のインスタンスの解決は、一般的な SpEL 構文で実現されます。これを行うには、Bean の SpEL 接頭辞(@)を Bean 名に指定します。例: Spring Bean でメソッドを実行するために、クライアントは次のように操作チャネルにメッセージを送信できます。
Message operation = MessageBuilder.withPayload("@myServiceBean.shutdown()").build();
operationChannel.send(operation)
式のコンテキストのルートは Message 自体であるため、式内の変数として payload および headers にもアクセスできます。これは、Spring Integration エンドポイントの他のすべての式サポートと一致しています。
Java アノテーションを使用すると、コントロールバスを次のように構成できます。
@Bean
@ServiceActivator(inputChannel = "operationChannel")
public ExpressionControlBusFactoryBean controlBus() {
return new ExpressionControlBusFactoryBean();
}
同様に、Java DSL フロー定義を次のように構成できます。
@Bean
public IntegrationFlow controlBusFlow() {
return IntegrationFlow.from("controlBus")
.controlBus()
.get();
}
DirectChannel の自動作成でラムダを使用する場合は、次のように制御バスを作成できます。
@Bean
public IntegrationFlow controlBus() {
return IntegrationFlowDefinition::controlBus;
}
この場合、チャネルの名前は controlBus.input です。
正常なシャットダウン
"MBean エクスポーター" に従って、MBean エクスポーターは stopActiveComponents と呼ばれる JMX 操作を提供します。これは、アプリケーションを正常に停止するために使用されます。操作には単一の Long パラメーターがあります。このパラメーターは、処理中のメッセージの完了を待機する時間(ミリ秒単位)を示します。操作は次のように機能します。
OrderlyShutdownCapableを実装するすべての Bean でbeforeShutdown()を呼び出します。そうすることで、そのようなコンポーネントはシャットダウンの準備ができます。このインターフェースを実装するコンポーネントの例と、この呼び出しで行うことには、リスナーコンテナーを停止する JMS および AMQP メッセージ駆動型アダプター、新しい接続の受け入れを停止する(既存の接続を開いたまま)TCP サーバー接続ファクトリ、ドロップする TCP 受信エンドポイントが含まれます(ログ)受信した新しいメッセージ、新しいリクエストに対して
503 - Service Unavailableを返す HTTP 受信エンドポイント。JMS または AMQP-backed チャネルなどのアクティブなチャネルを停止します。
すべての
MessageSourceインスタンスを停止します。すべての受信
MessageProducerを停止します(OrderlyShutdownCapableではありません)。操作に渡される
Longパラメーターの値で定義されているように、残り時間が残っているのを待ちます。そうすることで、飛行中のメッセージがすべての旅を完了することができます。この操作を呼び出すときに適切なタイムアウトを選択することが重要です。
すべての
OrderlyShutdownCapableコンポーネントでafterShutdown()を呼び出します。これにより、そのようなコンポーネントは最終的なシャットダウンタスクを実行できます(たとえば、開いているすべてのソケットを閉じる)。
正常なシャットダウン管理操作で説明したように、この操作は JMX を使用して呼び出すことができます。プログラムでメソッドを呼び出す場合は、IntegrationMBeanExporter への参照を挿入するか、取得する必要があります。id 属性が <int-jmx:mbean-export/> 定義で指定されていない場合、Bean には名前が生成されます。この名前には、同じ JVM(MBeanServer)に複数の Spring Integration コンテキストが存在する場合に ObjectName の衝突を回避するためのランダムなコンポーネントが含まれています。
このため、プログラムでメソッドを呼び出す場合は、アプリケーションコンテキストで簡単にアクセスできるように、エクスポーターに id 属性を提供することをお勧めします。
最後に、<control-bus> 要素を使用して操作を呼び出すことができます。詳細については、モニタリング Spring Integration サンプルアプリケーション [GitHub] (英語) を参照してください。
前述のアルゴリズムは、バージョン 4.1 で改善されました。以前は、すべてのタスクエグゼキューターとスケジューラーが停止していました。これにより、QueueChannel インスタンスの中間フローメッセージが残る可能性があります。これで、シャットダウンによりポーラーが実行されたままになり、これらのメッセージを排出して処理できるようになります。 |
統合グラフ
バージョン 4.3 から、Spring Integration はアプリケーションのランタイムオブジェクトモデルへのアクセスを提供します。これには、オプションでコンポーネントメトリクスを含めることができます。グラフとして公開され、統合アプリケーションの現在の状態を視覚化するために使用できます。o.s.i.support.management.graph パッケージには、Spring Integration コンポーネントのランタイム状態を単一のツリーのような Graph オブジェクトとして収集、構築、レンダリングするために必要なすべてのクラスが含まれています。IntegrationGraphServer は、Graph オブジェクトを作成、取得、リフレッシュするために Bean として宣言する必要があります。結果の Graph オブジェクトは任意の形式に直列化できますが、JSON はクライアント側で解析および表現するのに柔軟で便利です。デフォルトのコンポーネントのみを持つ Spring Integration アプリケーションは、次のようなグラフを公開します。
{
"contentDescriptor" : {
"providerVersion" : "6.1.4",
"providerFormatVersion" : 1.2,
"provider" : "spring-integration",
"name" : "myAppName:1.0"
},
"nodes" : [ {
"nodeId" : 1,
"componentType" : "null-channel",
"integrationPatternType" : "null_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 0.0,
"max" : 0.0
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"receiveCounters" : {
"successes" : 0,
"failures" : 0
},
"name" : "nullChannel"
}, {
"nodeId" : 2,
"componentType" : "publish-subscribe-channel",
"integrationPatternType" : "publish_subscribe_channel",
"integrationPatternCategory" : "messaging_channel",
"properties" : { },
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 7.807002,
"max" : 7.807002
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorChannel"
}, {
"nodeId" : 3,
"componentType" : "logging-channel-adapter",
"integrationPatternType" : "outbound_channel_adapter",
"integrationPatternCategory" : "messaging_endpoint",
"properties" : { },
"output" : null,
"input" : "errorChannel",
"sendTimers" : {
"successes" : {
"count" : 1,
"mean" : 6.742722,
"max" : 6.742722
},
"failures" : {
"count" : 0,
"mean" : 0.0,
"max" : 0.0
}
},
"name" : "errorLogger"
} ],
"links" : [ {
"from" : 2,
"to" : 3,
"type" : "input"
} ]
}| バージョン 5.2 は、メトリクス管理に従って、Micrometer メーターを優先してレガシーメトリクスを廃止しました。レガシーメトリクスはバージョン 5.4 で削除され、グラフに表示されなくなります。 |
前の例では、グラフは 3 つの最上位要素で構成されています。
contentDescriptor グラフ要素には、データを提供するアプリケーションに関する一般情報が含まれています。name は、IntegrationGraphServer Bean または spring.application.name アプリケーションコンテキスト環境プロパティでカスタマイズできます。他のプロパティはフレームワークによって提供され、同様のモデルを他のソースと区別できます。
links グラフ要素は、nodes グラフ要素のノード間の接続を表します。ソース Spring Integration アプリケーションの統合コンポーネント間の接続を表します。例: MessageChannel から MessageHandler を含む EventDrivenConsumer へ、または AbstractReplyProducingMessageHandler から MessageChannel へ。便宜上、リンクの目的を判断できるように、モデルには type 属性が含まれています。可能な型は次のとおりです。
input:MessageChannelからエンドポイント、inputChannel、requestChannelプロパティへの方向を識別しますoutput:MessageHandler、MessageProducer、SourcePollingChannelAdapterからoutputChannelまたはreplyChannelプロパティを介したMessageChannelへの方向error:PollingConsumerまたはMessageProducerまたはSourcePollingChannelAdapter上のMessageHandlerからerrorChannelプロパティを介してMessageChannelへ。discard:errorChannelプロパティを介してDiscardingMessageHandler(MessageFilterなど)からMessageChannelへ。route:AbstractMappingMessageRouter(HeaderValueRouterなど) からMessageChannelへ。outputに似ていますが、実行時に決定されます。おそらく、構成されたチャネルマッピングまたは動的に解決されたチャネルです。通常、ルーターはこの目的のために最大 100 個の動的ルートのみを保持しますが、dynamicChannelLimitプロパティを設定することでこの値を変更できます。
この要素からの情報を視覚化ツールで使用して、nodes グラフ要素のノード間の接続をレンダリングできます。from および to 番号は、リンクされたノードの nodeId プロパティの値を表します。例: link 要素を使用して、ターゲットノードで適切な port を決定できます。
次の「テキストイメージ」は、型間の関連を示しています。
+---(discard)
|
+----o----+
| |
| |
| |
(input)--o o---(output)
| |
| |
| |
+----o----+
|
+---(error)nodes グラフ要素は、おそらく最も興味深いものです。その要素には、componentType インスタンスと name 値を持つランタイムコンポーネントが含まれるだけでなく、オプションでコンポーネントによって公開されるメトリクスも含まれる可能性があるためです。ノード要素には、一般的に一目でわかるさまざまなプロパティが含まれています。例: 式ベースのコンポーネントには、コンポーネントの 1 次式文字列を含む expression プロパティが含まれます。メトリクスを有効にするには、@EnableIntegrationManagement を @Configuration クラスに追加するか、<int:management/> 要素を XML 構成に追加します。詳細については、指標と管理を参照してください。
nodeId は、あるコンポーネントと別のコンポーネントを区別できるようにする一意の増分識別子を表します。links 要素でも使用され、このコンポーネントと他のコンポーネント(ある場合)との関連(接続)を表します。input および output 属性は、AbstractEndpoint、MessageHandler、SourcePollingChannelAdapter または MessageProducerSupport の inputChannel および outputChannel プロパティ用です。詳細については、次のセクションを参照してください。
バージョン 5.1 以降、IntegrationGraphServer は、特定の NamedComponent の IntegrationNode に追加のプロパティを設定するために Function<NamedComponent, Map<String, Object>> additionalPropertiesCallback を受け入れます。例: SmartLifecycle autoStartup、running プロパティをターゲットグラフに公開できます。
server.setAdditionalPropertiesCallback(namedComponent -> {
Map<String, Object> properties = null;
if (namedComponent instanceof SmartLifecycle) {
SmartLifecycle smartLifecycle = (SmartLifecycle) namedComponent;
properties = new HashMap<>();
properties.put("auto-startup", smartLifecycle.isAutoStartup());
properties.put("running", smartLifecycle.isRunning());
}
return properties;
});
グラフランタイムモデル
Spring Integration コンポーネントには、さまざまなレベルの複雑さがあります。例: ポーリングされた MessageSource には、ソースデータからメッセージを定期的に送信する SourcePollingChannelAdapter と MessageChannel もあります。他のコンポーネントは、メッセージ用の requestChannel (input)にサブスクライブ(またはポーリング)するための消費 AbstractEndpoint と、ダウンストリームに送信するための応答メッセージを生成する replyChannel (output)を備えたミドルウェアのリクエスト / 応答コンポーネント(JmsOutboundGateway など)です。一方、MessageProducerSupport 実装(ApplicationEventListeningMessageProducer など)は、一部のソースプロトコルリスニングロジックをラップし、outputChannel にメッセージを送信します。
グラフ内で、Spring Integration コンポーネントは、IntegrationNode クラス階層を使用して表されます。これは、o.s.i.support.management.graph パッケージに含まれています。例: AggregatingMessageHandler には ErrorCapableDiscardingMessageHandlerNode を使用でき(discardChannel オプションがあるため)、PollingConsumer を使用して PollableChannel からコンシュームするときにエラーを生成できます。もう 1 つの例は CompositeMessageHandlerNode です。EventDrivenConsumer を使用して SubscribableChannel にサブスクライブした場合の MessageHandlerChain 用です。
@MessagingGateway (メッセージングゲートウェイを参照)は、各メソッドにノードを提供します。name 属性は、ゲートウェイの Bean 名と短いメソッドシグネチャーに基づいています。ゲートウェイの次の例を検討してください。 |
@MessagingGateway(defaultRequestChannel = "four")
public interface Gate {
void foo(String foo);
void foo(Integer foo);
void bar(String bar);
}
前述のゲートウェイは、次のようなノードを生成します。
{
"nodeId" : 10,
"name" : "gate.bar(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 11,
"name" : "gate.foo(class java.lang.String)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
},
{
"nodeId" : 12,
"name" : "gate.foo(class java.lang.Integer)",
"stats" : null,
"componentType" : "gateway",
"integrationPatternType" : "gateway",
"integrationPatternCategory" : "messaging_endpoint",
"output" : "four",
"errors" : null
} この IntegrationNode 階層を使用して、クライアント側でグラフモデルを解析したり、一般的な Spring Integration ランタイムの動作を理解したりできます。詳細については、プログラミングのヒントとコツも参照してください。
バージョン 5.3 は、IntegrationPattern 抽象化と、エンタープライズ統合パターン(EIP)を表すすべての標準コンポーネントを導入し、この抽象化を実装し、IntegrationPatternType 列挙値を提供します。この情報は、ターゲットアプリケーションのいくつかの分類ロジックに役立ちます。または、グラフノードに公開されているため、UI がコンポーネントの描画方法を決定するために使用できます。
統合グラフコントローラー
アプリケーションが Web ベース(または Web コンテナーが埋め込まれた Spring Boot 上に構築されている)であり、Spring Integration HTTP または WebFlux モジュール(それぞれ HTTP サポートおよび WebFlux サポートを参照)がクラスパスに存在する場合、IntegrationGraphController を使用して IntegrationGraphServer を公開できます。REST サービスとしての機能。この目的のために、@EnableIntegrationGraphController および @Configuration クラスのアノテーションと <int-http:graph-controller/> XML 要素が HTTP モジュールで使用可能です。この構成は、@EnableWebMvc アノテーション(または XML 定義の場合は <mvc:annotation-driven/>)とともに、IntegrationGraphController @RestController を登録します。この構成では、@RequestMapping.path を @EnableIntegrationGraphController アノテーションまたは <int-http:graph-controller/> 要素で構成できます。デフォルトのパスは /integration です。
IntegrationGraphController @RestController は、次のサービスを提供します。
@GetMapping(name = "getGraph"): 最後のIntegrationGraphServerリフレッシュ以降の Spring Integration コンポーネントの状態を取得します。o.s.i.support.management.graph.Graphは、REST サービスの@ResponseBodyとして返されます。@GetMapping(path = "/refresh", name = "refreshGraph"): 実際のランタイム状態の現在のGraphをリフレッシュし、REST レスポンスとして返します。メトリクスのグラフをリフレッシュする必要はありません。グラフが取得されると、リアルタイムで提供されます。グラフが最後に取得されてからアプリケーションコンテキストが変更された場合、リフレッシュを呼び出すことができます。その場合、グラフは完全に再構築されます。
Spring Security および Spring MVC プロジェクトによって提供される標準の構成オプションとコンポーネントを使用して、IntegrationGraphController のセキュリティとクロスオリジンの制限を設定できます。次の例は、これらのゴールを達成します。
<mvc:annotation-driven />
<mvc:cors>
<mvc:mapping path="/myIntegration/**"
allowed-origins="http://localhost:9090"
allowed-methods="GET" />
</mvc:cors>
<security:http>
<security:intercept-url pattern="/myIntegration/**" access="ROLE_ADMIN" />
</security:http>
<int-http:graph-controller path="/myIntegration" />次の例は、Java 構成で同じことを行う方法を示しています。
@Configuration
@EnableWebMvc // or @EnableWebFlux
@EnableWebSecurity // or @EnableWebFluxSecurity
@EnableIntegration
@EnableIntegrationGraphController(path = "/testIntegration", allowedOrigins="http://localhost:9090")
public class IntegrationConfiguration extends WebSecurityConfigurerAdapter {
@Override
protected void configure(HttpSecurity http) throws Exception {
http
.authorizeRequests()
.antMatchers("/testIntegration/**").hasRole("ADMIN")
// ...
.formLogin();
}
//...
}
便宜上、@EnableIntegrationGraphController アノテーションは allowedOrigins 属性を提供することに注意してください。これにより、path への GET アクセスが提供されます。より高度にするために、標準の Spring MVC メカニズムを使用して CORS マッピングを構成できます。