クラス FluxAggregatorMessageHandler
java.lang.ObjectSE
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.handler.AbstractMessageProducingHandler
org.springframework.integration.aggregator.FluxAggregatorMessageHandler
- 実装済みのインターフェース一覧:
org.reactivestreams.Subscriber<Message<?>>、Aware、BeanFactoryAware、BeanNameAware、DisposableBean、InitializingBean、ApplicationContextAware、Lifecycle、Ordered、ComponentSourceAware、ExpressionCapable、Orderable、MessageProducer、HeaderPropagationAware、IntegrationPattern、NamedComponent、IntegrationManagement、ManageableLifecycle、TrackableComponent、MessageHandler、reactor.core.CoreSubscriber<Message<?>>
public class FluxAggregatorMessageHandler
extends AbstractMessageProducingHandler
implements ManageableLifecycle
Reactor の
Flux.groupBy(java.util.function.Function<? super T, ? extends K>) および Flux.window(int) 演算子に基づく集約ロジックの AbstractMessageProducingHandler 実装。 受信メッセージは、コンストラクターで初期化された Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) によって提供される FluxSink に送信されます。
グループの結果のウィンドウは、ダウンストリーム消費のために Message にラップされます。
AbstractMessageProducingHandler.getOutputChannel() が ReactiveStreamsSubscribableChannel インスタンスでない場合は、集約 Flux 全体のサブスクリプションが start() メソッドで実行されます。
- 導入:
- 5.2
- 作成者:
- Artem Bilan, Glenn Renfro
ネストされたクラスの概要
インターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたネストクラス / インターフェース
IntegrationManagement.ManagementOverridesフィールド概要
クラス org.springframework.integration.handler.AbstractMessageProducingHandler から継承されたフィールド
messagingTemplateクラス org.springframework.integration.context.IntegrationObjectSupport から継承されたフィールド
EXPRESSION_PARSER, loggerインターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたフィールド
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEインターフェース org.springframework.core.Ordered から継承されたフィールド
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCEコンストラクター概要
コンストラクターコンストラクター説明Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>)でインスタンスを作成し、それにFlux.groupBy(java.util.function.Function<? super T, ? extends K>)およびFlux.window(int)変換を適用します。方法の概要
修飾子と型メソッド説明このコンポーネントが実装するパターン型を返します。protected voidhandleMessageInternal(Message<?> message) booleanvoidsetBoundaryTrigger(PredicateSE<Message<?>> boundaryTrigger) メッセージのPredicateSE を構成して、Flux.windowUntil(java.util.function.Predicate<T>)オペレーターのウィンドウ境界を決定します。voidsetCombineFunction(FunctionSE<reactor.core.publisher.Flux<Message<?>>, reactor.core.publisher.Mono<Message<?>>> combineFunction) 放出するFluxウィンドウに適用する変換FunctionSE を構成します。voidsetCorrelationStrategy(CorrelationStrategy correlationStrategy) 受信メッセージからグループキーを決定するようにCorrelationStrategyを構成します。voidsetWindowConfigurer(FunctionSE<reactor.core.publisher.Flux<Message<?>>, reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer) voidsetWindowSize(int windowSize) 閉じるウィンドウのサイズを指定します。voidsetWindowSizeFunction(FunctionSE<Message<?>, @Nullable IntegerSE> windowSizeFunction) FunctionSE を指定して、グループ内の最初のメッセージに対して閉じるウィンドウのサイズを決定します。voidsetWindowTimespan(DurationSE windowTimespan) ウィンドウを定期的に閉じるようにDurationSE を構成します。protected booleanサブクラスはこれをオーバーライドできます。voidstart()voidstop()クラス org.springframework.integration.handler.AbstractMessageProducingHandler から継承されたメソッド
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, onInit, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, setupMessageProcessor, shouldSplitOutput, updateNotPropagatedHeadersクラス org.springframework.integration.handler.AbstractMessageHandler から継承されたメソッド
handleMessage, onComplete, onError, onNext, onSubscribe, setObservationConventionクラス org.springframework.integration.handler.MessageHandlerSupport から継承されたメソッド
buildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getObservationRegistry, getOrder, getOverrides, isLoggingEnabled, isObserved, registerMetricsCaptor, registerObservationRegistry, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrackクラス org.springframework.integration.context.IntegrationObjectSupport から継承されたメソッド
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringクラス java.lang.ObjectSE から継承されたメソッド
clone, equalsSE, finalize, getClass, hashCode, notify, notifyAll, wait, waitSE, waitSEインターフェース reactor.core.CoreSubscriber から継承されたメソッド
currentContextインターフェース org.springframework.integration.support.management.IntegrationManagement から継承されたメソッド
getThisAsインターフェース org.springframework.integration.support.context.NamedComponent から継承されたメソッド
getBeanName, getComponentName
コンストラクターの詳細
FluxAggregatorMessageHandler
public FluxAggregatorMessageHandler()Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>)でインスタンスを作成し、それにFlux.groupBy(java.util.function.Function<? super T, ? extends K>)およびFlux.window(int)変換を適用します。
メソッドの詳細
setCorrelationStrategy
受信メッセージからグループキーを決定するには、CorrelationStrategyを設定します。デフォルトでは、IntegrationMessageHeaderAccessor.CORRELATION_IDヘッダー値に対してHeaderAttributeCorrelationStrategyが使用されます。- パラメーター:
correlationStrategy- 使用するCorrelationStrategy。
setCombineFunction
public void setCombineFunction(FunctionSE<reactor.core.publisher.Flux<Message<?>>, reactor.core.publisher.Mono<Message<?>>> combineFunction) Fluxウィンドウの出力に適用する変換FunctionSE を設定します。ウィンドウのFluxとの組み合わせ結果として、Messageを値とするMonoの結果が必要です。デフォルトでは、ウィンドウのFluxは、ウィンドウの最初のメッセージからコピーされたヘッダーを持つメッセージに完全にラップされます。ペイロード内のこのようなFluxは、サブスクライブして下流で消費する必要があります。- パラメーター:
combineFunction- 結果ウィンドウの変換に使用するFunctionSE
setBoundaryTrigger
メッセージのPredicateSE を構成して、Flux.windowUntil(java.util.function.Predicate<T>)オペレーターのウィンドウ境界を決定します。他のウィンドウ構成オプションよりも優先されます。- パラメーター:
boundaryTrigger- ウィンドウ境界に使用するPredicateSE- 関連事項:
setWindowSize
public void setWindowSize(int windowSize) 閉じるウィンドウのサイズを指定します。setWindowTimespan(Duration)と組み合わせることができます。- パラメーター:
windowSize- 使用するウィンドウのサイズ。- 関連事項:
setWindowSizeFunction
グループの最初のメッセージに対して閉じるウィンドウのサイズを決定するには、FunctionSE を指定します。この関数の戻り値はsetWindowTimespan(Duration)と組み合わせることができます。デフォルトでは、IntegrationMessageHeaderAccessor.SEQUENCE_SIZEヘッダーが参照されます。- パラメーター:
windowSizeFunction- グループ内の最初のメッセージに対してウィンドウサイズを決定するために使用するFunctionSE。- 関連事項:
setWindowTimespan
ウィンドウを定期的に閉じるようにDurationSE を構成します。setWindowSize(int)またはsetWindowSizeFunction(Function)と組み合わせることができます。- パラメーター:
windowTimespan- ウィンドウを定期的に閉じるために使用するDurationSE。- 関連事項:
setWindowConfigurer
public void setWindowConfigurer(FunctionSE<reactor.core.publisher.Flux<Message<?>>, reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer) 単純なオプションではカバーされない任意のFlux.window(int)オプションのグループ化Fluxに変換を適用するようにFunctionSE を構成します。他のウィンドウ構成オプションよりも優先されます。- パラメーター:
windowConfigurer-FunctionSE を使用して、カスタムウィンドウ変換を適用します。
getComponentType
- 次で指定:
- インターフェース
NamedComponentのgetComponentType - オーバーライド:
- クラス
MessageHandlerSupportのgetComponentType
getIntegrationPatternType
インターフェースからコピーされた説明:IntegrationPatternこのコンポーネントが実装するパターン型を返します。- 次で指定:
- インターフェース
IntegrationPatternのgetIntegrationPatternType - オーバーライド:
- クラス
MessageHandlerSupportのgetIntegrationPatternType - 戻り値:
- このコンポーネントが実装する
IntegrationPatternType
start
public void start()- 次で指定:
- インターフェース
Lifecycleのstart - 次で指定:
- インターフェース
ManageableLifecycleのstart
stop
public void stop()- 次で指定:
- インターフェース
Lifecycleのstop - 次で指定:
- インターフェース
ManageableLifecycleのstop
isRunning
public boolean isRunning()- 次で指定:
- インターフェース
LifecycleのisRunning - 次で指定:
- インターフェース
ManageableLifecycleのisRunning
handleMessageInternal
- 次で指定:
- クラス
AbstractMessageHandlerのhandleMessageInternal
shouldCopyRequestHeaders
protected boolean shouldCopyRequestHeaders()クラスからコピーされた説明:AbstractMessageProducingHandlerサブクラスはこれをオーバーライドできます。デフォルトでは true。- オーバーライド:
- クラス
AbstractMessageProducingHandlerのshouldCopyRequestHeaders - 戻り値:
- リクエストヘッダーをコピーする必要がある場合は true。