インターフェース StreamReceiver<K,V extends Record<K,?>>

型パラメーター:
K - ストリームキーとストリームフィールド型。
V - ストリーム値型。

public interface StreamReceiver<K,V extends Record<K,?>>
リアクティブインフラストラクチャを使用して Redis ストリームを消費するレシーバー。

作成されると、StreamReceiver は Redis ストリームにサブスクライブし、受信 records を消費できます。Record 無限の Flux を考えてみましょう。Subscription をキャンセルすると、最終的にバックグラウンドポーリングが終了します。レコードは key and value serializers を使用して変換され、さまざまなシリアライゼーション戦略をサポートします。
StreamReceiver は、次の 3 つのストリーム消費モードをサポートしています。

ストリームからの読み取りには、ポーリングと、ストリームオフセットを進めるための戦略が必要です。最初の ReadOffset に応じて、StreamReceiver は個別の戦略を適用して次の ReadOffset を取得します。
スタンドアロン
  • 特定のレコード ID を使用した ReadOffset.from(String) オフセット: 指定されたオフセットから開始し、最後に表示された record Id を使用します。
  • ReadOffset.lastConsumed() 最後に消費された: 最新のオフセット($)から開始し、最後に表示された record Id を使用します。
  • ReadOffset.latest() 最後に消費された: 最新のオフセット($)から開始し、後続の読み取りに最新のオフセット($)を使用します。

Consumer の使用
  • 特定のレコード ID を使用した ReadOffset.from(String) オフセット: 指定されたオフセットから開始し、最後に表示された record Id を使用します。
  • ReadOffset.lastConsumed() 最後に消費された: コンシューマーによって最後に消費されたレコード(>)から開始し、コンシューマーによって最後に消費されたレコード(>)を後続の読み取りに使用します。
  • ReadOffset.latest() 最後に消費された: 最新のオフセット($)から開始し、後続の読み取りに最新のオフセット($)を使用します。
メモ: ReadOffset.latest() を使用すると、ポーリングが中断されている間にレコードが到着する可能性があるため、レコードがドロップされる可能性があります。recordId をオフセットまたは ReadOffset.lastConsumed() として使用して、レコードが失われる可能性を最小限に抑えます。

StreamReceiver は、デフォルトで、ストリームの読み取りおよび逆直列化中にエラーをターミナルエラー信号として伝播します。resume function を構成すると、レコードをドロップするか、エラーを伝播してサブスクリプションを終了することにより、条件付きで再開できます。

StreamReceiver の使用方法については、次のサンプルコードを参照してください。

 ReactiveRedisConnectionFactory factory = …;

 StreamReceiver<String, String, String> receiver = StreamReceiver.create(factory);
 Flux<MapRecord<String, String, String>> records = receiver.receive(StreamOffset.fromStart("my-stream"));

 recordFlux.doOnNext(record -> …);
 
導入:
2.2
作成者:
Mark Paluch, Eddie McDaniel
関連事項: