重要な GemFire および Apache Geode サポート
Spring Integration は、Pivotal GemFire および Apache Geode のサポートを提供します。
この依存関係をプロジェクトに含める必要があります。
GemFire は、イベント処理、連続クエリ、リモート関数の実行などの高度な分散システム機能とともにキーと値のデータグリッドを提供する分散データ管理プラットフォームです。このガイドは、有償 Pivotal GemFire またはオープンソース Apache Geode (英語) にある程度精通していることを前提としています。
Spring 統合は、エントリおよび連続照会イベント用の受信アダプター、キャッシュにエントリを書き込む送信アダプター、メッセージとメタデータストアと GemfireLockRegistry
実装を実装することにより、GemFire のサポートを提供します。Spring 統合では、Spring Data for Pivotal GemFire プロジェクトを活用して、コンポーネントの薄いラッパーを提供します。
バージョン 5.1 以降、Spring Integration GemFire モジュールは、デフォルトで Spring Data for Apache Geode [GitHub] (英語) 推移的依存関係を使用します。Pivotal GemFire の有償 Pivotal GemFire ベースの Spring Data に切り替えるには、次の Maven スニペットが示すように、spring-data-geode
を依存関係から除外し、spring-data-gemfire
を追加します。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-gemfire</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-geode</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-gemfire</artifactId>
</dependency>
'int-gfe' 名前空間を構成するには、XML 構成ファイルのヘッダー内に次の要素を含めます。
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd"
受信チャネルアダプター
GemFire EntryEvent
によってトリガーされると、受信チャネルアダプターはチャネル上にメッセージを生成します。GemFire は、エントリが関連領域の CREATED
、UPDATED
、DESTROYED
または INVALIDATED
の場合にイベントを生成します。受信チャネルアダプターを使用すると、これらのイベントのサブセットでフィルタリングできます。例: エントリの作成に応じてのみメッセージを生成したい場合があります。さらに、たとえば、メッセージペイロードに新しいエントリ値などのイベントプロパティを含める場合、受信チャネルアダプターは SpEL 式を評価できます。次の例は、(expression
属性の)SpEL 言語で受信チャネルアダプターを構成する方法を示しています。
<gfe:cache/>
<gfe:replicated-region id="region"/>
<int-gfe:inbound-channel-adapter id="inputChannel" region="region"
cache-events="CREATED" expression="newValue"/>
上記の構成では、Spring GemFire の "gfe" 名前空間を使用して、GemFire Cache
および Region
を作成します。inbound-channel-adapter
要素には、アダプターがイベントをリッスンする GemFire 領域への参照が必要です。オプションの属性には cache-events
が含まれます。これには、入力チャネルでメッセージが生成されるイベント型のコンマ区切りリストを含めることができます。デフォルトでは、CREATED
と UPDATED
は有効になっています。channel
属性が指定されていない場合、チャネルは id
属性から作成されます。このアダプターは error-channel
もサポートしています。GemFire EntryEvent
[Apache] (英語) は、expression
評価の #root
オブジェクトです。次の例は、キーの値を置き換える式を示しています。
expression="new something.MyEvent(key, oldValue, newValue)"
expression
属性が提供されない場合、メッセージペイロードは GemFire EntryEvent
自体です。
このアダプターは、Spring Integration 規則に準拠しています。 |
連続クエリ受信チャネルアダプター
連続クエリ受信チャネルアダプターは、GemFire 連続クエリまたは CqEvent
イベントによってトリガーされると、チャネル上にメッセージを生成します。リリース 1.1
で、Spring Data は GemFire ネイティブ API の優れた抽象化を提供する ContinuousQueryListenerContainer
を含む連続クエリサポートを導入しました。このアダプターは、ContinuousQueryListenerContainer
インスタンスへの参照を必要とし、特定の query
のリスナーを作成し、クエリを実行します。連続クエリは、結果セットの状態が変わるたびに起動するイベントソースとして機能します。
GemFire クエリは OQL で記述され、1 つのリージョンだけでなくキャッシュ全体にスコープされます。さらに、連続クエリにはリモート(つまり、別のプロセスまたはリモートホストで実行)キャッシュサーバーが必要です。連続クエリの実装の詳細については、GemFire ドキュメント (英語) を参照してください。 |
次の構成では、GemFire クライアントキャッシュ(この実装にはリモートキャッシュサーバーが必要であり、そのアドレスはプールの子要素として構成されていることを思い出してください)、クライアント領域、Spring Data を使用する ContinuousQueryListenerContainer
を作成します。
<gfe:client-cache id="client-cache" pool-name="client-pool"/>
<gfe:pool id="client-pool" subscription-enabled="true" >
<!--configure server or locator here required to address the cache server -->
</gfe:pool>
<gfe:client-region id="test" cache-ref="client-cache" pool-name="client-pool"/>
<gfe:cq-listener-container id="queryListenerContainer" cache="client-cache"
pool-name="client-pool"/>
<int-gfe:cq-inbound-channel-adapter id="inputChannel"
cq-listener-container="queryListenerContainer"
query="select * from /test"/>
連続照会受信・チャネルアダプターには cq-listener-container
属性が必要であり、これには ContinuousQueryListenerContainer
への参照が含まれている必要があります。オプションで、SpEL を使用して CqEvent
を変換したり、必要に応じて個々のプロパティを抽出したりする expression
属性を受け入れます。cq-inbound-channel-adapter
は、入力チャネルでメッセージが生成されるイベント・型のコンマ区切りリストを含む query-events
属性を提供します。使用可能なイベント型は CREATED
、UPDATED
、DESTROYED
、REGION_DESTROYED
、REGION_INVALIDATED
です。デフォルトでは、CREATED
と UPDATED
が有効になっています。追加のオプション属性には、query-name
(オプションのクエリ名を提供)、expression
(前のセクションで説明したように機能)、および durable
(クエリが永続的かどうかを示すブール値 - デフォルトでは false)が含まれます。channel
を指定しない場合、チャネルは id
属性から作成されます。このアダプターは error-channel
もサポートします。
このアダプターは、Spring Integration 規則に準拠しています。 |
送信チャネルアダプター
送信チャネルアダプターは、メッセージペイロードからマップされたキャッシュエントリを書き込みます。最も単純な形式では、型 java.util.Map
のペイロードを想定し、マップエントリを構成済みの領域に配置します。次の例は、送信チャネルアダプターを構成する方法を示しています。
<int-gfe:outbound-channel-adapter id="cacheChannel" region="region"/>
上記の構成を前提として、ペイロードが Map
でない場合、例外がスローされます。さらに、SpEL を使用してキャッシュエントリのマップを作成するように送信チャネルアダプターを構成できます。次の例は、その方法を示しています。
<int-gfe:outbound-channel-adapter id="cacheChannel" region="region">
<int-gfe:cache-entries>
<entry key="payload.toUpperCase()" value="payload.toLowerCase()"/>
<entry key="'thing1'" value="'thing2'"/>
</int-gfe:cache-entries>
</int-gfe:outbound-channel-adapter>
上記の構成では、内部要素(cache-entries
)は Spring 'map' 要素と意味的に同等です。アダプターは、key
および value
属性を、メッセージを評価コンテキストとして含む SpEL 式として解釈します。これには、メッセージから派生したものだけでなく、任意のキャッシュエントリを含めることができ、リテラル値は一重引用符で囲む必要があることに注意してください。上記の例では、cacheChannel
に送信されたメッセージに値 Hello
の String
ペイロードがある場合、キャッシュ領域に 2 つのエントリ([HELLO:hello, thing1:thing2]
)が書き込まれます(作成または更新されます)。このアダプターは order
属性もサポートします。これは、PublishSubscribeChannel
にバインドされている場合に役立ちます。
Gemfire メッセージストア
EIP で説明したように、メッセージストア (英語) を使用すると、メッセージを保持 (英語) できます。これは、信頼性が懸念される場合にメッセージをバッファリングする機能(QueueChannel
、Aggregator
、Resequencer
など)を備えたコンポーネントを扱うときに役立ちます。Spring Integration では、MessageStore
ストラテジーインターフェースはクレームチェック (英語) パターンの基盤も提供します。これは EIP でも説明されています。
Spring Integration の Gemfire モジュールは GemfireMessageStore
を提供します。これは、MessageStore
戦略(主に QueueChannel
および ClaimCheck
パターンで使用)と MessageGroupStore
戦略(主に Aggregator
および Resequencer
パターンで使用)の両方の実装です。
次の例では、spring-gemfire
名前空間を使用してキャッシュと領域を構成します(spring-integration-gemfire
名前空間と混同しないでください)。
<bean id="gemfireMessageStore" class="o.s.i.gemfire.store.GemfireMessageStore">
<constructor-arg ref="myRegion"/>
</bean>
<gfe:cache/>
<gfe:replicated-region id="myRegion"/>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="gemfireMessageStore"/>
<int:channel>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="gemfireMessageStore"/>
多くの場合、クライアント / サーバー構成の 1 つ以上のリモートキャッシュサーバーでメッセージストアを維持することが望ましいです。この場合、クライアントキャッシュ、クライアントリージョン、クライアントプールを構成し、リージョンを MessageStore
に挿入する必要があります。次の例は、その方法を示しています。
<bean id="gemfireMessageStore"
class="org.springframework.integration.gemfire.store.GemfireMessageStore">
<constructor-arg ref="myRegion"/>
</bean>
<gfe:client-cache/>
<gfe:client-region id="myRegion" shortcut="PROXY" pool-name="messageStorePool"/>
<gfe:pool id="messageStorePool">
<gfe:server host="localhost" port="40404" />
</gfe:pool>
pool
要素はキャッシュサーバーのアドレスで構成されていることに注意してください(ここでロケーターを置き換えることができます)。領域は "PROXY" として構成されているため、データはローカルに保存されません。リージョンの id
は、キャッシュサーバー内の同じ名前のリージョンに対応しています。
バージョン 4.3.12 以降、GemfireMessageStore
はキー prefix
オプションをサポートし、同じ GemFire リージョン上のストアのインスタンスを区別できるようになりました。
Gemfire Lock レジストリ
バージョン 4.0 から、GemfireLockRegistry
が利用可能になりました。特定のコンポーネント(アグリゲーターやリシーケンサーなど)は、LockRegistry
インスタンスから取得したロックを使用して、常に 1 つのスレッドのみがグループを操作するようにします。DefaultLockRegistry
は、単一のコンポーネント内でこの機能を実行します。これらのコンポーネントで外部ロックレジストリを構成できるようになりました。GemfireLockRegistry
で共有 MessageGroupStore
を使用すると、一度に 1 つのインスタンスのみがグループを操作できるように、複数のアプリケーションインスタンスにわたってこの機能を提供できます。
GemfireLockRegistry コンストラクターの 1 つは、引数として Region を必要とします。getDistributedLock() メソッドから Lock を取得するために使用されます。この操作には、Region の GLOBAL スコープが必要です。別のコンストラクターには Cache が必要であり、Region は GLOBAL スコープと名前 LockRegistry で作成されます。 |
Gemfire メタデータストア
バージョン 4.0 は、Gemfire ベースの新しい MetadataStore
(メタデータストア)実装を導入しました。GemfireMetadataStore
を使用して、アプリケーションの再起動後もメタデータの状態を維持できます。この新しい MetadataStore
実装は、次のようなアダプターで使用できます。
これらのアダプターに新しい GemfireMetadataStore
を使用させるには、Bean 名が metadataStore
の Spring Bean を宣言します。フィード受信チャネルアダプターは、宣言された GemfireMetadataStore
を自動的に取得して使用します。
GemfireMetadataStore は ConcurrentMetadataStore も実装しており、キーの値を保存または変更できるインスタンスは 1 つだけであるため、複数のアプリケーションインスタンス間で確実に共有できます。これらの方法は、領域の範囲とデータポリシーに基づいて、さまざまなレベルの同時実行性を保証します。これらはピアキャッシュとクライアントサーバーキャッシュに実装されますが、NORMAL または EMPTY データポリシーを持つピア領域では許可されません。 |
バージョン 5.0 以降、GemfireMetadataStore は ListenableMetadataStore も実装しています。これにより、次の例に示すように、MetadataStoreListener インスタンスをストアに提供することでキャッシュイベントをリッスンできます。 |
GemfireMetadataStore metadataStore = new GemfireMetadataStore(cache);
metadataStore.addListener(new MetadataStoreListenerAdapter() {
@Override
public void onAdd(String key, String value) {
...
}
});