メッセージストア
エンタープライズ統合パターン (英語) (EIP)ブックは、メッセージをバッファリングする機能を持ついくつかのパターンを識別します。例: アグリゲーターは、解放できるまでメッセージをバッファリングし、QueueChannel
は、コンシューマーがそのチャネルからメッセージを明示的に受信するまでメッセージをバッファリングします。メッセージフロー内の任意の時点で発生する可能性がある障害のため、メッセージをバッファリングする EIP コンポーネントは、メッセージが失われる可能性のあるポイントも導入します。
メッセージを失うリスクを軽減するために、EIP はメッセージストア (英語) パターンを定義します。これにより、EIP コンポーネントは通常、ある種の永続ストア(RDBMS など)にメッセージを保存 (英語) できます。
Spring Integration は、以下によってメッセージストアパターンのサポートを提供します。
org.springframework.integration.store.MessageStore
戦略インターフェースの定義このインターフェースのいくつかの実装を提供する
MessageStore
インターフェースを実装するインスタンスを挿入できるように、メッセージをバッファリングする機能を持つすべてのコンポーネントでmessage-store
属性を公開します。
特定のメッセージストア実装の設定方法と特定のバッファリングコンポーネントへの MessageStore
実装の注入方法の詳細は、マニュアル全体で説明されています(QueueChannel、アグリゲーター、遅延器などの特定のコンポーネントを参照)。次の例のペアは、QueueChannel
およびアグリゲーターのメッセージストアへの参照を追加する方法を示しています。
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel>
<int:aggregator … message-store="refToMessageStore"/>
デフォルトでは、メッセージは MessageStore
の実装である o.s.i.store.SimpleMessageStore
を使用してメモリに保存されます。これは、非永続メッセージの潜在的な損失が懸念されない開発環境または単純な低ボリューム環境では問題ない場合があります。ただし、一般的な本番アプリケーションには、メッセージ損失のリスクを軽減するだけでなく、潜在的なメモリ不足エラーを回避するために、より堅牢なオプションが必要です。そのため、さまざまなデータストア用の MessageStore
実装も提供しています。以下は、サポートされている実装の完全なリストです。
JDBC メッセージストア : RDBMS を使用してメッセージを保存する
Redis メッセージストア : Redis キー / 値データストアを使用してメッセージを保存する
MongoDB メッセージストア : MongoDB ドキュメントストアを使用してメッセージを保存する
Gemfire メッセージストア : Gemfire 分散キャッシュを使用してメッセージを保存します
ただし、 メッセージデータ(ペイロードとヘッダー)は、 特定の型のデータを表すヘッダーに特に注意してください。例: ヘッダーの 1 つに Spring Bean のインスタンスが含まれている場合、逆直列化すると、その Bean の別のインスタンスになり、フレームワークによって作成された暗黙的なヘッダーの一部( Spring Integration バージョン 3.0 から、 また、次のようにメッセージフローを構成するときに何が起こるかを考慮してください: ゲートウェイ→キューチャネル(永続的なメッセージストアによってバッキング)→サービスアクティベーター。そのゲートウェイは一時的な応答チャネルを作成しますが、サービスアクティベーターのポーラーがキューから読み取るまでに失われます。この場合も、ヘッダーエンリッチャーを使用して、ヘッダーを 詳しくは、ヘッダーエンリッチャーを参照してください。 |
Spring Integration 4.0 は 2 つの新しいインターフェースを導入しました:
ChannelMessageStore
:QueueChannel
インスタンスに固有の操作を実装するにはPriorityCapableChannelMessageStore
:PriorityChannel
インスタンスに使用されるMessageStore
実装をマークし、持続メッセージの優先順位を提供するため。
実際の動作は実装によって異なります。このフレームワークは、QueueChannel
および PriorityChannel
の永続的な MessageStore
として使用できる次の実装を提供します。
SimpleMessageStore に関する注意 バージョン 4.1 以降、 アグリゲーターなどのコンポーネントの外部のグループストアにアクセスするユーザーは、コピーの代わりにアグリゲーターが使用しているグループへの直接参照を取得するようになりました。アグリゲーターの外部でグループを操作すると、予測できない結果が生じる可能性があります。 このため、このような操作を実行しないか、 |
MessageGroupFactory
を使用する
バージョン 4.3 から、MessageGroupStore
実装の一部にカスタム MessageGroupFactory
戦略を注入して、MessageGroupStore
が使用する MessageGroup
インスタンスを作成およびカスタマイズできます。これはデフォルトで SimpleMessageGroupFactory
になり、GroupType.HASH_SET
(LinkedHashSet
)内部コレクションに基づいて SimpleMessageGroup
インスタンスを生成します。他の可能なオプションは SYNCHRONISED_SET
および BLOCKING_QUEUE
です。最後のオプションを使用して、以前の SimpleMessageGroup
の動作を復元できます。PERSISTENT
オプションも利用できます。詳細については、次のセクションを参照してください。バージョン 5.0.1 から、LIST
オプションは、グループ内のメッセージの順序と一意性が重要でない場合にも使用できます。
永続的な MessageGroupStore
および遅延ロード
バージョン 4.3 以降、すべての永続的な MessageGroupStore
インスタンスは、遅延ロード方式でストアから MessageGroup
インスタンスとその messages
を取得します。ほとんどの場合、各相関操作でストアから MessageGroup
全体をロードするオーバーヘッドを追加する場合、相関 MessageHandler
インスタンス(アグリゲーターおよびリシーケンサーを参照)に役立ちます。
AbstractMessageGroupStore.setLazyLoadMessageGroups(false)
オプションを使用して、構成から遅延ロード動作をオフに切り替えることができます。
MongoDB MessageStore
(MongoDB メッセージストア)および <aggregator>
(アグリゲーター)での遅延ロードのパフォーマンステストでは、次のようなカスタム release-strategy
を使用します。
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>
1000 個の単純なメッセージに対して、次のような結果が生成されます。
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
...
ただし、バージョン 5.5 以降、すべての永続的な MessageGroupStore
実装は、ターゲットデータベースストリーミング API に基づいて streamMessagesForGroup(Object groupId)
契約を提供します。これにより、ストア内のグループが非常に大きい場合のリソース使用率が向上します。フレームワークの内部では、この新しい API は、起動時に永続化されたメッセージを再スケジュールするときに(たとえば)遅延器で使用されます。返された Stream<Message<?>>
は、処理の最後に閉じる必要があります。try-with-resources
による自動クローズを介して。PersistentMessageGroup
が使用されるときはいつでも、その streamMessages()
は MessageGroupStore.streamMessagesForGroup()
に委譲します。
メッセージグループの状態
バージョン 5.5 以降、MessageGroup
抽象化は condition
文字列オプションを提供します。このオプションの値は、グループの決定を行うために何らかの理由で後で解析できるものであれば何でもかまいません。たとえば、相関メッセージハンドラーからの ReleaseStrategy
は、グループ内のすべてのメッセージを繰り返す代わりに、グループからこのプロパティを参照する場合があります。MessageGroupStore
は setGroupCondition(Object groupId, String condition)
API を公開します。この目的のために、setGroupConditionSupplier(BiFunction<Message<?>, String, String>)
オプションが AbstractCorrelatingMessageHandler
に追加されました。この関数は、グループに追加された後の各メッセージと、グループの既存の状態に対して評価されます。実装は、新しい値または既存の値を返すか、ターゲット条件を null
にリセットすることを決定する場合があります。condition
の値は、JSON、SpEL 式、数値、文字列として直列化して後で解析できるものであれば何でもかまいません。例: ファイルアグリゲーターコンポーネントの FileMarkerReleaseStrategy
は、FileSplitter.FileMarker.Mark.END
メッセージの FileHeaders.LINE_COUNT
ヘッダーからグループに条件を入力し、canRelease()
からそれを参照して、グループサイズをこの条件の値と比較します。このように、グループ内のすべてのメッセージを繰り返して、FileHeaders.LINE_COUNT
ヘッダーを持つ FileSplitter.FileMarker.Mark.END
メッセージを見つけることはありません。また、他のすべてのレコードの前に終了マーカーがアグリゲーターに到達できるようにします。たとえば、マルチスレッド環境でファイルを処理する場合です。
さらに、構成の便宜上、GroupConditionProvider
契約が導入されました。AbstractCorrelatingMessageHandler
は、提供された ReleaseStrategy
がこのインターフェースを実装しているかどうかを確認し、グループ条件評価ロジック用に conditionSupplier
を抽出します。