インターフェース StreamReceiver<K,V extends Record<K,?>>
- 型パラメーター:
K
- ストリームキーとストリームフィールド型。V
- ストリーム値型。
public interface StreamReceiver<K,V extends Record<K,?>>
リアクティブインフラストラクチャを使用して Redis ストリームを消費するレシーバー。
スタンドアロン
StreamReceiver
は作成されると、Redis ストリームをサブスクライブし、受信した records
を消費できます。Record
が無限大の Flux
の場合を考えてみましょう。Subscription
をキャンセルすると、バックグラウンドポーリングは最終的に終了します。レコードは、様々な直列化戦略をサポートするために key and value serializers
を使用して変換されます。
StreamReceiver
は、ストリーム消費の 3 つのモードをサポートしています。
- スタンドアロン
- 外部
acknowledge
でConsumer
を使用する - 自動確認機能付きの
Consumer
の使用
ReadOffset
に応じて、StreamReceiver
は個別の戦略を適用して次の ReadOffset
を取得します。スタンドアロン
- 特定のレコード ID を使用した
ReadOffset.from(String)
オフセット: 指定されたオフセットから開始し、最後に表示されたrecord Id
を使用します。 ReadOffset.lastConsumed()
最後に消費された: 最新のオフセット($
)から開始し、最後に表示されたrecord Id
を使用します。ReadOffset.latest()
最後に消費された: 最新のオフセット($
)から開始し、後続の読み取りに最新のオフセット($
)を使用します。
Consumer
の使用 - 特定のレコード ID を使用した
ReadOffset.from(String)
オフセット: 指定されたオフセットから開始し、最後に表示されたrecord Id
を使用します。 ReadOffset.lastConsumed()
最後に消費された: コンシューマーによって最後に消費されたレコード(>
)から開始し、コンシューマーによって最後に消費されたレコード(>
)を後続の読み取りに使用します。ReadOffset.latest()
最後に消費された: 最新のオフセット($
)から開始し、後続の読み取りに最新のオフセット($
)を使用します。
ReadOffset.latest()
を使用すると、ポーリングが中断されている間にレコードが到着する可能性があるため、レコードがドロップされる可能性があります。レコードが失われる可能性を最小限に抑えるには、オフセットとして recordId を使用するか、ReadOffset.lastConsumed()
を使用します。StreamReceiver
は、デフォルトで、ストリームの読み取りおよび逆直列化中にエラーをターミナルエラーシグナルとして伝播します。resume function
を構成すると、レコードをドロップするか、エラーを伝播してサブスクリプションを終了することにより、条件付きで再開できます。
StreamReceiver
の使用方法については、次のサンプルコードを参照してください。
ReactiveRedisConnectionFactory factory = …; StreamReceiver<String, String, String> receiver = StreamReceiver.create(factory); Flux<MapRecord<String, String, String>> records = receiver.receive(StreamOffset.fromStart("my-stream")); recordFlux.doOnNext(record -> …);
- 導入:
- 2.2
- 作成者:
- Mark Paluch, Eddie McDaniel
- 関連事項:
ネストされたクラスのサマリー
ネストされたクラス修飾子と型インターフェース説明static class
StreamReceiver.StreamReceiverOptions<K,
V extends Record<K, ?>> StreamReceiver
のオプション。static class
StreamReceiver.StreamReceiverOptionsBuilder<K,
V extends Record<K, ?>> メソッドのサマリー
修飾子と型メソッド説明create
(ReactiveRedisConnectionFactory connectionFactory) static <K,
V extends Record<K, ?>>
StreamReceiver<K,V> create
(ReactiveRedisConnectionFactory connectionFactory, StreamReceiver.StreamReceiverOptions<K, V> options) ReactiveRedisConnectionFactory
およびStreamReceiver.StreamReceiverOptions
を指定して、新しいStreamReceiver
を作成します。receive
(Consumer consumer, StreamOffset<K> streamOffset) receive
(StreamOffset<K> streamOffset) receiveAutoAck
(Consumer consumer, StreamOffset<K> streamOffset)
メソッドの詳細
create
static StreamReceiver<StringSE,MapRecord<StringSE, createStringSE, StringSE>> (ReactiveRedisConnectionFactory connectionFactory) - パラメーター:
connectionFactory
- null であってはなりません。- 戻り値:
- 新しい
StreamReceiver
create
static <K,V extends Record<K, StreamReceiver<K,?>> V> create(ReactiveRedisConnectionFactory connectionFactory, StreamReceiver.StreamReceiverOptions<K, V> options) ReactiveRedisConnectionFactory
およびStreamReceiver.StreamReceiverOptions
を指定して、新しいStreamReceiver
を作成します。- パラメーター:
connectionFactory
- null であってはなりません。options
- null であってはなりません。- 戻り値:
- 新しい
StreamReceiver
receive
stream
からrecords
を消費する Redis ストリームコンシューマーを起動します。Flux でリクエストが行われると、レコードは Redis から消費され、返されたFlux
に配信されます。返されたFlux
が終了すると、レシーバーは閉じられます。すべてのレコードは、
ReactiveStreamCommands.xAck(ByteBuffer, String, String...)
を使用して確認する必要があります- パラメーター:
streamOffset
- そのオフセットに沿ったストリーム。- 戻り値:
- 受信
Record
の Flux。 - 関連事項:
receiveAutoAck
stream
からrecords
を消費する Redis ストリームコンシューマーを起動します。Flux でリクエストが行われると、レコードは Redis から消費され、返されたFlux
に配信されます。返されたFlux
が終了すると、レシーバーは閉じられます。すべてのレコードは、受信時に確認応答されます。
- パラメーター:
consumer
- コンシューマーグループは、null であってはなりません。streamOffset
- そのオフセットに沿ったストリーム。- 戻り値:
- 受信
Record
の Flux。 - 関連事項:
receive
stream
からrecords
を消費する Redis ストリームコンシューマーを起動します。Flux でリクエストが行われると、レコードは Redis から消費され、返されたFlux
に配信されます。返されたFlux
が終了すると、レシーバーは閉じられます。すべてのレコードは、処理後に
ReactiveStreamOperations.acknowledge(Object, String, String...)
を使用して確認応答する必要があります。- パラメーター:
consumer
- コンシューマーグループは、null であってはなりません。streamOffset
- そのオフセットに沿ったストリーム。- 戻り値:
- 受信
Record
の Flux。 - 関連事項: