Hazelcast サポート

Spring Integration は、メモリ内データグリッド Hazelcast (英語) と対話するためのチャネルアダプターおよびその他のユーティリティコンポーネントを提供します。

この依存関係をプロジェクトに含める必要があります。

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-hazelcast</artifactId>
    <version>6.0.5</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-hazelcast:6.0.5"

Hazelcast コンポーネントの XML 名前空間と schemaLocation の定義は次のとおりです。

xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
          https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"

Hazelcast イベント駆動型受信 チャネルアダプター

Hazelcast は、次のような分散データ構造を提供します。

  • com.hazelcast.map.IMap

  • com.hazelcast.multimap.MultiMap

  • com.hazelcast.collection.IList

  • com.hazelcast.collection.ISet

  • com.hazelcast.collection.IQueue

  • com.hazelcast.topic.ITopic

  • com.hazelcast.replicatedmap.ReplicatedMap

また、これらのデータ構造に加えられた変更をリッスンするためのイベントリスナーも提供します。

  • com.hazelcast.core.EntryListener<K, V>

  • com.hazelcast.collection.ItemListener

  • com.hazelcast.topic.MessageListener

Hazelcast イベントドリブン 受信 チャネルアダプターは、関連するキャッシュイベントをリッスンし、定義されたチャネルにイベントメッセージを送信します。XML と JavaConfig 駆動の構成の両方をサポートしています。

XML 構成:

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                      cache="map"
                      cache-events="UPDATED, REMOVED"
                      cache-listening-policy="SINGLE" />

Hazelcast イベント駆動型受信 チャネルアダプターには、次の属性が必要です。

  • channel: メッセージが送信されるチャネルを指定します。

  • cache: リッスンする分散オブジェクト参照を指定します。これは必須属性です。

  • cache-events: リッスンするキャッシュイベントを指定します。これはオプションの属性で、デフォルト値は ADDED です。サポートされている値は次のとおりです。

  • IMap および MultiMap でサポートされているキャッシュイベント型: ADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL;

  • ReplicatedMap でサポートされているキャッシュイベント型: ADDEDREMOVEDUPDATEDEVICTED;

  • IListISetIQueue でサポートされているキャッシュイベント型: ADDEDREMOVEDITopic のキャッシュイベント型はありません。

  • cache-listening-policy: キャッシュリスニングポリシーを SINGLE または ALL として指定します。これはオプションの属性で、デフォルト値は SINGLE です。同じ cache-events 属性を持つ同じキャッシュオブジェクトをリッスンする各 Hazelcast 受信チャネルアダプターは、単一のイベントメッセージまたはすべてのイベントメッセージを受信できます。ALL の場合、同じ cache-events 属性を持つ同じキャッシュオブジェクトをリッスンするすべての Hazelcast 受信 チャネルアダプターは、すべてのイベントメッセージを受信します。SINGLE の場合、固有のイベントメッセージを受け取ります。

いくつかの構成サンプル:

分散マップ
<int:channel id="mapChannel"/>

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                              cache="map"
                              cache-events="UPDATED, REMOVED" />

<bean id="map" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="map"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>
分散型 MultiMap
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
                              cache="multiMap"
                              cache-events="ADDED, REMOVED, CLEAR_ALL" />

<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
    <constructor-arg value="multiMap"/>
</bean>
配布リスト
<int-hazelcast:inbound-channel-adapter  channel="listChannel"
                               cache="list"
                               cache-events="ADDED, REMOVED"
                               cache-listening-policy="ALL" />

<bean id="list" factory-bean="instance" factory-method="getList">
    <constructor-arg value="list"/>
</bean>
分散セット
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />

<bean id="set" factory-bean="instance" factory-method="getSet">
    <constructor-arg value="set"/>
</bean>
分散キュー
<int-hazelcast:inbound-channel-adapter  channel="queueChannel"
                               cache="queue"
                               cache-events="REMOVED"
                               cache-listening-policy="ALL" />

<bean id="queue" factory-bean="instance" factory-method="getQueue">
    <constructor-arg value="queue"/>
</bean>
分散トピック
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />

<bean id="topic" factory-bean="instance" factory-method="getTopic">
    <constructor-arg value="topic"/>
</bean>
複製された地図
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
                              cache="replicatedMap"
                              cache-events="ADDED, UPDATED, REMOVED"
                              cache-listening-policy="SINGLE"  />

<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
    <constructor-arg value="replicatedMap"/>
</bean>

Java 構成のサンプル:

次のサンプルは、DistributedMap 構成を示しています。他の分散データ構造 (IMapMultiMapReplicatedMapIListISetIQueueITopic) にも同じ構成を使用できます。

@Bean
public PollableChannel distributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
    return hazelcastInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
    final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
    producer.setOutputChannel(distributedMapChannel());
    producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
    producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);

    return producer;
}

Hazelcast 連続クエリ 受信 チャネルアダプター

Hazelcast 連続クエリを使用すると、特定のマップエントリに対して実行された変更をリッスンできます。Hazelcast 連続クエリ 受信 チャネルアダプターは、定義された述語に照らして、関連する分散マップイベントをリッスンするイベントドリブンチャネルアダプターです。

Java
@Bean
public PollableChannel cqDistributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> cqDistributedMap() {
    return hazelcastInstance().getMap("CQ_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
    final HazelcastContinuousQueryMessageProducer producer =
        new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
    producer.setOutputChannel(cqDistributedMapChannel());
    producer.setCacheEventTypes("UPDATED");
    producer.setIncludeValue(false);

    return producer;
}
XML
<int:channel id="cqMapChannel"/>

<int-hazelcast:cq-inbound-channel-adapter
                channel="cqMapChannel"
                cache="cqMap"
                cache-events="UPDATED, REMOVED"
                predicate="name=TestName AND surname=TestSurname"
                include-value="true"
                cache-listening-policy="SINGLE"/>

<bean id="cqMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="cqMap"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

次の 6 つの属性をサポートします。

  • channel: メッセージが送信されるチャネルを指定します。

  • cache: リッスンする分散 Map 参照を指定します。必須 ;

  • cache-events: リッスンするキャッシュイベントを指定します。ADDED がデフォルト値であるオプションの属性。サポートされている値は ADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL です。

  • predicate: 特定のマップエントリに対して実行された変更をリッスンする述語を指定します。必須 ;

  • include-value: value と oldValue を連続クエリ結果に含めることを指定します。オプションで、true がデフォルトです。

  • cache-listening-policy: キャッシュリスニングポリシーを SINGLE または ALL として指定します。オプションで、デフォルト値は SINGLE です。同じ cache-events 属性を持つ同じキャッシュオブジェクトをリッスンする各 Hazelcast CQ 受信 チャネルアダプターは、単一のイベントメッセージまたはすべてのイベントメッセージを受信できます。ALL の場合、同じ cache-events 属性を持つ同じキャッシュオブジェクトをリッスンするすべての Hazelcast CQ 受信 チャネルアダプターは、すべてのイベントメッセージを受信します。SINGLE の場合、固有のイベントメッセージを受け取ります。

Hazelcast クラスタモニタ 受信 チャネルアダプター

Hazelcast Cluster Monitor は、クラスターで実行された変更のリッスンをサポートします。Hazelcast Cluster Monitor Inbound Channel Adapter は、イベント駆動型のチャネルアダプターであり、関連するメンバーシップ、分散オブジェクト、移行、ライフサイクル、クライアントイベントをリッスンします。

Java
@Bean
public PollableChannel eventChannel() {
    return new QueueChannel();
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
    HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
    producer.setOutputChannel(eventChannel());
    producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");

    return producer;
}
XML
<int:channel id="monitorChannel"/>

<int-hazelcast:cm-inbound-channel-adapter
                 channel="monitorChannel"
                 hazelcast-instance="instance"
                 monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

次の 3 つの属性をサポートします。

  • channel: メッセージが送信されるチャネルを指定します。

  • hazelcast-instance: クラスターイベントをリッスンする Hazelcast インスタンスリファレンスを指定します。これは必須属性です。

  • monitor-types: リッスンするモニター型を指定します。これはオプションの属性で、MEMBERSHIP がデフォルト値です。サポートされている値は MEMBERSHIPDISTRIBUTED_OBJECTMIGRATIONLIFECYCLECLIENT です。

Hazelcast 分散 SQL 受信 チャネルアダプター

Hazelcast を使用すると、分散マップで分散クエリを実行できます。Hazelcast 分散 SQL 受信 チャネルアダプターは、ポーリング 受信 チャネルアダプターです。定義済みの distributed-sql コマンドを実行し、反復型に応じて結果を返します。

Java
@Bean
public PollableChannel dsDistributedMapChannel() {
    return new QueueChannel();
}

@Bean
public IMap<Integer, String> dsDistributedMap() {
    return hazelcastInstance().getMap("DS_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
    final HazelcastDistributedSQLMessageSource messageSource =
        new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
            "name='TestName' AND surname='TestSurname'");
    messageSource.setIterationType(DistributedSQLIterationType.ENTRY);

    return messageSource;
}
XML
<int:channel id="dsMapChannel"/>

<int-hazelcast:ds-inbound-channel-adapter
            channel="dsMapChannel"
            cache="dsMap"
            iteration-type="ENTRY"
            distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
    <int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>

<bean id="dsMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="dsMap"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>

ポーラーが必要で、次の 4 つの属性をサポートしています。

  • channel: メッセージが送信されるチャネルを指定します。これは必須属性です。

  • cache: 照会される分散 IMap 参照を指定します。これは必須属性です。

  • iteration-type: 結果型を指定します。分散 SQL は EntrySetKeySetLocalKeySet または Values で実行できます。これはオプションの属性であり、VALUE がデフォルトです。サポートされている値は ENTRY, `KEYLOCAL_KEYVALUE です。

  • distributed-sql: SQL ステートメントの where 句を指定します。これは必須属性です。

Hazelcast 送信チャネルアダプター

Hazelcast 送信チャネルアダプターは、定義されたチャネルをリッスンし、受信メッセージを関連する分散キャッシュに書き込みます。分散オブジェクト定義には、cachecache-expression または HazelcastHeaders.CACHE_NAME のいずれかが必要です。サポートされている分散オブジェクトは次のとおりです: IMapMultiMapReplicatedMapIListISetIQueueITopic

Java
@Bean
public MessageChannel distributedMapChannel() {
    return new DirectChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
    return hzInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hzInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
    HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
    handler.setDistributedObject(distributedMap());
    handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
    handler.setExtractPayload(true);
    return handler;
}
XML
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
                    cache-expression="headers['CACHE_HEADER']"
                    key-expression="payload.key"
                    extract-payload="true"/>

次の属性が必要です。

  • channel: メッセージが送信されるチャネルを指定します。

  • cache: 分散オブジェクト参照を指定します。オプション。

  • cache-expression: Spring Expression Language (SpEL) を介して分散オブジェクトを指定します。オプション。

  • key-expression: Spring 式言語 (SpEL) を介してキーと値のペアのキーを指定します。オプションであり、IMapMultiMapReplicatedMap 分散データ構造に対してのみ必須です。

  • extract-payload: メッセージ全体を送信するか、ペイロードだけを送信するかを指定します。オプションの属性で、true がデフォルトです。true の場合、ペイロードだけが分散オブジェクトに書き込まれます。それ以外の場合、メッセージヘッダーとペイロードの両方を変換することによって、メッセージ全体が書き込まれます。

ヘッダーに分散オブジェクト名を設定することで、同じチャネルを介して異なる分散オブジェクトにメッセージを書き込むことができます。cache または cache-expression 属性が定義されていない場合は、リクエスト Message で HazelcastHeaders.CACHE_NAME ヘッダーを設定する必要があります。

Hazelcast リーダー選出

リーダーの選出が必要な場合 (たとえば、1 つのノードのみがメッセージを受信する高可用性メッセージコンシューマーの場合)、Hazelcast ベースの LeaderInitiator を使用できます。

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public LeaderInitiator initiator() {
    return new LeaderInitiator(hazelcastInstance());
}

ノードがリーダーに選出されると、すべてのアプリケーションリスナーに OnGrantedEvent が送信されます。

Hazelcast メッセージストア

永続的な QueueChannel または追跡 Aggregator メッセージグループなどの分散メッセージング状態管理のために、HazelcastMessageStore 実装が提供されます。

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public MessageGroupStore messageStore() {
    return new HazelcastMessageStore(hazelcastInstance());
}

デフォルトでは、SPRING_INTEGRATION_MESSAGE_STOREIMap は、メッセージとグループをキー / 値として保存するために使用されます。任意のカスタム IMap を HazelcastMessageStore に提供できます。

Hazelcast メタデータストア

ListenableMetadataStore の実装は、バッキング Hazelcast IMap を使用して利用できます。デフォルトのマップは、カスタマイズ可能な SPRING_INTEGRATION_METADATA_STORE という名前で作成されます。

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public MetadataStore metadataStore() {
    return new HazelcastMetadataStore(hazelcastInstance());
}

HazelcastMetadataStore は ListenableMetadataStore を実装します。これにより、型 MetadataStoreListener の独自のリスナーを登録して、addListener(MetadataStoreListener callback) を介してイベントをリッスンできます。

Hazelcast ロックレジストリ

LockRegistry の実装は、バッキング Hazelcast 分散 ILock サポートを使用して利用できます。

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}

@Bean
public LockRegistry lockRegistry() {
    return new HazelcastLockRegistry(hazelcastInstance());
}

共有 MessageGroupStore (例: Aggregator ストア管理) と一緒に使用すると、HazelcastLockRegistry を使用して複数のアプリケーションインスタンスにわたってこの機能を提供し、一度に 1 つのインスタンスのみがグループを操作できるようにすることができます。

すべての分散操作では、CP サブシステムが HazelcastInstance で有効になっている必要があります。

Hazelcast を使用したメッセージチャネル

Hazelcast IQueue および ITopic 分散オブジェクトは、基本的にメッセージングプリミティブであり、この Hazelcast モジュールで追加の実装を行わなくても Spring Integration コアコンポーネントで使用できます。

QueueChannel は、前述の Hazelcast 分散 IQueue を含む、任意の java.util.Queue によって提供できます。

@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
    return new QueueChannel(hazelcastInstance.Message<?>>getQueue("springIntegrationQueue"));
}

この構成をアプリケーションの Hazelcast クラスター内の複数のノードに配置すると、QueueChannel が分散され、1 つのノードのみがその IQueue から単一の Message をポーリングできるようになります。これは、PollableJmsChannelPollableKafkaChannel または PollableAmqpChannel と同様に機能します。

プロデューサー側が Spring Integration アプリケーションでない場合、QueueChannel を構成する方法がないため、プレーンな Hazelcast IQueue API を使用してデータを生成します。この場合、QueueChannel アプローチはコンシューマー側で間違っています。代わりに受信チャネルアダプターソリューションを使用する必要があります。

@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
    return hazelcastInstance.getQueue("springIntegrationQueue");
}

@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
    return myStringHzQueue::poll;
}

Hazelcast の ITopic 抽象化には、JMS の Topic と同様のセマンティクスがあります。つまり、すべてのサブスクライバーがパブリッシュされたメッセージを受け取ります。シンプルな MessageChannel Bean のペアを使用すると、このメカニズムはすぐに使用できる機能としてサポートされます。

@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
        MessageChannel fromHazelcastTopicChannel) {

    ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
	topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
	return topic;
}

@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
    return new FixedSubscriberChannel(springIntegrationTopic::publish);
}

@Bean
public MessageChannel fromHazelcastTopicChannel() {
    return new DirectChannel();
}

FixedSubscriberChannel は DirectChannel の最適化されたバリアントであり、初期化時に MessageHandler が必要です。MessageHandler は関数型インターフェースであるため、handleMessage メソッドの単純なラムダを提供できます。メッセージが publishToHazelcastTopicChannel に送信されると、そのメッセージは Hazelcast ITopic にパブリッシュされます。com.hazelcast.topic.MessageListener も関数インターフェースであるため、ITopic#addMessageListener へのラムダを提供できます。fromHazelcastTopicChannel のサブスクライバーは、前述の ITopic に送信されたすべてのメッセージを消費します。

ExecutorChannel には IExecutorService が付属しています。例: それぞれの構成で、クラスター全体のシングルトンを実現できます。

@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(
                new Config()
                    .addExecutorConfig(new ExecutorConfig()
                         .setName("singletonExecutor")
                         .setPoolSize(1)));
}

@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
    return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}