インターフェース StreamReceiver<K,V extends Record<K,?>>
- 型パラメーター:
K
- ストリームキーとストリームフィールド型。V
- ストリーム値型。
public interface StreamReceiver<K,V extends Record<K,?>>
リアクティブインフラストラクチャを使用して Redis ストリームを消費するレシーバー。
スタンドアロン
作成されると、StreamReceiver
は Redis ストリームにサブスクライブし、受信 records
を消費できます。Record
無限の Flux
を考えてみましょう。Subscription
をキャンセルすると、最終的にバックグラウンドポーリングが終了します。レコードは key and value serializers
を使用して変換され、さまざまなシリアライゼーション戦略をサポートします。
StreamReceiver
は、次の 3 つのストリーム消費モードをサポートしています。
- スタンドアロン
- 外部
acknowledge
でConsumer
を使用する - 自動確認機能付きの
Consumer
の使用
ReadOffset
に応じて、StreamReceiver
は個別の戦略を適用して次の ReadOffset
を取得します。スタンドアロン
- 特定のレコード ID を使用した
ReadOffset.from(String)
オフセット: 指定されたオフセットから開始し、最後に表示されたrecord Id
を使用します。 ReadOffset.lastConsumed()
最後に消費された: 最新のオフセット($
)から開始し、最後に表示されたrecord Id
を使用します。ReadOffset.latest()
最後に消費された: 最新のオフセット($
)から開始し、後続の読み取りに最新のオフセット($
)を使用します。
Consumer
の使用 - 特定のレコード ID を使用した
ReadOffset.from(String)
オフセット: 指定されたオフセットから開始し、最後に表示されたrecord Id
を使用します。 ReadOffset.lastConsumed()
最後に消費された: コンシューマーによって最後に消費されたレコード(>
)から開始し、コンシューマーによって最後に消費されたレコード(>
)を後続の読み取りに使用します。ReadOffset.latest()
最後に消費された: 最新のオフセット($
)から開始し、後続の読み取りに最新のオフセット($
)を使用します。
ReadOffset.latest()
を使用すると、ポーリングが中断されている間にレコードが到着する可能性があるため、レコードがドロップされる可能性があります。recordId をオフセットまたは ReadOffset.lastConsumed()
として使用して、レコードが失われる可能性を最小限に抑えます。StreamReceiver
は、デフォルトで、ストリームの読み取りおよび逆直列化中にエラーをターミナルエラー信号として伝播します。resume function
を構成すると、レコードをドロップするか、エラーを伝播してサブスクリプションを終了することにより、条件付きで再開できます。
StreamReceiver
の使用方法については、次のサンプルコードを参照してください。
ReactiveRedisConnectionFactory factory = …; StreamReceiver<String, String, String> receiver = StreamReceiver.create(factory); Flux<MapRecord<String, String, String>> records = receiver.receive(StreamOffset.fromStart("my-stream")); recordFlux.doOnNext(record -> …);
- 導入:
- 2.2
- 作成者:
- Mark Paluch, Eddie McDaniel
- 関連事項:
ネストされたクラスのサマリー
ネストされたクラス修飾子と型インターフェース説明static class
StreamReceiver.StreamReceiverOptions<K,
V extends Record<K, ?>> StreamReceiver
のオプション。static class
StreamReceiver.StreamReceiverOptionsBuilder<K,
V extends Record<K, ?>> メソッドのサマリー
修飾子と型メソッド説明create
(ReactiveRedisConnectionFactory connectionFactory) static <K,
V extends Record<K, ?>>
StreamReceiver<K,V> create
(ReactiveRedisConnectionFactory connectionFactory, StreamReceiver.StreamReceiverOptions<K, V> options) ReactiveRedisConnectionFactory
およびStreamReceiver.StreamReceiverOptions
を指定して、新しいStreamReceiver
を作成します。reactor.core.publisher.Flux<V>
receive
(Consumer consumer, StreamOffset<K> streamOffset) reactor.core.publisher.Flux<V>
receive
(StreamOffset<K> streamOffset) reactor.core.publisher.Flux<V>
receiveAutoAck
(Consumer consumer, StreamOffset<K> streamOffset)
メソッドの詳細
create
static StreamReceiver<StringSE,MapRecord<StringSE, createStringSE, StringSE>> (ReactiveRedisConnectionFactory connectionFactory) - パラメーター:
connectionFactory
- null であってはなりません。- 戻り値:
- 新しい
StreamReceiver
create
static <K,V extends Record<K, StreamReceiver<K,?>> V> create(ReactiveRedisConnectionFactory connectionFactory, StreamReceiver.StreamReceiverOptions<K, V> options) ReactiveRedisConnectionFactory
およびStreamReceiver.StreamReceiverOptions
を指定して、新しいStreamReceiver
を作成します。- パラメーター:
connectionFactory
- null であってはなりません。options
- null であってはなりません。- 戻り値:
- 新しい
StreamReceiver
receive
stream
からrecords
を消費する Redis ストリームコンシューマーを開始します。レコードは Redis から消費され、Flux でリクエストが行われると、返されたFlux
で配信されます。返されたFlux
が終了すると、レシーバーは閉じられます。すべてのレコードは、
ReactiveStreamCommands.xAck(ByteBuffer, String, String...)
を使用して確認する必要があります- パラメーター:
streamOffset
- そのオフセットに沿ったストリーム。- 戻り値:
- 受信
Record
のフラックス。 - 関連事項:
receiveAutoAck
stream
からrecords
を消費する Redis ストリームコンシューマーを開始します。レコードは Redis から消費され、Flux でリクエストが行われると、返されたFlux
で配信されます。返されたFlux
が終了すると、レシーバーは閉じられます。すべてのレコードは、受信時に確認応答されます。
- パラメーター:
consumer
- コンシューマーグループは、null であってはなりません。streamOffset
- そのオフセットに沿ったストリーム。- 戻り値:
- 受信
Record
のフラックス。 - 関連事項:
receive
stream
からrecords
を消費する Redis ストリームコンシューマーを開始します。レコードは Redis から消費され、Flux でリクエストが行われると、返されたFlux
で配信されます。返されたFlux
が終了すると、レシーバーは閉じられます。すべてのレコードは、処理後に
ReactiveStreamOperations.acknowledge(Object, String, String...)
を使用して確認応答する必要があります。- パラメーター:
consumer
- コンシューマーグループは、null であってはなりません。streamOffset
- そのオフセットに沿ったストリーム。- 戻り値:
- 受信
Record
のフラックス。 - 関連事項: