Redis サポート
Spring Integration 2.1 は、Redis (英語) のサポートを導入しました: 「オープンソースの高度な Key-Value ストア」。このサポートは、Redis ベースの MessageStore
と、Redis が PUBLISH
、SUBSCRIBE
、UNSUBSCRIBE
(英語) コマンドを介してサポートするパブリッシュ / サブスクライブメッセージングアダプターの形式で提供されます。
この依存関係をプロジェクトに含める必要があります。
Redis クライアントの依存関係も含める必要があります。Lettuce。
Redis をダウンロード、インストール、実行するには、Redis ドキュメント (英語) を参照してください。
Redis への接続
Redis との対話を開始するには、まず接続する必要があります。Spring Integration は、典型的な Spring コンストラクト ( ConnectionFactory
および Template
) を提供する別の Spring プロジェクト Spring Data Redis [GitHub] (英語) によって提供されるサポートを使用します。これらの抽象化により、いくつかの Redis クライアント Java API との統合が簡素化されます。現在、Spring Data Redis は Jedis [GitHub] (英語) および Lettuce (英語) をサポートしています。
RedisConnectionFactory
を使用する
Redis に接続するには、RedisConnectionFactory
インターフェースの実装の 1 つを使用できます。次のリストは、インターフェース定義を示しています。
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
次の例は、Java で LettuceConnectionFactory
を作成する方法を示しています。
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
次の例は、Spring の XML 構成で LettuceConnectionFactory
を作成する方法を示しています。
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
RedisConnectionFactory
の実装では、ポートやホストなど、必要に応じて設定できる一連のプロパティが提供されます。RedisConnectionFactory
のインスタンスを取得したら、RedisTemplate
のインスタンスを作成して、RedisConnectionFactory
を注入できます。
RedisTemplate
を使用する
Spring の他のテンプレートクラス(JdbcTemplate
や JmsTemplate
など)と同様に、RedisTemplate
は Redis データアクセスコードを簡素化するヘルパークラスです。RedisTemplate
およびそのバリエーション(StringRedisTemplate
など)の詳細については、Spring Data Redis ドキュメントを参照してください。
次の例は、Java で RedisTemplate
のインスタンスを作成する方法を示しています。
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
次の例は、Spring の XML 構成で RedisTemplate
のインスタンスを作成する方法を示しています。
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
Redis でメッセージング
で記述されていたように導入、Redis はその PUBLISH
、SUBSCRIBE
、UNSUBSCRIBE
コマンドによってメッセージング、パブリッシュサブスクライブのサポートを提供します。JMS および AMQP と同様に、Spring Integration は、Redis を介してメッセージを送受信するためのメッセージチャネルとアダプターを提供します。
Redis パブリッシュ / サブスクライブチャネル
JMS と同様に、プロデューサーとコンシューマーの両方が同じアプリケーションの一部であり、同じプロセス内で実行される場合があります。これを実現するには、受信チャネルアダプターと送信チャネルアダプターのペアを使用します。ただし、Spring Integration の JMS サポートと同様に、このユースケースに対処する簡単な方法があります。次の例に示すように、パブリッシュ / サブスクライブチャネルを作成できます。
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
publish-subscribe-channel
は、メイン Spring Integration 名前空間の通常の <publish-subscribe-channel/>
要素とほとんど同じように動作します。任意のエンドポイントの input-channel
属性と output-channel
属性の両方で参照できます。違いは、このチャネルが Redis トピック名(topic-name
属性で指定された String
値)によって支えられていることです。ただし、JMS とは異なり、このトピックは事前に作成したり、Redis によって自動作成したりする必要はありません。Redis では、トピックはアドレスのロールを果たす単純な String
値です。プロデューサーとコンシューマーは、トピック名と同じ String
値を使用して通信できます。このチャネルへの単純なサブスクリプションは、プロデュースエンドポイントとコンシュームエンドポイント間で非同期パブリッシュ / サブスクライブメッセージングが可能であることを意味します。ただし、単純な Spring Integration <channel/>
要素内に <queue/>
要素を追加して作成された非同期メッセージチャネルとは異なり、メッセージはメモリ内キューに保存されません。代わりに、これらのメッセージは Redis を介して渡されるため、永続化とクラスタリングのサポート、他の非 Java プラットフォームとの相互運用性に依存できます。
Redis 受信チャネルアダプター
Redis 受信チャネルアダプター(RedisInboundChannelAdapter
)は、他の受信アダプターと同じ方法で、受信 Redis メッセージを Spring メッセージに適合させます。プラットフォーム固有のメッセージ(この場合は Redis)を受信し、MessageConverter
戦略を使用して Spring メッセージに変換します。次の例は、Redis 受信チャネルアダプターを構成する方法を示しています。
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
上記の例は、Redis 受信チャネルアダプターのシンプルだが完全な構成を示しています。前述の構成は、特定の Bean を自動検出する、よく知られた Spring パラダイムに依存していることに注意してください。この場合、redisConnectionFactory
は暗黙的にアダプターに挿入されます。代わりに connection-factory
属性を使用して明示的に指定できます。
また、上記の構成では、アダプターにカスタム MessageConverter
が挿入されることに注意してください。このアプローチは、MessageConverter
インスタンスを使用して Redis メッセージと Spring Integration メッセージペイロードを変換する JMS に似ています。デフォルトは SimpleMessageConverter
です。
受信アダプターは、複数のトピック名をサブスクライブできます。topics
属性のコンマ区切り値のセットです。
バージョン 3.0 以降、受信アダプターは、既存の topics
属性に加えて、topic-patterns
属性を持つようになりました。この属性には、Redis トピックパターンのコンマ区切りセットが含まれています。Redis のパブリッシュ / サブスクライブに関する詳細については、Redis Pub/Sub (英語) を参照してください。
受信アダプターは、RedisSerializer
を使用して Redis メッセージの本文を逆直列化できます。<int-redis:inbound-channel-adapter>
の serializer
属性を空の文字列に設定すると、RedisSerializer
プロパティの null
値が得られます。この場合、Redis メッセージの未加工の byte[]
本体は、メッセージペイロードとして提供されます。
バージョン 5.0 以降、<int-redis:inbound-channel-adapter>
の task-executor
属性を使用して、受信アダプターに Executor
インスタンスを提供できます。また、受信した Spring Integration メッセージには、公開されたメッセージのソースを示す RedisHeaders.MESSAGE_SOURCE
ヘッダーが含まれるようになりました: トピックまたはパターン。このダウンストリームをルーティングロジックに使用できます。
Redis 送信チャネルアダプター
Redis 送信チャネルアダプターは、他の送信アダプターと同じ方法で、発信 Spring Integration メッセージを Redis メッセージに適合させます。Spring Integration メッセージを受信し、MessageConverter
戦略を使用してプラットフォーム固有のメッセージ(この場合は Redis)に変換します。次の例は、Redis 送信チャネルアダプターを構成する方法を示しています。
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
この構成は、Redis 受信チャネルアダプターに対応しています。アダプターには、RedisConnectionFactory
が暗黙的に挿入されます。これは、Bean 名として redisConnectionFactory
で定義されています。この例には、オプションの(およびカスタムの) MessageConverter
(testConverter
Bean)も含まれています。
Spring Integration 3.0 以降、<int-redis:outbound-channel-adapter>
は topic
属性に代わるものを提供します。topic-expression
属性を使用して、実行時にメッセージの Redis トピックを決定できます。これらの属性は相互に排他的です。
Redis キュー受信チャネルアダプター
Spring Integration 3.0 は、Redis リストからメッセージを「ポップ」するキュー受信チャネルアダプターを導入しました。デフォルトでは「右ポップ」を使用しますが、「左ポップ」を使用するように構成できます。アダプターはメッセージ駆動型です。内部リスナースレッドを使用し、ポーラーは使用しません。
次のリストは、queue-inbound-channel-adapter
で使用可能なすべての属性を示しています。
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
1 | コンポーネント Bean 名。channel 属性を指定しない場合、DirectChannel が作成され、この id 属性を Bean 名としてアプリケーションコンテキストに登録されます。この場合、エンドポイント自体は、Bean 名 id および .adapter で登録されます。(Bean 名が thing1 であった場合、エンドポイントは thing1.adapter として登録されます。) |
2 | このエンドポイントから Message インスタンスを送信する MessageChannel 。 |
3 | アプリケーションコンテキストの開始後にこのエンドポイントを自動的に開始するかどうかを指定する SmartLifecycle 属性。デフォルトは true です。 |
4 | このエンドポイントが開始されるフェーズを指定する SmartLifecycle 属性。デフォルトは 0 です。 |
5 | RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。 |
6 | Redis メッセージを取得するためにキューベースの「ポップ」操作が実行される Redis リストの名前。 |
7 | エンドポイントのリスニングタスクから例外を受信したときに ErrorMessage インスタンスを送信する MessageChannel 。デフォルトでは、基礎となる MessagePublishingErrorHandler はアプリケーションコンテキストのデフォルト errorChannel を使用します。 |
8 | RedisSerializer Bean リファレンス。これは、「シリアライザーなし」を意味する空の文字列にすることができます。この場合、受信 Redis メッセージからの生の byte[] が Message ペイロードとして channel に送信されます。デフォルトでは JdkSerializationRedisSerializer です。 |
9 | キューからの Redis メッセージを待機する「ポップ」操作のタイムアウト(ミリ秒)。デフォルトは 1 秒です。 |
10 | "pop" 操作の例外の後、リスナータスクを再起動するまでにリスナータスクがスリープする時間(ミリ秒)。 |
11 | このエンドポイントが Redis キューからのデータに Message インスタンス全体が含まれることを期待するかどうかを指定します。この属性が true に設定されている場合、メッセージは何らかの形式のデシリアライゼーション(デフォルトでは JDK シリアライゼーション)を必要とするため、serializer を空の文字列にすることはできません。デフォルトは false です。 |
12 | Spring TaskExecutor (または標準 JDK 1.5 + Executor )Bean への参照。これは、基礎となるリスニングタスクに使用されます。デフォルトは SimpleAsyncTaskExecutor です。 |
13 | このエンドポイントが Redis リストからメッセージを読み取るために「右ポップ」(true の場合)または「左ポップ」(false の場合)を使用するかどうかを指定します。true の場合、Redis リストは、デフォルトの Redis キュー送信チャネルアダプターと共に使用されると、FIFO キューとして機能します。false に設定すると、「右プッシュ」でリストに書き込むソフトウェアで使用したり、スタックのようなメッセージの順序を実現したりできます。デフォルトは true です。バージョン 4.3 以降。 |
task-executor は、処理のために複数のスレッドで構成する必要があります。そうしないと、RedisQueueMessageDrivenEndpoint がエラー後にリスナータスクを再起動しようとしたときに、デッドロックが発生する可能性があります。errorChannel を使用してこれらのエラーを処理し、再起動を回避できますが、アプリケーションをデッドロックの可能性のある状況にさらさないことをお勧めします。可能な TaskExecutor 実装については、Spring Framework 参考マニュアルを参照してください。 |
Redis キュー送信チャネルアダプター
Spring Integration 3.0 は、Spring Integration メッセージから Redis リストに「プッシュ」するためのキュー送信チャネルアダプターを導入しました。デフォルトでは「左プッシュ」を使用しますが、代わりに「右プッシュ」を使用するように構成できます。次のリストは、Redis queue-outbound-channel-adapter
で使用可能なすべての属性を示しています。
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
1 | コンポーネント Bean 名。channel 属性を指定しない場合、DirectChannel が作成され、この id 属性を Bean 名としてアプリケーションコンテキストに登録されます。この場合、エンドポイントは id に .adapter を加えた Bean 名で登録されます。(Bean 名が thing1 であった場合、エンドポイントは thing1.adapter として登録されます。) |
2 | このエンドポイントが Message インスタンスを受信する MessageChannel 。 |
3 | RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。 |
4 | Redis メッセージを送信するためにキューベースの「プッシュ」操作が実行される Redis リストの名前。この属性は queue-expression と相互に排他的です。 |
5 | Redis リストの名前を決定する SpEL Expression 。実行時に受信 Message を #root 変数として使用します。この属性は queue と相互に排他的です。 |
6 | RedisSerializer Bean リファレンス。デフォルトは JdkSerializationRedisSerializer です。ただし、String ペイロードの場合、serializer 参照が提供されていない場合は、StringRedisSerializer が使用されます。 |
7 | このエンドポイントがペイロードのみを送信するか、Message 全体を Redis キューに送信するかを指定します。デフォルトは true です。 |
8 | このエンドポイントが Redis リストにメッセージを書き込むために「左プッシュ」(true の場合)または「右プッシュ」(false の場合)を使用するかどうかを指定します。true の場合、Redis リストは、デフォルトの Redis キュー受信チャネルアダプターと共に使用されると、FIFO キューとして機能します。false に設定すると、「左ポップ」でリストから読み取るソフトウェアで使用したり、スタックのようなメッセージの順序を実現したりできます。デフォルトは true です。バージョン 4.3 以降。 |
Redis アプリケーションイベント
Spring Integration 3.0 以降、Redis モジュールは org.springframework.context.ApplicationEvent
である IntegrationEvent
の実装を提供します。RedisExceptionEvent
は、Redis 操作からの例外をカプセル化します(エンドポイントはイベントの「ソース」です)。例: <int-redis:queue-inbound-channel-adapter/>
は、BoundListOperations.rightPop
操作からの例外をキャッチした後にこれらのイベントを発行します。例外は、一般的な org.springframework.data.redis.RedisSystemException
または org.springframework.data.redis.RedisConnectionFailureException
です。これらのイベントを <int-event:inbound-channel-adapter/>
で処理すると、バックグラウンドの Redis タスクの問題を特定し、管理アクションを実行できます。
Redis メッセージストア
エンタープライズ統合パターン(EIP)ブックに従って、メッセージストア (英語) を使用するとメッセージを保持できます。これは、信頼性が懸念される場合にメッセージ(アグリゲーター、リシーケンサーなど)をバッファリングする機能を備えたコンポーネントを扱うときに役立ちます。Spring Integration では、MessageStore
戦略はクレームチェック (英語) パターンの基盤も提供します。これは EIP でも説明されています。
Spring Integration の Redis モジュールは RedisMessageStore
を提供します。次の例は、アグリゲーターでそれを使用する方法を示しています。
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
上記の例は Bean 構成であり、コンストラクター引数として RedisConnectionFactory
を想定しています。
デフォルトでは、RedisMessageStore
は Java 直列化を使用してメッセージを直列化します。ただし、別の直列化手法(JSON など)を使用する場合は、RedisMessageStore
の valueSerializer
プロパティを設定して独自のシリアライザーを提供できます。
バージョン 4.3.10 以降、フレームワークは、Message
インスタンスおよび MessageHeaders
インスタンス(それぞれ MessageJacksonDeserializer
および MessageHeadersJacksonSerializer
)の Jackson シリアライザーおよびデシリアライザーの実装を提供します。ObjectMapper
の SimpleModule
オプションで構成する必要があります。さらに、ObjectMapper
で enableDefaultTyping
を設定して、直列化された各複合オブジェクトの型情報を追加する必要があります(ソースが信頼できる場合)。その後、その型情報は逆直列化中に使用されます。フレームワークは、JacksonJsonUtils.messagingAwareMapper()
と呼ばれるユーティリティメソッドを提供します。これには、前述のすべてのプロパティとシリアライザがすでに提供されています。このユーティリティメソッドには trustedPackages
引数が付属しており、デシリアライゼーションの Java パッケージを制限して、セキュリティの脆弱性を回避します。デフォルトの信頼できるパッケージ: java.util
、java.lang
、org.springframework.messaging.support
、org.springframework.integration.support
、org.springframework.integration.message
、org.springframework.integration.store
。RedisMessageStore
で JSON 直列化を管理するには、次の例のような方法で構成する必要があります。
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
バージョン 4.3.12 から、RedisMessageStore
は prefix
オプションをサポートし、同じ Redis サーバー上のストアのインスタンスを区別できるようにします。
Redis チャネルメッセージストア
前に示した RedisMessageStore
は、各グループを単一のキー(グループ ID)の値として維持します。これを使用して永続化のために QueueChannel
をバックアップできますが、その目的のために専用の RedisChannelMessageStore
が提供されます(バージョン 4.0 以降)。このストアは、各チャネルに LIST
を使用し、メッセージを送信するときに LPUSH
を使用し、メッセージを受信するときに RPOP
を使用します。デフォルトでは、このストアも JDK シリアライゼーションを使用しますが、前述のように値シリアライザーを変更できます。
一般的な RedisMessageStore
を使用する代わりに、このストアバッキングチャネルを使用することをお勧めします。次の例では、Redis メッセージストアを定義し、それをキューのあるチャネルで使用します。
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
データの保存に使用されるキーの形式は、<storeBeanName>:<channelId>
(前の例では redisMessageStore:somePersistentQueueChannel
)です。
さらに、サブクラス RedisChannelPriorityMessageStore
も提供されます。QueueChannel
でこれを使用すると、メッセージは(FIFO)優先順位で受信されます。標準の IntegrationMessageHeaderAccessor.PRIORITY
ヘッダーを使用し、優先度値(0 - 9
)をサポートします。他の優先度のメッセージ(および優先度のないメッセージ)は、優先度のあるメッセージの後に FIFO 順に取得されます。
これらのストアは BasicMessageGroupStore のみを実装し、MessageGroupStore は実装しません。それらは、QueueChannel のバックアップなどの状況にのみ使用できます。 |
Redis メタデータストア
Spring Integration 3.0 は、新しい Redis ベースの MetadataStore
(Javadoc) (メタデータストアを参照)実装を導入しました。RedisMetadataStore
を使用して、アプリケーションの再起動後も MetadataStore
の状態を維持できます。この新しい MetadataStore
実装は、次のようなアダプターで使用できます。
これらのアダプターに新しい RedisMetadataStore
を使用するよう指示するには、metadataStore
という名前の Spring Bean を宣言します。フィード受信チャネルアダプターとフィード受信チャネルアダプターの両方が、宣言された RedisMetadataStore
を自動的に取得して使用します。次の例は、そのような Bean を宣言する方法を示しています。
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
RedisMetadataStore
は RedisProperties
(Javadoc) によってサポートされています。それとの相互作用は BoundHashOperations
(Javadoc) を使用します。これは、Properties
ストア全体に対して key
を必要とします。MetadataStore
の場合、この key
はリージョンのロールを果たします。これは、いくつかのアプリケーションが同じ Redis サーバーを使用する分散環境で役立ちます。デフォルトでは、この key
の値は MetaData
です。
バージョン 4.0 以降、このストアは ConcurrentMetadataStore
を実装し、キーの値を保存または変更できるインスタンスが 1 つだけである複数のアプリケーションインスタンス間で確実に共有できるようにします。
アトミック性のための WATCH コマンドは現在サポートされていないため、Redis クラスターで RedisMetadataStore.replace() (たとえば、AbstractPersistentAcceptOnceFileListFilter 内)を使用することはできません。 |
Redis ストア受信チャネルアダプター
Redis ストア受信チャネルアダプターは、Redis コレクションからデータを読み取り、Message
ペイロードとして送信するポーリングコンシューマーです。次の例は、Redis ストア受信チャネルアダプターを構成する方法を示しています。
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
上記の例は、store-inbound-channel-adapter
要素を使用して Redis ストア受信チャネルアダプターを構成し、次のようなさまざまな属性の値を提供する方法を示しています。
key
またはkey-expression
: 使用されているコレクションのキーの名前。collection-type
: このアダプターでサポートされているコレクション型の列挙。サポートされているコレクションはLIST
、SET
、ZSET
、PROPERTIES
、MAP
です。connection-factory
:o.s.data.redis.connection.RedisConnectionFactory
のインスタンスへの参照。redis-template
:o.s.data.redis.core.RedisTemplate
のインスタンスへの参照。他のすべての受信アダプターに共通のその他の属性(「チャネル」など)。
redis-template と connection-factory の両方を設定することはできません。 |
デフォルトでは、アダプターは
|
key
のリテラル値があるため、前述の例は比較的単純で静的です。ある条件に基づいて、実行時にキーの値を変更する必要がある場合があります。そのためには、代わりに key-expression
を使用します。指定された式は、有効な SpEL 式であればどれでもかまいません。
また、Redis コレクションから読み取られた正常に処理されたデータに対して後処理を実行することもできます。例: 処理後に値を移動または削除することができます。これは、Spring Integration 2.2 で追加されたトランザクション同期機能を使用して行うことができます。次の例では、key-expression
とトランザクション同期を使用しています。
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
transactional
要素を使用して、ポーラーをトランザクション対応として宣言できます。この要素は、実際のトランザクションマネージャーを参照できます(たとえば、フローの他の部分が JDBC を呼び出す場合)。「実際の」トランザクションがない場合は、Spring の PlatformTransactionManager
の実装である o.s.i.transaction.PseudoTransactionManager
を使用でき、実際のトランザクションがない場合に Redis アダプターのトランザクション同期機能を使用できます。
これにより、Redis アクティビティ自体がトランザクションになりません。成功(コミット)または失敗(ロールバック)の前後にアクションの同期を取ることができます。 |
ポーラーがトランザクション対応になると、o.s.i.transaction.TransactionSynchronizationFactory
のインスタンスを transactional
要素に設定できます。TransactionSynchronizationFactory
は、TransactionSynchronization
のインスタンスを作成します。便宜上、デフォルトの SpEL ベースの TransactionSynchronizationFactory
を公開しました。これにより、SpEL 式を構成し、その実行をトランザクションと調整(同期)できます。評価結果(存在する場合)が送信されるチャネル(イベントの種類ごとに 1 つ)とともに、コミット前、コミット後、ロールバック後の式がサポートされています。子要素ごとに、expression
および channel
属性を指定できます。channel
属性のみが存在する場合、受信したメッセージは特定の同期シナリオの一部としてそこに送信されます。expression
属性のみが存在し、式の結果が NULL 以外の値である場合、ペイロードとしての結果を含むメッセージが生成され、デフォルトチャネル(NullChannel
)に送信され、ログ(DEBUG
レベル)に表示されます。評価結果を特定のチャネルに移動する場合は、channel
属性を追加します。式の結果が null または void の場合、メッセージは生成されません。
RedisStoreMessageSource
は、TransactionSynchronizationProcessor
実装からアクセスできるトランザクション IntegrationResourceHolder
にバインドされた RedisStore
インスタンスを持つ store
属性を追加します。
トランザクション同期の詳細については、トランザクションの同期を参照してください。
RedisStore 送信チャネルアダプター
RedisStore 送信チャネルアダプターを使用すると、次の例に示すように、メッセージペイロードを Redis コレクションに書き込むことができます。
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
上記の構成では、store-inbound-channel-adapter
要素を使用して Redis ストア送信チャネルアダプターを保存します。次のようなさまざまな属性の値を提供します。
key
またはkey-expression
: 使用されているコレクションのキーの名前。extract-payload-elements
:true
(デフォルト) に設定され、ペイロードが「複数値」オブジェクト (つまり、Collection
またはMap
) のインスタンスである場合は、"addAll" および "putAll" セマンティクスを使用して格納されます。それ以外の場合、false
に設定すると、ペイロードはその型に関係なく単一のエントリとして格納されます。ペイロードが「複数値」オブジェクトのインスタンスでない場合、この属性の値は無視され、ペイロードは常に単一のエントリとして格納されます。collection-type
: このアダプターでサポートされているCollection
型の列挙。サポートされているコレクションはLIST
、SET
、ZSET
、PROPERTIES
、MAP
です。map-key-expression
: 格納されているエントリのキーの名前を返す SpEL 式。collection-type
がMAP
またはPROPERTIES
であり、"extract-payload-elements" が false の場合にのみ適用されます。connection-factory
:o.s.data.redis.connection.RedisConnectionFactory
のインスタンスへの参照。redis-template
:o.s.data.redis.core.RedisTemplate
のインスタンスへの参照。他のすべての受信アダプターに共通のその他の属性(「チャネル」など)。
redis-template と connection-factory の両方を設定することはできません。 |
デフォルトでは、アダプターは StringRedisTemplate を使用します。これは、キー、値、ハッシュキー、ハッシュ値に StringRedisSerializer インスタンスを使用します。ただし、extract-payload-elements が false に設定されている場合、キーおよびハッシュキー用の StringRedisSerializer インスタンスと値およびハッシュ値用の JdkSerializationRedisSerializer インスタンスを持つ RedisTemplate が使用されます。JDK シリアライザーでは、値が実際にコレクションであるかどうかに関係なく、Java 直列化がすべての値に使用されることを理解することが重要です。値の直列化をさらに制御する必要がある場合は、これらのデフォルトに依存するのではなく、独自の RedisTemplate を提供することを検討してください。 |
key
およびその他の属性のリテラル値があるため、前の例は比較的単純で静的です。場合によっては、条件に基づいて実行時に値を動的に変更する必要があります。これを行うには、同等の -expression
(key-expression
、map-key-expression
など)を使用します。ここで、提供される式は任意の有効な SpEL 式にすることができます。
Redis 送信コマンドゲートウェイ
Spring Integration 4.0 は Redis コマンドゲートウェイを導入し、汎用の RedisConnection#execute
メソッドを使用して標準の Redis コマンドを実行できるようにしました。次のリストは、Redis 送信ゲートウェイで使用可能な属性を示しています。
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
1 | このエンドポイントが Message インスタンスを受信する MessageChannel 。 |
2 | このエンドポイントが応答 Message インスタンスを送信する MessageChannel 。 |
3 | この送信ゲートウェイが null 以外の値を返す必要があるかどうかを指定します。デフォルトは true です。Redis が null 値を返すと、ReplyRequiredException がスローされます。 |
4 | 応答メッセージが送信されるまで待機するタイムアウト(ミリ秒単位)。通常、キューベースの制限された応答チャネルに適用されます。 |
5 | RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。'redis-template' 属性と相互に排他的です。 |
6 | RedisTemplate Bean への参照。'connection-factory' 属性と相互に排他的です。 |
7 | org.springframework.data.redis.serializer.RedisSerializer のインスタンスへの参照。必要に応じて、各コマンド引数を byte[] に直列化するために使用されます。 |
8 | コマンドキーを返す SpEL 式。デフォルトは redis_command メッセージヘッダーです。null に評価してはなりません。 |
9 | コマンド引数として評価されるカンマ区切りの SpEL 式。arguments-strategy 属性と相互に排他的。どちらの属性も指定しない場合、payload がコマンド引数として使用されます。引数式は "null" と評価され、可変数の引数をサポートできます。 |
10 | argument-expressions が構成されている場合に、評価された Redis コマンド文字列を o.s.i.redis.outbound.ExpressionArgumentsStrategy の式評価コンテキストで #cmd 変数として使用できるようにするかどうかを指定する boolean フラグ。それ以外の場合、この属性は無視されます。 |
11 | o.s.i.redis.outbound.ArgumentsStrategy のインスタンスへの参照。argument-expressions 属性と相互に排他的です。どちらの属性も指定しない場合、payload がコマンド引数として使用されます。 |
<int-redis:outbound-gateway>
を共通コンポーネントとして使用して、任意の Redis 操作を実行できます。次の例は、Redis アトミック番号から増分値を取得する方法を示しています。
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
Message
ペイロードの名前は redisCounter
である必要があり、これは org.springframework.data.redis.support.atomic.RedisAtomicInteger
Bean 定義によって提供される場合があります。
RedisConnection#execute
メソッドには、戻り値の型として汎用 Object
があります。実際の結果はコマンドの種類によって異なります。例: MGET
は List<byte[]>
を返します。コマンド、それらの引数、結果型の詳細については、Redis 仕様 (英語) を参照してください。
Redis キュー送信ゲートウェイ
Spring Integration では、リクエストと応答のシナリオを実行するために Redis キュー送信ゲートウェイが導入されました。会話 UUID
を指定された queue
にプッシュし、その UUID
をキーとして持つ値を Redis リストにプッシュし、UUID
と .reply
のキーを持つ Redis リストからの応答を待ちます。インタラクションごとに異なる UUID が使用されます。次のリストは、Redis 送信ゲートウェイで使用可能な属性を示しています。
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
1 | このエンドポイントが Message インスタンスを受信する MessageChannel 。 |
2 | このエンドポイントが応答 Message インスタンスを送信する MessageChannel 。 |
3 | この送信ゲートウェイが非 null 値を返す必要があるかどうかを指定します。この値は、デフォルトでは false です。そうでない場合、Redis が null 値を返すと、ReplyRequiredException がスローされます。 |
4 | 応答メッセージが送信されるまで待機するタイムアウト(ミリ秒単位)。通常、キューベースの制限された応答チャネルに適用されます。 |
5 | RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。'redis-template' 属性と相互に排他的です。 |
6 | 送信ゲートウェイが会話 UUID を送信する Redis リストの名前。 |
7 | 複数のゲートウェイが登録されている場合のこの送信ゲートウェイの順序。 |
8 | RedisSerializer Bean リファレンス。空の文字列にすることもできます。これは、「シリアライザなし」を意味します。この場合、受信 Redis メッセージからの生の byte[] は、Message ペイロードとして channel に送信されます。デフォルトでは、JdkSerializationRedisSerializer です。 |
9 | このエンドポイントが Redis キューからのデータに Message インスタンス全体が含まれることを期待するかどうかを指定します。この属性が true に設定されている場合、メッセージは何らかの形式のデシリアライゼーション(デフォルトでは JDK シリアライゼーション)を必要とするため、serializer を空の文字列にすることはできません。 |
Redis キュー受信ゲートウェイ
Spring Integration 4.1 は、リクエストと応答のシナリオを実行するために Redis キューの受信ゲートウェイを導入しました。提供された queue
から会話 UUID
をポップし、その UUID
をキーとする値を Redis リストからポップし、UUID
と .reply
のキーを使用して Redis リストへの応答をプッシュします。次のリストは、Redis キューの受信ゲートウェイで使用可能な属性を示しています。
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
1 | このエンドポイントが Redis データから作成された Message インスタンスを送信する MessageChannel 。 |
2 | このエンドポイントが応答 Message インスタンスを待機する MessageChannel 。オプション - replyChannel ヘッダーはまだ使用されています。 |
3 | Spring TaskExecutor (または標準の JDK Executor )Bean への参照。基礎となるリスニングタスクに使用されます。デフォルトは SimpleAsyncTaskExecutor です。 |
4 | 応答メッセージが送信されるまで待機するタイムアウト(ミリ秒単位)。通常、キューベースの制限された応答チャネルに適用されます。 |
5 | RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。'redis-template' 属性と相互に排他的です。 |
6 | 会話 UUID の Redis リストの名前。 |
7 | 複数のゲートウェイが登録されている場合のこの受信ゲートウェイの順序。 |
8 | RedisSerializer Bean リファレンス。これは、「シリアライザーなし」を意味する空の文字列にすることができます。この場合、受信 Redis メッセージからの生の byte[] が Message ペイロードとして channel に送信されます。デフォルトは JdkSerializationRedisSerializer です。(バージョン 4.3 より前のリリースでは、デフォルトで StringRedisSerializer であったことに注意してください。その動作を復元するには、StringRedisSerializer への参照を提供します)。 |
9 | 受信メッセージが取得されるまで待機するタイムアウト(ミリ秒)。通常、キューベースの制限されたリクエストチャネルに適用されます。 |
10 | このエンドポイントが Redis キューからのデータに Message インスタンス全体が含まれることを期待するかどうかを指定します。この属性が true に設定されている場合、メッセージは何らかの形式のデシリアライゼーション(デフォルトでは JDK シリアライゼーション)を必要とするため、serializer を空の文字列にすることはできません。 |
11 | 「右ポップ」操作の例外の後、リスナタスクを再起動する前にリスナタスクがスリープする時間(ミリ秒単位)。 |
task-executor は、処理のために複数のスレッドで構成する必要があります。そうしないと、RedisQueueMessageDrivenEndpoint がエラー後にリスナータスクを再起動しようとしたときに、デッドロックが発生する可能性があります。errorChannel を使用してこれらのエラーを処理し、再起動を回避できますが、アプリケーションをデッドロックの可能性のある状況にさらさないことをお勧めします。可能な TaskExecutor 実装については、Spring Framework 参考マニュアルを参照してください。 |
Redis ストリーム送信チャネルアダプター
Spring Integration 5.4 は、メッセージペイロードを Redis ストリームに書き込むためのリアクティブ Redis ストリーム送信チャネルアダプターを導入しました。送信チャネルアダプターは、ReactiveStreamOperations.add(…)
を使用して Record
をストリームに追加します。次の例は、Redis ストリーム送信チャネルアダプターの Java 構成とサービスクラスを使用する方法を示しています。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
1 | ReactiveRedisConnectionFactory とストリーム名を使用して ReactiveRedisStreamMessageHandler のインスタンスを作成し、レコードを追加します。別のコンストラクターバリアントは、SpEL 式に基づいて、リクエストメッセージに対してストリームキーを評価します。 |
2 | ストリームに追加する前に、レコードのキーと値を直列化するために使用する RedisSerializationContext を設定します。 |
3 | Java 型と Redis ハッシュ / マップ間の契約を提供する HashMapper を設定します。 |
4 | 'true' の場合、チャネルアダプターは追加するストリームレコードのリクエストメッセージからペイロードを抽出します。または、メッセージ全体を値として使用します。デフォルトは true です。 |
Redis ストリーム受信チャネルアダプター
Spring Integration 5.4 は、Redis ストリームからメッセージを読み取るためのリアクティブストリーム受信チャネルアダプターを導入しました。受信・チャネルアダプターは、自動確認応答フラグに基づいて StreamReceiver.receive(…)
または StreamReceiver.receiveAutoAck()
を使用して、Redis ストリームからレコードを読み取ります。次の例は、Redis ストリーム受信チャネルアダプターの Java 構成を使用する方法を示しています。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
1 | ReactiveRedisConnectionFactory とストリームキーを使用してレコードを読み取ることにより、ReactiveRedisStreamMessageProducer のインスタンスを構築します。 |
2 | リアクティブインフラストラクチャを使用して redis ストリームを消費する StreamReceiver.StreamReceiverOptions 。 |
3 | アプリケーションコンテキストの開始後にこのエンドポイントを自動的に開始するかどうかを指定する SmartLifecycle 属性。デフォルトは true です。false の場合、RedisStreamMessageProducer は手動で messageProducer.start() を開始する必要があります。 |
4 | false の場合、受信したメッセージは自動確認応答されません。メッセージの確認応答は、クライアントがメッセージを消費するまで延期されます。デフォルトは true です。 |
5 | true の場合、コンシューマーグループが作成されます。コンシューマーグループの作成中に、ストリームも作成されます(まだ存在しない場合)。コンシューマーグループはメッセージ配信を追跡し、コンシューマーを区別します。デフォルトは false です。 |
6 | コンシューマーグループ名を設定します。デフォルトでは、定義された Bean 名になります。 |
7 | コンシューマー名を設定します。グループ my-group から my-consumer としてメッセージを読み取ります。 |
8 | このエンドポイントからメッセージを送信するメッセージチャネル。 |
9 | メッセージを読み取るためのオフセットを定義します。デフォルトは ReadOffset.latest() です。 |
10 | "true" の場合、チャネルアダプターは Record からペイロード値を抽出します。それ以外の場合、Record 全体がペイロードとして使用されます。デフォルトは true です。 |
autoAck
が false
に設定されている場合、Redis ストリームの Record
は Redis ドライバーによって自動的に確認されず、代わりに IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
ヘッダーがメッセージに追加され、SimpleAcknowledgment
インスタンスを値として生成します。ビジネスロジックがそのようなレコードに基づいてメッセージに対して実行されるたびに、その acknowledge()
コールバックを呼び出すのは、ターゲット統合フローの責任です。デシリアライゼーション中に例外が発生し、errorChannel
が構成されている場合でも、同様のロジックが必要です。そのため、ターゲットエラーハンドラーは、そのような失敗したメッセージに対して ack または nack を決定する必要があります。IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
とともに、ReactiveRedisStreamMessageProducer
はこれらのヘッダーをメッセージに入力して、RedisHeaders.STREAM_KEY
、RedisHeaders.STREAM_MESSAGE_ID
、RedisHeaders.CONSUMER_GROUP
、RedisHeaders.CONSUMER
を生成します。
バージョン 5.5 以降、ReactiveRedisStreamMessageProducer
で StreamReceiver.StreamReceiverOptionsBuilder
オプションを明示的に構成できます。これには、逆直列化エラーが発生したときに Redis ストリームコンシューマーがポーリングを続行する必要がある場合に必要な、新しく導入された onErrorResume
関数が含まれます。デフォルトの関数は、上記のように、失敗したメッセージの確認応答を含むメッセージをエラーチャネル(提供されている場合)に送信します。これらの StreamReceiver.StreamReceiverOptionsBuilder
はすべて、外部から提供された StreamReceiver.StreamReceiverOptions
と相互に排他的です。
Redis ロックレジストリ
Spring Integration 4.0 は RedisLockRegistry
を導入しました。特定のコンポーネント(アグリゲーターやリシーケンサーなど)は、LockRegistry
インスタンスから取得したロックを使用して、一度に 1 つのスレッドのみがグループを操作するようにします。DefaultLockRegistry
は、単一のコンポーネント内でこの機能を実行します。これで、これらのコンポーネントで外部ロックレジストリを構成できます。共有 MessageGroupStore
で使用する場合、RedisLockRegistry
を使用して複数のアプリケーションインスタンスにこの機能を提供でき、一度に 1 つのインスタンスのみがグループを操作できます。
ローカルスレッドによってロックが解除されると、通常、別のローカルスレッドがすぐにロックを取得できます。別のレジストリインスタンスを使用するスレッドによってロックが解放された場合、ロックを取得するのに最大 100 ミリ秒かかることがあります。
「ハング」ロック(サーバーの障害時)を回避するために、このレジストリのロックはデフォルトの 60 秒後に期限切れになりますが、レジストリでこの値を構成できます。ロックは通常、はるかに短い時間保持されます。
キーは期限切れになる可能性があるため、期限切れのロックをロック解除しようとすると、例外がスローされます。ただし、このようなロックによって保護されているリソースは危険にさらされている可能性があるため、このような例外は深刻であると見なされる必要があります。この状態を防ぐには、有効期限を十分に大きい値に設定する必要がありますが、サーバー障害が発生した後、妥当な時間内にロックを回復できるように十分に低く設定する必要があります。 |
バージョン 5.0 から、RedisLockRegistry
は ExpirableLockRegistry
を実装します。ExpirableLockRegistry
は、age
より前に最後に取得され、現在ロックされていないロックを削除します。
バージョン 5.5.6 以降、RedisLockRegistry
は、RedisLockRegistry.setCacheCapacity()
を介した RedisLockRegistry.locks
内の redisLocks のキャッシュの自動クリーンアップをサポートします。詳細については、JavaDocs を参照してください。
バージョン 5.5.13 以降、RedisLockRegistry
は、Redis ロックの取得をどのモードで行うかを決定する setRedisLockType(RedisLockType)
オプションを公開します。
RedisLockType.SPIN_LOCK
- ロックは、ロックを取得できるかどうかをチェックする定期的なループ(100ms)によって取得されます。デフォルト。RedisLockType.PUB_SUB_LOCK
- ロックは、redispub-sub サブスクリプションによって取得されます。
pub-sub が優先されるモード(クライアント Redis サーバー間のネットワークチャタリングが少なく、パフォーマンスが高い)は、他のプロセスでのロック解除についてサブスクリプションに通知されるとすぐにロックが取得されます。ただし、Redis はマスター / レプリカ接続(AWS ElastiCache 環境など)で pub-sub をサポートしていないため、レジストリをどの環境でも機能させるために、デフォルトとしてビジースピンモードが選択されています。
バージョン 6.4 以降では、ロックの所有権が期限切れの場合、RedisLockRegistry.RedisLock.unlock()
メソッドは IllegalStateException
をスローする代わりに ConcurrentModificationException
をスローします。
バージョン 6.4 以降では、ロックの定期的な更新のスケジューラを構成するための RedisLockRegistry.setRenewalTaskScheduler()
が追加されました。これを設定すると、ロックが正常に取得されてから、ロックが解除されるか、Redis キーが削除されるまで、有効期限の 1/3
ごとにロックが自動的に更新されます。