クラス AbstractCorrelatingMessageHandler

実装されたすべてのインターフェース:
org.reactivestreams.Subscriber<Message<?>>AwareBeanFactoryAwareBeanNameAwareDisposableBeanInitializingBeanApplicationContextAwareApplicationEventPublisherAwareLifecycleOrderedExpressionCapableOrderableMessageProducerDiscardingMessageHandlerHeaderPropagationAwareIntegrationPatternNamedComponentIntegrationManagementManageableLifecycleTrackableComponentMessageHandlerreactor.core.CoreSubscriber<Message<?>>
既知の直属サブクラス
AggregatingMessageHandlerResequencingMessageHandler

public abstract class AbstractCorrelatingMessageHandler extends AbstractMessageProducingHandler implements DiscardingMessageHandler, ApplicationEventPublisherAware, ManageableLifecycle
MessageStore 内の相関メッセージのバッファを保持する抽象メッセージハンドラー。このクラスは、バッチで完了することができるメッセージの相関グループを処理します。これは、相関を必要とする MessageHandlers のカスタム実装に役立ち、Aggregator-AggregatingMessageHandler および Resequencer-ResequencingMessageHandler の基本クラスとして、または相関を必要とするカスタム実装に使用されます。

このハンドラーをカスタマイズするには、必要に応じて CorrelationStrategyReleaseStrategyMessageGroupProcessor 実装を挿入します。

デフォルトでは、CorrelationStrategyHeaderAttributeCorrelationStrategy になり、ReleaseStrategySequenceSizeReleaseStrategy になります。

複数のハンドラーで同じ MessageStore が使用されている場合は、適切な CorrelationStrategy を使用して、ハンドラー全体でメッセージグループの一意性を確保します。

expireTimeout が 0 より大きい場合、このタイムアウトより古いグループは、起動時(または purgeOrphanedGroups() が呼び出されたとき)にストアからパージされます。expireDuration が提供されている場合、タスクは purgeOrphanedGroups() を定期的に実行するようにスケジュールされます。

導入:
2.0
作成者:
Iwein Fuld, Dave Syer, Oleg Zhurakousky, Gary Russell, Artem Bilan, David Liu, Enrique Rodriguez, Meherzad Lahewala, Jayadev Sirimamilla
  • コンストラクターの詳細

  • メソッドの詳細

    • setLockRegistry

      public void setLockRegistry(LockRegistry lockRegistry)
    • setMessageStore

      public final void setMessageStore(MessageGroupStore store)
    • setCorrelationStrategy

      public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
    • setReleaseStrategy

      public void setReleaseStrategy(ReleaseStrategy releaseStrategy)
    • setGroupTimeoutExpression

      public void setGroupTimeoutExpression(Expression groupTimeoutExpression)
    • setForceReleaseAdviceChain

      public void setForceReleaseAdviceChain(ListSE<Advice> forceReleaseAdviceChain)
    • setOutputProcessor

      public void setOutputProcessor(MessageGroupProcessor outputProcessor)
      出力関数に MessageGroupProcessor を指定します。
      パラメーター:
      outputProcessor - 使用する MessageGroupProcessor
      導入:
      5.0
    • getOutputProcessor

      public MessageGroupProcessor getOutputProcessor()
      構成済みの MessageGroupProcessor を返します。
      戻り値:
      設定された MessageGroupProcessor
      導入:
      5.2
    • setDiscardChannel

      public void setDiscardChannel(MessageChannel discardChannel)
    • setDiscardChannelName

      public void setDiscardChannelName(StringSE discardChannelName)
    • setSendPartialResultOnExpiry

      public void setSendPartialResultOnExpiry(boolean sendPartialResultOnExpiry)
    • setMinimumTimeoutForEmptyGroups

      public void setMinimumTimeoutForEmptyGroups(long minimumTimeoutForEmptyGroups)
      デフォルトでは、MessageGroupStoreReaper が部分グループを期限切れにするように構成されている場合、空のグループも削除されます。グループが正常に解放された後、空のグループが存在します。これは、遅れて到着するメッセージの検出と破棄を可能にするためです。空のグループを部分的なグループの期限切れよりも長いスケジュールで期限切れにする場合は、このプロパティを設定します。空のグループは、少なくともこのミリ秒の間変更されない限り、MessageStore から削除されません。
      パラメーター:
      minimumTimeoutForEmptyGroups - 最小タイムアウト。
    • setReleasePartialSequences

      public void setReleasePartialSequences(boolean releasePartialSequences)
      基礎となるデフォルトの SequenceSizeReleaseStrategy に releasePartialSequences を設定します。他のリリース戦略では無視されます。
      パラメーター:
      releasePartialSequences - リリースを許可する場合は true。
    • setExpireGroupsUponTimeout

      public void setExpireGroupsUponTimeout(boolean expireGroupsUponTimeout)
      タイムアウトのためにグループが完了した場合、グループを期限切れ(完全に削除)します。デフォルトは true
      パラメーター:
      expireGroupsUponTimeout - 設定する expireGroupsUponTimeout
      導入:
      4.1
    • setPopSequence

      public void setPopSequence(boolean popSequence)
      出力メッセージに対して MessageBuilder.popSequenceDetails() を実行するかどうか。デフォルトは true です。このオプションは、applySequence=true を使用して最も近いアップストリームコンポーネント(スプリッターなど)によって追加されたシーケンス情報を削除します。
      パラメーター:
      popSequence - 使用するブールフラグ。
      導入:
      5.1
    • isReleaseLockBeforeSend

      protected boolean isReleaseLockBeforeSend()
    • setReleaseLockBeforeSend

      public void setReleaseLockBeforeSend(boolean releaseLockBeforeSend)
      true に設定すると、出力を送信する前にメッセージグループロックが解除されます。これが必要になる理由の詳細については、リファレンスマニュアルの「アグリゲーター」セクションの「デッドロックの回避」を参照してください。
      パラメーター:
      releaseLockBeforeSend - ロックを解除するには true。
      導入:
      5.1.1
    • setExpireTimeout

      public void setExpireTimeout(long expireTimeout)
      古い孤立したグループをストアから削除するためのタイムアウトをミリ秒単位で構成します。起動時に使用され、expireDuration が提供されると、purgeOrphanedGroups() を実行するためのタスクがその期間でスケジュールされます。forceReleaseProcessor は、「強制完了」オプションに従って、これらの期限切れグループを処理するために使用されます。永続メッセージグループストアが使用され、再起動後にそのグループに新しいメッセージが到着しない場合、グループは孤立する可能性があります。
      パラメーター:
      expireTimeout - パージするストア内の古い孤立したグループを判別するためのミリ秒数。
      導入:
      5.4
      関連事項:
    • setExpireDurationMillis

      public void setExpireDurationMillis(long expireDuration)
      ストアから古い孤立したグループをクリーンアップする頻度を DurationSE(ミリ単位)で構成します。
      パラメーター:
      expireDuration - purgeOrphanedGroups() を呼び出す頻度の遅延。
      導入:
      5.4
      関連事項:
    • setExpireDuration

      public void setExpireDuration(@Nullable DurationSE expireDuration)
      ストアから古い孤立したグループをクリーンアップする頻度を DurationSE に構成します。
      パラメーター:
      expireDuration - purgeOrphanedGroups() を呼び出す頻度の遅延。
      導入:
      5.4
      関連事項:
    • setGroupConditionSupplier

      public void setGroupConditionSupplier(BiFunctionSE<Message<?>,StringSE,StringSE> conditionSupplier)
      グループに追加されるメッセージからグループ条件を提供するように BiFunctionSE を構成します。関数からの null の結果は、以前に設定された条件をリセットします。
      パラメーター:
      conditionSupplier - グループに追加するメッセージからグループ条件を提供する機能。
      導入:
      5.5
      関連事項:
    • setApplicationEventPublisher

      public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher)
      次で指定:
      インターフェース ApplicationEventPublisherAwaresetApplicationEventPublisher 
    • onInit

      protected void onInit()
      クラスからコピーされた説明: IntegrationObjectSupport
      サブクラスは、初期化ロジック用にこれを実装できます。
      オーバーライド:
      クラス AbstractMessageProducingHandleronInit 
    • getComponentType

      public StringSE getComponentType()
      クラスからコピーされた説明: IntegrationObjectSupport
      サブクラスはこのメソッドを実装して、コンポーネント型情報を提供できます。
      次で指定:
      インターフェース NamedComponentgetComponentType 
      オーバーライド:
      クラス MessageHandlerSupportgetComponentType 
    • getMessageStore

      public MessageGroupStore getMessageStore()
    • getExpireGroupScheduledFutures

      protected MapSE<UUIDSE,ScheduledFutureSE<?>> getExpireGroupScheduledFutures()
    • getCorrelationStrategy

      protected CorrelationStrategy getCorrelationStrategy()
    • getReleaseStrategy

      protected ReleaseStrategy getReleaseStrategy()
    • getGroupConditionSupplier

      @Nullable protected BiFunctionSE<Message<?>,StringSE,StringSE> getGroupConditionSupplier()
    • getDiscardChannel

      public MessageChannel getDiscardChannel()
      インターフェースからコピーされた説明: DiscardingMessageHandler
      破棄チャネルを返します。
      次で指定:
      インターフェース DiscardingMessageHandlergetDiscardChannel 
      戻り値:
      チャンネル。
    • getDiscardChannelName

      protected StringSE getDiscardChannelName()
    • isSendPartialResultOnExpiry

      protected boolean isSendPartialResultOnExpiry()
    • isSequenceAware

      protected boolean isSequenceAware()
    • getLockRegistry

      protected LockRegistry getLockRegistry()
    • isLockRegistrySet

      protected boolean isLockRegistrySet()
    • getMinimumTimeoutForEmptyGroups

      protected long getMinimumTimeoutForEmptyGroups()
    • isReleasePartialSequences

      protected boolean isReleasePartialSequences()
    • getGroupTimeoutExpression

      protected Expression getGroupTimeoutExpression()
    • getEvaluationContext

      protected EvaluationContext getEvaluationContext()
    • handleMessageInternal

      protected void handleMessageInternal(Message<?> message)
      次で指定:
      クラス AbstractMessageHandlerhandleMessageInternal 
    • isExpireGroupsUponCompletion

      protected boolean isExpireGroupsUponCompletion()
    • afterRelease

      protected abstract void afterRelease(MessageGroup group, CollectionSE<Message<?>> completedMessages)
      MessageGroup のリリース後に実行する必要がある追加のロジックを提供できます。
      パラメーター:
      group - グループ。
      completedMessages - 完了したメッセージ。
    • afterRelease

      protected void afterRelease(MessageGroup group, CollectionSE<Message<?>> completedMessages, boolean timeout)
      タイムアウトのためにグループが解放または破棄されたために特別なアクションが必要な場合、サブクラスがオーバーライドすることがあります。デフォルトでは、afterRelease(MessageGroup, Collection) が呼び出されます。
      パラメーター:
      group - グループ。
      completedMessages - 完了したメッセージ。
      timeout - リリースまたは破棄がタイムアウトによるものである場合は true。
    • forceComplete

      protected void forceComplete(MessageGroup group)
    • remove

      protected void remove(MessageGroup group)
    • findLastReleasedSequenceNumber

      protected int findLastReleasedSequenceNumber(ObjectSE groupId, CollectionSE<Message<?>> partialSequence)
    • store

      protected MessageGroup store(ObjectSE correlationKey, Message<?> message)
    • expireGroup

      protected void expireGroup(ObjectSE correlationKey, MessageGroup group, LockSE lock)
    • completeGroup

      protected void completeGroup(ObjectSE correlationKey, MessageGroup group, LockSE lock)
    • completeGroup

      protected CollectionSE<Message<?>> completeGroup(Message<?> message, ObjectSE correlationKey, MessageGroup group, LockSE lock)
    • verifyResultCollectionConsistsOfMessages

      protected void verifyResultCollectionConsistsOfMessages(CollectionSE<?> elements)
    • obtainGroupTimeout

      protected ObjectSE obtainGroupTimeout(MessageGroup group)
    • destroy

      public void destroy()
      次で指定:
      インターフェース DisposableBeandestroy 
      次で指定:
      インターフェース IntegrationManagementdestroy 
      オーバーライド:
      クラス MessageHandlerSupportdestroy 
    • start

      public void start()
      次で指定:
      インターフェース Lifecyclestart 
      次で指定:
      インターフェース ManageableLifecyclestart 
    • stop

      public void stop()
      次で指定:
      インターフェース Lifecyclestop 
      次で指定:
      インターフェース ManageableLifecyclestop 
    • isRunning

      public boolean isRunning()
      次で指定:
      インターフェース LifecycleisRunning 
      次で指定:
      インターフェース ManageableLifecycleisRunning 
    • purgeOrphanedGroups

      public void purgeOrphanedGroups()
      付属の expireTimeout を使用して MessageGroupStore.expireMessageGroups(long) を実行します。いつでも外部から呼び出すことができます。内部的には、構成された expireDuration を使用してスケジュールされたタスクから呼び出されます。
      導入:
      5.4