Redis サポート
Spring Integration 2.1 は、Redis (英語) のサポートを導入しました: 「オープンソースの高度な Key-Value ストア」。このサポートは、Redis ベースの MessageStore と、Redis が PUBLISH、SUBSCRIBE、UNSUBSCRIBE (英語) コマンドを介してサポートするパブリッシュ / サブスクライブメッセージングアダプターの形式で提供されます。
この依存関係はプロジェクトに必要です:
Maven
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>7.0.0</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:7.0.0"
The Redis client dependency must be included, e.g. Lettuce (英語) .
Redis をダウンロード、インストール、実行するには、Redis ドキュメント (英語) を参照してください。
Redis への接続
To begin interacting with Redis, a connection must be obtained first. Spring Integration uses support provided by another Spring project, Spring Data Redis [GitHub] (英語) , which provides typical Spring constructs: ConnectionFactory and Template. Those abstractions simplify integration with several Redis client Java API. Currently, Spring Data Redis supports Jedis [GitHub] (英語) and Lettuce (英語) .
RedisConnectionFactory を使用する
The RedisConnectionFactory from Spring Data Redis is a high-level abstraction for managing connections with Redis. The following listing shows the interface definition:
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
} 次の例は、Java で LettuceConnectionFactory を作成する方法を示しています。
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet(); 次の例は、Spring の XML 構成で LettuceConnectionFactory を作成する方法を示しています。
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>The implementations of RedisConnectionFactory provide a set of properties, such as port and host. Once an instance of RedisConnectionFactory exists, the RedisTemplate can be created.
RedisTemplate を使用する
Spring の他のテンプレートクラス(JdbcTemplate や JmsTemplate など)と同様に、RedisTemplate は Redis データアクセスコードを簡素化するヘルパークラスです。RedisTemplate およびそのバリエーション(StringRedisTemplate など)の詳細については、Spring Data Redis ドキュメントを参照してください。
次の例は、Java で RedisTemplate のインスタンスを作成する方法を示しています。
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory); 次の例は、Spring の XML 構成で RedisTemplate のインスタンスを作成する方法を示しています。
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>Redis でメッセージング
で記述されていたように導入、Redis はその PUBLISH、SUBSCRIBE、UNSUBSCRIBE コマンドによってメッセージング、パブリッシュサブスクライブのサポートを提供します。JMS および AMQP と同様に、Spring Integration は、Redis を介してメッセージを送受信するためのメッセージチャネルとアダプターを提供します。
Redis パブリッシュ / サブスクライブチャネル
Similarly to JMS, there are cases where both the producer and consumer are intended to be part of the same application, running within the same process. This can be accomplished with a pair of inbound and outbound channel adapters. However, as with Spring Integration’s JMS support, there is a simpler way to address this use case. Instead, a publish-subscribe channel can be used, as the following example shows:
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>A publish-subscribe-channel behaves much like a normal <publish-subscribe-channel/> element from the main Spring Integration namespace. It can be referenced by both the input-channel and the output-channel attributes of any endpoint. The difference is that this channel is backed by a Redis topic name: a String value specified by the topic-name attribute. However, unlike JMS, this topic does not have to be created in advance or even auto-created by Redis. In Redis, topics are simple String values that play the role of an address. The producer and consumer can communicate by using the same String value as their topic name. A simple subscription to this channel means that asynchronous publish-subscribe messaging is possible between the producing and consuming endpoints. However, unlike the asynchronous message channels created by adding a <queue/> element within a simple Spring Integration <channel/> element, the messages are not stored in an in-memory queue. Instead, those messages are passed through Redis, which lets rely on its support for persistence and clustering as well as its interoperability with other non-Java platforms.
Redis 受信チャネルアダプター
Redis 受信チャネルアダプター(RedisInboundChannelAdapter)は、他の受信アダプターと同じ方法で、受信 Redis メッセージを Spring メッセージに適合させます。プラットフォーム固有のメッセージ(この場合は Redis)を受信し、MessageConverter 戦略を使用して Spring メッセージに変換します。次の例は、Redis 受信チャネルアダプターを構成する方法を示しています。
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />The preceding example shows a simple but complete configuration of a Redis inbound channel adapter. Note that the preceding configuration relies on the familiar Spring paradigm of auto-discovering certain beans. In this case, the redisConnectionFactory is implicitly injected into the adapter. Alternatively, a custom RedisConnectionFactory can be injected via connection-factory attribute.
また、上記の構成では、アダプターにカスタム MessageConverter が挿入されることに注意してください。このアプローチは、MessageConverter インスタンスを使用して Redis メッセージと Spring Integration メッセージペイロードを変換する JMS に似ています。デフォルトは SimpleMessageConverter です。
受信アダプターは、複数のトピック名をサブスクライブできます。topics 属性のコンマ区切り値のセットです。
バージョン 3.0 以降、受信アダプターは、既存の topics 属性に加えて、topic-patterns 属性を持つようになりました。この属性には、Redis トピックパターンのコンマ区切りセットが含まれています。Redis のパブリッシュ / サブスクライブに関する詳細については、Redis Pub/Sub (英語) を参照してください。
受信アダプターは、RedisSerializer を使用して Redis メッセージの本文を逆直列化できます。<int-redis:inbound-channel-adapter> の serializer 属性を空の文字列に設定すると、RedisSerializer プロパティの null 値が得られます。この場合、Redis メッセージの未加工の byte[] 本体は、メッセージペイロードとして提供されます。
Since version 5.0, an Executor instance can be injected into the inbound adapter by using the task-executor attribute of the <int-redis:inbound-channel-adapter>. Also, the received Spring Integration messages now have the RedisHeaders.MESSAGE_SOURCE header to indicate the source of the published message: topic or pattern. This can be used downstream for routing logic.
Redis 送信チャネルアダプター
Redis 送信チャネルアダプターは、他の送信アダプターと同じ方法で、発信 Spring Integration メッセージを Redis メッセージに適合させます。Spring Integration メッセージを受信し、MessageConverter 戦略を使用してプラットフォーム固有のメッセージ(この場合は Redis)に変換します。次の例は、Redis 送信チャネルアダプターを構成する方法を示しています。
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" /> この構成は、Redis 受信チャネルアダプターに対応しています。アダプターには、RedisConnectionFactory が暗黙的に挿入されます。これは、Bean 名として redisConnectionFactory で定義されています。この例には、オプションの(およびカスタムの) MessageConverter (testConverter Bean)も含まれています。
Since Spring Integration 3.0, the <int-redis:outbound-channel-adapter> offers an alternative to the topic attribute: a topic-expression attribute is present to determine the Redis topic for the message at runtime. These attributes are mutually exclusive.
Redis キュー受信チャネルアダプター
Spring Integration 3.0 introduced a queue inbound channel adapter to “pop” messages from a Redis list. By default, it uses “right pop”, but can be configured to use “left pop” instead. The adapter is message-driven. It uses an internal listener thread and does not use a poller.
次のリストは、queue-inbound-channel-adapter で使用可能なすべての属性を示しています。
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)| 1 | The component bean name. If channel attribute not provided, a DirectChannel is created and registered in the application context with this id attribute as the bean name. In this case, the endpoint itself is registered with the bean name id plus .adapter. (Bean 名が thing1 であった場合、エンドポイントは thing1.adapter として登録されます。) |
| 2 | このエンドポイントから Message インスタンスを送信する MessageChannel。 |
| 3 | アプリケーションコンテキストの起動後にこのエンドポイントを自動的に起動するかどうかを指定するための SmartLifecycle 属性。デフォルトは true です。 |
| 4 | このエンドポイントが開始されるフェーズを指定する SmartLifecycle 属性。デフォルトは 0 です。 |
| 5 | RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。 |
| 6 | Redis メッセージを取得するためにキューベースの「ポップ」操作が実行される Redis リストの名前。 |
| 7 | エンドポイントのリスニングタスクから例外を受信したときに ErrorMessage インスタンスを送信する MessageChannel。デフォルトでは、基礎となる MessagePublishingErrorHandler はアプリケーションコンテキストのデフォルト errorChannel を使用します。 |
| 8 | RedisSerializer Bean リファレンス。これは、「シリアライザーなし」を意味する空の文字列にすることができます。この場合、受信 Redis メッセージからの生の byte[] が Message ペイロードとして channel に送信されます。デフォルトでは JdkSerializationRedisSerializer です。 |
| 9 | キューからの Redis メッセージを待機する「ポップ」操作のタイムアウト(ミリ秒)。デフォルトは 1 秒です。 |
| 10 | "pop" 操作の例外の後、リスナータスクを再起動するまでにリスナータスクがスリープする時間(ミリ秒)。 |
| 11 | このエンドポイントが Redis キューからのデータに Message インスタンス全体が含まれることを期待するかどうかを指定します。この属性が true に設定されている場合、メッセージは何らかの形式のデシリアライゼーション(デフォルトでは JDK シリアライゼーション)を必要とするため、serializer を空の文字列にすることはできません。デフォルトは false です。 |
| 12 | Spring TaskExecutor (または標準 JDK 1.5 + Executor)Bean への参照。これは、基礎となるリスニングタスクに使用されます。デフォルトは SimpleAsyncTaskExecutor です。 |
| 13 | このエンドポイントが Redis リストからメッセージを読み取るために「右ポップ」(true の場合)または「左ポップ」(false の場合)を使用するかどうかを指定します。true の場合、Redis リストは、デフォルトの Redis キュー送信チャネルアダプターと共に使用されると、FIFO キューとして機能します。false に設定すると、「右プッシュ」でリストに書き込むソフトウェアで使用したり、スタックのようなメッセージの順序を実現したりできます。デフォルトは true です。バージョン 4.3 以降。 |
The task-executor has to be configured with more than one thread for processing; otherwise there is a possible deadlock when the RedisQueueMessageDrivenEndpoint tries to restart the listener task after an error. The errorChannel can be used to process those errors, to avoid restarts, but it is preferable to not expose the application to the possible deadlock situation. See Spring Framework 参考マニュアル for possible TaskExecutor implementations. |
Redis キュー送信チャネルアダプター
Spring Integration 3.0 introduced a queue outbound channel adapter to “push” to a Redis list from Spring Integration messages. By default, it uses “left push”, but “right push” can be configured instead. The following listing shows all the available attributes for a Redis queue-outbound-channel-adapter:
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)| 1 | The component bean name. If the channel attribute is not provided, a DirectChannel is created and registered in the application context with this id attribute as the bean name. In this case, the endpoint is registered with a bean name of id plus .adapter. (Bean 名が thing1 であった場合、エンドポイントは thing1.adapter として登録されます。) |
| 2 | このエンドポイントが Message インスタンスを受信する MessageChannel。 |
| 3 | RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。 |
| 4 | Redis メッセージを送信するためにキューベースの「プッシュ」操作が実行される Redis リストの名前。この属性は queue-expression と相互に排他的です。 |
| 5 | Redis リストの名前を決定する SpEL Expression。実行時に受信 Message を #root 変数として使用します。この属性は queue と相互に排他的です。 |
| 6 | RedisSerializer Bean リファレンス。デフォルトは JdkSerializationRedisSerializer です。ただし、String ペイロードの場合、serializer 参照が提供されていない場合は、StringRedisSerializer が使用されます。 |
| 7 | このエンドポイントがペイロードのみを送信するか、Message 全体を Redis キューに送信するかを指定します。デフォルトは true です。 |
| 8 | このエンドポイントが Redis リストにメッセージを書き込むために「左プッシュ」(true の場合)または「右プッシュ」(false の場合)を使用するかどうかを指定します。true の場合、Redis リストは、デフォルトの Redis キュー受信チャネルアダプターと共に使用されると、FIFO キューとして機能します。false に設定すると、「左ポップ」でリストから読み取るソフトウェアで使用したり、スタックのようなメッセージの順序を実現したりできます。デフォルトは true です。バージョン 4.3 以降。 |
Redis アプリケーションイベント
Spring Integration 3.0 以降、Redis モジュールは IntegrationEvent の実装を提供しており、これは org.springframework.context.ApplicationEvent です。RedisExceptionEvent は、Redis 操作からの例外をカプセル化します(エンドポイントがイベントの「ソース」となります)。たとえば、<int-redis:queue-inbound-channel-adapter/> は BoundListOperations.rightPop 操作からの例外をキャッチした後、これらのイベントを発行します。例外は、汎用の org.springframework.data.redis.RedisSystemException または org.springframework.data.redis.RedisConnectionFailureException のいずれかです。これらのイベントを <int-event:inbound-channel-adapter/> で処理することは、バックグラウンドの Redis タスクの問題を特定し、管理アクションを実行できます。
Redis メッセージストア
As described in the Enterprise Integration Patterns (EIP) book, a message store (英語) lets persist messages. This can be useful when dealing with components that have a capability to buffer messages (aggregator, resequencer, and others) when reliability is a concern. In Spring Integration, the MessageStore strategy also provides the foundation for the claim check (英語) pattern, which is described in EIP as well.
Spring Integration の Redis モジュールは RedisMessageStore を提供します。次の例は、アグリゲーターでそれを使用する方法を示しています。
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/> 上記の例は Bean 構成であり、コンストラクター引数として RedisConnectionFactory を想定しています。
By default, the RedisMessageStore uses Java serialization to serialize the message. However, if a different serialization technique (such as JSON) is required, a custom serializer can be set into the valueSerializer property of the RedisMessageStore.
The Framework provides Jackson serializer and deserializer implementations for Message instances and MessageHeaders instances — MessageJsonDeserializer and MessageHeadersJsonSerializer, respectively. They have to be configured with the SimpleModule options for the ObjectMapper. In addition, the enableDefaultTyping should be set on the ObjectMapper to add type information for each serialized complex object. That type information is then used during deserialization. The framework provides a utility method called JacksonMessagingUtils.messagingAwareMapper(), which is already supplied with all the previously mentioned properties and serializers. This utility method comes with the trustedPackages argument to limit Java packages for deserialization to avoid security vulnerabilities. The default trusted packages: java.util、java.lang、org.springframework.messaging.support、org.springframework.integration.support、org.springframework.integration.message、org.springframework.integration.store. To manage JSON serialization in the RedisMessageStore, a configuration like following must be applied:
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonMessagingUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson3JsonRedisSerializer(mapper);
store.setValueSerializer(serializer); バージョン 4.3.12 から、RedisMessageStore は prefix オプションをサポートし、同じ Redis サーバー上のストアのインスタンスを区別できるようにします。
Redis チャネルメッセージストア
The RedisMessageStore shown earlier maintains each group as a value under a single key (the group ID). While a QueueChannel can be used for persistence, a specialized RedisChannelMessageStore is provided for that purpose (since version 4.0). This store uses a LIST for each channel, LPUSH when sending messages, and RPOP when receiving messages. By default, this store also uses JDK serialization, but it can be modified for the value serializer, as described earlier.
It is recommended to use a store-backing channel, instead of using the general RedisMessageStore. The following example defines a Redis message store and uses it in a channel with a queue:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel> データの保存に使用されるキーの形式は、<storeBeanName>:<channelId> (前の例では redisMessageStore:somePersistentQueueChannel)です。
In addition, a subclass RedisChannelPriorityMessageStore is also provided. When this is used with a QueueChannel, the messages are received in (FIFO) priority order. It uses the standard IntegrationMessageHeaderAccessor.PRIORITY header and supports priority values (0 - 9). Messages with other priorities (and messages with no priority) are retrieved in FIFO order after any messages with priority.
これらのストアは BasicMessageGroupStore のみを実装し、MessageGroupStore は実装しません。それらは、QueueChannel のバックアップなどの状況にのみ使用できます。 |
Redis メタデータストア
Spring Integration 3.0 introduced a new Redis-based MetadataStore (Javadoc) (see メタデータストア ) implementation. The RedisMetadataStore can be used to maintain the state of a MetadataStore across application restarts. Such a MetadataStore implementation can be used with adapters such as:
これらのアダプターに新しい RedisMetadataStore を使用するよう指示するには、metadataStore という名前の Spring Bean を宣言します。フィード受信チャネルアダプターとフィード受信チャネルアダプターの両方が、宣言された RedisMetadataStore を自動的に取得して使用します。次の例は、そのような Bean を宣言する方法を示しています。
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>RedisMetadataStore は RedisProperties (Javadoc) によってサポートされています。それとの相互作用は BoundHashOperations (Javadoc) を使用します。これは、Properties ストア全体に対して key を必要とします。MetadataStore の場合、この key はリージョンのロールを果たします。これは、いくつかのアプリケーションが同じ Redis サーバーを使用する分散環境で役立ちます。デフォルトでは、この key の値は MetaData です。
バージョン 4.0 以降、このストアは ConcurrentMetadataStore を実装し、キーの値を保存または変更できるインスタンスが 1 つだけである複数のアプリケーションインスタンス間で確実に共有できるようにします。
The RedisMetadataStore.replace() cannot be used (for example, in the AbstractPersistentAcceptOnceFileListFilter) with a Redis cluster, since the WATCH command for atomicity is not currently supported. |
Redis ストア受信チャネルアダプター
Redis ストア受信チャネルアダプターは、Redis コレクションからデータを読み取り、Message ペイロードとして送信するポーリングコンシューマーです。次の例は、Redis ストア受信チャネルアダプターを構成する方法を示しています。
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter> 上記の例は、store-inbound-channel-adapter 要素を使用して Redis ストア受信チャネルアダプターを構成し、次のようなさまざまな属性の値を提供する方法を示しています。
keyまたはkey-expression: 使用されているコレクションのキーの名前。collection-type: このアダプターでサポートされているコレクション型の列挙。サポートされているコレクションはLIST、SET、ZSET、PROPERTIES、MAPです。connection-factory:o.s.data.redis.connection.RedisConnectionFactoryのインスタンスへの参照。redis-template:o.s.data.redis.core.RedisTemplateのインスタンスへの参照。すべての受信 アダプターに共通するその他の属性 (「チャネル」など)。
The redis-template and connection-factory are mutually exclusive. |
By default, the adapter uses a
|
Because it has a literal value for the key, the preceding example is relatively simple and static. Sometimes, the value of the key must be changed at runtime based on some condition. To do so, use key-expression instead, where the provided expression can be any valid SpEL expression.
Also, some post-processing could be done on the successfully processed data that was read from the Redis collection. For example, the value may be moved or removed after it has been processed. The transaction synchronization feature could be used for such a logic. The following example uses key-expression and transaction synchronization:
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>The poller can be transactional by using a transactional element. This element can reference a real transaction manager, for example, if some other part of the flow invokes JDBC. If no “real” transaction, a o.s.i.transaction.PseudoTransactionManager can be used instead, which is an implementation of Spring’s PlatformTransactionManager and enables the use of the transaction synchronization features of the Redis adapter when there is no actual transaction.
| これにより、Redis アクティビティ自体がトランザクションになりません。成功(コミット)または失敗(ロールバック)の前後にアクションの同期を取ることができます。 |
Once the poller is transactional, an instance of the o.s.i.transaction.TransactionSynchronizationFactory can be added on the transactional element. TransactionSynchronizationFactory creates an instance of the TransactionSynchronization. For convenience, a default SpEL-based TransactionSynchronizationFactory is exposed, which lets configure SpEL expressions, with their execution being coordinated (synchronized) with a transaction. Expressions for before-commit, after-commit, and after-rollback are supported, together with channels (one for each kind of event) where the evaluation results (if any) is sent. For each child element, an expression and channel attributes can be specified. If only the channel attribute is present, the received message is sent there as part of the particular synchronization scenario. If only the expression attribute is present and the result of an expression is a non-null value, a message with the result as the payload is generated and sent to a default channel (NullChannel) and appears in the logs (at the DEBUG level). If the result of an expression is null or void, no message is generated.
RedisStoreMessageSource は、TransactionSynchronizationProcessor 実装からアクセスできるトランザクション IntegrationResourceHolder にバインドされた RedisStore インスタンスを持つ store 属性を追加します。
トランザクション同期の詳細については、トランザクションの同期を参照してください。
RedisStore 送信チャネルアダプター
The RedisStore outbound channel adapter lets write a message payload to a Redis collection, as the following example shows:
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" /> 上記の構成では、store-inbound-channel-adapter 要素を使用して Redis ストア送信チャネルアダプターを保存します。次のようなさまざまな属性の値を提供します。
keyまたはkey-expression: 使用されているコレクションのキーの名前。extract-payload-elements:true(デフォルト) に設定され、ペイロードが「複数値」オブジェクト (つまり、CollectionまたはMap) のインスタンスである場合は、"addAll" および "putAll" セマンティクスを使用して格納されます。それ以外の場合、falseに設定すると、ペイロードはその型に関係なく単一のエントリとして格納されます。ペイロードが「複数値」オブジェクトのインスタンスでない場合、この属性の値は無視され、ペイロードは常に単一のエントリとして格納されます。collection-type: このアダプターでサポートされているCollection型の列挙。サポートされているコレクションはLIST、SET、ZSET、PROPERTIES、MAPです。map-key-expression: 格納されているエントリのキーの名前を返す SpEL 式。collection-typeがMAPまたはPROPERTIESであり、"extract-payload-elements" が false の場合にのみ適用されます。connection-factory:o.s.data.redis.connection.RedisConnectionFactoryのインスタンスへの参照。redis-template:o.s.data.redis.core.RedisTemplateのインスタンスへの参照。すべての受信 アダプターに共通するその他の属性 (「チャネル」など)。
The redis-template and connection-factory are mutually exclusive. |
By default, the adapter uses a StringRedisTemplate. This uses StringRedisSerializer instances for keys, values, hash keys, and hash values. However, if extract-payload-elements is set to false, a RedisTemplate that has StringRedisSerializer instances for keys and hash keys and JdkSerializationRedisSerializer instances s for values and hash values will be used. With the JDK serializer, it is important to understand that Java serialization is used for all values, regardless of whether the value is actually a collection or not. In case of more control over the serialization of values, a custom RedisTemplate could be provided rather than relying upon these defaults. |
Because it has literal values for the key and other attributes, the preceding example is relatively simple and static. Sometimes, the values may be changed dynamically at runtime based on some condition. To do so, their -expression equivalents (key-expression, map-key-expression, and so on) are provided, where an expression can be any valid SpEL expression.
Redis 送信コマンドゲートウェイ
Spring Integration 4.0 introduced the Redis command gateway to let perform any standard Redis command by using the generic RedisConnection#execute method. The following listing shows the available attributes for the Redis outbound gateway:
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)| 1 | このエンドポイントが Message インスタンスを受信する MessageChannel。 |
| 2 | このエンドポイントが応答 Message インスタンスを送信する MessageChannel。 |
| 3 | この送信ゲートウェイが null 以外の値を返す必要があるかどうかを指定します。デフォルトは true です。Redis が null 値を返すと、ReplyRequiredException がスローされます。 |
| 4 | 応答メッセージが送信されるまで待機するタイムアウト(ミリ秒単位)。通常、キューベースの制限された応答チャネルに適用されます。 |
| 5 | A reference to a RedisConnectionFactory bean. It defaults to redisConnectionFactory. It is mutually exclusive with redis-template attribute. |
| 6 | A reference to a RedisTemplate bean. It is mutually exclusive with connection-factory attribute. |
| 7 | A reference to an instance of org.springframework.data.redis.serializer.RedisSerializer. It is used to serialize each command argument to byte[], if necessary. |
| 8 | コマンドキーを返す SpEL 式。デフォルトは redis_command メッセージヘッダーです。null に評価してはなりません。 |
| 9 | Comma-separated SpEL expressions that are evaluated as command arguments. Mutually exclusive with the arguments-strategy attribute. If neither attribute is provided, the payload is used as the command arguments. The argument expressions can evaluate to 'null' to support a variable number of arguments. |
| 10 | argument-expressions が構成されている場合に、評価された Redis コマンド文字列を o.s.i.redis.outbound.ExpressionArgumentsStrategy の式評価コンテキストで #cmd 変数として使用できるようにするかどうかを指定する boolean フラグ。それ以外の場合、この属性は無視されます。 |
| 11 | Reference to an instance of o.s.i.redis.outbound.ArgumentsStrategy. It is mutually exclusive with argument-expressions attribute. If neither attribute is provided, the payload is used as the command arguments. |
The <int-redis:outbound-gateway> can be used as a common component to perform any desired Redis operation. The following example shows how to get incremented values from Redis atomic number:
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>Message ペイロードの名前は redisCounter である必要があり、これは org.springframework.data.redis.support.atomic.RedisAtomicInteger Bean 定義によって提供される場合があります。
The RedisConnection#execute method has a generic Object as its return type. The real result depends on the command type. For example, MGET returns a List<byte[]>. For more information about commands, their arguments, and result type, see Redis 仕様 (英語) 。
Redis キュー送信ゲートウェイ
Spring Integration では、リクエストと応答のシナリオを実行するために Redis キュー送信ゲートウェイが導入されました。会話 UUID を指定された queue にプッシュし、その UUID をキーとして持つ値を Redis リストにプッシュし、UUID と .reply のキーを持つ Redis リストからの応答を待ちます。インタラクションごとに異なる UUID が使用されます。次のリストは、Redis 送信ゲートウェイで使用可能な属性を示しています。
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)| 1 | このエンドポイントが Message インスタンスを受信する MessageChannel。 |
| 2 | このエンドポイントが応答 Message インスタンスを送信する MessageChannel。 |
| 3 | この送信ゲートウェイが非 null 値を返す必要があるかどうかを指定します。この値は、デフォルトでは false です。そうでない場合、Redis が null 値を返すと、ReplyRequiredException がスローされます。 |
| 4 | 応答メッセージが送信されるまで待機するタイムアウト(ミリ秒単位)。通常、キューベースの制限された応答チャネルに適用されます。 |
| 5 | RedisConnectionFactory Bean への参照。デフォルトは redisConnectionFactory です。'redis-template' 属性と相互に排他的です。 |
| 6 | 送信ゲートウェイが会話 UUID を送信する Redis リストの名前。 |
| 7 | 複数のゲートウェイが登録されている場合のこの送信ゲートウェイの順序。 |
| 8 | RedisSerializer Bean リファレンス。空の文字列にすることもできます。これは、「シリアライザなし」を意味します。この場合、受信 Redis メッセージからの生の byte[] は、Message ペイロードとして channel に送信されます。デフォルトでは、JdkSerializationRedisSerializer です。 |
| 9 | このエンドポイントが Redis キューからのデータに Message インスタンス全体が含まれることを期待するかどうかを指定します。この属性が true に設定されている場合、メッセージは何らかの形式のデシリアライゼーション(デフォルトでは JDK シリアライゼーション)を必要とするため、serializer を空の文字列にすることはできません。 |
Redis キュー受信ゲートウェイ
Spring Integration 4.1 は、リクエストと応答のシナリオを実行するために Redis キューの受信ゲートウェイを導入しました。提供された queue から会話 UUID をポップし、その UUID をキーとする値を Redis リストからポップし、UUID と .reply のキーを使用して Redis リストへの応答をプッシュします。次のリストは、Redis キューの受信ゲートウェイで使用可能な属性を示しています。
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)| 1 | このエンドポイントが Redis データから作成された Message インスタンスを送信する MessageChannel。 |
| 2 | このエンドポイントが応答 Message インスタンスを待機する MessageChannel。オプション - replyChannel ヘッダーはまだ使用されています。 |
| 3 | Spring TaskExecutor (または標準の JDK Executor)Bean への参照。基礎となるリスニングタスクに使用されます。デフォルトは SimpleAsyncTaskExecutor です。 |
| 4 | 応答メッセージが送信されるまで待機するタイムアウト(ミリ秒単位)。通常、キューベースの制限された応答チャネルに適用されます。 |
| 5 | A reference to a RedisConnectionFactory bean. It defaults to redisConnectionFactory. It is mutually exclusive with redis-template attribute. |
| 6 | 会話 UUID の Redis リストの名前。 |
| 7 | 複数のゲートウェイが登録されている場合のこの受信ゲートウェイの順序。 |
| 8 | RedisSerializer Bean リファレンス。これは、「シリアライザーなし」を意味する空の文字列にすることができます。この場合、受信 Redis メッセージからの生の byte[] が Message ペイロードとして channel に送信されます。デフォルトは JdkSerializationRedisSerializer です。(バージョン 4.3 より前のリリースでは、デフォルトで StringRedisSerializer であったことに注意してください。その動作を復元するには、StringRedisSerializer への参照を提供します)。 |
| 9 | The timeout (in milliseconds) to wait until the received message is fetched. It is typically applied for queue-based limited request-channels. |
| 10 | このエンドポイントが Redis キューからのデータに Message インスタンス全体が含まれることを期待するかどうかを指定します。この属性が true に設定されている場合、メッセージは何らかの形式のデシリアライゼーション(デフォルトでは JDK シリアライゼーション)を必要とするため、serializer を空の文字列にすることはできません。 |
| 11 | 「右ポップ」操作の例外の後、リスナタスクを再起動する前にリスナタスクがスリープする時間(ミリ秒単位)。 |
The task-executor has to be configured with more than one thread for processing; otherwise there is a possible deadlock when the RedisQueueMessageDrivenEndpoint tries to restart the listener task after an error. The errorChannel can be used to process those errors, to avoid restarts, but it is preferable to not expose the application to the possible deadlock situation. See Spring Framework 参考マニュアル for possible TaskExecutor implementations. |
Redis ストリーム送信チャネルアダプター
Spring Integration 5.4 は、メッセージペイロードを Redis ストリームに書き込むためのリアクティブ Redis ストリーム送信チャネルアダプターを導入しました。送信チャネルアダプターは、ReactiveStreamOperations.add(…) を使用して Record をストリームに追加します。次の例は、Redis ストリーム送信チャネルアダプターの Java 構成とサービスクラスを使用する方法を示しています。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}| 1 | ReactiveRedisConnectionFactory とストリーム名を使用して ReactiveRedisStreamMessageHandler のインスタンスを作成し、レコードを追加します。別のコンストラクターバリアントは、SpEL 式に基づいて、リクエストメッセージに対してストリームキーを評価します。 |
| 2 | ストリームに追加する前に、レコードのキーと値を直列化するために使用する RedisSerializationContext を設定します。 |
| 3 | Java 型と Redis ハッシュ / マップ間の契約を提供する HashMapper を設定します。 |
| 4 | If 'true', the channel adapter will extract the payload from a request message for a stream record to add. Or use the whole message as a value. It defaults to true. |
バージョン 6.5 以降、ReactiveRedisStreamMessageHandler は、内部 ReactiveStreamOperations.add(Record<K, ?> record, XAddOptions xAddOptions) 呼び出しのリクエストメッセージに基づいて RedisStreamCommands.XAddOptions を構築するための setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction) を提供します。
Redis ストリーム受信チャネルアダプター
Spring Integration 5.4 introduced the Reactive Stream inbound channel adapter for reading messages from a Redis Stream. Inbound channel adapter uses StreamReceiver.receive(…) or StreamReceiver.receiveAutoAck() based on an auto acknowledgement flag to read record from Redis stream. The following example shows how to use Java configuration for Redis Stream Inbound Channel Adapter.
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}| 1 | Construct an instance of ReactiveRedisStreamMessageProducer using ReactiveRedisConnectionFactory and a stream key to read records. |
| 2 | リアクティブインフラストラクチャを使用して redis ストリームを消費する StreamReceiver.StreamReceiverOptions。 |
| 3 | A SmartLifecycle attribute to specify whether this endpoint should start automatically after the application context starts or not. It defaults to true. If false, RedisStreamMessageProducer should be started manually messageProducer.start(). |
| 4 | If false, received messages are not auto acknowledged. The acknowledgement of the message will be deferred to the client-consuming message. It defaults to true. |
| 5 | If true, a consumer group will be created. During creation of consumer group stream will be created (if not exists yet), too. Consumer group tracks message delivery and distinguishes between consumers. It defaults to false. |
| 6 | Set the Consumer Group name. It defaults to the defined bean name. |
| 7 | Set Consumer name. Reads a message as my-consumer from group my-group. |
| 8 | このエンドポイントからメッセージを送信するメッセージチャネル。 |
| 9 | Define the offset to read a message. It defaults to ReadOffset.latest(). |
| 10 | "true" の場合、チャネルアダプターは Record からペイロード値を抽出します。それ以外の場合、Record 全体がペイロードとして使用されます。デフォルトは true です。 |
If the autoAck is set to false, the Record in Redis Stream is not acknowledged automatically by the Redis driver, instead an IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK header is added into a message to produce with a SimpleAcknowledgment instance as a value. It is a target integration flow responsibility to call its acknowledge() callback whenever the business logic is done for the message based on such a record. Similar logic is required even when an exception happens during deserialization and errorChannel is configured. So, the target error handler must decide to ack or nack such a failed message. Alongside with IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK, the ReactiveRedisStreamMessageProducer also populates these headers into the message to produce: RedisHeaders.STREAM_KEY、RedisHeaders.STREAM_MESSAGE_ID、RedisHeaders.CONSUMER_GROUP、RedisHeaders.CONSUMER.
Starting with version 5.5, StreamReceiver.StreamReceiverOptionsBuilder options can be configured explicitly on the ReactiveRedisStreamMessageProducer, including the newly introduced onErrorResume function, which is required if the Redis Stream consumer should continue polling when deserialization errors occur. The default function sends a message to the error channel (if provided) with possible acknowledgement for the failed message as it is described above. All these StreamReceiver.StreamReceiverOptionsBuilder are mutually exclusive with an externally provided StreamReceiver.StreamReceiverOptions.
Redis ロックレジストリ
Spring Integration 4.0 introduced the RedisLockRegistry. Certain components, (for example, aggregator and resequencer) use a lock obtained from a LockRegistry instance to ensure that only one thread manipulates a group at a time. The DefaultLockRegistry performs this function within a single component. An external lock registry can be configured on these components. When it is used with a shared MessageGroupStore, the RedisLockRegistry can be set to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
ローカルスレッドによってロックが解除されると、通常、別のローカルスレッドがすぐにロックを取得できます。別のレジストリインスタンスを使用するスレッドによってロックが解放された場合、ロックを取得するのに最大 100 ミリ秒かかることがあります。
To avoid “hung” locks (when a server fails), the locks in this registry are expired after default 60 seconds, but it can be configured on the registry. Locks are normally held for a much smaller time.
| Because the keys can expire, an attempt to unlock an expired lock results in an exception being thrown. However, the resources protected by such a lock may have been compromised, so such exceptions should be considered to be severe. The expiry should be set at a large enough value to prevent this condition, but set it low enough that the lock can be recovered after a server failure in a reasonable amount of time. |
バージョン 5.0 から、RedisLockRegistry は ExpirableLockRegistry を実装します。ExpirableLockRegistry は、age より前に最後に取得され、現在ロックされていないロックを削除します。
バージョン 5.5.6 以降、RedisLockRegistry は、RedisLockRegistry.setCacheCapacity() を介した RedisLockRegistry.locks 内の redisLocks のキャッシュの自動クリーンアップをサポートします。詳細については、JavaDocs を参照してください。
バージョン 5.5.13 以降、RedisLockRegistry は、Redis ロックの取得をどのモードで行うかを決定する setRedisLockType(RedisLockType) オプションを公開します。
RedisLockType.SPIN_LOCK- ロックは、ロックを取得できるかどうかをチェックする定期的なループ(100ms)によって取得されます。デフォルト。RedisLockType.PUB_SUB_LOCK- ロックは、redispub-sub サブスクリプションによって取得されます。
The pub-sub is the preferred mode - less network chatter between client Redis server, and more performant - the lock is acquired immediately when subscription is notified about unlocking in the other process. However, the Redis does not support pub-sub in the Master/Replica connections, (for example, in AWS ElastiCache environment), therefore, a busy-spin mode is chosen as a default to make the registry working in any environment.
バージョン 6.4 以降では、ロックの所有権が期限切れの場合、RedisLockRegistry.RedisLock.unlock() メソッドは IllegalStateException をスローする代わりに ConcurrentModificationException をスローします。
バージョン 6.4 以降では、ロックの定期的な更新のスケジューラを構成するための RedisLockRegistry.setRenewalTaskScheduler() が追加されました。これを設定すると、ロックが正常に取得されてから、ロックが解除されるか、Redis キーが削除されるまで、有効期限の 1/3 ごとにロックが自動的に更新されます。
Starting with version 7.0, the RedisLock implements DistributedLock interface to support the feature of customized time-to-live (TTL) for the lock status data. A RedisLock can now be acquired using the lock(Duration ttl) or tryLock(long time, TimeUnit unit, Duration ttl) method, with a specified time-to-live (TTL) value. The RedisLockRegistry now provides new renewLock(Object lockKey, Duration ttl) method, allowing to renew the lock with a custom time-to-live value.
AWS ElastiCache for Valkey Support in cluster mode
Starting with version 6.4.9/6.5.4/7.0.0, RedisLockRegistry supports AWS Elasticache for Valkey in cluster mode. In this version of valkey (a redis drop-in replacement), all PubSub operations (PUBLISH, SUBSCRIBE, etc.) use their sharded variants (SPUBLISH, SSUBSCRIBE, etc.) internally. If there are any errors in the form of:
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Script attempted to access keys that do not hash to the same slot script: b2dedc0ab01c17f9f20e3e6ddb62dcb6afbed0bd, on @user_script:3."in the unlock step of the RedisLockRegistry, a lock key that includes a hashtag {…} must be supplied to ensure all operations in the unlock script are hashed to the same cluster slot/shard, e.g.:
RedisLockRegistry lockRegistry = new RedisLockRegistry("my-lock-key{choose_your_tag}");
lockRegistry.lock();
# critical section
lockRegistry.unlock();