Redis サポート

Spring Integration 2.1 は、Redis (英語) のサポートを導入しました: 「オープンソースの高度な Key-Value ストア」。このサポートは、Redis ベースの MessageStore と、Redis が PUBLISHSUBSCRIBEUNSUBSCRIBE (英語) コマンドを介してサポートするパブリッシュ / サブスクライブメッセージングアダプターの形式で提供されます。

この依存関係をプロジェクトに含める必要があります。

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-redis</artifactId>
    <version>5.5.8</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-redis:5.5.8"

Redis クライアントの依存関係も含める必要があります。Lettuce。

Redis をダウンロード、インストール、実行するには、Redis ドキュメント (英語) を参照してください。

Redis への接続

Redis との対話を開始するには、最初に接続する必要があります。Spring Integration は、別の Spring プロジェクトである Spring Data Redis [GitHub] (英語) が提供するサポートを使用します。Spring Data Redis [GitHub] (英語) は、典型的な Spring 構成体 ConnectionFactory および Template を提供します。これらの抽象化により、いくつかの Redis クライアント Java API との統合が簡素化されます。現在、Spring Data RedisJedis [GitHub] (英語) および Lettuce (英語) をサポートしています。

RedisConnectionFactory を使用する

Redis に接続するには、RedisConnectionFactory インターフェースの実装の 1 つを使用できます。次のリストは、インターフェース定義を示しています。

public interface RedisConnectionFactory extends PersistenceExceptionTranslator {

    /**
     * Provides a suitable connection for interacting with Redis.
     * @return connection for interacting with Redis.
     */
    RedisConnection getConnection();
}

次の例は、Java で LettuceConnectionFactory を作成する方法を示しています。

LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();

次の例は、Spring の XML 構成で LettuceConnectionFactory を作成する方法を示しています。

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

RedisConnectionFactory の実装では、ポートやホストなど、必要に応じて設定できる一連のプロパティが提供されます。RedisConnectionFactory のインスタンスを取得したら、RedisTemplate のインスタンスを作成して、RedisConnectionFactory を注入できます。

RedisTemplate を使用する

Spring の他のテンプレートクラス(JdbcTemplate や JmsTemplate など)と同様に、RedisTemplate は Redis データアクセスコードを簡素化するヘルパークラスです。RedisTemplate およびそのバリエーション(StringRedisTemplate など)の詳細については、Spring Data Redis ドキュメントを参照してください。

次の例は、Java で RedisTemplate のインスタンスを作成する方法を示しています。

RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);

次の例は、Spring の XML 構成で RedisTemplate のインスタンスを作成する方法を示しています。

<bean id="redisTemplate"
         class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

Redis でメッセージング

で記述されていたように導入、Redis はその PUBLISHSUBSCRIBEUNSUBSCRIBE コマンドによってメッセージング、パブリッシュサブスクライブのサポートを提供します。JMS および AMQP と同様に、Spring Integration は、Redis を介してメッセージを送受信するためのメッセージチャネルとアダプターを提供します。

Redis パブリッシュ / サブスクライブチャネル

JMS と同様に、プロデューサーとコンシューマーの両方が同じアプリケーション内にあり、同じプロセス内で実行されることが意図されている場合があります。これは、受信と送信のチャネルアダプターのペアを使用して実現できます。ただし、Spring Integration の JMS サポートと同様に、このユースケースに対処するより簡単な方法があります。次の例に示すように、パブリッシュ / サブスクライブチャネルを作成できます。

<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>

publish-subscribe-channel は、メイン Spring Integration 名前空間の通常の <publish-subscribe-channel/> 要素とほとんど同じように動作します。任意のエンドポイントの input-channel 属性と output-channel 属性の両方で参照できます。違いは、このチャネルが Redis トピック名(topic-name 属性で指定された String 値)によって支えられていることです。ただし、JMS とは異なり、このトピックは事前に作成したり、Redis によって自動作成したりする必要はありません。Redis では、トピックはアドレスのロールを果たす単純な String 値です。プロデューサーとコンシューマーは、トピック名と同じ String 値を使用して通信できます。このチャネルへの単純なサブスクリプションは、プロデュースエンドポイントとコンシュームエンドポイント間で非同期パブリッシュ / サブスクライブメッセージングが可能であることを意味します。ただし、単純な Spring Integration <channel/> 要素内に <queue/> 要素を追加して作成された非同期メッセージチャネルとは異なり、メッセージはメモリ内キューに保存されません。代わりに、これらのメッセージは Redis を介して渡されるため、永続化とクラスタリングのサポート、他の非 Java プラットフォームとの相互運用性に依存できます。

Redis 受信チャネルアダプター

Redis 受信チャネルアダプター(RedisInboundChannelAdapter)は、他の受信アダプターと同じ方法で、受信 Redis メッセージを Spring メッセージに適合させます。プラットフォーム固有のメッセージ(この場合は Redis)を受信し、MessageConverter 戦略を使用して Spring メッセージに変換します。次の例は、Redis 受信チャネルアダプターを構成する方法を示しています。

<int-redis:inbound-channel-adapter id="redisAdapter"
       topics="thing1, thing2"
       channel="receiveChannel"
       error-channel="testErrorChannel"
       message-converter="testConverter" />

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379" />
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

上記の例は、Redis 受信チャネルアダプターのシンプルだが完全な構成を示しています。前述の構成は、特定の Bean を自動検出する、よく知られた Spring パラダイムに依存していることに注意してください。この場合、redisConnectionFactory は暗黙的にアダプターに挿入されます。代わりに connection-factory 属性を使用して明示的に指定できます。

また、上記の構成では、アダプターにカスタム MessageConverter が挿入されることに注意してください。このアプローチは、MessageConverter インスタンスを使用して Redis メッセージと Spring Integration メッセージペイロードを変換する JMS に似ています。デフォルトは SimpleMessageConverter です。

受信アダプターは、複数のトピック名をサブスクライブできます。topics 属性のコンマ区切り値のセットです。

バージョン 3.0 以降、受信アダプターは、既存の topics 属性に加えて、topic-patterns 属性を持つようになりました。この属性には、Redis トピックパターンのコンマ区切りセットが含まれています。Redis のパブリッシュ / サブスクライブに関する詳細については、Redis Pub/Sub (英語) を参照してください。

受信アダプターは、RedisSerializer を使用して Redis メッセージの本文を逆直列化できます。<int-redis:inbound-channel-adapter> の serializer 属性を空の文字列に設定すると、RedisSerializer プロパティの null 値が得られます。この場合、Redis メッセージの未加工の byte[] 本体は、メッセージペイロードとして提供されます。

バージョン 5.0 以降、<int-redis:inbound-channel-adapter> の task-executor 属性を使用して、受信アダプターに Executor インスタンスを提供できます。また、受信した Spring Integration メッセージには、公開されたメッセージのソースを示す RedisHeaders.MESSAGE_SOURCE ヘッダーが含まれるようになりました: トピックまたはパターン。このダウンストリームをルーティングロジックに使用できます。

Redis 送信チャネルアダプター

Redis 送信チャネルアダプターは、他の送信アダプターと同じ方法で、発信 Spring Integration メッセージを Redis メッセージに適合させます。Spring Integration メッセージを受信し、MessageConverter 戦略を使用してプラットフォーム固有のメッセージ(この場合は Redis)に変換します。次の例は、Redis 送信チャネルアダプターを構成する方法を示しています。

<int-redis:outbound-channel-adapter id="outboundAdapter"
    channel="sendChannel"
    topic="thing1"
    message-converter="testConverter"/>

<bean id="redisConnectionFactory"
    class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
    <property name="port" value="7379"/>
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />

この構成は、Redis 受信チャネルアダプターに対応しています。アダプターには、RedisConnectionFactory が暗黙的に挿入されます。これは、Bean 名として redisConnectionFactory で定義されています。この例には、オプションの(およびカスタムの) MessageConverter (testConverter Bean)も含まれています。

Spring Integration 3.0 以降、<int-redis:outbound-channel-adapter> は topic 属性に代わるものを提供します。topic-expression 属性を使用して、実行時にメッセージの Redis トピックを決定できます。これらの属性は相互に排他的です。

Redis キュー受信チャネルアダプター

Spring Integration 3.0 は、Redis リストからメッセージを「ポップ」するキュー受信チャネルアダプターを導入しました。デフォルトでは「右ポップ」を使用しますが、「左ポップ」を使用するように構成できます。アダプターはメッセージ駆動型です。内部リスナースレッドを使用し、ポーラーは使用しません。

次のリストは、queue-inbound-channel-adapter で使用可能なすべての属性を示しています。

<int-redis:queue-inbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    auto-startup=""  (3)
                    phase=""  (4)
                    connection-factory=""  (5)
                    queue=""  (6)
                    error-channel=""  (7)
                    serializer=""  (8)
                    receive-timeout=""  (9)
                    recovery-interval=""  (10)
                    expect-message=""  (11)
                    task-executor=""  (12)
                    right-pop=""/>  (13)
1 コンポーネント Bean 名。channel 属性を指定しない場合、DirectChannel が作成され、この id 属性を Bean 名としてアプリケーションコンテキストに登録されます。この場合、エンドポイント自体は、Bean 名 id および .adapter で登録されます。(Bean 名が thing1 であった場合、エンドポイントは thing1.adapter として登録されます。)
2 このエンドポイントから Message インスタンスを送信する MessageChannel
3 アプリケーションコンテキストの開始後にこのエンドポイントを自動的に開始するかどうかを指定する SmartLifecycle 属性。デフォルトは true です。
4 このエンドポイントが開始されるフェーズを指定する SmartLifecycle 属性。デフォルトは 0 です。
5RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。
6Redis メッセージを取得するためにキューベースの「ポップ」操作が実行される Redis リストの名前。
7 エンドポイントのリスニングタスクから例外を受信したときに ErrorMessage インスタンスを送信する MessageChannel。デフォルトでは、基礎となる MessagePublishingErrorHandler はアプリケーションコンテキストのデフォルト errorChannel を使用します。
8RedisSerializer Bean リファレンス。空の文字列にすることもできます。これは、「シリアライザなし」を意味します。この場合、受信 Redis メッセージからの生の byte[] は、Message ペイロードとして channel に送信されます。デフォルトでは JdkSerializationRedisSerializer です。
9 キューからの Redis メッセージを待機する「ポップ」操作のタイムアウト(ミリ秒)。デフォルトは 1 秒です。
10 "pop" 操作の例外の後、リスナータスクを再起動するまでにリスナータスクがスリープする時間(ミリ秒)。
11 このエンドポイントが Redis キューからのデータに Message インスタンス全体が含まれることを期待するかどうかを指定します。この属性が true に設定されている場合、メッセージは何らかの形式のデシリアライゼーション(デフォルトでは JDK シリアライゼーション)を必要とするため、serializer を空の文字列にすることはできません。デフォルトは false です。
12Spring TaskExecutor (または標準 JDK 1.5 + Executor)Bean への参照。これは、基礎となるリスニングタスクに使用されます。デフォルトは SimpleAsyncTaskExecutor です。
13 このエンドポイントが Redis リストからメッセージを読み取るために「右ポップ」(true の場合)または「左ポップ」(false の場合)を使用するかどうかを指定します。true の場合、Redis リストは、デフォルトの Redis キュー送信チャネルアダプターと共に使用されると、FIFO キューとして機能します。false に設定すると、「右プッシュ」でリストに書き込むソフトウェアで使用したり、スタックのようなメッセージの順序を実現したりできます。デフォルトは true です。バージョン 4.3 以降。
task-executor は、処理のために複数のスレッドで構成する必要があります。そうでない場合、RedisQueueMessageDrivenEndpoint がエラー後にリスナータスクを再起動しようとすると、デッドロックが発生する可能性があります。errorChannel を使用してこれらのエラーを処理し、再起動を回避することができますが、アプリケーションをデッドロック状態にさらさないことが望ましいです。可能な TaskExecutor 実装については、Spring Framework 参考マニュアルを参照してください。

Redis キュー送信チャネルアダプター

Spring Integration 3.0 は、Spring Integration メッセージから Redis リストに「プッシュ」するためのキュー送信チャネルアダプターを導入しました。デフォルトでは「左プッシュ」を使用しますが、代わりに「右プッシュ」を使用するように構成できます。次のリストは、Redis queue-outbound-channel-adapter で使用可能なすべての属性を示しています。

<int-redis:queue-outbound-channel-adapter id=""  (1)
                    channel=""  (2)
                    connection-factory=""  (3)
                    queue=""  (4)
                    queue-expression=""  (5)
                    serializer=""  (6)
                    extract-payload=""  (7)
                    left-push=""/>  (8)
1 コンポーネント Bean 名。channel 属性を指定しない場合、DirectChannel が作成され、この id 属性を Bean 名としてアプリケーションコンテキストに登録されます。この場合、エンドポイントは id に .adapter を加えた Bean 名で登録されます。(Bean 名が thing1 であった場合、エンドポイントは thing1.adapter として登録されます。)
2 このエンドポイントが Message インスタンスを受信する MessageChannel
3RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。
4Redis メッセージを送信するためにキューベースの「プッシュ」操作が実行される Redis リストの名前。この属性は queue-expression と相互に排他的です。
5Redis リストの名前を決定する SpEL Expression。実行時に受信 Message を #root 変数として使用します。この属性は queue と相互に排他的です。
6RedisSerializer Bean リファレンス。デフォルトは JdkSerializationRedisSerializer です。ただし、String ペイロードの場合、serializer 参照が提供されていない場合は、StringRedisSerializer が使用されます。
7 このエンドポイントがペイロードのみを送信するか、Message 全体を Redis キューに送信するかを指定します。デフォルトは true です。
8 このエンドポイントが Redis リストにメッセージを書き込むために「左プッシュ」(true の場合)または「右プッシュ」(false の場合)を使用するかどうかを指定します。true の場合、Redis リストは、デフォルトの Redis キュー受信チャネルアダプターと共に使用されると、FIFO キューとして機能します。false に設定すると、「左ポップ」でリストから読み取るソフトウェアで使用したり、スタックのようなメッセージの順序を実現したりできます。デフォルトは true です。バージョン 4.3 以降。

Redis アプリケーションイベント

Spring Integration 3.0 以降、Redis モジュールは org.springframework.context.ApplicationEvent である IntegrationEvent の実装を提供します。RedisExceptionEvent は、Redis 操作からの例外をカプセル化します(エンドポイントはイベントの「ソース」です)。例: <int-redis:queue-inbound-channel-adapter/> は、BoundListOperations.rightPop 操作からの例外をキャッチした後にこれらのイベントを発行します。例外は、一般的な org.springframework.data.redis.RedisSystemException または org.springframework.data.redis.RedisConnectionFailureException です。これらのイベントを <int-event:inbound-channel-adapter/> で処理すると、バックグラウンドの Redis タスクの問題を特定し、管理アクションを実行できます。

Redis メッセージストア

エンタープライズ統合パターン(EIP)ブックに従って、メッセージストア (英語) を使用するとメッセージを保持できます。これは、信頼性が懸念される場合にメッセージ(アグリゲーター、リシーケンサーなど)をバッファリングする機能を備えたコンポーネントを扱うときに役立ちます。Spring Integration では、MessageStore 戦略はクレームチェック (英語) パターンの基盤も提供します。これは EIP でも説明されています。

Spring Integration の Redis モジュールは RedisMessageStore を提供します。次の例は、アグリゲーターでそれを使用する方法を示しています。

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
    <constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
         message-store="redisMessageStore"/>

上記の例は Bean 構成であり、コンストラクター引数として RedisConnectionFactory を想定しています。

デフォルトでは、RedisMessageStore は Java 直列化を使用してメッセージを直列化します。ただし、別の直列化手法(JSON など)を使用する場合は、RedisMessageStore の valueSerializer プロパティを設定して独自のシリアライザーを提供できます。

バージョン 4.3.10 以降、フレームワークは、Message インスタンスおよび MessageHeaders インスタンス(それぞれ MessageJacksonDeserializer および MessageHeadersJacksonSerializer)の Jackson シリアライザーおよびデシリアライザーの実装を提供します。ObjectMapper の SimpleModule オプションで構成する必要があります。さらに、ObjectMapper で enableDefaultTyping を設定して、直列化された各複合オブジェクトの型情報を追加する必要があります(ソースが信頼できる場合)。その後、その型情報は逆直列化中に使用されます。フレームワークは、JacksonJsonUtils.messagingAwareMapper() と呼ばれるユーティリティメソッドを提供します。これには、前述のすべてのプロパティとシリアライザがすでに提供されています。このユーティリティメソッドには trustedPackages 引数が付属しており、デシリアライゼーションの Java パッケージを制限して、セキュリティの脆弱性を回避します。デフォルトの信頼できるパッケージ: java.utiljava.langorg.springframework.messaging.supportorg.springframework.integration.supportorg.springframework.integration.messageorg.springframework.integration.storeRedisMessageStore で JSON 直列化を管理するには、次の例のような方法で構成する必要があります。

RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);

バージョン 4.3.12 から、RedisMessageStore は prefix オプションをサポートし、同じ Redis サーバー上のストアのインスタンスを区別できるようにします。

Redis チャネルメッセージストア

前に示した  RedisMessageStore は、各グループを単一のキー(グループ ID)の値として維持します。これを使用して永続化のために QueueChannel をバックアップできますが、その目的のために専用の RedisChannelMessageStore が提供されます(バージョン 4.0 以降)。このストアは、各チャネルに LIST を使用し、メッセージを送信するときに LPUSH を使用し、メッセージを受信するときに RPOP を使用します。デフォルトでは、このストアも JDK シリアライゼーションを使用しますが、前述のように値シリアライザーを変更できます。

一般的な RedisMessageStore を使用する代わりに、このストアバッキングチャネルを使用することをお勧めします。次の例では、Redis メッセージストアを定義し、それをキューのあるチャネルで使用します。

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
	<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="redisMessageStore"/>
<int:channel>

データの保存に使用されるキーの形式は、<storeBeanName>:<channelId> (前の例では redisMessageStore:somePersistentQueueChannel)です。

さらに、サブクラス RedisChannelPriorityMessageStore も提供されます。QueueChannel でこれを使用すると、メッセージは(FIFO)優先順位で受信されます。標準の IntegrationMessageHeaderAccessor.PRIORITY ヘッダーを使用し、優先度値(0 - 9)をサポートします。他の優先度のメッセージ(および優先度のないメッセージ)は、優先度のあるメッセージの後に FIFO 順に取得されます。

これらのストアは BasicMessageGroupStore のみを実装し、MessageGroupStore は実装しません。それらは、QueueChannel のバックアップなどの状況にのみ使用できます。

Redis メタデータストア

Spring Integration 3.0 は、新しい Redis ベースの MetadataStore (Javadoc) メタデータストアを参照)実装を導入しました。RedisMetadataStore を使用して、アプリケーションの再起動後も MetadataStore の状態を維持できます。この新しい MetadataStore 実装は、次のようなアダプターで使用できます。

これらのアダプターに新しい RedisMetadataStore を使用するよう指示するには、metadataStore という名前の Spring Bean を宣言します。フィード受信チャネルアダプターとフィード受信チャネルアダプターの両方が、宣言された RedisMetadataStore を自動的に取得して使用します。次の例は、そのような Bean を宣言する方法を示しています。

<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
    <constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>

RedisMetadataStore は RedisProperties (Javadoc) によってサポートされています。それとの相互作用は BoundHashOperations (Javadoc) を使用します。これは、Properties ストア全体に対して key を必要とします。MetadataStore の場合、この key はリージョンのロールを果たします。これは、いくつかのアプリケーションが同じ Redis サーバーを使用する分散環境で役立ちます。デフォルトでは、この key の値は MetaData です。

バージョン 4.0 以降、このストアは ConcurrentMetadataStore を実装し、キーの値を保存または変更できるインスタンスが 1 つだけである複数のアプリケーションインスタンス間で確実に共有できるようにします。

アトミック性のための WATCH コマンドは現在サポートされていないため、Redis クラスターで RedisMetadataStore.replace() (たとえば、AbstractPersistentAcceptOnceFileListFilter 内)を使用することはできません。

Redis ストア受信チャネルアダプター

Redis ストア受信チャネルアダプターは、Redis コレクションからデータを読み取り、Message ペイロードとして送信するポーリングコンシューマーです。次の例は、Redis ストア受信チャネルアダプターを構成する方法を示しています。

<int-redis:store-inbound-channel-adapter id="listAdapter"
    connection-factory="redisConnectionFactory"
    key="myCollection"
    channel="redisChannel"
    collection-type="LIST" >
    <int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>

上記の例は、store-inbound-channel-adapter 要素を使用して Redis ストア受信チャネルアダプターを構成し、次のようなさまざまな属性の値を提供する方法を示しています。

  • key または key-expression: 使用されているコレクションのキーの名前。

  • collection-type: このアダプターでサポートされているコレクション型の列挙。サポートされているコレクションは LISTSETZSETPROPERTIESMAP です。

  • connection-factoryo.s.data.redis.connection.RedisConnectionFactory のインスタンスへの参照。

  • redis-templateo.s.data.redis.core.RedisTemplate のインスタンスへの参照。

  • 他のすべての受信アダプターに共通のその他の属性(「チャネル」など)。

redis-template と connection-factory の両方を設定することはできません。

デフォルトでは、アダプターは StringRedisTemplate を使用します。これは、キー、値、ハッシュキー、ハッシュ値に StringRedisSerializer インスタンスを使用します。Redis ストアに他の手法で直列化されたオブジェクトが含まれる場合、適切なシリアライザーで構成された RedisTemplate を提供する必要があります。例: extract-payload-elements が false に設定されている Redis ストア送信アダプターを使用してストアが書き込まれる場合、次のように構成された RedisTemplate を提供する必要があります。

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
    <property name="connectionFactory" ref="redisConnectionFactory"/>
    <property name="keySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
    <property name="hashKeySerializer">
        <bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
    </property>
</bean>

RedisTemplate は、キーとハッシュキーに String シリアライザーを使用し、値とハッシュ値にデフォルトの JDK 直列化シリアライザーを使用します。

key のリテラル値があるため、前述の例は比較的単純で静的です。ある条件に基づいて、実行時にキーの値を変更する必要がある場合があります。そのためには、代わりに key-expression を使用します。指定された式は、有効な SpEL 式であればどれでもかまいません。

また、Redis コレクションから読み取られ、正常に処理されたデータに対して後処理を実行することもできます。例: 処理後に値を移動または削除することができます。そのためには、Spring Integration 2.2 で追加されたトランザクション同期機能を使用します。次の例では、key-expression とトランザクション同期を使用しています。

<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
        connection-factory="redisConnectionFactory"
        key-expression="'presidents'"
        channel="otherRedisChannel"
        auto-startup="false"
        collection-type="ZSET">
            <int:poller fixed-rate="1000" max-messages-per-poll="2">
                <int:transactional synchronization-factory="syncFactory"/>
            </int:poller>
</int-redis:store-inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
	<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>

transactional 要素を使用して、ポーラーをトランザクション対応として宣言できます。この要素は、実際のトランザクションマネージャーを参照できます(たとえば、フローの他の部分が JDBC を呼び出す場合)。「実際の」トランザクションがない場合は、Spring の PlatformTransactionManager の実装である o.s.i.transaction.PseudoTransactionManager を使用でき、実際のトランザクションがない場合に Redis アダプターのトランザクション同期機能を使用できます。

これにより、Redis アクティビティ自体がトランザクションになりません。成功(コミット)または失敗(ロールバック)の前後にアクションの同期を取ることができます。

ポーラーがトランザクション対応になると、o.s.i.transaction.TransactionSynchronizationFactory のインスタンスを transactional 要素に設定できます。TransactionSynchronizationFactory は、TransactionSynchronization のインスタンスを作成します。便宜上、デフォルトの SpEL ベースの TransactionSynchronizationFactory を公開しました。これにより、SpEL 式を構成し、その実行をトランザクションと調整(同期)できます。評価結果(存在する場合)が送信されるチャネル(イベントの種類ごとに 1 つ)とともに、コミット前、コミット後、ロールバック後の式がサポートされています。子要素ごとに、expression および channel 属性を指定できます。channel 属性のみが存在する場合、受信したメッセージは特定の同期シナリオの一部としてそこに送信されます。expression 属性のみが存在し、式の結果が NULL 以外の値である場合、ペイロードとしての結果を含むメッセージが生成され、デフォルトチャネル(NullChannel)に送信され、ログ(DEBUG レベル)に表示されます。評価結果を特定のチャネルに移動する場合は、channel 属性を追加します。式の結果が null または void の場合、メッセージは生成されません。

トランザクション同期の詳細については、トランザクションの同期を参照してください。

RedisStore 送信チャネルアダプター

RedisStore 送信チャネルアダプターを使用すると、次の例に示すように、メッセージペイロードを Redis コレクションに書き込むことができます。

<int-redis:store-outbound-channel-adapter id="redisListAdapter"
          collection-type="LIST"
          channel="requestChannel"
          key="myCollection" />

上記の構成では、store-inbound-channel-adapter 要素を使用して Redis ストア送信チャネルアダプターを保存します。次のようなさまざまな属性の値を提供します。

  • key または key-expression: 使用されているコレクションのキーの名前。

  • extract-payload-elementstrue (デフォルト)に設定され、ペイロードが「複数値」オブジェクト(つまり、Collection または Map)のインスタンスである場合、"addAll" および "putAll" セマンティクスを使用して格納されます。それ以外の場合、false に設定すると、ペイロードはその型に関係なく単一のエントリとして保存されます。ペイロードが「複数値」オブジェクトのインスタンスではない場合、この属性の値は無視され、ペイロードは常に単一のエントリとして保存されます。

  • collection-type: このアダプターでサポートされている Collection 型の列挙。サポートされているコレクションは LISTSETZSETPROPERTIESMAP です。

  • map-key-expression: 格納されているエントリのキーの名前を返す SpEL 式。collection-type が MAP または PROPERTIES であり、"extract-payload-elements" が false の場合にのみ適用されます。

  • connection-factoryo.s.data.redis.connection.RedisConnectionFactory のインスタンスへの参照。

  • redis-templateo.s.data.redis.core.RedisTemplate のインスタンスへの参照。

  • 他のすべての受信アダプターに共通のその他の属性(「チャネル」など)。

redis-template と connection-factory の両方を設定することはできません。
デフォルトでは、アダプターは StringRedisTemplate を使用します。これは、キー、値、ハッシュキー、ハッシュ値に StringRedisSerializer インスタンスを使用します。ただし、extract-payload-elements が false に設定されている場合、キーおよびハッシュキー用の StringRedisSerializer インスタンスと値およびハッシュ値用の JdkSerializationRedisSerializer インスタンスを持つ RedisTemplate が使用されます。JDK シリアライザーでは、値が実際にコレクションであるかどうかに関係なく、Java 直列化がすべての値に使用されることを理解することが重要です。値の直列化をさらに制御する必要がある場合は、これらのデフォルトに依存するのではなく、独自の RedisTemplate を提供することを検討してください。

key およびその他の属性のリテラル値があるため、前の例は比較的単純で静的です。場合によっては、条件に基づいて実行時に値を動的に変更する必要があります。これを行うには、同等の -expression (key-expressionmap-key-expression など)を使用します。ここで、提供される式は任意の有効な SpEL 式にすることができます。

Redis 送信コマンドゲートウェイ

Spring Integration 4.0 は Redis コマンドゲートウェイを導入し、汎用の RedisConnection#execute メソッドを使用して標準の Redis コマンドを実行できるようにしました。次のリストは、Redis 送信ゲートウェイで使用可能な属性を示しています。

<int-redis:outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        redis-template=""  (6)
        arguments-serializer=""  (7)
        command-expression=""  (8)
        argument-expressions=""  (9)
        use-command-variable=""  (10)
        arguments-strategy="" /> (11)
1 このエンドポイントが Message インスタンスを受信する MessageChannel
2 このエンドポイントが応答 Message インスタンスを送信する MessageChannel
3 この送信ゲートウェイが null 以外の値を返す必要があるかどうかを指定します。デフォルトは true です。Redis が null 値を返すと、ReplyRequiredException がスローされます。
4 応答メッセージが送信されるまで待機するタイムアウト(ミリ秒単位)。通常、キューベースの制限された応答チャネルに適用されます。
5RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。'redis-template' 属性と相互に排他的です。
6RedisTemplate Bean への参照。'connection-factory' 属性と相互に排他的です。
7org.springframework.data.redis.serializer.RedisSerializer のインスタンスへの参照。必要に応じて、各コマンド引数を byte[] に直列化するために使用されます。
8 コマンドキーを返す SpEL 式。デフォルトは redis_command メッセージヘッダーです。null に評価してはなりません。
9 コマンド引数として評価されるカンマ区切りの SpEL 式。arguments-strategy 属性と相互に排他的。どちらの属性も指定しない場合、payload がコマンド引数として使用されます。引数式は "null" と評価され、可変数の引数をサポートできます。
10argument-expressions が構成されているときに、評価された Redis コマンド文字列を、o.s.i.redis.outbound.ExpressionArgumentsStrategy の式評価コンテキストで #cmd 変数として使用可能にするかどうかを指定する boolean フラグ。それ以外の場合、この属性は無視されます。
11o.s.i.redis.outbound.ArgumentsStrategy のインスタンスへの参照。argument-expressions 属性と相互に排他的です。どちらの属性も指定しない場合、payload がコマンド引数として使用されます。

<int-redis:outbound-gateway> を共通コンポーネントとして使用して、任意の Redis 操作を実行できます。次の例は、Redis アトミック番号から増分値を取得する方法を示しています。

<int-redis:outbound-gateway request-channel="requestChannel"
    reply-channel="replyChannel"
    command-expression="'INCR'"/>

Message ペイロードの名前は redisCounter である必要があり、これは org.springframework.data.redis.support.atomic.RedisAtomicInteger Bean 定義によって提供される場合があります。

RedisConnection#execute メソッドには、戻り値の型として汎用 Object があります。実際の結果はコマンドの種類によって異なります。例: MGET は List<byte[]> を返します。コマンド、それらの引数、結果型の詳細については、Redis 仕様 (英語) を参照してください。

Redis キュー送信ゲートウェイ

Spring Integration は、リクエストおよび応答シナリオを実行するために Redis キュー送信ゲートウェイを導入しました。提供された queue に会話 UUID をプッシュし、その UUID をキーとして値を Redis リストにプッシュし、UUID' plus '.reply のキーを使用して Redis リストからの応答を待ちます。対話ごとに異なる UUID が使用されます。次のリストは、Redis 送信ゲートウェイで使用可能な属性を示しています。

<int-redis:queue-outbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        requires-reply=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        extract-payload=""/>  (9)
1 このエンドポイントが Message インスタンスを受信する MessageChannel
2 このエンドポイントが応答 Message インスタンスを送信する MessageChannel
3 この送信ゲートウェイが非 null 値を返す必要があるかどうかを指定します。この値は、デフォルトでは false です。そうでない場合、Redis が null 値を返すと、ReplyRequiredException がスローされます。
4 応答メッセージが送信されるまで待機するタイムアウト(ミリ秒単位)。通常、キューベースの制限された応答チャネルに適用されます。
5RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。'redis-template' 属性と相互に排他的です。
6 送信ゲートウェイが会話 UUID を送信する Redis リストの名前。
7 複数のゲートウェイが登録されている場合のこの送信ゲートウェイの順序。
8RedisSerializer Bean リファレンス。空の文字列にすることもできます。これは、「シリアライザなし」を意味します。この場合、受信 Redis メッセージからの生の byte[] は、Message ペイロードとして channel に送信されます。デフォルトでは、JdkSerializationRedisSerializer です。
9 このエンドポイントが Redis キューからのデータに Message インスタンス全体が含まれることを期待するかどうかを指定します。この属性が true に設定されている場合、メッセージは何らかの形式のデシリアライゼーション(デフォルトでは JDK シリアライゼーション)を必要とするため、serializer を空の文字列にすることはできません。

Redis キュー受信ゲートウェイ

Spring Integration 4.1 は、リクエストと応答のシナリオを実行するために Redis キューの受信ゲートウェイを導入しました。提供された queue から会話 UUID をポップし、その UUID をキーとする値を Redis リストからポップし、UUID と .reply のキーを使用して Redis リストへの応答をプッシュします。次のリストは、Redis キューの受信ゲートウェイで使用可能な属性を示しています。

<int-redis:queue-inbound-gateway
        request-channel=""  (1)
        reply-channel=""  (2)
        executor=""  (3)
        reply-timeout=""  (4)
        connection-factory=""  (5)
        queue=""  (6)
        order=""  (7)
        serializer=""  (8)
        receive-timeout=""  (9)
        expect-message=""  (10)
        recovery-interval=""/>  (11)
1 このエンドポイントが Redis データから作成された Message インスタンスを送信する MessageChannel
2 このエンドポイントが応答 Message インスタンスを待機する MessageChannel。オプション - replyChannel ヘッダーはまだ使用されています。
3Spring TaskExecutor (または標準の JDK Executor)Bean への参照。基礎となるリスニングタスクに使用されます。デフォルトは SimpleAsyncTaskExecutor です。
4 応答メッセージが送信されるまで待機するタイムアウト(ミリ秒単位)。通常、キューベースの制限された応答チャネルに適用されます。
5RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。'redis-template' 属性と相互に排他的です。
6 会話 UUID の Redis リストの名前。
7 複数のゲートウェイが登録されている場合のこの受信ゲートウェイの順序。
8RedisSerializer Bean リファレンス。空の文字列にすることもできます。つまり、「シリアライザーなし」を意味します。この場合、受信 Redis メッセージからの生の byte[] は、Message ペイロードとして channel に送信されます。デフォルトは JdkSerializationRedisSerializer です。(バージョン 4.3 より前のリリースでは、デフォルトでは StringRedisSerializer でした。その動作を復元するには、StringRedisSerializer への参照を指定してください)。
9 受信メッセージが取得されるまで待機するタイムアウト(ミリ秒)。通常、キューベースの制限されたリクエストチャネルに適用されます。
10 このエンドポイントが Redis キューからのデータに Message インスタンス全体が含まれることを期待するかどうかを指定します。この属性が true に設定されている場合、メッセージは何らかの形式のデシリアライゼーション(デフォルトでは JDK シリアライゼーション)を必要とするため、serializer を空の文字列にすることはできません。
11「右ポップ」操作の例外の後、リスナタスクを再起動する前にリスナタスクがスリープする時間(ミリ秒単位)。
task-executor は、処理のために複数のスレッドで構成する必要があります。そうでない場合、RedisQueueMessageDrivenEndpoint がエラー後にリスナータスクを再起動しようとすると、デッドロックが発生する可能性があります。errorChannel を使用してこれらのエラーを処理し、再起動を回避することができますが、アプリケーションをデッドロック状態にさらさないことが望ましいです。可能な TaskExecutor 実装については、Spring Framework 参考マニュアルを参照してください。

Redis ストリーム送信チャネルアダプター

Spring Integration 5.4 は、メッセージペイロードを Redis ストリームに書き込むためのリアクティブ Redis ストリーム送信チャネルアダプターを導入しました。送信チャネルアダプターは、ReactiveStreamOperations.add(…​) を使用して Record をストリームに追加します。次の例は、Redis ストリーム送信チャネルアダプターの Java 構成とサービスクラスを使用する方法を示しています。

@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
        ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
    ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
        new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
    reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
    reactiveStreamMessageHandler.setExtractPayload(true); (4)
    return reactiveStreamMessageHandler;
}
1ReactiveRedisConnectionFactory とストリーム名を使用して ReactiveRedisStreamMessageHandler のインスタンスを作成し、レコードを追加します。別のコンストラクターバリアントは、SpEL 式に基づいて、リクエストメッセージに対してストリームキーを評価します。
2 ストリームに追加する前に、レコードのキーと値を直列化するために使用する RedisSerializationContext を設定します。
3Java 型と Redis ハッシュ / マップ間の契約を提供する HashMapper を設定します。
4'true' の場合、チャネルアダプターは追加するストリームレコードのリクエストメッセージからペイロードを抽出します。または、メッセージ全体を値として使用します。デフォルトは true です。

Redis ストリーム受信チャネルアダプター

Spring Integration 5.4 は、Redis ストリームからメッセージを読み取るためのリアクティブストリーム受信チャネルアダプターを導入しました。受信・チャネルアダプターは、自動確認応答フラグに基づいて StreamReceiver.receive(…​) または StreamReceiver.receiveAutoAck() を使用して、Redis ストリームからレコードを読み取ります。次の例は、Redis ストリーム受信チャネルアダプターの Java 構成を使用する方法を示しています。

@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
       ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
            new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
    messageProducer.setStreamReceiverOptions( (2)
                StreamReceiver.StreamReceiverOptions.builder()
                      .pollTimeout(Duration.ofMillis(100))
                      .build());
    messageProducer.setAutoStartup(true); (3)
    messageProducer.setAutoAck(false); (4)
    messageProducer.setCreateConsumerGroup(true); (5)
    messageProducer.setConsumerGroup("my-group"); (6)
    messageProducer.setConsumerName("my-consumer"); (7)
    messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
    messageProducer.setReadOffset(ReadOffset.latest()); (9)
    messageProducer.extractPayload(true); (10)
    return messageProducer;
}
1ReactiveRedisConnectionFactory とストリームキーを使用してレコードを読み取ることにより、ReactiveRedisStreamMessageProducer のインスタンスを構築します。
2 リアクティブインフラストラクチャを使用して redis ストリームを消費する StreamReceiver.StreamReceiverOptions
3 アプリケーションコンテキストの開始後にこのエンドポイントを自動的に開始するかどうかを指定する SmartLifecycle 属性。デフォルトは true です。false の場合、RedisStreamMessageProducer は手動で messageProducer.start() を開始する必要があります。
4false の場合、受信したメッセージは自動確認応答されません。メッセージの確認応答は、クライアントがメッセージを消費するまで延期されます。デフォルトは true です。
5true の場合、コンシューマーグループが作成されます。コンシューマーグループの作成中に、ストリームも作成されます(まだ存在しない場合)。コンシューマーグループはメッセージ配信を追跡し、コンシューマーを区別します。デフォルトは false です。
6 コンシューマーグループ名を設定します。デフォルトでは、定義された Bean 名になります。
7 コンシューマー名を設定します。グループ my-group から my-consumer としてメッセージを読み取ります。
8 このエンドポイントからメッセージを送信するメッセージチャネル。
9 メッセージを読み取るためのオフセットを定義します。デフォルトは ReadOffset.latest() です。
10'true' の場合、チャネルアダプターは Record からペイロード値を抽出します。それ以外の場合は、Record 全体がペイロードとして使用されます。デフォルトは true です。

autoAck が false に設定されている場合、Redis ストリームの Record は Redis ドライバーによって自動的に確認応答されません。代わりに、IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK ヘッダーがメッセージに追加され、SimpleAcknowledgment インスタンスを値として生成します。このようなレコードに基づいてメッセージのビジネスロジックが実行されるたびに、acknowledge() コールバックを呼び出すことはターゲット統合フローの責任です。デシリアライズ中に例外が発生し、errorChannel が構成されている場合でも、同様のロジックが必要です。ターゲットエラーハンドラーは、そのような失敗したメッセージを確認するか、削除するかを決定する必要があります。IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK に加えて、ReactiveRedisStreamMessageProducer はこれらのヘッダーをメッセージに入力して、RedisHeaders.STREAM_KEYRedisHeaders.STREAM_MESSAGE_IDRedisHeaders.CONSUMER_GROUPRedisHeaders.CONSUMER を生成します。

バージョン 5.5 以降、ReactiveRedisStreamMessageProducer で StreamReceiver.StreamReceiverOptionsBuilder オプションを明示的に構成できます。これには、逆直列化エラーが発生したときに Redis ストリームコンシューマーがポーリングを続行する必要がある場合に必要な、新しく導入された onErrorResume 関数が含まれます。デフォルトの関数は、上記のように、失敗したメッセージの確認応答を含むメッセージをエラーチャネル(提供されている場合)に送信します。これらの StreamReceiver.StreamReceiverOptionsBuilder はすべて、外部から提供された StreamReceiver.StreamReceiverOptions と相互に排他的です。

Redis ロックレジストリ

Spring Integration 4.0 は RedisLockRegistry を導入しました。特定のコンポーネント(アグリゲーターやリシーケンサーなど)は、LockRegistry インスタンスから取得したロックを使用して、一度に 1 つのスレッドのみがグループを操作するようにします。DefaultLockRegistry は、単一のコンポーネント内でこの機能を実行します。これで、これらのコンポーネントで外部ロックレジストリを構成できます。共有 MessageGroupStore で使用する場合、RedisLockRegistry を使用して複数のアプリケーションインスタンスにこの機能を提供でき、一度に 1 つのインスタンスのみがグループを操作できます。

ローカルスレッドによってロックが解除されると、通常、別のローカルスレッドがすぐにロックを取得できます。別のレジストリインスタンスを使用するスレッドによってロックが解放された場合、ロックを取得するのに最大 100 ミリ秒かかることがあります。

「ハング」ロック(サーバーの障害時)を回避するために、このレジストリのロックはデフォルトの 60 秒後に期限切れになりますが、レジストリでこの値を構成できます。ロックは通常、はるかに短い時間保持されます。

キーは期限切れになる可能性があるため、期限切れのロックをロック解除しようとすると、例外がスローされます。ただし、このようなロックによって保護されているリソースは危険にさらされている可能性があるため、このような例外は深刻であると見なされる必要があります。この状態を防ぐには、有効期限を十分に大きい値に設定する必要がありますが、サーバー障害が発生した後、妥当な時間内にロックを回復できるように十分に低く設定する必要があります。

バージョン 5.0 から、RedisLockRegistry は ExpirableLockRegistry を実装します。ExpirableLockRegistry は、age より前に最後に取得され、現在ロックされていないロックを削除します。

バージョン 5.5.6 の文字列である RedisLockRegistry は、RedisLockRegistry.setCacheCapacity() を介して RedisLockRegistry.locks の redisLocks のキャッシュを自動的にクリーンアップすることをサポートしています。詳細については、JavaDocs を参照してください。