インターフェース StreamMessageListenerContainer<K,V extends Record<K,?>>
- 型パラメーター:
K
- ストリームキーとストリームフィールド型。V
- ストリーム値型。
- すべてのスーパーインターフェース:
Lifecycle
、Phased
、SmartLifecycle
作成されると、StreamMessageListenerContainer
は Redis ストリームにサブスクライブし、受信 messages
を消費できます。StreamMessageListenerContainer
は、複数のストリーム読み取りリクエストを許可し、読み取りリクエストごとに Subscription
ハンドルを返します。Subscription
をキャンセルすると、最終的にバックグラウンドポーリングが終了します。メッセージは key and value serializers
を使用して変換され、さまざまな直列化戦略をサポートします。
StreamMessageListenerContainer
は、複数のストリーム消費モードをサポートしています。
- スタンドアロン
- 外部
StreamOperations.acknowledge(Object, String, String...)
確認応答でConsumer
を使用する } - 自動確認機能付きの
Consumer
の使用
ReadOffset
に応じて、StreamMessageListenerContainer
は個別の戦略を適用して次の ReadOffset
を取得します。スタンドアロン
- 特定のメッセージ ID を使用した
ReadOffset.from(String)
オフセット: 指定されたオフセットから開始し、最後に表示されたmessage Id
を使用します。 ReadOffset.lastConsumed()
最後に消費された: 最新のオフセット($
)から開始し、最後に表示されたmessage Id
を使用します。ReadOffset.latest()
最後に消費された: 最新のオフセット($
)から開始し、後続の読み取りに最新のオフセット($
)を使用します。
Consumer
の使用 - 特定のメッセージ ID を使用した
ReadOffset.from(String)
オフセット: 指定されたオフセットから開始し、最後に表示されたmessage Id
を使用します。 ReadOffset.lastConsumed()
最後に消費された: コンシューマーによって最後に消費されたメッセージ(>
)から開始し、コンシューマーによって最後に消費されたメッセージ(>
)を後続の読み取りに使用します。ReadOffset.latest()
最後に消費された: 最新のオフセット($
)から開始し、後続の読み取りに最新のオフセット($
)を使用します。
ReadOffset.latest()
を使用すると、ポーリングが一時停止されている間にメッセージが到着する可能性があるため、メッセージがドロップされる可能性があります。messagedId をオフセットまたは ReadOffset.lastConsumed()
として使用して、メッセージが失われる可能性を最小限に抑えます。StreamMessageListenerContainer
では、別の Thread
SE で長時間実行されるポーリングタスクをフォークするために Executor
SE が必要です。このスレッドは、ストリームメッセージをポーリングして listener callback
を呼び出すイベントループとして使用されます。
StreamMessageListenerContainer
タスクは、ストリーム読み取り中のエラーと listener notification
を構成可能な ErrorHandler
に伝播します。エラーはデフォルトで Subscription
を停止します。StreamMessageListenerContainer.StreamReadRequest
用に Predicate
SE を構成すると、条件付きサブスクリプションをキャンセルしたり、すべてのエラーを続行したりできます。
StreamMessageListenerContainer
の使用方法については、次のサンプルコードを参照してください。
RedisConnectionFactory factory = …; StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(factory); Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), message -> …); container.start(); // later container.stop();
- 導入:
- 2.2
- 作成者:
- Mark Paluch, Christoph Strobl, Christian Rest
- 関連事項:
StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
StreamListener
StreamMessageListenerContainer.StreamReadRequest
StreamMessageListenerContainer.ConsumerStreamReadRequest
StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder.executor(Executor)
ErrorHandler
StreamOperations
RedisConnectionFactory
StreamReceiver
ネストされたクラスのサマリー
ネストされたクラス修飾子と型インターフェース説明static class
Consumer
を使用して Redis ストリームを読み取るようにリクエストします。static class
static class
StreamMessageListenerContainer
のオプション。static class
StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder<K,
V extends Record<K, ?>> static class
Redis ストリームの読み取りをリクエストします。static class
フィールドサマリー
インターフェース org.springframework.context.SmartLifecycle から継承されたフィールド
DEFAULT_PHASE
メソッドのサマリー
修飾子と型メソッド説明create
(RedisConnectionFactory connectionFactory) static <K,
V extends Record<K, ?>>
StreamMessageListenerContainer<K,V> create
(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> options) default Subscription
receive
(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K, V> listener) Redis ストリームの新しいサブスクリプションを登録します。default Subscription
receive
(StreamOffset<K> streamOffset, StreamListener<K, V> listener) Redis ストリームの新しいサブスクリプションを登録します。default Subscription
receiveAutoAck
(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K, V> listener) Redis ストリームの新しいサブスクリプションを登録します。register
(StreamMessageListenerContainer.StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) Redis ストリームの新しいサブスクリプションを登録します。void
remove
(Subscription subscription) 特定のSubscription
をコンテナーから登録解除します。インターフェース org.springframework.context.SmartLifecycle から継承されたメソッド
getPhase, isAutoStartup, stop
メソッドの詳細
create
static StreamMessageListenerContainer<StringSE,MapRecord<StringSE, createStringSE, StringSE>> (RedisConnectionFactory connectionFactory) - パラメーター:
connectionFactory
- null であってはなりません。- 戻り値:
- 新しい
StreamMessageListenerContainer
create
static <K,V extends Record<K, StreamMessageListenerContainer<K,?>> V> create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> options) RedisConnectionFactory
およびStreamMessageListenerContainer.StreamMessageListenerContainerOptions
を指定して、新しいStreamMessageListenerContainer
を作成します。- パラメーター:
connectionFactory
- null であってはなりません。options
- null であってはなりません。- 戻り値:
- 新しい
StreamMessageListenerContainer
receive
Redis ストリームの新しいサブスクリプションを登録します。is already running
の場合、Subscription
が追加されてすぐに実行されます。それ以外の場合は、コンテナーが実際にstarted
になると、スケジュールされて開始されます。Record
取得中のエラーは、基礎となるタスクのcancellation
につながります。Lifecycle.stop()
では、コンテナー自体をシャットダウンする前に、すべてのsubscriptions
がキャンセルされます。- パラメーター:
streamOffset
- そのオフセットに沿ったストリーム。listener
- null であってはなりません。- 戻り値:
- サブスクリプションハンドル。
- 関連事項:
receive
default Subscription receive(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K, V> listener) Redis ストリームの新しいサブスクリプションを登録します。is already running
の場合、Subscription
が追加されてすぐに実行されます。それ以外の場合は、コンテナーが実際にstarted
になると、スケジュールされて開始されます。すべてのメッセージは、処理後に
StreamOperations.acknowledge(Object, String, String...)
を使用して確認応答する必要があります。Record
取得中のエラーは、基礎となるタスクのcancellation
につながります。Lifecycle.stop()
では、コンテナー自体をシャットダウンする前に、すべてのsubscriptions
がキャンセルされます。- パラメーター:
consumer
- コンシューマーグループは、null であってはなりません。streamOffset
- そのオフセットに沿ったストリーム。listener
- null であってはなりません。- 戻り値:
- サブスクリプションハンドル。
- 関連事項:
receiveAutoAck
default Subscription receiveAutoAck(Consumer consumer, StreamOffset<K> streamOffset, StreamListener<K, V> listener) Redis ストリームの新しいサブスクリプションを登録します。is already running
の場合、Subscription
が追加されてすぐに実行されます。それ以外の場合は、コンテナーが実際にstarted
になると、スケジュールされて開始されます。すべてのメッセージは、受信時に確認されます。
Record
取得中のエラーは、基礎となるタスクのcancellation
につながります。Lifecycle.stop()
では、コンテナー自体をシャットダウンする前に、すべてのsubscriptions
がキャンセルされます。- パラメーター:
consumer
- コンシューマーグループは、null であってはなりません。streamOffset
- そのオフセットに沿ったストリーム。listener
- null であってはなりません。- 戻り値:
- サブスクリプションハンドル。
- 関連事項:
register
Subscription register(StreamMessageListenerContainer.StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) Redis ストリームの新しいサブスクリプションを登録します。is already running
の場合、Subscription
が追加されてすぐに実行されます。それ以外の場合は、コンテナーが実際にstarted
になると、スケジュールされて開始されます。Record
中のエラーは、基になるタスクをキャンセルするかどうかのテストcancellation predicate
に対してテストされます。Lifecycle.stop()
では、コンテナー自体をシャットダウンする前に、すべてのsubscriptions
がキャンセルされます。Record
取得中のエラーは、指定されたStreamMessageListenerContainer.StreamReadRequest.getErrorHandler()
に委譲されます。- パラメーター:
streamRequest
- null であってはなりません。listener
- null であってはなりません。- 戻り値:
- サブスクリプションハンドル。
- 関連事項:
remove
特定のSubscription
をコンテナーから登録解除します。これにより、潜在的なstop
/start
シナリオでSubscription
が再起動されるのを防ぎます。
active
subcription
は、削除前はcancelled
です。- パラメーター:
subscription
- null であってはなりません。