重要な GemFire および Apache Geode サポート

Spring Integration は、Pivotal GemFire および Apache Geode のサポートを提供します。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-gemfire</artifactId>
    <version>5.5.8</version>
</dependency>
compile "org.springframework.integration:spring-integration-gemfire:5.5.8"

GemFire は、イベント処理、連続クエリ、リモート関数の実行などの高度な分散システム機能とともにキーと値のデータグリッドを提供する分散データ管理プラットフォームです。このガイドは、有償 Pivotal GemFire またはオープンソース Apache Geode (英語) にある程度精通していることを前提としています。

Spring 統合は、エントリおよび連続照会イベント用の受信アダプター、キャッシュにエントリを書き込む送信アダプター、メッセージとメタデータストアと GemfireLockRegistry 実装を実装することにより、GemFire のサポートを提供します。Spring 統合では、Spring Data for Pivotal GemFire プロジェクトを活用して、コンポーネントの薄いラッパーを提供します。

バージョン 5.1 以降、Spring Integration GemFire モジュールは、デフォルトで Spring Data for Apache Geode [GitHub] (英語) 推移的依存関係を使用します。Pivotal GemFire の有償 Pivotal GemFire ベースの Spring Data に切り替えるには、次の Maven スニペットが示すように、spring-data-geode を依存関係から除外し、spring-data-gemfire を追加します。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-gemfire</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-geode</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-gemfire</artifactId>
</dependency>

'int-gfe' 名前空間を構成するには、XML 構成ファイルのヘッダー内に次の要素を含めます。

xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
	https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd"

受信チャネルアダプター

GemFire EntryEvent によってトリガーされると、受信チャネルアダプターはチャネル上にメッセージを生成します。GemFire は、エントリが関連領域の CREATEDUPDATEDDESTROYED または INVALIDATED の場合にイベントを生成します。受信チャネルアダプターを使用すると、これらのイベントのサブセットでフィルタリングできます。例: エントリの作成に応じてのみメッセージを生成したい場合があります。さらに、たとえば、メッセージペイロードに新しいエントリ値などのイベントプロパティを含める場合、受信チャネルアダプターは SpEL 式を評価できます。次の例は、(expression 属性の)SpEL 言語で受信チャネルアダプターを構成する方法を示しています。

<gfe:cache/>
<gfe:replicated-region id="region"/>
<int-gfe:inbound-channel-adapter id="inputChannel" region="region"
    cache-events="CREATED" expression="newValue"/>

上記の構成では、Spring GemFire の "gfe" 名前空間を使用して、GemFire Cache および Region を作成します。inbound-channel-adapter 要素には、アダプターがイベントをリッスンする GemFire 領域への参照が必要です。オプションの属性には cache-events が含まれます。これには、入力チャネルでメッセージが生成されるイベント型のコンマ区切りリストを含めることができます。デフォルトでは、CREATED と UPDATED は有効になっています。channel 属性が指定されていない場合、チャネルは id 属性から作成されます。このアダプターは error-channel もサポートしています。GemFire EntryEvent [Apache] (英語) は、expression 評価の #root オブジェクトです。次の例は、キーの値を置き換える式を示しています。

expression="new something.MyEvent(key, oldValue, newValue)"

expression 属性が提供されない場合、メッセージペイロードは GemFire EntryEvent 自体です。

このアダプターは、Spring Integration 規則に準拠しています。

連続クエリ受信チャネルアダプター

連続クエリ受信チャネルアダプターは、GemFire 連続クエリまたは CqEvent イベントによってトリガーされると、チャネル上にメッセージを生成します。リリース 1.1 で、Spring Data は GemFire ネイティブ API の優れた抽象化を提供する ContinuousQueryListenerContainer を含む連続クエリサポートを導入しました。このアダプターは、ContinuousQueryListenerContainer インスタンスへの参照を必要とし、特定の query のリスナーを作成し、クエリを実行します。連続クエリは、結果セットの状態が変わるたびに起動するイベントソースとして機能します。

GemFire クエリは OQL で記述され、1 つのリージョンだけでなくキャッシュ全体にスコープされます。さらに、連続クエリにはリモート(つまり、別のプロセスまたはリモートホストで実行)キャッシュサーバーが必要です。連続クエリの実装の詳細については、GemFire ドキュメント (英語) を参照してください。

次の構成では、GemFire クライアントキャッシュ(この実装にはリモートキャッシュサーバーが必要であり、そのアドレスはプールの子要素として構成されていることを思い出してください)、クライアント領域、Spring Data を使用する ContinuousQueryListenerContainer を作成します。

<gfe:client-cache id="client-cache" pool-name="client-pool"/>

<gfe:pool id="client-pool" subscription-enabled="true" >
    <!--configure server or locator here required to address the cache server -->
</gfe:pool>

<gfe:client-region id="test" cache-ref="client-cache" pool-name="client-pool"/>

<gfe:cq-listener-container id="queryListenerContainer" cache="client-cache"
    pool-name="client-pool"/>

<int-gfe:cq-inbound-channel-adapter id="inputChannel"
    cq-listener-container="queryListenerContainer"
    query="select * from /test"/>

連続照会受信・チャネルアダプターには cq-listener-container 属性が必要であり、これには ContinuousQueryListenerContainer への参照が含まれている必要があります。オプションで、SpEL を使用して CqEvent を変換したり、必要に応じて個々のプロパティを抽出したりする expression 属性を受け入れます。cq-inbound-channel-adapter は、入力チャネルでメッセージが生成されるイベント・型のコンマ区切りリストを含む query-events 属性を提供します。使用可能なイベント型は CREATEDUPDATEDDESTROYEDREGION_DESTROYEDREGION_INVALIDATED です。デフォルトでは、CREATED と UPDATED が有効になっています。追加のオプション属性には、query-name (オプションのクエリ名を提供)、expression (前のセクションで説明したように機能)、および durable (クエリが永続的かどうかを示すブール値 - デフォルトでは false)が含まれます。channel を指定しない場合、チャネルは id 属性から作成されます。このアダプターは error-channel もサポートします。

このアダプターは、Spring Integration 規則に準拠しています。

送信チャネルアダプター

送信チャネルアダプターは、メッセージペイロードからマップされたキャッシュエントリを書き込みます。最も単純な形式では、型 java.util.Map のペイロードを想定し、マップエントリを構成済みの領域に配置します。次の例は、送信チャネルアダプターを構成する方法を示しています。

<int-gfe:outbound-channel-adapter id="cacheChannel" region="region"/>

上記の構成を前提として、ペイロードが Map でない場合、例外がスローされます。さらに、SpEL を使用してキャッシュエントリのマップを作成するように送信チャネルアダプターを構成できます。次の例は、その方法を示しています。

<int-gfe:outbound-channel-adapter id="cacheChannel" region="region">
    <int-gfe:cache-entries>
        <entry key="payload.toUpperCase()" value="payload.toLowerCase()"/>
        <entry key="'thing1'" value="'thing2'"/>
    </int-gfe:cache-entries>
</int-gfe:outbound-channel-adapter>

上記の構成では、内部要素(cache-entries)は Spring 'map' 要素と意味的に同等です。アダプターは、key および value 属性を、メッセージを評価コンテキストとして含む SpEL 式として解釈します。これには、メッセージから派生したものだけでなく、任意のキャッシュエントリを含めることができ、リテラル値は一重引用符で囲む必要があることに注意してください。上記の例では、cacheChannel に送信されたメッセージに値 Hello の String ペイロードがある場合、キャッシュ領域に 2 つのエントリ([HELLO:hello, thing1:thing2])が書き込まれます(作成または更新されます)。このアダプターは order 属性もサポートします。これは、PublishSubscribeChannel にバインドされている場合に役立ちます。

Gemfire メッセージストア

EIP で説明したように、メッセージストア (英語) を使用すると、メッセージを保持 (英語) できます。これは、信頼性が懸念される場合にメッセージをバッファリングする機能(QueueChannelAggregatorResequencer など)を備えたコンポーネントを扱うときに役立ちます。Spring Integration では、MessageStore ストラテジーインターフェースはクレームチェック (英語) パターンの基盤も提供します。これは EIP でも説明されています。

Spring Integration の Gemfire モジュールは GemfireMessageStore を提供します。これは、MessageStore 戦略(主に QueueChannel および ClaimCheck パターンで使用)と MessageGroupStore 戦略(主に Aggregator および Resequencer パターンで使用)の両方の実装です。

次の例では、spring-gemfire 名前空間を使用してキャッシュと領域を構成します(spring-integration-gemfire 名前空間と混同しないでください)。

<bean id="gemfireMessageStore" class="o.s.i.gemfire.store.GemfireMessageStore">
    <constructor-arg ref="myRegion"/>
</bean>

<gfe:cache/>

<gfe:replicated-region id="myRegion"/>


<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="gemfireMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
    message-store="gemfireMessageStore"/>

多くの場合、クライアント / サーバー構成の 1 つ以上のリモートキャッシュサーバーでメッセージストアを維持することが望ましいです。この場合、クライアントキャッシュ、クライアントリージョン、クライアントプールを構成し、リージョンを MessageStore に挿入する必要があります。次の例は、その方法を示しています。

<bean id="gemfireMessageStore"
    class="org.springframework.integration.gemfire.store.GemfireMessageStore">
    <constructor-arg ref="myRegion"/>
</bean>

<gfe:client-cache/>

<gfe:client-region id="myRegion" shortcut="PROXY" pool-name="messageStorePool"/>

<gfe:pool id="messageStorePool">
    <gfe:server host="localhost" port="40404" />
</gfe:pool>

pool 要素はキャッシュサーバーのアドレスで構成されていることに注意してください(ここでロケーターを置き換えることができます)。領域は "PROXY" として構成されているため、データはローカルに保存されません。リージョンの id は、キャッシュサーバー内の同じ名前のリージョンに対応しています。

バージョン 4.3.12 以降、GemfireMessageStore はキー prefix オプションをサポートし、同じ GemFire リージョン上のストアのインスタンスを区別できるようになりました。

Gemfire Lock レジストリ

バージョン 4.0 から、GemfireLockRegistry が利用可能になりました。特定のコンポーネント(アグリゲーターやリシーケンサーなど)は、LockRegistry インスタンスから取得したロックを使用して、常に 1 つのスレッドのみがグループを操作するようにします。DefaultLockRegistry は、単一のコンポーネント内でこの機能を実行します。これらのコンポーネントで外部ロックレジストリを構成できるようになりました。GemfireLockRegistry で共有 MessageGroupStore を使用すると、一度に 1 つのインスタンスのみがグループを操作できるように、複数のアプリケーションインスタンスにわたってこの機能を提供できます。

GemfireLockRegistry コンストラクターの 1 つは、引数として Region を必要とします。getDistributedLock() メソッドから Lock を取得するために使用されます。この操作には、Region の GLOBAL スコープが必要です。別のコンストラクターには Cache が必要であり、Region は GLOBAL スコープと名前 LockRegistry で作成されます。

Gemfire メタデータストア

バージョン 4.0 は、Gemfire ベースの新しい MetadataStore (メタデータストア)実装を導入しました。GemfireMetadataStore を使用して、アプリケーションの再起動後もメタデータの状態を維持できます。この新しい MetadataStore 実装は、次のようなアダプターで使用できます。

これらのアダプターに新しい GemfireMetadataStore を使用させるには、Bean 名が metadataStore の Spring Bean を宣言します。フィード受信チャネルアダプターは、宣言された GemfireMetadataStore を自動的に取得して使用します。

GemfireMetadataStore は ConcurrentMetadataStore も実装しており、キーの値を保存または変更できるインスタンスは 1 つだけであるため、複数のアプリケーションインスタンス間で確実に共有できます。これらの方法は、領域の範囲とデータポリシーに基づいて、さまざまなレベルの同時実行性を保証します。これらはピアキャッシュとクライアントサーバーキャッシュに実装されますが、NORMAL または EMPTY データポリシーを持つピア領域では許可されません。
バージョン 5.0 以降、GemfireMetadataStore は ListenableMetadataStore も実装しています。これにより、次の例に示すように、MetadataStoreListener インスタンスをストアに提供することでキャッシュイベントをリッスンできます。
GemfireMetadataStore metadataStore = new GemfireMetadataStore(cache);
metadataStore.addListener(new MetadataStoreListenerAdapter() {

    @Override
    public void onAdd(String key, String value) {
         ...
    }

});