インターフェース StreamReceiver<K,V extends Record<K,?>>
- 型パラメーター:
K- ストリームキーとストリームフィールド型。V- ストリーム値型。
public interface StreamReceiver<K,V extends Record<K,?>>
リアクティブインフラストラクチャを使用して Redis ストリームを消費するレシーバー。
スタンドアロン
StreamReceiver は作成されると、Redis ストリームをサブスクライブし、受信した records を消費できます。Record が無限大の Flux の場合を考えてみましょう。Subscription をキャンセルすると、バックグラウンドポーリングは最終的に終了します。レコードは、様々な直列化戦略をサポートするために key and value serializers を使用して変換されます。
StreamReceiver は、ストリーム消費の 3 つのモードをサポートしています。
- スタンドアロン
- 外部
acknowledgeでConsumerを使用する - 自動確認機能付きの
Consumerの使用
ReadOffset に応じて、StreamReceiver は個別の戦略を適用して次の ReadOffset を取得します。スタンドアロン
- 特定のレコード ID を使用した
ReadOffset.from(String)オフセット: 指定されたオフセットから開始し、最後に表示されたrecord Idを使用します。 ReadOffset.lastConsumed()最後に消費された: 最新のオフセット($)から開始し、最後に表示されたrecord Idを使用します。ReadOffset.latest()最後に消費された: 最新のオフセット($)から開始し、後続の読み取りに最新のオフセット($)を使用します。
Consumer の使用 - 特定のレコード ID を使用した
ReadOffset.from(String)オフセット: 指定されたオフセットから開始し、最後に表示されたrecord Idを使用します。 ReadOffset.lastConsumed()最後に消費された: コンシューマーによって最後に消費されたレコード(>)から開始し、コンシューマーによって最後に消費されたレコード(>)を後続の読み取りに使用します。ReadOffset.latest()最後に消費された: 最新のオフセット($)から開始し、後続の読み取りに最新のオフセット($)を使用します。
ReadOffset.latest() を使用すると、ポーリングが中断されている間にレコードが到着する可能性があるため、レコードがドロップされる可能性があります。レコードが失われる可能性を最小限に抑えるには、オフセットとして recordId を使用するか、ReadOffset.lastConsumed() を使用します。StreamReceiver は、デフォルトで、ストリームの読み取りおよび逆直列化中にエラーをターミナルエラーシグナルとして伝播します。resume function を構成すると、レコードをドロップするか、エラーを伝播してサブスクリプションを終了することにより、条件付きで再開できます。
StreamReceiver の使用方法については、次のサンプルコードを参照してください。
ReactiveRedisConnectionFactory factory = …;
StreamReceiver<String, String, String> receiver = StreamReceiver.create(factory);
Flux<MapRecord<String, String, String>> records = receiver.receive(StreamOffset.fromStart("my-stream"));
recordFlux.doOnNext(record -> …);
- 導入:
- 2.2
- 作成者:
- Mark Paluch, Eddie McDaniel
- 関連事項:
ネストされたクラスの要約
ネストされたクラス修飾子と型インターフェース説明static classStreamReceiver.StreamReceiverOptions<K,V extends Record<K, ?>> StreamReceiverのオプション。static classStreamReceiver.StreamReceiverOptionsBuilder<K,V extends Record<K, ?>> メソッドのサマリー
修飾子と型メソッド説明create(ReactiveRedisConnectionFactory connectionFactory) static <K,V extends Record<K, ?>>
StreamReceiver<K,V> create(ReactiveRedisConnectionFactory connectionFactory, StreamReceiver.StreamReceiverOptions<K, V> options) ReactiveRedisConnectionFactoryおよびStreamReceiver.StreamReceiverOptionsを指定して、新しいStreamReceiverを作成します。receive(Consumer consumer, StreamOffset<K> streamOffset) receive(StreamOffset<K> streamOffset) receiveAutoAck(Consumer consumer, StreamOffset<K> streamOffset)
メソッドの詳細
create
static StreamReceiver<StringSE,MapRecord<StringSE, createStringSE, StringSE>> (ReactiveRedisConnectionFactory connectionFactory) - パラメーター:
connectionFactory- null であってはなりません。- 戻り値:
- 新しい
StreamReceiver
create
static <K,V extends Record<K, StreamReceiver<K,?>> V> create(ReactiveRedisConnectionFactory connectionFactory, StreamReceiver.StreamReceiverOptions<K, V> options) ReactiveRedisConnectionFactoryおよびStreamReceiver.StreamReceiverOptionsを指定して、新しいStreamReceiverを作成します。- パラメーター:
connectionFactory- null であってはなりません。options- null であってはなりません。- 戻り値:
- 新しい
StreamReceiver
receive
streamからrecordsを消費する Redis ストリームコンシューマーを起動します。Flux でリクエストが行われると、レコードは Redis から消費され、返されたFluxに配信されます。返されたFluxが終了すると、レシーバーは閉じられます。すべてのレコードは、
ReactiveStreamCommands.xAck(ByteBuffer, String, String...)を使用して確認する必要があります- パラメーター:
streamOffset- そのオフセットに沿ったストリーム。- 戻り値:
- 受信
Recordの Flux。 - 関連事項:
receiveAutoAck
streamからrecordsを消費する Redis ストリームコンシューマーを起動します。Flux でリクエストが行われると、レコードは Redis から消費され、返されたFluxに配信されます。返されたFluxが終了すると、レシーバーは閉じられます。すべてのレコードは、受信時に確認応答されます。
- パラメーター:
consumer- コンシューマーグループは、null であってはなりません。streamOffset- そのオフセットに沿ったストリーム。- 戻り値:
- 受信
Recordの Flux。 - 関連事項:
receive
streamからrecordsを消費する Redis ストリームコンシューマーを起動します。Flux でリクエストが行われると、レコードは Redis から消費され、返されたFluxに配信されます。返されたFluxが終了すると、レシーバーは閉じられます。すべてのレコードは、処理後に
ReactiveStreamOperations.acknowledge(Object, String, String...)を使用して確認応答する必要があります。- パラメーター:
consumer- コンシューマーグループは、null であってはなりません。streamOffset- そのオフセットに沿ったストリーム。- 戻り値:
- 受信
Recordの Flux。 - 関連事項: