メッセージストア

エンタープライズ統合パターン (英語) (EIP)ブックは、メッセージをバッファリングする機能を持ついくつかのパターンを識別します。例: アグリゲーターは、解放できるまでメッセージをバッファリングし、QueueChannel は、コンシューマーがそのチャネルからメッセージを明示的に受信するまでメッセージをバッファリングします。メッセージフロー内の任意の時点で発生する可能性がある障害のため、メッセージをバッファリングする EIP コンポーネントは、メッセージが失われる可能性のあるポイントも導入します。

メッセージを失うリスクを軽減するために、EIP はメッセージストア (英語) パターンを定義します。これにより、EIP コンポーネントは通常、ある種の永続ストア(RDBMS など)にメッセージを保存 (英語) できます。

Spring Integration は、以下によってメッセージストアパターンのサポートを提供します。

  • org.springframework.integration.store.MessageStore 戦略インターフェースの定義

  • このインターフェースのいくつかの実装を提供する

  • MessageStore インターフェースを実装するインスタンスを挿入できるように、メッセージをバッファリングする機能を持つすべてのコンポーネントで message-store 属性を公開します。

特定のメッセージストア実装の設定方法と特定のバッファリングコンポーネントへの MessageStore 実装の注入方法の詳細は、マニュアル全体で説明されています(QueueChannelアグリゲーター遅延器などの特定のコンポーネントを参照)。次の例のペアは、QueueChannel およびアグリゲーターのメッセージストアへの参照を追加する方法を示しています。

例 1: QueueChannel
<int:channel id="myQueueChannel">
    <int:queue message-store="refToMessageStore"/>
<int:channel>
例 2: アグリゲーター
<int:aggregator … message-store="refToMessageStore"/>

デフォルトでは、メッセージは MessageStore の実装である o.s.i.store.SimpleMessageStore を使用してメモリに保存されます。これは、非永続メッセージの潜在的な損失が懸念されない開発環境または単純な低ボリューム環境では問題ない場合があります。ただし、一般的な本番アプリケーションには、メッセージ損失のリスクを軽減するだけでなく、潜在的なメモリ不足エラーを回避するために、より堅牢なオプションが必要です。そのため、さまざまなデータストア用の MessageStore 実装も提供しています。以下は、サポートされている実装の完全なリストです。

ただし、MessageStore の永続的な実装を使用するときは、いくつかの制限に注意してください。

メッセージデータ(ペイロードとヘッダー)は、MessageStore の実装に応じて、異なる直列化戦略を使用して直列化および非直列化されます。例: JdbcMessageStore を使用する場合、Serializable データのみがデフォルトで保持されます。この場合、直列化が発生する前に、直列化できないヘッダーが削除されます。また、トランスポートアダプター(FTP、HTTP、JMS など)によって挿入されるプロトコル固有のヘッダーにも注意してください。例: <http:inbound-channel-adapter/> は HTTP ヘッダーをメッセージヘッダーにマッピングします。そのうちの 1 つは、直列化できない org.springframework.http.MediaType インスタンスの ArrayList です。ただし、Serializer および Deserializer 戦略インターフェースの独自の実装をいくつかの MessageStore 実装(JdbcMessageStore など)に注入して、シリアライゼーションおよびデシリアライゼーションの動作を変更できます。

特定の型のデータを表すヘッダーに特に注意してください。例: ヘッダーの 1 つに Spring Bean のインスタンスが含まれている場合、逆直列化すると、その Bean の別のインスタンスになり、フレームワークによって作成された暗黙的なヘッダーの一部(REPLY_CHANNEL や ERROR_CHANNEL など)に直接影響する場合があります。現在、それらは直列化できませんが、仮にそうであっても、逆直列化されたチャネルは期待されるインスタンスを表しません。

Spring Integration バージョン 3.0 から、HeaderChannelRegistry でチャネルを登録した後にこれらのヘッダーを名前に置き換えるように構成されたヘッダーエンリッチャーでこの課題を解決できます。

また、次のようにメッセージフローを構成するときに何が起こるかを考慮してください: ゲートウェイ→キューチャネル(永続的なメッセージストアによってバッキング)→サービスアクティベーター。そのゲートウェイは一時的な応答チャネルを作成しますが、サービスアクティベーターのポーラーがキューから読み取るまでに失われます。この場合も、ヘッダーエンリッチャーを使用して、ヘッダーを String 表現に置き換えることができます。

詳しくは、ヘッダーエンリッチャーを参照してください。

Spring Integration 4.0 は 2 つの新しいインターフェースを導入しました:

  • ChannelMessageStoreQueueChannel インスタンスに固有の操作を実装するには

  • PriorityCapableChannelMessageStorePriorityChannel インスタンスに使用される MessageStore 実装をマークし、持続メッセージの優先順位を提供するため。

実際の動作は実装によって異なります。このフレームワークは、QueueChannel および PriorityChannel の永続的な MessageStore として使用できる次の実装を提供します。

SimpleMessageStore に関する注意

バージョン 4.1 以降、SimpleMessageStore は getMessageGroup() を呼び出したときにメッセージグループをコピーしなくなりました。大きなメッセージグループの場合、これは重大なパフォーマンスの問題でした。4.0.1 は、この動作を制御できるブール copyOnGet プロパティを導入しました。アグリゲーターによって内部的に使用される場合、このプロパティはパフォーマンスを改善するために false に設定されました。現在は、デフォルトで false です。

アグリゲーターなどのコンポーネントの外部のグループストアにアクセスするユーザーは、コピーの代わりにアグリゲーターが使用しているグループへの直接参照を取得するようになりました。アグリゲーターの外部でグループを操作すると、予測できない結果が生じる可能性があります。

このため、このような操作を実行しないか、copyOnGet プロパティを true に設定する必要があります。

MessageGroupFactory を使用する

バージョン 4.3 から、MessageGroupStore 実装の一部にカスタム MessageGroupFactory 戦略を注入して、MessageGroupStore が使用する MessageGroup インスタンスを作成およびカスタマイズできます。これはデフォルトで SimpleMessageGroupFactory になり、GroupType.HASH_SET (LinkedHashSet)内部コレクションに基づいて SimpleMessageGroup インスタンスを生成します。他の可能なオプションは SYNCHRONISED_SET および BLOCKING_QUEUE です。最後のオプションを使用して、以前の SimpleMessageGroup の動作を復元できます。PERSISTENT オプションも利用できます。詳細については、次のセクションを参照してください。バージョン 5.0.1 から、LIST オプションは、グループ内のメッセージの順序と一意性が重要でない場合にも使用できます。

永続的な MessageGroupStore および遅延ロード

バージョン 4.3 以降、すべての永続的な MessageGroupStore インスタンスは、遅延ロード方式でストアから MessageGroup インスタンスとその messages を取得します。ほとんどの場合、各相関操作でストアから MessageGroup 全体をロードするオーバーヘッドを追加する場合、相関 MessageHandler インスタンス(アグリゲーターおよびリシーケンサーを参照)に役立ちます。

AbstractMessageGroupStore.setLazyLoadMessageGroups(false) オプションを使用して、構成から遅延ロード動作をオフに切り替えることができます。

MongoDB MessageStore (MongoDB メッセージストア)および <aggregator> (アグリゲーター)での遅延ロードのパフォーマンステストでは、次のようなカスタム release-strategy を使用します。

<int:aggregator input-channel="inputChannel"
                output-channel="outputChannel"
                message-store="mongoStore"
                release-strategy-expression="size() == 1000"/>

1000 個の単純なメッセージに対して、次のような結果が生成されます。

...
StopWatch 'Lazy-Load Performance': running time (millis) = 38918
-----------------------------------------
ms     %     Task name
-----------------------------------------
02652  007%  Lazy-Load
36266  093%  Eager
...

ただし、バージョン 5.5 以降、すべての永続的な MessageGroupStore 実装は、ターゲットデータベースストリーミング API に基づいて streamMessagesForGroup(Object groupId) 契約を提供します。これにより、ストア内のグループが非常に大きい場合のリソース使用率が向上します。フレームワークの内部では、この新しい API は、起動時に永続化されたメッセージを再スケジュールするときに(たとえば)遅延器で使用されます。返された Stream<Message<?>> は、処理の最後に閉じる必要があります。try-with-resources による自動クローズを介して。PersistentMessageGroup が使用されるときはいつでも、その streamMessages() は MessageGroupStore.streamMessagesForGroup() に委譲します。

メッセージグループの状態

バージョン 5.5 以降、MessageGroup 抽象化は condition 文字列オプションを提供します。このオプションの値は、グループの決定を行うために何らかの理由で後で解析できるものであれば何でもかまいません。たとえば、相関メッセージハンドラーからの ReleaseStrategy は、グループ内のすべてのメッセージを繰り返す代わりに、グループからこのプロパティを参照する場合があります。MessageGroupStore は setGroupCondition(Object groupId, String condition) API を公開します。この目的のために、setGroupConditionSupplier(BiFunction<Message<?>, String, String>) オプションが AbstractCorrelatingMessageHandler に追加されました。この関数は、グループに追加された後の各メッセージと、グループの既存の状態に対して評価されます。実装は、新しい値または既存の値を返すか、ターゲット条件を null にリセットすることを決定する場合があります。condition の値は、JSON、SpEL 式、数値、文字列として直列化して後で解析できるものであれば何でもかまいません。例: ファイルアグリゲーターコンポーネントの FileMarkerReleaseStrategy は、FileSplitter.FileMarker.Mark.END メッセージの FileHeaders.LINE_COUNT ヘッダーからグループに条件を入力し、canRelease() からそれを参照して、グループサイズをこの条件の値と比較します。このように、グループ内のすべてのメッセージを繰り返して、FileHeaders.LINE_COUNT ヘッダーを持つ FileSplitter.FileMarker.Mark.END メッセージを見つけることはありません。また、他のすべてのレコードの前に終了マーカーがアグリゲーターに到達できるようにします。たとえば、マルチスレッド環境でファイルを処理する場合です。

さらに、構成の便宜上、GroupConditionProvider 契約が導入されました。AbstractCorrelatingMessageHandler は、提供された ReleaseStrategy がこのインターフェースを実装しているかどうかを確認し、グループ条件評価ロジック用に conditionSupplier を抽出します。