メッセージストア
エンタープライズ統合パターン (英語) (EIP)ブックは、メッセージをバッファリングする機能を持ついくつかのパターンを識別します。例: アグリゲーターは、解放できるまでメッセージをバッファリングし、QueueChannel は、コンシューマーがそのチャネルからメッセージを明示的に受信するまでメッセージをバッファリングします。メッセージフロー内の任意の時点で発生する可能性がある障害のため、メッセージをバッファリングする EIP コンポーネントは、メッセージが失われる可能性のあるポイントも導入します。
メッセージを失うリスクを軽減するために、EIP はメッセージストア (英語) パターンを定義します。これにより、EIP コンポーネントは通常、ある種の永続ストア(RDBMS など)にメッセージを保存 (英語) できます。
Spring Integration は、以下によってメッセージストアパターンのサポートを提供します。
org.springframework.integration.store.MessageStore戦略インターフェースの定義このインターフェースのいくつかの実装を提供する
MessageStoreインターフェースを実装するインスタンスを挿入できるように、メッセージをバッファリングする機能を持つすべてのコンポーネントでmessage-store属性を公開します。
特定のメッセージストア実装の設定方法と特定のバッファリングコンポーネントへの MessageStore 実装の注入方法の詳細は、マニュアル全体で説明されています(QueueChannel、アグリゲーター、遅延器などの特定のコンポーネントを参照)。次の例のペアは、QueueChannel およびアグリゲーターのメッセージストアへの参照を追加する方法を示しています。
<int:channel id="myQueueChannel">
<int:queue message-store="refToMessageStore"/>
<int:channel><int:aggregator message-store="refToMessageStore"/> デフォルトでは、メッセージは MessageStore の実装である o.s.i.store.SimpleMessageStore を使用してメモリに保存されます。これは、非永続メッセージの潜在的な損失が懸念されない開発環境または単純な低ボリューム環境では問題ない場合があります。ただし、一般的な本番アプリケーションには、メッセージ損失のリスクを軽減するだけでなく、潜在的なメモリ不足エラーを回避するために、より堅牢なオプションが必要です。そのため、さまざまなデータストア用の MessageStore 実装も提供しています。以下は、サポートされている実装の完全なリストです。
Hazelcast メッセージストア : Hazelcast 分散キャッシュを使用してメッセージを保存します
JDBC メッセージストア : RDBMS を使用してメッセージを保存する
Redis メッセージストア : Redis キー / 値データストアを使用してメッセージを保存する
MongoDB メッセージストア : MongoDB ドキュメントストアを使用してメッセージを保存する
ただし、 メッセージデータ(ペイロードとヘッダー)は、 特定の型のデータを表すヘッダーに特に注意してください。例: ヘッダーの 1 つに Spring Bean のインスタンスが含まれている場合、逆直列化すると、その Bean の別のインスタンスになり、フレームワークによって作成された暗黙的なヘッダーの一部( Spring Integration バージョン 3.0 から、 また、メッセージフローを次のように構成した場合どうなるか考えてみましょう: 詳しくは、ヘッダーエンリッチャーを参照してください。 |
Spring Integration 4.0 は 2 つの新しいインターフェースを導入しました:
ChannelMessageStore:QueueChannelインスタンスに固有の操作を実装するにはPriorityCapableChannelMessageStore:PriorityChannelインスタンスに使用されるMessageStore実装をマークし、持続メッセージの優先順位を提供するため。
実際の動作は実装によって異なります。このフレームワークは、QueueChannel および PriorityChannel の永続的な MessageStore として使用できる次の実装を提供します。
SimpleMessageStore に関する注意 バージョン 4.1 以降、 アグリゲーターなどのコンポーネントの外部でグループストアにアクセスするユーザーは、コピーではなく、アグリゲーターによって使用されているグループへの直接参照を取得するようになりました。アグリゲーターの外部でグループを操作すると、予期しない結果が生じる可能性があります。 このため、このような操作を実行しないか、 |
MessageGroupFactory を使用する
バージョン 4.3 以降では、一部の MessageGroupStore 実装にカスタム MessageGroupFactory 戦略を挿入して、MessageGroupStore で使用される MessageGroup インスタンスを作成およびカスタマイズできます。これは、GroupType.HASH_SET (LinkedHashSet) 内部コレクションに基づいて SimpleMessageGroup インスタンスを生成する SimpleMessageGroupFactory にデフォルト設定されます。他に考えられるオプションは SYNCHRONISED_SET と BLOCKING_QUEUE で、最後のオプションを使用して以前の SimpleMessageGroup の動作を復元できます。また、PERSISTENT オプションも用意されています。詳細については、次のセクションを参照してください。バージョン 5.0.1 以降、LIST オプションは、グループ内のメッセージの順序と一意性が問題にならない場合にも使用できます。
永続的な MessageGroupStore および遅延ロード
バージョン 4.3 以降、すべての永続的な MessageGroupStore インスタンスは、遅延ロード方式でストアから MessageGroup インスタンスとその messages を取得します。ほとんどの場合、各相関操作でストアから MessageGroup 全体をロードするオーバーヘッドを追加する場合、相関 MessageHandler インスタンス(アグリゲーターおよびリシーケンサーを参照)に役立ちます。
AbstractMessageGroupStore.setLazyLoadMessageGroups(false) オプションを使用して、構成から遅延ロード動作をオフに切り替えることができます。
MongoDB MessageStore (MongoDB メッセージストア)および <aggregator> (アグリゲーター)での遅延ロードのパフォーマンステストでは、次のようなカスタム release-strategy を使用します。
<int:aggregator input-channel="inputChannel"
output-channel="outputChannel"
message-store="mongoStore"
release-strategy-expression="size() == 1000"/>1000 個の単純なメッセージに対して、次のような結果が生成されます。
...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms % Task name
-----------------------------------------
02652 007% Lazy-Load
36266 093% Eager
... しかし、バージョン 5.5 以降、すべての永続 MessageGroupStore 実装は、ターゲットデータベースストリーミング API に基づく streamMessagesForGroup(Object groupId) 契約を提供します。これにより、ストア内のグループが非常に大きい場合のリソース利用率が向上します。フレームワーク内部では、この新しい API は遅延器で使用され、たとえば起動時に永続化されたメッセージを再スケジュールする場合などに使用されます。返された Stream<Message<?>> は、処理終了時に閉じる必要があります(たとえば、try-with-resources による自動クローズなど)。PersistentMessageGroup が使用されるたびに、その streamMessages() は MessageGroupStore.streamMessagesForGroup() に委譲されます。
メッセージグループの状態
バージョン 5.5 以降、MessageGroup 抽象化は condition 文字列オプションを提供します。このオプションの値は、グループの決定を行うために、後で何らかの理由で解析できるものであれば何でも構いません。例: 相関メッセージハンドラーからの ReleaseStrategy は、グループ内のすべてのメッセージを反復する代わりに、グループからこのプロパティを参照する場合があります。MessageGroupStore は setGroupCondition(Object groupId, String condition) API を公開します。この目的のために、setGroupConditionSupplier(BiFunction<Message<?>, String, String>) オプションが AbstractCorrelatingMessageHandler に追加されました。この関数は、グループに追加された後の各メッセージと、グループの既存の状態に対して評価されます。実装では、新しい値または既存の値を返すか、ターゲットの状態を null にリセットするかを決定できます。condition の値は、JSON、SpEL 式、数値など、文字列として直列化して後で解析できるものであれば何でも構いません。例: ファイルアグリゲーターコンポーネントの FileMarkerReleaseStrategy は、FileSplitter.FileMarker.Mark.END メッセージの FileHeaders.LINE_COUNT ヘッダーから条件をグループに取り込み、その canRelease() を参照してグループサイズとこの条件の値を比較します。これにより、FileHeaders.LINE_COUNT ヘッダーを持つ FileSplitter.FileMarker.Mark.END メッセージを見つけるために、グループ内のすべてのメッセージを反復処理する必要がなくなります。また、たとえばマルチスレッド環境でファイルを処理する場合など、終了マーカーが他のすべてのレコードよりも先にアグリゲータに到達できるようになります。
さらに、構成の便宜上、GroupConditionProvider 契約が導入されました。AbstractCorrelatingMessageHandler は、提供された ReleaseStrategy がこのインターフェースを実装しているかどうかを確認し、グループ条件評価ロジック用に conditionSupplier を抽出します。
LockRegistry を使用する
バージョン 6.5 以降、AbstractMessageGroupStore 抽象化は、ロックを使用してメッセージグループのメタデータを操作します。このロックは groupId を取得し、LockRegister によって生成されます。その目的は、メッセージおよびメッセージグループのアトミック性を操作することです。複数のスレッドで同時にメッセージを追加または削除したり、メタデータを更新したりすると、ロックがない場合、一部の実装でメッセージグループエラーが発生する可能性があります。デフォルトでは DefaultLockRegistry が使用されますが、任意の LockRegister は AbstractMessageGroupStore.setLockRegistry() を介して注入できます。AbstractMessageGroupStore.setLockRegistry() は通常、同じ永続ストアの実装です。詳細については、分散ロックを参照してください。