Redis ストリーム

Redis Streams は、抽象的なアプローチでログデータ構造をモデル化します。通常、ログは追加専用のデータ構造であり、最初から、ランダムな位置で、新しいメッセージをストリーミングすることによって消費されます。

Redis リファレンスドキュメント (英語) の Redis ストリームの詳細を参照してください。

Redis ストリームは、大きく 2 つの機能領域に分けることができます。

  • レコードの追加

  • レコードの消費

このパターンは Pub/Sub と類似していますが、主な違いはメッセージの永続性とメッセージの消費メソッドにあります。

Pub/Sub は一時的なメッセージのブロードキャストに依存していますが(つまり、聞いていない場合はメッセージを見逃します)、Redis ストリームは、ストリームがトリミングされるまでメッセージを保持する永続的な追加専用データ型を使用します。消費のもう 1 つの違いは、Pub/Sub がサーバー側のサブスクリプションを登録することです。Redis は到着メッセージをクライアントにプッシュしますが、Redis ストリームはアクティブなポーリングを必要とします。

org.springframework.data.redis.connection および org.springframework.data.redis.stream パッケージは、Redis ストリームのコア機能を提供します。

追加

レコードを送信するには、他の操作と同様に、低レベルの RedisConnection または高レベルの StreamOperations のいずれかを使用できます。どちらのエンティティも、レコードと宛先ストリームを引数として受け入れる add (xAdd)メソッドを提供します。RedisConnection には生データ(バイトの配列)が必要ですが、StreamOperations では、次の例に示すように、任意のオブジェクトをレコードとして渡すことができます。

// append message through connection
RedisConnection con = …
byte[] stream = …
ByteRecord record = StreamRecords.rawBytes(…).withStreamKey(stream);
con.xAdd(record);

// append message through RedisTemplate
RedisTemplate template = …
StringRecord record = StreamRecords.string(…).withStreamKey("my-stream");
template.opsForStream().add(record);

ストリームレコードは、ペイロードとして Map、Key-Value タプルを運びます。レコードをストリームに追加すると、さらなる参照として使用できる RecordId が返されます。

消費する

消費側では、1 つまたは複数のストリームを消費できます。Redis ストリームは、既知のストリームコンテンツ内およびストリームの終わりを超えて任意の位置(ランダムアクセス)からストリームを消費して新しいストリームレコードを消費できるようにする読み取りコマンドを提供します。

低レベルでは、RedisConnection は、Redis コマンドをそれぞれコンシューマーグループ内で読み取るためにマップする xRead メソッドと xReadGroup メソッドを提供します。複数のストリームを引数として使用できることに注意してください。

Redis のサブスクリプションコマンドがブロックしている可能性があります。つまり、接続で xRead を呼び出すと、現在のスレッドがメッセージの待機を開始するときにブロックされます。スレッドは、読み取りコマンドがタイムアウトするか、メッセージを受信した場合にのみ解放されます。

ストリームメッセージを消費するには、アプリケーションコード内のメッセージをポーリングするか、2 つのメッセージリスナーコンテナーを介した非同期受信のいずれか(命令型またはリアクティブ型)を使用できます。新しいレコードが到着するたびに、コンテナーはアプリケーションコードを通知します。

同期受信

ストリームの消費は通常、非同期処理に関連付けられていますが、メッセージを同期的に消費することもできます。オーバーロードされた StreamOperations.read(…) メソッドは、この機能を提供します。同期受信中、呼び出し元のスレッドは、メッセージが使用可能になるまでブロックする可能性があります。プロパティ StreamReadOptions.block は、受信者がメッセージの待機をあきらめる前に待機する時間を指定します。

// Read message through RedisTemplate
RedisTemplate template = …

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(StreamReadOptions.empty().count(2),
				StreamOffset.latest("my-stream"));

List<MapRecord<K, HK, HV>> messages = template.opsForStream().read(Consumer.from("my-group", "my-consumer"),
				StreamReadOptions.empty().count(2),
				StreamOffset.create("my-stream", ReadOffset.lastConsumed()))

メッセージリスナーコンテナーを介した非同期受信

低レベルのポーリングはブロッキングの性質があるため、すべてのコンシューマーに対して接続とスレッドの管理が必要になるため、魅力的ではありません。この問題を軽減するために、Spring Data はメッセージリスナーを提供します。メッセージリスナーはすべての面倒な作業を行います。EJB と JMS に精通している場合は、Spring Framework とそのメッセージ駆動型 POJO(MDP)のサポートに可能な限り近いように設計されているため、概念に精通している必要があります。

Spring Data には、使用されるプログラミングモデルに合わせて調整された 2 つの実装が付属しています。

  • StreamMessageListenerContainer (Javadoc) は、命令型プログラミングモデルのメッセージリスナーコンテナーとして機能します。Redis ストリームからレコードを消費し、そこに挿入される StreamListener (Javadoc) インスタンスを駆動するために使用されます。

  • StreamReceiver (Javadoc) は、メッセージリスナーのリアクティブバリアントを提供します。これは、潜在的に無限のストリームとして Redis ストリームからのメッセージを消費し、Flux を介してストリームメッセージを発行するために使用されます。

StreamMessageListenerContainer と StreamReceiver は、メッセージ受信のすべてのスレッド化と、処理のためのリスナーへのディスパッチを担当します。メッセージリスナーコンテナー / レシーバーは、MDP とメッセージングプロバイダー間の仲介者であり、メッセージを受信するための登録、リソースの取得と解放、例外変換などを処理します。これにより、アプリケーション開発者は、メッセージの受信(およびメッセージへの応答)に関連する(おそらく複雑な)ビジネスロジックを記述し、定型的な Redis インフラストラクチャの関心事をフレームワークに委譲できます。

どちらのコンテナーでもランタイム構成の変更が可能であるため、アプリケーションの実行中に再起動せずにサブスクリプションを追加または削除できます。さらに、コンテナーは、必要な場合にのみ RedisConnection を使用する、遅延サブスクリプションアプローチを使用します。すべてのリスナーがサブスクライブ解除されると、自動的にクリーンアップが実行され、スレッドが解放されます。

命令型 StreamMessageListenerContainer

EJB の世界のメッセージ駆動型 Bean (MDB) と同様に、ストリーム駆動型 POJO (SDP) はストリームメッセージの受信側として機能します。SDP の唯一の制限は、StreamListener (Javadoc) インターフェースを実装する必要があることです。また、POJO が複数のスレッドでメッセージを受信する場合は、実装がスレッドセーフであることを確認することが重要であることにも注意してください。

class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

	@Override
	public void onMessage(MapRecord<String, String, String> message) {

		System.out.println("MessageId: " + message.getId());
		System.out.println("Stream: " + message.getStream());
		System.out.println("Body: " + message.getValue());
	}
}

StreamListener は関数インターフェースを表すため、Lambda 形式を使用して実装を書き直すことができます。

message -> {

    System.out.println("MessageId: " + message.getId());
    System.out.println("Stream: " + message.getStream());
    System.out.println("Body: " + message.getValue());
};

StreamListener を実装したら、メッセージリスナーコンテナーを作成してサブスクリプションを登録します。

RedisConnectionFactory connectionFactory = …
StreamListener<String, MapRecord<String, String, String>> streamListener = …

StreamMessageListenerContainerOptions<String, MapRecord<String, String, String>> containerOptions = StreamMessageListenerContainerOptions
			.builder().pollTimeout(Duration.ofMillis(100)).build();

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(connectionFactory,
				containerOptions);

Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), streamListener);

各実装でサポートされている機能の詳細については、さまざまなメッセージリスナーコンテナーの Javadoc を参照してください。

リアクティブ StreamReceiver

ストリーミングデータソースのリアクティブな消費は、通常、イベントまたはメッセージの Flux を介して発生します。リアクティブレシーバーの実装には、StreamReceiver とそのオーバーロードされた receive(…) メッセージが付属しています。リアクティブアプローチは、ドライバーによって提供されるスレッドリソースを活用しているため、StreamMessageListenerContainer と比較して、スレッドなどのインフラストラクチャリソースが少なくて済みます。受信ストリームは、StreamMessage の需要主導型パブリッシャーです。

Flux<MapRecord<String, String, String>> messages = …

return messages.doOnNext(it -> {
    System.out.println("MessageId: " + message.getId());
    System.out.println("Stream: " + message.getStream());
    System.out.println("Body: " + message.getValue());
});

次に、StreamReceiver を作成し、サブスクリプションを登録してストリームメッセージを消費する必要があります。

ReactiveRedisConnectionFactory connectionFactory = …

StreamReceiverOptions<String, MapRecord<String, String, String>> options = StreamReceiverOptions.builder().pollTimeout(Duration.ofMillis(100))
				.build();
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);

Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-stream"));

各実装でサポートされている機能の詳細については、さまざまなメッセージリスナーコンテナーの Javadoc を参照してください。

需要主導型の消費は、バックプレッシャーシグナルを使用してポーリングをアクティブ化および非アクティブ化します。StreamReceiver サブスクリプションは、サブスクライバーがさらに要求を通知するまで要求が満たされると、ポーリングを一時停止します。ReadOffset 戦略によっては、これによりメッセージがスキップされる可能性があります。

Acknowledge 戦略

Consumer Group を介してメッセージを読み取ると、サーバーは特定のメッセージが配信されたことを記憶し、それを保留中のエントリリスト(PEL)に追加します。配信されましたがまだ確認されていないメッセージのリスト。
以下のスニペットに示すように、保留中のエントリリストからメッセージを削除するには、StreamOperations.acknowledge を介してメッセージを確認する必要があります。

StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = ...

container.receive(Consumer.from("my-group", "my-consumer"), (1)
	StreamOffset.create("my-stream", ReadOffset.lastConsumed()),
    msg -> {

	    // ...
	    redisTemplate.opsForStream().acknowledge("my-group", msg); (2)
    });
1 グループ my-group から my-consumer として読み取ります。受信したメッセージは確認されません。
2 処理後にメッセージを確認しました。
受信時にメッセージを自動確認するには、receive の代わりに receiveAutoAck を使用します。

ReadOffset 戦略

ストリーム読み取り操作は、読み取りオフセット指定を受け入れて、で指定されたオフセットからのメッセージを消費します。ReadOffset は、読み取りオフセット仕様を表します。Redis は、ストリームをスタンドアロンで消費するか、コンシューマーグループ内で消費するかに応じて、オフセットの 3 つのバリアントをサポートします。

  • ReadOffset.latest() –最新のメッセージを読みます。

  • ReadOffset.from(…) –特定のメッセージ ID の後に読み取ります。

  • ReadOffset.lastConsumed() –最後に消費されたメッセージ ID の後に読み取ります(コンシューマーグループのみ)。

メッセージコンテナーベースの消費のコンテキストでは、メッセージを消費するときに読み取りオフセットを進める(またはインクリメントする)必要があります。前進は、リクエストされた ReadOffset と消費モード(コンシューマーグループあり / なし)によって異なります。次のマトリックスは、コンテナーが ReadOffset をどのように進めるかを説明しています。

表 1: ReadOffset 前進
オフセットの読み取り スタンドアロン コンシューマーグループ

最新

最新のメッセージを読む

最新のメッセージを読む

特定のメッセージ ID

最後に見たメッセージを次の MessageId として使用する

最後に見たメッセージを次の MessageId として使用する

最後に消費された

最後に見たメッセージを次の MessageId として使用する

コンシューマーグループごとに最後に消費されたメッセージ

特定のメッセージ ID と最後に消費されたメッセージからの読み取りは、ストリームに追加されたすべてのメッセージの消費を保証する安全な操作と見なすことができます。最新のメッセージを読み取りに使用すると、ポーリング操作がデッドタイムの状態にあるときにストリームに追加されたメッセージをスキップできます。ポーリングにより、個々のポーリングコマンド間にメッセージが到着するデッドタイムが発生します。ストリームの消費は、線形の連続した読み取りではなく、繰り返しの XREAD 呼び出しに分割されます。

直列化

ストリームに送信されるすべてのレコードは、バイナリ形式に直列化する必要があります。ストリームがハッシュデータ構造に近いため、ストリームキー、フィールド名、値は、RedisTemplate で構成された対応するシリアライザーを使用します。

表 2: ストリームの直列化
ストリームプロパティ シリアライザー 説明

key

keySerializer

Record#getStream() に使用

field

hashKeySerializer

ペイロードの各マップキーに使用されます

value

hashValueSerializer

ペイロードの各マップ値に使用されます

使用中の RedisSerializer を確認し、シリアライザーを使用しない場合は、それらの値がすでにバイナリであることを確認する必要があることに注意してください。

オブジェクトマッピング

シンプル値

StreamOperations を使用すると、ObjectRecord を介して単純な値をストリームに直接追加できます。これらの値を Map 構造体に配置する必要はありません。次に、値はペイロードフィールドに割り当てられ、値を読み戻すときに抽出できます。

ObjectRecord<String, String> record = StreamRecords.newRecord()
    .in("my-stream")
    .ofObject("my-value");

redisTemplate()
    .opsForStream()
    .add(record); (1)

List<ObjectRecord<String, String>> records = redisTemplate()
    .opsForStream()
    .read(String.class, StreamOffset.fromStart("my-stream"));
1XADD my-stream * "_class" "java.lang.String" "_raw" "my-value"

ObjectRecord は、他のすべてのレコードとまったく同じ直列化プロセスを通過するため、MapRecord を返す型なし読み取り操作を使用してレコードを取得することもできます。

複素数値

ストリームに複雑な値を追加するには、次の 3 つの方法があります。

  • e を使用して単純な値に変換します。g. 文字列の JSON 表現。

  • 適切な RedisSerializer を使用して値を直列化します。

  • HashMapper を使用して、値を直列化に適した Map に変換します。

最初のバリアントは最も単純なものですが、ストリーム構造によって提供されるフィールド値機能を無視します。それでも、ストリーム内の値は他のコンシューマーが読み取ることができます。2 番目のオプションには、最初のオプションと同じ利点がありますが、すべてのコンシューマーがまったく同じ直列化メカニズムを実装する必要があるため、非常に具体的なコンシューマーの制限につながる可能性があります。HashMapper アプローチは、スチームハッシュ構造を利用するが、ソースをフラット化する、もう少し複雑なアプローチです。さらに他のコンシューマーは、適切なシリアライザーの組み合わせが選択されている限り、レコードを読み取ることができます。

HashMappers は、ペイロードを特定の型の Map に変換します。ハッシュを(逆)直列化できるハッシュキーおよびハッシュ値シリアライザーを必ず使用してください。
ObjectRecord<String, User> record = StreamRecords.newRecord()
    .in("user-logon")
    .ofObject(new User("night", "angel"));

redisTemplate()
    .opsForStream()
    .add(record); (1)

List<ObjectRecord<String, User>> records = redisTemplate()
    .opsForStream()
    .read(User.class, StreamOffset.fromStart("user-logon"));
1XADD ユーザーログオン * "_ class" "com.example.User" "firstname" "night" "lastname" "angel"

StreamOperations はデフォルトで ObjectHashMapper を使用します。StreamOperations を入手する際には、要件に適した HashMapper を提供できます。

redisTemplate()
    .opsForStream(new Jackson2HashMapper(true))
    .add(record); (1)
1XADD ユーザーログオン *「名」「夜」 "@class" "com.example.User" 「姓」「天使」

StreamMessageListenerContainer は、ドメイン型で使用されている @TypeAlias を認識しない場合があります。これは、それらが MappingContext を介して解決される必要があるためです。RedisMappingContext は必ず initialEntitySet で初期化してください。

@Bean
RedisMappingContext redisMappingContext() {
    RedisMappingContext ctx = new RedisMappingContext();
    ctx.setInitialEntitySet(Collections.singleton(Person.class));
    return ctx;
}

@Bean
RedisConverter redisConverter(RedisMappingContext mappingContext) {
    return new MappingRedisConverter(mappingContext);
}

@Bean
ObjectHashMapper hashMapper(RedisConverter converter) {
    return new ObjectHashMapper(converter);
}

@Bean
StreamMessageListenerContainer streamMessageListenerContainer(RedisConnectionFactory connectionFactory, ObjectHashMapper hashMapper) {
    StreamMessageListenerContainerOptions<String, ObjectRecord<String, Object>> options = StreamMessageListenerContainerOptions.builder()
            .objectMapper(hashMapper)
            .build();

    return StreamMessageListenerContainer.create(connectionFactory, options);
}