Hazelcast サポート
Spring Integration は、メモリ内データグリッド Hazelcast (英語) と対話するためのチャネルアダプターおよびその他のユーティリティコンポーネントを提供します。
この依存関係はプロジェクトに必要です:
Hazelcast コンポーネントの XML 名前空間と schemaLocation 定義は次のとおりです。
xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"Hazelcast イベント駆動型受信 チャネルアダプター
Hazelcast は、次のような分散データ構造を提供します。
com.hazelcast.map.IMapcom.hazelcast.multimap.MultiMapcom.hazelcast.collection.IListcom.hazelcast.collection.ISetcom.hazelcast.collection.IQueuecom.hazelcast.topic.ITopiccom.hazelcast.replicatedmap.ReplicatedMap
また、これらのデータ構造に加えられた変更をリッスンするためのイベントリスナーも提供します。
com.hazelcast.core.EntryListener<K, V>com.hazelcast.collection.ItemListenercom.hazelcast.topic.MessageListener
Hazelcast イベントドリブン受信 チャネルアダプターは、関連するキャッシュイベントをリッスンし、定義されたチャネルにイベントメッセージを送信します。XML と JavaConfig 駆動の構成の両方をサポートしています。
XML 構成:
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED"
cache-listening-policy="SINGLE" />Hazelcast イベント駆動型受信 チャネルアダプターには、次の属性が必要です。
channel: メッセージが送信されるチャネルを指定します。cache: リッスンする分散オブジェクト参照を指定します。これは必須属性です。cache-events: リッスンするキャッシュイベントを指定します。これはオプションの属性で、デフォルト値はADDEDです。サポートされている値は次のとおりです。IMapおよびMultiMapでサポートされているキャッシュイベント型:ADDED、REMOVED、UPDATED、EVICTED、EVICT_ALL、CLEAR_ALL;ReplicatedMapでサポートされているキャッシュイベント型:ADDED、REMOVED、UPDATED、EVICTED;IList、ISet、IQueueでサポートされているキャッシュイベント型:ADDED、REMOVED。ITopicのキャッシュイベント型はありません。cache-listening-policy: キャッシュリスニングポリシーをSINGLEまたはALLに指定します。これはオプション属性で、デフォルト値はSINGLEです。同じキャッシュオブジェクトを同じ cache-events 属性でリスニングする各 Hazelcast 受信チャネルアダプターは、単一のイベントメッセージまたはすべてのイベントメッセージを受信できます。ALLの場合、同じキャッシュオブジェクトを同じ cache-events 属性でリスニングするすべての Hazelcast 受信チャネルアダプターは、すべてのイベントメッセージを受信します。SINGLEの場合、すべての Hazelcast 受信チャネルアダプターは、それぞれ固有のイベントメッセージを受信します。
いくつかの構成サンプル:
<int:channel id="mapChannel"/>
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED" />
<bean id="map" factory-bean="instance" factory-method="getMap">
<constructor-arg value="map"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean><int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED, REMOVED, CLEAR_ALL" />
<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
<constructor-arg value="multiMap"/>
</bean><int-hazelcast:inbound-channel-adapter channel="listChannel"
cache="list"
cache-events="ADDED, REMOVED"
cache-listening-policy="ALL" />
<bean id="list" factory-bean="instance" factory-method="getList">
<constructor-arg value="list"/>
</bean><int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />
<bean id="set" factory-bean="instance" factory-method="getSet">
<constructor-arg value="set"/>
</bean><int-hazelcast:inbound-channel-adapter channel="queueChannel"
cache="queue"
cache-events="REMOVED"
cache-listening-policy="ALL" />
<bean id="queue" factory-bean="instance" factory-method="getQueue">
<constructor-arg value="queue"/>
</bean><int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />
<bean id="topic" factory-bean="instance" factory-method="getTopic">
<constructor-arg value="topic"/>
</bean><int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
cache="replicatedMap"
cache-events="ADDED, UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
<constructor-arg value="replicatedMap"/>
</bean>Java 構成のサンプル:
次のサンプルは、DistributedMap 構成を示しています。他の分散データ構造 (IMap、MultiMap、ReplicatedMap、IList、ISet、IQueue、ITopic) にも同じ構成を使用できます。
@Bean
public PollableChannel distributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hazelcastInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
producer.setOutputChannel(distributedMapChannel());
producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
return producer;
}Hazelcast 連続クエリ受信 チャネルアダプター
Hazelcast Continuous Query は、特定のマップエントリに対して行われた変更をリッスンすることを可能にします。Hazelcast Continuous Query Inbound Channel Adapter は、定義された述語に基づいて関連する分散マップイベントをリッスンするイベント駆動型チャネルアダプターです。
Java
XML
@Bean
public PollableChannel cqDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> cqDistributedMap() {
return hazelcastInstance().getMap("CQ_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
final HazelcastContinuousQueryMessageProducer producer =
new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
producer.setOutputChannel(cqDistributedMapChannel());
producer.setCacheEventTypes("UPDATED");
producer.setIncludeValue(false);
return producer;
}
<int:channel id="cqMapChannel"/>
<int-hazelcast:cq-inbound-channel-adapter
channel="cqMapChannel"
cache="cqMap"
cache-events="UPDATED, REMOVED"
predicate="name=TestName AND surname=TestSurname"
include-value="true"
cache-listening-policy="SINGLE"/>
<bean id="cqMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="cqMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
次の 6 つの属性をサポートします。
channel: メッセージが送信されるチャネルを指定します。cache: リッスンする分散 Map 参照を指定します。必須 ;cache-events: リッスンするキャッシュイベントを指定します。ADDEDがデフォルト値であるオプションの属性。サポートされている値はADDED、REMOVED、UPDATED、EVICTED、EVICT_ALL、CLEAR_ALLです。predicate: 特定のマップエントリに対して実行された変更をリッスンする述語を指定します。必須 ;include-value: 連続クエリ結果に値と oldValue を含めることを指定します。オプションで、デフォルトはtrueです。cache-listening-policy: キャッシュリスニングポリシーをSINGLEまたはALLで指定します。これはオプションで、デフォルト値はSINGLEです。同じキャッシュオブジェクトを同じ cache-events 属性でリスニングする各 Hazelcast CQ 受信チャネルアダプターは、単一のイベントメッセージまたはすべてのイベントメッセージを受信できます。ALLの場合、同じキャッシュオブジェクトを同じ cache-events 属性でリスニングするすべての Hazelcast CQ 受信チャネルアダプターは、すべてのイベントメッセージを受信します。SINGLEの場合、すべての Hazelcast CQ 受信チャネルアダプターは、それぞれ固有のイベントメッセージを受信します。
Hazelcast クラスタモニタ受信 チャネルアダプター
Hazelcast Cluster Monitor は、クラスターで実行された変更のリッスンをサポートします。Hazelcast Cluster Monitor Inbound Channel Adapter は、イベント駆動型のチャネルアダプターであり、関連するメンバーシップ、分散オブジェクト、移行、ライフサイクル、クライアントイベントをリッスンします。
Java
XML
@Bean
public PollableChannel eventChannel() {
return new QueueChannel();
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
producer.setOutputChannel(eventChannel());
producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");
return producer;
}
<int:channel id="monitorChannel"/>
<int-hazelcast:cm-inbound-channel-adapter
channel="monitorChannel"
hazelcast-instance="instance"
monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
次の 3 つの属性をサポートします。
channel: メッセージが送信されるチャネルを指定します。hazelcast-instance: クラスターイベントをリッスンする Hazelcast インスタンスリファレンスを指定します。これは必須属性です。monitor-types: リッスンするモニター型を指定します。これはオプションの属性で、MEMBERSHIPがデフォルト値です。サポートされている値はMEMBERSHIP、DISTRIBUTED_OBJECT、MIGRATION、LIFECYCLE、CLIENTです。
Hazelcast 分散 SQL 受信 チャネルアダプター
Hazelcast を使用すると、分散マップで分散クエリを実行できます。Hazelcast 分散 SQL 受信 チャネルアダプターは、ポーリング受信 チャネルアダプターです。定義済みの distributed-sql コマンドを実行し、反復型に応じて結果を返します。
Java
XML
@Bean
public PollableChannel dsDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> dsDistributedMap() {
return hazelcastInstance().getMap("DS_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
final HazelcastDistributedSQLMessageSource messageSource =
new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
"name='TestName' AND surname='TestSurname'");
messageSource.setIterationType(DistributedSQLIterationType.ENTRY);
return messageSource;
}
<int:channel id="dsMapChannel"/>
<int-hazelcast:ds-inbound-channel-adapter
channel="dsMapChannel"
cache="dsMap"
iteration-type="ENTRY"
distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
<int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
<bean id="dsMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="dsMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
ポーラーが必要で、次の 4 つの属性をサポートしています。
channel: メッセージが送信されるチャネルを指定します。これは必須属性です。cache: クエリ対象となる分散IMap参照を指定します。これは必須属性です。iteration-type: 結果の型を指定します。分散 SQL はEntrySet、KeySet、LocalKeySetまたはValuesで実行できます。これはオプション属性で、デフォルトはVALUEです。サポートされる値はENTRY, `KEY、LOCAL_KEY、VALUEです。distributed-sql: SQL 文の WHERE 句を指定します。これは必須属性です。
Hazelcast 送信チャネルアダプター
Hazelcast 送信チャネルアダプターは、定義されたチャネルをリッスンし、受信メッセージを関連する分散キャッシュに書き込みます。分散オブジェクト定義には、cache、cache-expression、または HazelcastHeaders.CACHE_NAME のいずれかを使用します。サポートされる分散オブジェクトは IMap、MultiMap、ReplicatedMap、IList、ISet、IQueue、ITopic です。
Java
XML
@Bean
public MessageChannel distributedMapChannel() {
return new DirectChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hzInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hzInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
handler.setDistributedObject(distributedMap());
handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
handler.setExtractPayload(true);
return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
cache-expression="headers['CACHE_HEADER']"
key-expression="payload.key"
extract-payload="true"/>
次の属性が必要です。
channel: メッセージが送信されるチャネルを指定します。cache: 分散オブジェクト参照を指定します。オプション。cache-expression: Spring Expression Language (SpEL) を介して分散オブジェクトを指定します。オプション。key-expression: Spring 式言語 (SpEL) を介してキーと値のペアのキーを指定します。オプションであり、IMap、MultiMap、ReplicatedMap分散データ構造に対してのみ必須です。extract-payload: メッセージ全体を送信するか、ペイロードだけを送信するかを指定します。オプションの属性で、trueがデフォルトです。true の場合、ペイロードだけが分散オブジェクトに書き込まれます。それ以外の場合、メッセージヘッダーとペイロードの両方を変換することによって、メッセージ全体が書き込まれます。
ヘッダーに分散オブジェクト名を設定することで、同じチャネルを介して異なる分散オブジェクトにメッセージを書き込むことができます。cache または cache-expression 属性が定義されていない場合は、リクエスト Message に HazelcastHeaders.CACHE_NAME ヘッダーを設定する必要があります。
Hazelcast メッセージストア
永続的な QueueChannel または追跡 Aggregator メッセージグループなどの分散メッセージング状態管理のために、HazelcastMessageStore 実装が提供されます。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MessageGroupStore messageStore() {
return new HazelcastMessageStore(hazelcastInstance());
} デフォルトでは、SPRING_INTEGRATION_MESSAGE_STORE IMap は、メッセージとグループをキー / 値として保存するために使用されます。任意のカスタム IMap を HazelcastMessageStore に提供できます。
Hazelcast メタデータストア
ListenableMetadataStore の実装は、バッキング Hazelcast IMap を使用して利用できます。デフォルトのマップは、カスタマイズ可能な SPRING_INTEGRATION_METADATA_STORE という名前で作成されます。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MetadataStore metadataStore() {
return new HazelcastMetadataStore(hazelcastInstance());
}HazelcastMetadataStore は ListenableMetadataStore を実装します。これにより、型 MetadataStoreListener の独自のリスナーを登録して、addListener(MetadataStoreListener callback) を介してイベントをリッスンできます。
Hazelcast を使用したメッセージチャネル
Hazelcast IQueue および ITopic 分散オブジェクトは、本質的にはメッセージングプリミティブであり、この Hazelcast モジュールで追加の実装を行わなくても Spring Integration コアコンポーネントで使用できます。
QueueChannel は、前述の Hazelcast 分散 IQueue を含む、任意の java.util.Queue によって提供できます。
@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
return new QueueChannel(hazelcastInstance.getQueue("springIntegrationQueue"));
} この設定をアプリケーションの Hazelcast クラスタ内の複数のノードに配置すると、QueueChannel が分散型となり、IQueue から 1 つの Message をポーリングできるのは 1 つのノードのみになります。これは PollableJmsChannel、PollableKafkaChannel、PollableAmqpChannel と同様に機能します。
プロデューサー側が Spring Integration アプリケーションでない場合、QueueChannel を構成する方法がないため、プレーンな Hazelcast IQueue API を使用してデータを生成します。この場合、QueueChannel アプローチはコンシューマー側で間違っています。代わりに受信チャネルアダプターソリューションを使用する必要があります。
@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
return hazelcastInstance.getQueue("springIntegrationQueue");
}
@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
return myStringHzQueue::poll;
}Hazelcast の ITopic 抽象化には、JMS の Topic と同様のセマンティクスがあります。つまり、すべてのサブスクライバーがパブリッシュされたメッセージを受け取ります。シンプルな MessageChannel Bean のペアを使用すると、このメカニズムはすぐに使用できる機能としてサポートされます。
@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
MessageChannel fromHazelcastTopicChannel) {
ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
return topic;
}
@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
return new FixedSubscriberChannel(springIntegrationTopic::publish);
}
@Bean
public MessageChannel fromHazelcastTopicChannel() {
return new DirectChannel();
}FixedSubscriberChannel は DirectChannel の最適化されたバリアントであり、初期化時に MessageHandler が必要です。MessageHandler は関数型インターフェースであるため、handleMessage メソッドの単純なラムダを提供できます。メッセージが publishToHazelcastTopicChannel に送信されると、そのメッセージは Hazelcast ITopic にパブリッシュされます。com.hazelcast.topic.MessageListener も関数インターフェースであるため、ITopic#addMessageListener へのラムダを提供できます。fromHazelcastTopicChannel のサブスクライバーは、前述の ITopic に送信されたすべてのメッセージを消費します。
ExecutorChannel は IExecutorService とともに提供できます。例: それぞれの構成により、クラスター全体のシングルトンを実現できます。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(
new Config()
.addExecutorConfig(new ExecutorConfig()
.setName("singletonExecutor")
.setPoolSize(1)));
}
@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}