Hazelcast サポート
Spring Integration は、メモリ内データグリッド Hazelcast (英語) と対話するためのチャネルアダプターおよびその他のユーティリティコンポーネントを提供します。
この依存関係をプロジェクトに含める必要があります。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-hazelcast</artifactId>
<version>6.0.5</version>
</dependency>
compile "org.springframework.integration:spring-integration-hazelcast:6.0.5"
Hazelcast コンポーネントの XML 名前空間と schemaLocation の定義は次のとおりです。
xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"
Hazelcast イベント駆動型受信 チャネルアダプター
Hazelcast は、次のような分散データ構造を提供します。
com.hazelcast.map.IMap
com.hazelcast.multimap.MultiMap
com.hazelcast.collection.IList
com.hazelcast.collection.ISet
com.hazelcast.collection.IQueue
com.hazelcast.topic.ITopic
com.hazelcast.replicatedmap.ReplicatedMap
また、これらのデータ構造に加えられた変更をリッスンするためのイベントリスナーも提供します。
com.hazelcast.core.EntryListener<K, V>
com.hazelcast.collection.ItemListener
com.hazelcast.topic.MessageListener
Hazelcast イベントドリブン 受信 チャネルアダプターは、関連するキャッシュイベントをリッスンし、定義されたチャネルにイベントメッセージを送信します。XML と JavaConfig 駆動の構成の両方をサポートしています。
XML 構成:
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
Hazelcast イベント駆動型受信 チャネルアダプターには、次の属性が必要です。
channel
: メッセージが送信されるチャネルを指定します。cache
: リッスンする分散オブジェクト参照を指定します。これは必須属性です。cache-events
: リッスンするキャッシュイベントを指定します。これはオプションの属性で、デフォルト値はADDED
です。サポートされている値は次のとおりです。IMap
およびMultiMap
でサポートされているキャッシュイベント型:ADDED
、REMOVED
、UPDATED
、EVICTED
、EVICT_ALL
、CLEAR_ALL
;ReplicatedMap
でサポートされているキャッシュイベント型:ADDED
、REMOVED
、UPDATED
、EVICTED
;IList
、ISet
、IQueue
でサポートされているキャッシュイベント型:ADDED
、REMOVED
。ITopic
のキャッシュイベント型はありません。cache-listening-policy
: キャッシュリスニングポリシーをSINGLE
またはALL
として指定します。これはオプションの属性で、デフォルト値はSINGLE
です。同じ cache-events 属性を持つ同じキャッシュオブジェクトをリッスンする各 Hazelcast 受信チャネルアダプターは、単一のイベントメッセージまたはすべてのイベントメッセージを受信できます。ALL
の場合、同じ cache-events 属性を持つ同じキャッシュオブジェクトをリッスンするすべての Hazelcast 受信 チャネルアダプターは、すべてのイベントメッセージを受信します。SINGLE
の場合、固有のイベントメッセージを受け取ります。
いくつかの構成サンプル:
<int:channel id="mapChannel"/>
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED" />
<bean id="map" factory-bean="instance" factory-method="getMap">
<constructor-arg value="map"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED, REMOVED, CLEAR_ALL" />
<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
<constructor-arg value="multiMap"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="listChannel"
cache="list"
cache-events="ADDED, REMOVED"
cache-listening-policy="ALL" />
<bean id="list" factory-bean="instance" factory-method="getList">
<constructor-arg value="list"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />
<bean id="set" factory-bean="instance" factory-method="getSet">
<constructor-arg value="set"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="queueChannel"
cache="queue"
cache-events="REMOVED"
cache-listening-policy="ALL" />
<bean id="queue" factory-bean="instance" factory-method="getQueue">
<constructor-arg value="queue"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />
<bean id="topic" factory-bean="instance" factory-method="getTopic">
<constructor-arg value="topic"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
cache="replicatedMap"
cache-events="ADDED, UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
<constructor-arg value="replicatedMap"/>
</bean>
Java 構成のサンプル:
次のサンプルは、DistributedMap
構成を示しています。他の分散データ構造 (IMap
、MultiMap
、ReplicatedMap
、IList
、ISet
、IQueue
、ITopic
) にも同じ構成を使用できます。
@Bean
public PollableChannel distributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hazelcastInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
producer.setOutputChannel(distributedMapChannel());
producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
return producer;
}
Hazelcast 連続クエリ 受信 チャネルアダプター
Hazelcast 連続クエリを使用すると、特定のマップエントリに対して実行された変更をリッスンできます。Hazelcast 連続クエリ 受信 チャネルアダプターは、定義された述語に照らして、関連する分散マップイベントをリッスンするイベントドリブンチャネルアダプターです。
@Bean
public PollableChannel cqDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> cqDistributedMap() {
return hazelcastInstance().getMap("CQ_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
final HazelcastContinuousQueryMessageProducer producer =
new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
producer.setOutputChannel(cqDistributedMapChannel());
producer.setCacheEventTypes("UPDATED");
producer.setIncludeValue(false);
return producer;
}
<int:channel id="cqMapChannel"/>
<int-hazelcast:cq-inbound-channel-adapter
channel="cqMapChannel"
cache="cqMap"
cache-events="UPDATED, REMOVED"
predicate="name=TestName AND surname=TestSurname"
include-value="true"
cache-listening-policy="SINGLE"/>
<bean id="cqMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="cqMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
次の 6 つの属性をサポートします。
channel
: メッセージが送信されるチャネルを指定します。cache
: リッスンする分散 Map 参照を指定します。必須 ;cache-events
: リッスンするキャッシュイベントを指定します。ADDED
がデフォルト値であるオプションの属性。サポートされている値はADDED
、REMOVED
、UPDATED
、EVICTED
、EVICT_ALL
、CLEAR_ALL
です。predicate
: 特定のマップエントリに対して実行された変更をリッスンする述語を指定します。必須 ;include-value
: value と oldValue を連続クエリ結果に含めることを指定します。オプションで、true
がデフォルトです。cache-listening-policy
: キャッシュリスニングポリシーをSINGLE
またはALL
として指定します。オプションで、デフォルト値はSINGLE
です。同じ cache-events 属性を持つ同じキャッシュオブジェクトをリッスンする各 Hazelcast CQ 受信 チャネルアダプターは、単一のイベントメッセージまたはすべてのイベントメッセージを受信できます。ALL
の場合、同じ cache-events 属性を持つ同じキャッシュオブジェクトをリッスンするすべての Hazelcast CQ 受信 チャネルアダプターは、すべてのイベントメッセージを受信します。SINGLE
の場合、固有のイベントメッセージを受け取ります。
Hazelcast クラスタモニタ 受信 チャネルアダプター
Hazelcast Cluster Monitor は、クラスターで実行された変更のリッスンをサポートします。Hazelcast Cluster Monitor Inbound Channel Adapter は、イベント駆動型のチャネルアダプターであり、関連するメンバーシップ、分散オブジェクト、移行、ライフサイクル、クライアントイベントをリッスンします。
@Bean
public PollableChannel eventChannel() {
return new QueueChannel();
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
producer.setOutputChannel(eventChannel());
producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");
return producer;
}
<int:channel id="monitorChannel"/>
<int-hazelcast:cm-inbound-channel-adapter
channel="monitorChannel"
hazelcast-instance="instance"
monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
次の 3 つの属性をサポートします。
channel
: メッセージが送信されるチャネルを指定します。hazelcast-instance
: クラスターイベントをリッスンする Hazelcast インスタンスリファレンスを指定します。これは必須属性です。monitor-types
: リッスンするモニター型を指定します。これはオプションの属性で、MEMBERSHIP
がデフォルト値です。サポートされている値はMEMBERSHIP
、DISTRIBUTED_OBJECT
、MIGRATION
、LIFECYCLE
、CLIENT
です。
Hazelcast 分散 SQL 受信 チャネルアダプター
Hazelcast を使用すると、分散マップで分散クエリを実行できます。Hazelcast 分散 SQL 受信 チャネルアダプターは、ポーリング 受信 チャネルアダプターです。定義済みの distributed-sql コマンドを実行し、反復型に応じて結果を返します。
@Bean
public PollableChannel dsDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> dsDistributedMap() {
return hazelcastInstance().getMap("DS_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
final HazelcastDistributedSQLMessageSource messageSource =
new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
"name='TestName' AND surname='TestSurname'");
messageSource.setIterationType(DistributedSQLIterationType.ENTRY);
return messageSource;
}
<int:channel id="dsMapChannel"/>
<int-hazelcast:ds-inbound-channel-adapter
channel="dsMapChannel"
cache="dsMap"
iteration-type="ENTRY"
distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
<int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
<bean id="dsMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="dsMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
ポーラーが必要で、次の 4 つの属性をサポートしています。
channel
: メッセージが送信されるチャネルを指定します。これは必須属性です。cache
: 照会される分散IMap
参照を指定します。これは必須属性です。iteration-type
: 結果型を指定します。分散 SQL はEntrySet
、KeySet
、LocalKeySet
またはValues
で実行できます。これはオプションの属性であり、VALUE
がデフォルトです。サポートされている値はENTRY, `KEY
、LOCAL_KEY
、VALUE
です。distributed-sql
: SQL ステートメントの where 句を指定します。これは必須属性です。
Hazelcast 送信チャネルアダプター
Hazelcast 送信チャネルアダプターは、定義されたチャネルをリッスンし、受信メッセージを関連する分散キャッシュに書き込みます。分散オブジェクト定義には、cache
、cache-expression
または HazelcastHeaders.CACHE_NAME
のいずれかが必要です。サポートされている分散オブジェクトは次のとおりです: IMap
、MultiMap
、ReplicatedMap
、IList
、ISet
、IQueue
、ITopic
。
@Bean
public MessageChannel distributedMapChannel() {
return new DirectChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hzInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hzInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
handler.setDistributedObject(distributedMap());
handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
handler.setExtractPayload(true);
return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
cache-expression="headers['CACHE_HEADER']"
key-expression="payload.key"
extract-payload="true"/>
次の属性が必要です。
channel
: メッセージが送信されるチャネルを指定します。cache
: 分散オブジェクト参照を指定します。オプション。cache-expression
: Spring Expression Language (SpEL) を介して分散オブジェクトを指定します。オプション。key-expression
: Spring 式言語 (SpEL) を介してキーと値のペアのキーを指定します。オプションであり、IMap
、MultiMap
、ReplicatedMap
分散データ構造に対してのみ必須です。extract-payload
: メッセージ全体を送信するか、ペイロードだけを送信するかを指定します。オプションの属性で、true
がデフォルトです。true の場合、ペイロードだけが分散オブジェクトに書き込まれます。それ以外の場合、メッセージヘッダーとペイロードの両方を変換することによって、メッセージ全体が書き込まれます。
ヘッダーに分散オブジェクト名を設定することで、同じチャネルを介して異なる分散オブジェクトにメッセージを書き込むことができます。cache
または cache-expression
属性が定義されていない場合は、リクエスト Message
で HazelcastHeaders.CACHE_NAME
ヘッダーを設定する必要があります。
Hazelcast リーダー選出
リーダーの選出が必要な場合 (たとえば、1 つのノードのみがメッセージを受信する高可用性メッセージコンシューマーの場合)、Hazelcast ベースの LeaderInitiator
を使用できます。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LeaderInitiator initiator() {
return new LeaderInitiator(hazelcastInstance());
}
ノードがリーダーに選出されると、すべてのアプリケーションリスナーに OnGrantedEvent
が送信されます。
Hazelcast メッセージストア
永続的な QueueChannel
または追跡 Aggregator
メッセージグループなどの分散メッセージング状態管理のために、HazelcastMessageStore
実装が提供されます。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MessageGroupStore messageStore() {
return new HazelcastMessageStore(hazelcastInstance());
}
デフォルトでは、SPRING_INTEGRATION_MESSAGE_STORE
IMap
は、メッセージとグループをキー / 値として保存するために使用されます。任意のカスタム IMap
を HazelcastMessageStore
に提供できます。
Hazelcast メタデータストア
ListenableMetadataStore
の実装は、バッキング Hazelcast IMap
を使用して利用できます。デフォルトのマップは、カスタマイズ可能な SPRING_INTEGRATION_METADATA_STORE
という名前で作成されます。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MetadataStore metadataStore() {
return new HazelcastMetadataStore(hazelcastInstance());
}
HazelcastMetadataStore
は ListenableMetadataStore
を実装します。これにより、型 MetadataStoreListener
の独自のリスナーを登録して、addListener(MetadataStoreListener callback)
を介してイベントをリッスンできます。
Hazelcast ロックレジストリ
LockRegistry
の実装は、バッキング Hazelcast 分散 ILock
サポートを使用して利用できます。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LockRegistry lockRegistry() {
return new HazelcastLockRegistry(hazelcastInstance());
}
共有 MessageGroupStore
(例: Aggregator
ストア管理) と一緒に使用すると、HazelcastLockRegistry
を使用して複数のアプリケーションインスタンスにわたってこの機能を提供し、一度に 1 つのインスタンスのみがグループを操作できるようにすることができます。
すべての分散操作では、CP サブシステムが HazelcastInstance で有効になっている必要があります。 |
Hazelcast を使用したメッセージチャネル
Hazelcast IQueue
および ITopic
分散オブジェクトは、基本的にメッセージングプリミティブであり、この Hazelcast モジュールで追加の実装を行わなくても Spring Integration コアコンポーネントで使用できます。
QueueChannel
は、前述の Hazelcast 分散 IQueue
を含む、任意の java.util.Queue
によって提供できます。
@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
return new QueueChannel(hazelcastInstance.Message<?>>getQueue("springIntegrationQueue"));
}
この構成をアプリケーションの Hazelcast クラスター内の複数のノードに配置すると、QueueChannel
が分散され、1 つのノードのみがその IQueue
から単一の Message
をポーリングできるようになります。これは、PollableJmsChannel
、PollableKafkaChannel
または PollableAmqpChannel
と同様に機能します。
プロデューサー側が Spring Integration アプリケーションでない場合、QueueChannel
を構成する方法がないため、プレーンな Hazelcast IQueue
API を使用してデータを生成します。この場合、QueueChannel
アプローチはコンシューマー側で間違っています。代わりに受信チャネルアダプターソリューションを使用する必要があります。
@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
return hazelcastInstance.getQueue("springIntegrationQueue");
}
@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
return myStringHzQueue::poll;
}
Hazelcast の ITopic
抽象化には、JMS の Topic
と同様のセマンティクスがあります。つまり、すべてのサブスクライバーがパブリッシュされたメッセージを受け取ります。シンプルな MessageChannel
Bean のペアを使用すると、このメカニズムはすぐに使用できる機能としてサポートされます。
@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
MessageChannel fromHazelcastTopicChannel) {
ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
return topic;
}
@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
return new FixedSubscriberChannel(springIntegrationTopic::publish);
}
@Bean
public MessageChannel fromHazelcastTopicChannel() {
return new DirectChannel();
}
FixedSubscriberChannel
は DirectChannel
の最適化されたバリアントであり、初期化時に MessageHandler
が必要です。MessageHandler
は関数型インターフェースであるため、handleMessage
メソッドの単純なラムダを提供できます。メッセージが publishToHazelcastTopicChannel
に送信されると、そのメッセージは Hazelcast ITopic
にパブリッシュされます。com.hazelcast.topic.MessageListener
も関数インターフェースであるため、ITopic#addMessageListener
へのラムダを提供できます。fromHazelcastTopicChannel
のサブスクライバーは、前述の ITopic
に送信されたすべてのメッセージを消費します。
ExecutorChannel
には IExecutorService
が付属しています。例: それぞれの構成で、クラスター全体のシングルトンを実現できます。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(
new Config()
.addExecutorConfig(new ExecutorConfig()
.setName("singletonExecutor")
.setPoolSize(1)));
}
@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}