クラス FluxAggregatorMessageHandler

実装されたすべてのインターフェース:
org.reactivestreams.Subscriber<Message<?>>AwareBeanFactoryAwareBeanNameAwareDisposableBeanInitializingBeanApplicationContextAwareLifecycleOrderedExpressionCapableOrderableMessageProducerHeaderPropagationAwareIntegrationPatternNamedComponentIntegrationManagementManageableLifecycleTrackableComponentMessageHandlerreactor.core.CoreSubscriber<Message<?>>

public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandler implements ManageableLifecycle
Reactor の Flux.groupBy(java.util.function.Function<? super T, ? extends K>) および Flux.window(int) 演算子に基づく集約ロジックの AbstractMessageProducingHandler 実装。

受信メッセージは、コンストラクターで初期化された Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) によって提供される FluxSink に送信されます。

グループの結果のウィンドウは、ダウンストリーム消費のために Message にラップされます。

AbstractMessageProducingHandler.getOutputChannel()ReactiveStreamsSubscribableChannel インスタンスでない場合は、集約 Flux 全体のサブスクリプションが start() メソッドで実行されます。

導入:
5.2
作成者:
Artem Bilan
  • コンストラクターの詳細

    • FluxAggregatorMessageHandler

      public FluxAggregatorMessageHandler()
      Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) でインスタンスを作成し、それに Flux.groupBy(java.util.function.Function<? super T, ? extends K>) および Flux.window(int) 変換を適用します。
  • メソッドの詳細

    • setCorrelationStrategy

      public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
      受信メッセージからグループキーを決定するように CorrelationStrategy を構成します。デフォルトでは、IntegrationMessageHeaderAccessor.CORRELATION_ID ヘッダー値に対して HeaderAttributeCorrelationStrategy が使用されます。
      パラメーター:
      correlationStrategy - 使用する CorrelationStrategy
    • setCombineFunction

      public void setCombineFunction(FunctionSE<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Mono<Message<?>>> combineFunction)
      放出する Flux ウィンドウに適用する変換 FunctionSE を構成します。ウィンドウの受信 Flux の組み合わせ結果として、Message を値として持つ Mono 結果が必要です。デフォルトでは、ウィンドウの Flux は、ウィンドウの最初のメッセージからコピーされたヘッダーを持つメッセージに完全にラップされます。ペイロード内のこのような Flux は、サブスクライブしてダウンストリームで使用する必要があります。
      パラメーター:
      combineFunction - 結果ウィンドウの変換に使用する FunctionSE
    • setBoundaryTrigger

      public void setBoundaryTrigger(PredicateSE<Message<?>> boundaryTrigger)
      メッセージの PredicateSE を構成して、Flux.windowUntil(java.util.function.Predicate<T>) オペレーターのウィンドウ境界を決定します。他のウィンドウ構成オプションよりも優先されます。
      パラメーター:
      boundaryTrigger - ウィンドウ境界に使用する PredicateSE
      関連事項:
      • Flux.windowUntil(Predicate)
    • setWindowSize

      public void setWindowSize(int windowSize)
      閉じるウィンドウのサイズを指定します。setWindowTimespan(Duration) と組み合わせることができます。
      パラメーター:
      windowSize - 使用するウィンドウのサイズ。
      関連事項:
      • Flux.window(int)
      • Flux.windowTimeout(int, Duration)
    • setWindowSizeFunction

      public void setWindowSizeFunction(FunctionSE<Message<?>,IntegerSE> windowSizeFunction)
      FunctionSE を指定して、グループ内の最初のメッセージに対して閉じるウィンドウのサイズを決定します。関数の結果は setWindowTimespan(Duration) と組み合わせることができます。デフォルトでは、IntegrationMessageHeaderAccessor.SEQUENCE_SIZE ヘッダーが参照されます。
      パラメーター:
      windowSizeFunction - グループ内の最初のメッセージに対してウィンドウサイズを決定するために使用する FunctionSE
      関連事項:
      • Flux.window(int)
      • Flux.windowTimeout(int, Duration)
    • setWindowTimespan

      public void setWindowTimespan(DurationSE windowTimespan)
      ウィンドウを定期的に閉じるように DurationSE を構成します。setWindowSize(int) または setWindowSizeFunction(Function) と組み合わせることができます。
      パラメーター:
      windowTimespan - ウィンドウを定期的に閉じるために使用する DurationSE
      関連事項:
      • Flux.window(Duration)
      • Flux.windowTimeout(int, Duration)
    • setWindowConfigurer

      public void setWindowConfigurer(FunctionSE<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer)
      単純なオプションではカバーされない任意の Flux.window(int) オプションのグループ化 Flux に変換を適用するように FunctionSE を構成します。他のウィンドウ構成オプションよりも優先されます。
      パラメーター:
      windowConfigurer - FunctionSE を使用して、カスタムウィンドウ変換を適用します。
    • getComponentType

      public StringSE getComponentType()
      クラスからコピーされた説明: IntegrationObjectSupport
      サブクラスはこのメソッドを実装して、コンポーネント型情報を提供できます。
      次で指定:
      インターフェース NamedComponentgetComponentType 
      オーバーライド:
      クラス MessageHandlerSupportgetComponentType 
    • getIntegrationPatternType

      public IntegrationPatternType getIntegrationPatternType()
      インターフェースからコピーされた説明: IntegrationPattern
      このコンポーネントが実装するパターン型を返します。
      次で指定:
      インターフェース IntegrationPatterngetIntegrationPatternType 
      オーバーライド:
      クラス MessageHandlerSupportgetIntegrationPatternType 
      戻り値:
      このコンポーネントが実装する IntegrationPatternType
    • start

      public void start()
      次で指定:
      インターフェース Lifecyclestart 
      次で指定:
      インターフェース ManageableLifecyclestart 
    • stop

      public void stop()
      次で指定:
      インターフェース Lifecyclestop 
      次で指定:
      インターフェース ManageableLifecyclestop 
    • isRunning

      public boolean isRunning()
      次で指定:
      インターフェース LifecycleisRunning 
      次で指定:
      インターフェース ManageableLifecycleisRunning 
    • handleMessageInternal

      protected void handleMessageInternal(Message<?> message)
      次で指定:
      クラス AbstractMessageHandlerhandleMessageInternal 
    • shouldCopyRequestHeaders

      protected boolean shouldCopyRequestHeaders()
      クラスからコピーされた説明: AbstractMessageProducingHandler
      サブクラスはこれをオーバーライドできます。デフォルトでは true。
      オーバーライド:
      クラス AbstractMessageProducingHandlershouldCopyRequestHeaders 
      戻り値:
      リクエストヘッダーをコピーする必要がある場合は true。