インターフェース StreamMessageListenerContainer<K,V extends Record<K,?>>

型パラメーター:
K - ストリームキーとストリームフィールド型。
V - ストリーム値型。
すべてのスーパーインターフェース:
LifecyclePhasedSmartLifecycle

public interface StreamMessageListenerContainer<K,V extends Record<K,?>> extends SmartLifecycle
メッセージリスナーコンテナーを表すフレームワークによって使用される抽象化。外部で実装することを意図したものではありません。

作成されると、StreamMessageListenerContainer は Redis ストリームにサブスクライブし、受信 messages を消費できます。StreamMessageListenerContainer は、複数のストリーム読み取りリクエストを許可し、読み取りリクエストごとに Subscription ハンドルを返します。Subscription をキャンセルすると、最終的にバックグラウンドポーリングが終了します。メッセージは key and value serializers を使用して変換され、さまざまな直列化戦略をサポートします。
StreamMessageListenerContainer は、複数のストリーム消費モードをサポートしています。

ストリームからの読み取りには、ポーリングと、ストリームオフセットを進めるための戦略が必要です。最初の ReadOffset に応じて、StreamMessageListenerContainer は個別の戦略を適用して次の ReadOffset を取得します。
スタンドアロン
  • 特定のメッセージ ID を使用した ReadOffset.from(String) オフセット: 指定されたオフセットから開始し、最後に表示された message Id を使用します。
  • ReadOffset.lastConsumed() 最後に消費された: 最新のオフセット($)から開始し、最後に表示された message Id を使用します。
  • ReadOffset.latest() 最後に消費された: 最新のオフセット($)から開始し、後続の読み取りに最新のオフセット($)を使用します。

Consumer の使用
  • 特定のメッセージ ID を使用した ReadOffset.from(String) オフセット: 指定されたオフセットから開始し、最後に表示された message Id を使用します。
  • ReadOffset.lastConsumed() 最後に消費された: コンシューマーによって最後に消費されたメッセージ(>)から開始し、コンシューマーによって最後に消費されたメッセージ(>)を後続の読み取りに使用します。
  • ReadOffset.latest() 最後に消費された: 最新のオフセット($)から開始し、後続の読み取りに最新のオフセット($)を使用します。
メモ: ReadOffset.latest() を使用すると、ポーリングが一時停止されている間にメッセージが到着する可能性があるため、メッセージがドロップされる可能性があります。messagedId をオフセットまたは ReadOffset.lastConsumed() として使用して、メッセージが失われる可能性を最小限に抑えます。

StreamMessageListenerContainer では、別の ThreadSE で長時間実行されるポーリングタスクをフォークするために ExecutorSE が必要です。このスレッドは、ストリームメッセージをポーリングして listener callback を呼び出すイベントループとして使用されます。

StreamMessageListenerContainer タスクは、ストリーム読み取り中のエラーと listener notification を構成可能な ErrorHandler に伝播します。エラーはデフォルトで Subscription を停止します。StreamMessageListenerContainer.StreamReadRequest 用に PredicateSE を構成すると、条件付きサブスクリプションをキャンセルしたり、すべてのエラーを続行したりできます。

StreamMessageListenerContainer の使用方法については、次のサンプルコードを参照してください。

 RedisConnectionFactory factory = …;

 StreamMessageListenerContainer<String, MapRecord<String, String, String>> container = StreamMessageListenerContainer.create(factory);
 Subscription subscription = container.receive(StreamOffset.fromStart("my-stream"), message -> …);

 container.start();

 // later
 container.stop();
 
導入:
2.2
作成者:
Mark Paluch, Christoph Strobl, Christian Rest
関連事項: