Pub/ サブメッセージング

Spring Data は、Spring Framework の JMS 統合と機能および命名が類似した、Redis 専用のメッセージング統合を提供します。

Redis メッセージングは、機能の 2 つの領域に大きく分けることができます。

  • メッセージの公開または作成

  • メッセージのサブスクリプションまたは消費

これは、パブリッシュ / サブスクライブ(略して Pub/ サブ)と呼ばれることが多いパターンの例です。RedisTemplate クラスは、メッセージの生成に使用されます。Java EE のメッセージ駆動型 Bean スタイルと同様の非同期受信の場合、Spring Data は、メッセージ駆動型 POJO(MDP)の作成に使用される専用のメッセージリスナーコンテナーを提供し、同期受信の場合は RedisConnection 契約を提供します。

org.springframework.data.redis.connection および org.springframework.data.redis.listener パッケージは、Redis メッセージングのコア機能を提供します。

公開 (メッセージ送信)

メッセージを公開するには、他の操作と同様に、低レベルの [Reactive]RedisConnection または高レベルの [Reactive]RedisOperations のいずれかを使用できます。どちらのエンティティも、メッセージと宛先チャネルを引数として受け入れる publish メソッドを提供します。RedisConnection には生データ(バイトの配列)が必要ですが、[Reactive]RedisOperations では、次の例に示すように、任意のオブジェクトをメッセージとして渡すことができます。

  • 命令的

  • リアクティブ

// send message through connection
RedisConnection con = …
byte[] msg = …
byte[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through RedisOperations
RedisOperations operations = …
Long numberOfClients = operations.convertAndSend("hello!", "world");
// send message through connection
ReactiveRedisConnection con = …
ByteBuffer[] msg = …
ByteBuffer[] channel = …
con.pubSubCommands().publish(msg, channel);

// send message through ReactiveRedisOperations
ReactiveRedisOperations operations = …
Mono<Long> numberOfClients = operations.convertAndSend("hello!", "world");

購読する (メッセージの受信)

受信側では、チャネルに直接名前を付けるか、パターンマッチングを使用して、1 つまたは複数のチャネルにサブスクライブできます。後者のアプローチは、1 つのコマンドで複数のサブスクリプションを作成できるだけでなく、サブスクリプション時にまだ作成されていないチャネルをリッスンできるため(パターンに一致する限り)非常に便利です。

低レベルでは、RedisConnection は、チャネルまたはパターンでそれぞれサブスクライブするために Redis コマンドをマップする subscribe メソッドと pSubscribe メソッドを提供します。複数のチャネルまたはパターンを引数として使用できることに注意してください。接続のサブスクリプションを変更したり、接続がリッスンしているかどうかを照会したりするために、RedisConnection は getSubscription メソッドと isSubscribed メソッドを提供します。

Spring Data のサブスクリプションコマンド Redis がブロックしています。つまり、接続で subscribe を呼び出すと、現在のスレッドがメッセージの待機を開始するときにブロックされます。スレッドは、サブスクリプションがキャンセルされた場合にのみ解放されます。これは、別のスレッドが同じ接続で unsubscribe または pUnsubscribe を呼び出したときに発生します。この問題の解決策については、"メッセージリスナコンテナー" (このドキュメントの後半)を参照してください。

前述のように、サブスクライブすると、接続はメッセージの待機を開始します。新しいサブスクリプションの追加、既存のサブスクリプションの変更、既存のサブスクリプションのキャンセルを行うコマンドのみが許可されます。subscribepSubscribeunsubscribe または pUnsubscribe 以外のものを呼び出すと、例外がスローされます。

メッセージをサブスクライブするには、MessageListener コールバックを実装する必要があります。新しいメッセージが到着するたびに、コールバックが呼び出され、ユーザーコードが onMessage メソッドによって実行されます。インターフェースは、実際のメッセージだけでなく、メッセージが受信されたチャネル、チャネルと一致するためにサブスクリプションによって使用されるパターン(存在する場合)へのアクセスを提供します。この情報により、受信者はコンテンツだけでなく、追加の詳細を調べることによって、さまざまなメッセージを区別できます。

メッセージリスナコンテナー

低レベルのサブスクリプションは、そのブロッキング特性により、リスナーごとに接続とスレッド管理が必要になるため、魅力的ではありません。この問題を軽減するために、Spring Data は、面倒な作業をすべて実行する RedisMessageListenerContainer (Javadoc) を提供しています。EJB と JMS に精通している場合は、Spring Framework とそのメッセージ駆動型 POJO (MDP) のサポートに可能な限り近いように設計されているため、概念に馴染みがあるはずです。

RedisMessageListenerContainer (Javadoc) はメッセージリスナーコンテナーとして機能します。これは、Redis チャネルからメッセージを受信し、そこに挿入された MessageListener (Javadoc) インスタンスを駆動するために使用されます。リスナーコンテナーは、メッセージ受信のすべてのスレッド化を担当し、リスナーにディスパッチして処理します。メッセージリスナーコンテナーは、MDP とメッセージングプロバイダー間の仲介役であり、メッセージの受信登録、リソースの取得と解放、例外の変換などを行います。これにより、アプリケーション開発者は、メッセージの受信 (およびそれに対する反応) に関連する (場合によっては複雑な) ビジネスロジックを作成し、定型的な Redis インフラストラクチャの問題をフレームワークに委譲できます。

MessageListener (Javadoc) は、サブスクリプション / サブスクリプション解除の確認時に通知を受信するために、SubscriptionListener (Javadoc) を追加で実装できます。サブスクリプション通知をリッスンすることは、呼び出しを同期するときに役立ちます。

さらに、アプリケーションのフットプリントを最小限に抑えるために、RedisMessageListenerContainer (Javadoc) では、サブスクリプションを共有していなくても、1 つの接続と 1 つのスレッドを複数のリスナーで共有できます。アプリケーションが追跡するリスナーやチャネルの数に関係なく、ランタイムコストは存続期間中同じままです。さらに、コンテナーではランタイム構成の変更が許可されているため、アプリケーションの実行中にリスナーを追加または削除しても、再起動する必要はありません。さらに、コンテナーでは遅延サブスクリプションアプローチが採用されており、必要な場合にのみ RedisConnection が使用されます。すべてのリスナーがサブスクライブ解除されると、クリーンアップが自動的に実行され、スレッドが解放されます。

メッセージの非同期性を支援するために、コンテナーにはメッセージをディスパッチするための java.util.concurrent.Executor (または Spring の TaskExecutor)が必要です。負荷、リスナーの数、ランタイム環境に応じて、ニーズにより適切に対応するようにエグゼキュータを変更または微調整する必要があります。特に、管理された環境(アプリサーバーなど)では、ランタイムを利用するために適切な TaskExecutor を選択することを強くお勧めします。

MessageListenerAdapter

MessageListenerAdapter (Javadoc) クラスは、Spring の非同期メッセージングサポートの最後のコンポーネントです。簡単に言えば、ほぼすべてのクラスを MDP として公開できます (ただし、いくつかの制約があります)。

次のインターフェース定義を検討してください。

public interface MessageDelegate {
  void handleMessage(String message);
  void handleMessage(Map message);
  void handleMessage(byte[] message);
  void handleMessage(Serializable message);
  // pass the channel/pattern as well
  void handleMessage(Serializable message, String channel);
 }

このインターフェースは MessageListener インターフェースを継承していませんが、MessageListenerAdapter (Javadoc) クラスを使用することで MDP として使用できることに注意してください。また、さまざまなメッセージ処理メソッドが、受信して処理できるさまざまな Message 型の内容に応じて厳密に型指定されていることにも注意してください。さらに、メッセージが送信されるチャネルまたはパターンは、String 型の 2 番目の引数としてメソッドに渡すことができます。

public class DefaultMessageDelegate implements MessageDelegate {
  // implementation elided for clarity...
}
Notice how the above implementation of the `MessageDelegate` interface (the above `DefaultMessageDelegate` class) has *no* Redis dependencies at all. It truly is a POJO that we make into an MDP with the following configuration:
  • Java

  • XML

@Configuration
class MyConfig {

  // …

  @Bean
  DefaultMessageDelegate listener() {
    return new DefaultMessageDelegate();
  }

  @Bean
  MessageListenerAdapter messageListenerAdapter(DefaultMessageDelegate listener) {
    return new MessageListenerAdapter(listener, "handleMessage");
  }

  @Bean
  RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory, MessageListenerAdapter listener) {

    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(listener, ChannelTopic.of("chatroom"));
    return container;
  }
}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:redis="http://www.springframework.org/schema/redis"
   xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd
   http://www.springframework.org/schema/redis https://www.springframework.org/schema/redis/spring-redis.xsd">

<!-- the default ConnectionFactory -->
<redis:listener-container>
  <!-- the method attribute can be skipped as the default method name is "handleMessage" -->
  <redis:listener ref="listener" method="handleMessage" topic="chatroom" />
</redis:listener-container>

<bean id="listener" class="redisexample.DefaultMessageDelegate"/>
 ...
</beans>
リスナートピックは、チャネル(topic="chatroom" など)またはパターンのいずれかです。(たとえば、topic="*room")

前の例では、Redis 名前空間を使用してメッセージリスナーコンテナーを宣言し、POJO をリスナーとして自動的に登録します。本格的な Bean の定義は次のとおりです。

<bean id="messageListener" class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
  <constructor-arg>
    <bean class="redisexample.DefaultMessageDelegate"/>
  </constructor-arg>
</bean>

<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
  <property name="connectionFactory" ref="connectionFactory"/>
  <property name="messageListeners">
    <map>
      <entry key-ref="messageListener">
        <bean class="org.springframework.data.redis.listener.ChannelTopic">
          <constructor-arg value="chatroom"/>
        </bean>
      </entry>
    </map>
  </property>
</bean>

メッセージが受信されるたびに、アダプターは、低レベルのフォーマットと必要なオブジェクト・型の間で(構成された RedisSerializer を使用して)自動的かつ透過的に変換を実行します。メソッドの呼び出しによって発生した例外はすべて、コンテナーによってキャッチおよび処理されます(デフォルトでは、例外はログに記録されます)。

リアクティブメッセージリスナーコンテナー

Spring Data は、ユーザーに代わって変換とサブスクリプション状態管理の面倒な作業をすべて実行する ReactiveRedisMessageListenerContainer (Javadoc) を提供します。

メッセージリスナーコンテナー自体は、外部スレッドリソースを必要としません。ドライバースレッドを使用してメッセージを公開します。

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Flux<ChannelMessage<String, String>> stream = container.receive(ChannelTopic.of("my-channel"));

適切なサブスクリプションを待って確認するには、Mono<Flux<ChannelMessage>> を返す receiveLater メソッドを使用できます。結果の Mono は、指定されたトピックへのサブスクリプションを完了した結果として、内部パブリッシャーで完了します。onNext シグナルをインターセプトすることで、サーバー側のサブスクリプションを同期できます。

ReactiveRedisConnectionFactory factory = …
ReactiveRedisMessageListenerContainer container = new ReactiveRedisMessageListenerContainer(factory);

Mono<Flux<ChannelMessage<String, String>>> stream = container.receiveLater(ChannelTopic.of("my-channel"));

stream.doOnNext(inner -> // notification hook when Redis subscriptions are synchronized with the server)
    .flatMapMany(Function.identity())
    .…;

テンプレート API を介したサブスクライブ

前述のように、ReactiveRedisTemplate (Javadoc) を直接使用してチャネル / パターンをサブスクライブできます。このアプローチは、最初のサブスクリプションの後にサブスクリプションを追加するオプションがなくなるため、制限のあるソリューションではありますが、簡単です。それでも、返された Flux (例: take(Duration)) を介してメッセージストリームを制御することはできます。読み取りが完了すると、エラーまたはキャンセル時に、バインドされたすべてのリソースが再び解放されます。

redisTemplate.listenToChannel("channel1", "channel2").doOnNext(msg -> {
    // message processing ...
}).subscribe();