アグリゲーター

基本的にスプリッターの鏡像であるアグリゲーターは、複数のメッセージを受け取り、単一のメッセージに結合するメッセージハンドラーの一種です。実際、アグリゲーターは多くの場合、スプリッターを含むパイプラインのダウンストリームコンシューマーです。

技術的には、アグリゲーターはステートフルであるため、スプリッターよりも複雑です。集約するメッセージを保持し、メッセージの完全なグループをいつ集約する準備ができたかを判断する必要があります。そのためには、MessageStore が必要です。

機能性

アグリゲーターは、グループが完了したと見なされるまで、関連するメッセージのグループを関連付けて保管することにより、グループを結合します。その時点で、アグリゲーターはグループ全体を処理して単一のメッセージを作成し、集約されたメッセージを出力として送信します。

アグリゲーターを実装するには、集約を実行するロジックを提供する必要があります(つまり、多数からの単一メッセージの作成)。関連する 2 つの概念は、相関とリリースです。

相関により、メッセージを集約用にグループ化する方法が決まります。Spring Integration では、IntegrationMessageHeaderAccessor.CORRELATION_ID メッセージヘッダーに基づいて、デフォルトで相関が行われます。同じ IntegrationMessageHeaderAccessor.CORRELATION_ID を持つメッセージはグループ化されます。ただし、相関ストラテジをカスタマイズして、メッセージをグループ化する方法を指定する他の方法を許可できます。そのためには、CorrelationStrategy (この章で後述)を実装できます。

メッセージのグループを処理する準備が整った時点を判断するために、ReleaseStrategy が参照されます。アグリゲーターのデフォルトのリリース戦略は、IntegrationMessageHeaderAccessor.SEQUENCE_SIZE ヘッダーに基づいて、シーケンスに含まれるすべてのメッセージが存在するときにグループをリリースします。カスタム ReleaseStrategy 実装への参照を提供することにより、このデフォルト戦略をオーバーライドできます。

プログラミングモデル

Aggregation API は、いくつかのクラスで構成されています。

  • インターフェース MessageGroupProcessor およびそのサブクラス: MethodInvokingAggregatingMessageGroupProcessor および ExpressionEvaluatingMessageGroupProcessor

  • ReleaseStrategy インターフェースとそのデフォルト実装: SimpleSequenceSizeReleaseStrategy

  • CorrelationStrategy インターフェースとそのデフォルト実装: HeaderAttributeCorrelationStrategy

AggregatingMessageHandler

AggregatingMessageHandler (AbstractCorrelatingMessageHandler のサブクラス)は MessageHandler 実装であり、アグリゲーター(および他の相関するユースケース)の共通機能をカプセル化します。

  • 集約するグループにメッセージを関連付ける

  • グループが解放されるまで MessageStore でこれらのメッセージを維持する

  • グループをいつリリースできるかを決定する

  • リリースされたグループを単一のメッセージに集約する

  • 期限切れのグループの認識と対応

メッセージをグループ化する方法を決定する責任は、CorrelationStrategy インスタンスに委譲されます。メッセージグループを解放できるかどうかを決定する責任は、ReleaseStrategy インスタンスに委譲されます。

次のリストは、ベース AbstractAggregatingMessageGroupProcessor の簡単なハイライトを示しています(aggregatePayloads メソッドを実装する責任は開発者に任されています)。

public abstract class AbstractAggregatingMessageGroupProcessor
              implements MessageGroupProcessor {

    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        // default implementation exists
    }

    protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);

}

AbstractAggregatingMessageGroupProcessor のすぐに使用可能な実装として DefaultAggregatingMessageGroupProcessorExpressionEvaluatingMessageGroupProcessorMethodInvokingMessageGroupProcessor を参照してください。

バージョン 5.2 以降、Function<MessageGroup, Map<String, Object>> 戦略は、AbstractAggregatingMessageGroupProcessor が出力メッセージのヘッダーをマージおよび計算(集約)するために使用可能です。DefaultAggregateHeadersFunction 実装は、グループ間で競合のないすべてのヘッダーを返すロジックで使用できます。グループ内の 1 つ以上のメッセージにヘッダーがない場合、競合とは見なされません。競合するヘッダーは省略されます。新しく導入された DelegatingMessageGroupProcessor とともに、この関数は任意の(AbstractAggregatingMessageGroupProcessor 以外の) MessageGroupProcessor 実装に使用されます。基本的に、フレームワークは提供された関数を AbstractAggregatingMessageGroupProcessor インスタンスに注入し、他のすべての実装を DelegatingMessageGroupProcessor にラップします。AbstractAggregatingMessageGroupProcessor と DelegatingMessageGroupProcessor のロジックの違い。後者は、デリゲート戦略を呼び出す前にヘッダーを事前に計算せず、デリゲートが Message または AbstractIntegrationMessageBuilder を返す場合、関数を呼び出しません。その場合、フレームワークは、ターゲット実装が返された結果に含まれる適切なヘッダーセットを生成することを考慮したと想定します。Function<MessageGroup, Map<String, Object>> ストラテジーは、XML 構成の headers-function 参照属性、Java DSL の AggregatorSpec.headersFunction() オプション、プレーン Java 構成の AggregatorFactoryBean.setHeadersFunction() として利用可能です。

CorrelationStrategy は AbstractCorrelatingMessageHandler が所有し、次の例に示すように、IntegrationMessageHeaderAccessor.CORRELATION_ID メッセージヘッダーに基づいたデフォルト値を持っています。

public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
        CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
    ...
    this.correlationStrategy = correlationStrategy == null ?
        new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
    this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
    ...
}

メッセージグループの実際の処理に関しては、デフォルトの実装は DefaultAggregatingMessageGroupProcessor です。ペイロードが特定のグループに対して受信されたペイロードの List である単一の Message を作成します。これは、スプリッター、パブリッシュ / サブスクライブチャネル、アップストリームの受信者リストルーターを使用した単純なスキャッターギャザーの実装に適しています。

この型のシナリオでパブリッシュ / サブスクライブチャネルまたは受信者リストルーターを使用する場合は、必ず apply-sequence フラグを有効にしてください。そうすることで、必要なヘッダー CORRELATION_IDSEQUENCE_NUMBERSEQUENCE_SIZE が追加されます。この動作は、Spring Integration のスプリッターではデフォルトで有効になっていますが、パブリッシュ / サブスクライブチャネルや受信者リストルーターでは有効になっていません。これらのコンポーネントは、これらのヘッダーが不要なさまざまなコンテキストで使用される可能性があるためです。

アプリケーションに特定のアグリゲーター戦略を実装する場合、AbstractAggregatingMessageGroupProcessor を継承して aggregatePayloads メソッドを実装できます。ただし、XML またはアノテーションを使用して構成できる集約ロジックを実装するための、API とはあまり関係のない、より優れたソリューションがあります。

一般に、POJO は、単一の java.util.List を引数として受け入れるメソッドを提供する場合、集約アルゴリズムを実装できます(パラメーター化されたリストもサポートされます)。このメソッドは、次のようにメッセージを集約するために呼び出されます。

  • 引数が java.util.Collection<T> であり、パラメーター型 T が Message に割り当て可能な場合、集約のために蓄積されたメッセージのリスト全体が集約器に送信されます。

  • 引数がパラメーター化されていない java.util.Collection であるか、パラメーター型が Message に割り当て可能でない場合、メソッドは蓄積されたメッセージのペイロードを受け取ります。

  • 戻り値の型が Message に割り当て可能でない場合、フレームワークによって自動的に作成される Message のペイロードとして扱われます。

コードをシンプルにし、低カップリング、テスト容易性などのベストプラクティスを促進するために、集約ロジックを実装する推奨する方法は、POJO を使用し、XML またはアノテーションサポートを使用してアプリケーションで構成することです。

バージョン 5.3 以降、メッセージグループの処理後、AbstractCorrelatingMessageHandler は、いくつかのネストされたレベルを持つ適切なスプリッターアグリゲーターシナリオの MessageBuilder.popSequenceDetails() メッセージヘッダー変更を実行します。メッセージグループの解放結果がメッセージのコレクションでない場合にのみ行われます。その場合、ターゲット MessageGroupProcessor は、それらのメッセージの作成中に MessageBuilder.popSequenceDetails() 呼び出しを担当します。

MessageGroupProcessor が Message を返す場合、sequenceDetails がグループ内の最初のメッセージと一致する場合にのみ、出力メッセージに対して MessageBuilder.popSequenceDetails() が実行されます。(以前は、プレーンペイロードまたは AbstractIntegrationMessageBuilder が MessageGroupProcessor から返された場合にのみ、これが行われていました。)

この機能は、新しい popSequence boolean プロパティによって制御できるため、相関の詳細が標準スプリッターによって入力されていないシナリオでは、MessageBuilder.popSequenceDetails() を無効にすることができます。このプロパティは、基本的に、AbstractMessageSplitter 内の最も近いアップストリーム applySequence = true によって実行されたことを元に戻します。詳細については、スプリッターを参照してください。

SimpleMessageGroup.getMessages() メソッドは unmodifiableCollection を返します。集約 POJO メソッドに Collection<Message> パラメーターがある場合、渡される引数は正確にその Collection インスタンスであり、アグリゲーターに SimpleMessageStore を使用すると、元の Collection<Message> はグループの解放後にクリアされます。アグリゲーターから渡された場合、POJO の Collection<Message> 変数もクリアされます。さらなる処理のためにそのコレクションをそのままリリースしたい場合は、新しい Collection (たとえば new ArrayList<Message>(messages))を構築する必要があります。バージョン 4.3 以降、フレームワークは、不要な余分なオブジェクトの作成を避けるために、メッセージを新しいコレクションにコピーしなくなりました。

MessageGroupProcessor の processMessageGroup メソッドがコレクションを返す場合、Message<?> オブジェクトのコレクションでなければなりません。この場合、メッセージは個別にリリースされます。バージョン 4.2 より前は、XML 構成を使用して MessageGroupProcessor を提供することはできませんでした。集約には POJO メソッドのみを使用できます。ここで、参照された(または内部の)Bean が MessageProcessor を実装していることをフレームワークが検出すると、アグリゲーターの出力プロセッサーとして使用されます。

カスタム MessageGroupProcessor からオブジェクトのコレクションをメッセージのペイロードとしてリリースする場合、クラスは AbstractAggregatingMessageGroupProcessor を継承し、aggregatePayloads() を実装する必要があります。

また、バージョン 4.2 以降、SimpleMessageGroupProcessor が提供されています。グループからメッセージのコレクションを返します。これにより、前述のように、リリースされたメッセージが個別に送信されます。

これにより、アグリゲーターはメッセージバリアとして機能し、リリース戦略が実行され、グループが個々のメッセージのシーケンスとしてリリースされるまで、到着したメッセージが保持されます。

ReleaseStrategy

ReleaseStrategy インターフェースは次のように定義されます。

public interface ReleaseStrategy {

  boolean canRelease(MessageGroup group);

}

一般に、POJO は、単一の java.util.List を引数として受け入れ(パラメーター化されたリストもサポートされている)、ブール値を返すメソッドを提供する場合、完了決定ロジックを実装できます。このメソッドは、新しいメッセージが到着するたびに呼び出され、次のようにグループが完了したかどうかを判断します。

  • 引数が java.util.List<T> であり、パラメーター型 T が Message に割り当て可能な場合、グループに蓄積されたメッセージのリスト全体がメソッドに送信されます。

  • 引数がパラメーター化されていない java.util.List であるか、パラメーター型が Message に割り当て可能でない場合、メソッドは蓄積されたメッセージのペイロードを受け取ります。

  • このメソッドは、メッセージグループが集約の準備ができている場合は true を返し、それ以外の場合は false を返す必要があります。

次の例は、型 Message の List に @ReleaseStrategy アノテーションを使用する方法を示しています。

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<Message<?>>) {...}
}

次の例は、型 String の List に @ReleaseStrategy アノテーションを使用する方法を示しています。

public class MyReleaseStrategy {

    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<String>) {...}
}

前述の 2 つの例の署名に基づいて、POJO ベースのリリース戦略には、まだリリースされていないメッセージの Collection (Message 全体へのアクセスが必要な場合)またはペイロードオブジェクトの Collection が渡されます(type パラメーターが何かの場合 Message 以外)。これにより、ほとんどのユースケースが満たされます。ただし、何らかの理由で MessageGroup 全体にアクセスする必要がある場合は、ReleaseStrategy インターフェースの実装を提供する必要があります。

グループがリリースされる前にリリース戦略が複数回呼び出される可能性があるため、潜在的に大きなグループを処理する場合、これらのメソッドがどのように呼び出されるかを理解する必要があります。最も効率的なのは、アグリゲーターが直接呼び出すことができるため、ReleaseStrategy の実装です。2 番目に効率的なのは、Collection<Message<?>> パラメーター型の POJO メソッドです。最も効率が悪いのは、Collection<Something> 型の POJO メソッドです。フレームワークは、リリース戦略が呼び出されるたびに、グループ内のメッセージからペイロードを新しいコレクションにコピーする必要があります(ペイロードを Something に変換しようとする可能性があります)。Collection<?> を使用すると、変換は回避されますが、新しい Collection を作成する必要があります。

これらの理由から、大規模なグループでは、ReleaseStrategy を実装することをお勧めします。

グループが集約のためにリリースされると、まだリリースされていないすべてのメッセージが処理され、グループから削除されます。グループも完了している場合(つまり、シーケンスからのすべてのメッセージが到着した場合、またはシーケンスが定義されていない場合)、グループは完了としてマークされます。このグループの新しいメッセージは、破棄チャネル(定義されている場合)に送信されます。expire-groups-upon-completion を true (デフォルトは false)に設定すると、グループ全体が削除され、新しいメッセージ(削除されたグループと同じ相関 ID を持つ)は新しいグループを形成します。MessageGroupStoreReaper と true に設定されている send-partial-result-on-expiry を使用して、部分シーケンスを解放できます。

遅延到着メッセージの廃棄を容易にするために、アグリゲーターはグループが解放された後、グループに関する状態を維持する必要があります。これにより、最終的にメモリ不足状態が発生する可能性があります。このような状況を回避するには、MessageGroupStoreReaper を構成してグループメタデータを削除することを検討する必要があります。有効期限パラメーターは、ポイントに到達した後にグループが期限切れになるように設定する必要があります。それ以降は、遅延メッセージが到着することはありません。リーパーの構成については、アグリゲーターでの状態の管理: MessageGroupStore を参照してください。

Spring Integration は、ReleaseStrategy の実装を提供します: SimpleSequenceSizeReleaseStrategy。この実装は、到着する各メッセージの SEQUENCE_NUMBER および SEQUENCE_SIZE ヘッダーを調べて、メッセージグループが完了し、集約の準備ができたときを判断します。前に示したように、これはデフォルトの戦略でもあります。

バージョン 5.0 より前のデフォルトのリリース戦略は SequenceSizeReleaseStrategy でしたが、これは大規模なグループではうまく機能しません。この戦略では、重複したシーケンス番号が検出され、拒否されます。この操作は高負荷になる可能性があります。

大きなグループを集約する場合、部分的なグループを解放する必要はなく、重複したシーケンスを検出 / 拒否する必要はありません。代わりに SimpleSequenceSizeReleaseStrategy を使用することを検討してください。これらのユースケースでははるかに効率的で、デフォルトです部分的なグループリリースが指定されていない場合、バージョン 5.0 以降。

大規模グループの集約

4.3 リリースは、SimpleMessageGroup のメッセージのデフォルト Collection を HashSet に変更しました(以前は BlockingQueue でした)。これは、大きなグループから個々のメッセージを削除するときにコストがかかりました(O(n)線形スキャンが必要でした)。ハッシュセットは通常、削除がはるかに高速ですが、挿入と削除の両方でハッシュを計算する必要があるため、大きなメッセージの場合はコストが高くなる可能性があります。ハッシュが高負荷なメッセージがある場合は、他のコレクション型の使用を検討してください。MessageGroupFactory を使用するで説明したように、SimpleMessageGroupFactory が提供されているため、ニーズに最適な Collection を選択できます。独自のファクトリ実装を提供して、他の Collection<Message<?>> を作成することもできます。

次の例は、以前の実装と SimpleSequenceSizeReleaseStrategy でアグリゲーターを構成する方法を示しています。

<int:aggregator input-channel="aggregate"
    output-channel="out" message-store="store" release-strategy="releaser" />

<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
    <property name="messageGroupFactory">
        <bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
            <constructor-arg value="BLOCKING_QUEUE"/>
        </bean>
    </property>
</bean>

<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />
フィルターエンドポイントがアグリゲータの上流のフローに関与している場合、シーケンスからの一部のメッセージがフィルターによって破棄される可能性があるため、シーケンスサイズリリース戦略(固定または sequenceSize ヘッダーに基づく)はその目的を果たしません。この場合、別の ReleaseStrategy を選択するか、コンテンツに一部の情報を含む破棄サブフローから送信された補正メッセージを使用して、カスタム完全グループ関数でスキップすることをお勧めします。詳細については、フィルターを参照してください。
相関戦略

CorrelationStrategy インターフェースは次のように定義されます。

public interface CorrelationStrategy {

  Object getCorrelationKey(Message<?> message);

}

このメソッドは、メッセージをメッセージグループに関連付けるために使用される相関キーを表す Object を返します。キーは、equals() および hashCode() の実装に関して Map のキーに使用される条件を満たさなければなりません。

一般に、どの POJO も相関ロジックを実装でき、メッセージをメソッドの引数にマッピングするルールは ServiceActivator と同じです(@Header アノテーションのサポートを含む)。メソッドは値を返す必要があり、値は null であってはなりません。

Spring Integration は、CorrelationStrategy の実装を提供します: HeaderAttributeCorrelationStrategy。この実装は、メッセージヘッダーの 1 つ(コンストラクター引数で名前が指定されている)の値を相関キーとして返します。デフォルトでは、相関戦略は CORRELATION_ID ヘッダー属性の値を返す HeaderAttributeCorrelationStrategy です。相関に使用するカスタムヘッダー名がある場合は、HeaderAttributeCorrelationStrategy のインスタンスで構成して、アグリゲーターの相関戦略の参照として提供できます。

レジストリをロック

グループへの変更はスレッドセーフです。同じ相関 ID のメッセージを同時に送信すると、そのうちの 1 つのみがアグリゲーターで処理され、メッセージグループごとにシングルスレッドとして効果的に処理されます。LockRegistry は、解決された相関 ID のロックを取得するために使用されます。DefaultLockRegistry はデフォルトで使用されます(メモリ内)。共有 MessageGroupStore が使用されているサーバー間で更新を同期するには、共有ロックレジストリを構成する必要があります。

デッドロックの回避

上記で説明したように、メッセージグループが変更されると(メッセージが追加または解放される)、ロックが保持されます。

次のフローを検討してください。

...->aggregator1-> ... ->aggregator2-> ...

複数のスレッドがあり、アグリゲーターが共通のロックレジストリを共有している場合、デッドロックが発生する可能性があります。これにより、スレッドがハングし、jstack <pid> は次のような結果を表示する場合があります。

Found one Java-level deadlock:
=============================
"t2":
  waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t1"
"t1":
  waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t2"

この問題を回避するには、いくつかの方法があります。

  • 各アグリゲーターに独自のロックレジストリがあることを確認します (これは、アプリケーションインスタンス間で共有レジストリにすることができますが、フロー内の 2 つ以上のアグリゲーターにはそれぞれ個別のレジストリが必要です)

  • アグリゲータの出力チャネルとして ExecutorChannel または QueueChannel を使用して、ダウンストリームフローが新しいスレッドで実行されるようにします

  • バージョン 5.1.1 以降、releaseLockBeforeSend アグリゲータープロパティを true に設定します。

この問題は、何らかの理由で単一のアグリゲーターの出力が最終的に同じアグリゲーターにルーティングされる場合にも発生する可能性があります。もちろん、この場合、上記の最初の解決策は適用されません。

Java DSL でのアグリゲーターの構成

Java DSL でアグリゲーターを構成する方法については、アグリゲーターとリシーケンサーを参照してください。

XML を使用したアグリゲーターの構成

Spring Integration は、<aggregator/> 要素を介した XML を使用したアグリゲーターの構成をサポートします。次の例は、アグリゲーターの例を示しています。

<channel id="inputChannel"/>

<int:aggregator id="myAggregator"                          (1)
        auto-startup="true"                                (2)
        input-channel="inputChannel"                       (3)
        output-channel="outputChannel"                     (4)
        discard-channel="throwAwayChannel"                 (5)
        message-store="persistentMessageStore"             (6)
        order="1"                                          (7)
        send-partial-result-on-expiry="false"              (8)
        send-timeout="1000"                                (9)

        correlation-strategy="correlationStrategyBean"     (10)
        correlation-strategy-method="correlate"            (11)
        correlation-strategy-expression="headers['foo']"   (12)

        ref="aggregatorBean"                               (13)
        method="aggregate"                                 (14)

        release-strategy="releaseStrategyBean"             (15)
        release-strategy-method="release"                  (16)
        release-strategy-expression="size() == 5"          (17)

        expire-groups-upon-completion="false"              (18)
        empty-group-min-timeout="60000"                    (19)

        lock-registry="lockRegistry"                       (20)

        group-timeout="60000"                              (21)
        group-timeout-expression="size() ge 2 ? 100 : -1"  (22)
        expire-groups-upon-timeout="true"                  (23)

        scheduler="taskScheduler" >                        (24)
            <expire-transactional/>                        (25)
            <expire-advice-chain/>                         (26)
</aggregator>

<int:channel id="outputChannel"/>

<int:channel id="throwAwayChannel"/>

<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
    <constructor-arg ref="dataSource"/>
</bean>

<bean id="aggregatorBean" class="sample.PojoAggregator"/>

<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>

<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>
1 アグリゲーターの ID はオプションです。
2 アプリケーションコンテキストの起動中にアグリゲーターを起動する必要があるかどうかを示すライフサイクル属性。オプション(デフォルトは "true" )。
3 アグリゲーターがメッセージを受信するチャンネル。必須。
4 アグリゲータが集約結果を送信するチャネル。オプション (受信メッセージ自体が 'replyChannel' メッセージヘッダーで応答チャネルを指定できるため)。
5 アグリゲーターがタイムアウトしたメッセージを送信するチャネル(send-partial-result-on-expiry が false の場合)。オプション。
6 完了するまで相関キーにメッセージのグループを保管するために使用される MessageGroupStore への参照。オプション。デフォルトでは、揮発性のメモリ内ストアです。詳細については、メッセージストアを参照してください。
7 複数のハンドルが同じ DirectChannel にサブスクライブされるときのこのアグリゲーターの順序(負荷分散の目的で使用)。オプション。
8 期限切れのメッセージは、それを含む MessageGroup の有効期限が切れたら ( MessageGroupStore.expireMessageGroups(long) (Javadoc) を参照)、集約されて「出力チャネル」または "replyChannel" に送信されることを示します。MessageGroup を期限切れにする 1 つの方法は、MessageGroupStoreReaper を構成することです。ただし、代わりに MessageGroupStore.expireMessageGroups(timeout) を呼び出して MessageGroup を期限切れにすることもできます。これは、コントロールバス操作を通じて、または MessageGroupStore インスタンスへの参照がある場合は expireMessageGroups(timeout) を呼び出すことによって実行できます。それ以外の場合、この属性自体は何も行いません。期限切れになりそうな MessageGroup に残っているメッセージを破棄するか、出力チャネルまたは応答チャネルに送信するかを示すインジケータとしてのみ機能します。オプション (デフォルトは false)。注: expire-groups-upon-timeout が false に設定されている場合、グループは実際には期限切れにならない可能性があるため、この属性は send-partial-result-on-timeout と呼ぶ方が適切です。
9 応答 Message を output-channel または discard-channel に送信するときに待機するタイムアウト間隔。デフォルトは -1 で、これにより無期限にブロックされます。固定の「容量」を持つ QueueChannel など、出力チャネルに「送信」制限がある場合にのみ適用されます。この場合、MessageDeliveryException がスローされます。AbstractSubscribableChannel 実装の場合、send-timeout は無視されます。group-timeout(-expression) の場合、スケジュールされた期限切れタスクの MessageDeliveryException により、このタスクが再スケジュールされます。オプション。
10 メッセージ相関(グループ化)アルゴリズムを実装する Bean への参照。Bean は、CorrelationStrategy インターフェースまたは POJO の実装にすることができます。後者の場合、correlation-strategy-method 属性も定義する必要があります。オプション(デフォルトでは、アグリゲーターは IntegrationMessageHeaderAccessor.CORRELATION_ID ヘッダーを使用します)。
11correlation-strategy によって参照される Bean で定義されたメソッド。相関決定アルゴリズムを実装します。オプション、制限付き(correlation-strategy が存在する必要があります)。
12 相関戦略を表す SpEL 式。例: "headers['something']" correlation-strategy または correlation-strategy-expression のいずれかのみが許可されます。
13 アプリケーションコンテキストで定義された Bean への参照。Bean は、前述のように集約ロジックを実装する必要があります。オプション(デフォルトでは、集約されたメッセージのリストは出力メッセージのペイロードになります)。
14ref 属性によって参照される Bean で定義されたメソッド。メッセージ集約アルゴリズムを実装します。オプション(定義される ref 属性に依存)。
15 リリース戦略を実装する Bean への参照。Bean は、ReleaseStrategy インターフェースまたは POJO の実装にすることができます。後者の場合、release-strategy-method 属性も定義する必要があります。オプション(デフォルトでは、アグリゲーターは IntegrationMessageHeaderAccessor.SEQUENCE_SIZE ヘッダー属性を使用します)。
16release-strategy 属性によって参照される Bean で定義されたメソッド。完了決定アルゴリズムを実装します。オプション、制限付き(release-strategy が存在する必要があります)。
17 リリース戦略を表す SpEL 式。式のルートオブジェクトは MessageGroup です。例: "size() == 5" release-strategy または release-strategy-expression のいずれかのみが許可されます。
18true (デフォルトは false)に設定すると、完了したグループがメッセージストアから削除され、同じ相関を持つ後続のメッセージが新しいグループを形成します。デフォルトの動作では、完了したグループと同じ相関関係を持つメッセージを discard-channel に送信します。
19MessageGroupStoreReaper が <aggregator> の MessageStore 用に構成されている場合にのみ適用されます。デフォルトでは、MessageGroupStoreReaper が部分グループを期限切れにするように構成されている場合、空のグループも削除されます。グループが通常解放された後、空のグループが存在します。空のグループにより、後着メッセージの検出と破棄が可能になります。部分的なグループの期限切れよりも長いスケジュールで空のグループを期限切れにする場合は、このプロパティを設定します。空のグループは、少なくともこのミリ秒数の間変更されない限り、MessageStore から削除されません。空のグループの有効期限が切れる実際の時間は、リーパーの timeout プロパティの影響も受けます。この値にタイムアウトを加えた値になる可能性があることに注意してください。
20org.springframework.integration.util.LockRegistry Bean への参照。MessageGroup での同時操作のために、groupId に基づいて Lock を取得していました。デフォルトでは、内部 DefaultLockRegistry が使用されます。ZookeeperLockRegistry などの分散 LockRegistry を使用すると、アグリゲーターの 1 つのインスタンスのみがグループで同時に操作できるようになります。詳細については、Redis ロックレジストリGemfire Lock レジストリZookeeper ロックレジストリを参照してください。
21ReleaseStrategy が現在のメッセージが到着したときにグループを解放しない場合に、MessageGroup を強制的に完了するタイムアウト(ミリ秒単位)。この属性は、時間からカウントされるタイムアウト内で MessageGroup に新しいメッセージが到着しない場合、部分的な結果を発行する(またはグループを破棄する)必要がある場合に、アグリゲーターに組み込みの時間ベースのリリース戦略を提供します。最後のメッセージが到着しました。MessageGroup が作成された時間からカウントするタイムアウトを設定するには、group-timeout-expression 情報を参照してください。新しいメッセージがアグリゲーターに到着すると、その MessageGroup の既存の ScheduledFuture<?> は取り消されます。ReleaseStrategy が false (解放しないことを意味する)および groupTimeout > 0 を返す場合、新しいタスクがグループを期限切れにするようにスケジュールされます。この属性をゼロ(または負の値)に設定することはお勧めしません。これを行うと、すべてのメッセージグループがすぐに完了するため、アグリゲーターが効果的に無効になります。ただし、式を使用して条件付きでゼロ(または負の値)に設定できます。詳細については、group-timeout-expression を参照してください。完了時に実行されるアクションは、ReleaseStrategy および send-partial-group-on-expiry 属性によって異なります。詳細については、アグリゲーターとグループのタイムアウトを参照してください。'group-timeout-expression' 属性と相互に排他的です。
22MessageGroup を #root 評価コンテキストオブジェクトとする groupTimeout に評価される SpEL 式。MessageGroup の強制完了のスケジューリングに使用します。式が null と評価される場合、補完はスケジュールされません。評価が 0 の場合、グループは現在のスレッドでただちに完了します。これにより、動的な group-timeout プロパティが提供されます。例として、グループが作成されてから 10 秒が経過した後に MessageGroup を強制的に完了させたい場合は、以下の SpEL 式を使用することを検討してください。timestamp + 10000 - T(System).currentTimeMillis() ここで、timestamp は MessageGroup として MessageGroup.getTimestamp() から提供されます。ここでは #root 評価コンテキストオブジェクトです。ただし、他のグループ有効期限プロパティの構成によっては、グループ作成時刻が最初に到着したメッセージの時刻と異なる場合があることに注意してください。詳細については、group-timeout を参照してください。'group-timeout' 属性とは相互に排他的です。
23 タイムアウトにより(または MessageGroupStoreReaper により)グループが完了すると、グループはデフォルトで期限切れ(完全に削除)になります。遅れて到着するメッセージは新しいグループを開始します。これを false に設定してグループを完成させることができますが、そのメタデータを残しておくと、遅れて到着するメッセージが破棄されます。空のグループは、empty-group-min-timeout 属性と一緒に MessageGroupStoreReaper を使用して、後で期限切れにすることができます。デフォルトは "true" です。
24groupTimeout 内の MessageGroup に新しいメッセージが到着しない場合、MessageGroup を強制的に完了するようにスケジュールする TaskScheduler Bean 参照。指定しない場合、ApplicationContext (ThreadPoolTaskScheduler)に登録されているデフォルトのスケジューラー(taskScheduler)が使用されます。group-timeout または group-timeout-expression が指定されていない場合、この属性は適用されません。
25 バージョン 4.1 以降。これにより、forceComplete 操作のトランザクションを開始できます。これは、group-timeout(-expression) または MessageGroupStoreReaper によって開始され、通常の addreleasediscard 操作には適用されません。このサブエレメントまたは <expire-advice-chain/> のみが許可されます。
26 バージョン 4.1 以降。これにより、forceComplete 操作用に任意の Advice を構成できます。これは、group-timeout(-expression) または MessageGroupStoreReaper によって開始され、通常の addreleasediscard 操作には適用されません。このサブエレメントまたは <expire-transactional/> のみが許可されます。トランザクション Advice は、Spring tx 名前空間を使用してここで構成することもできます。
期限切れのグループ

期限切れ(完全に削除)のグループに関連する 2 つの属性があります。グループの有効期限が切れると、そのレコードはありません。また、同じ相関で新しいメッセージが到着すると、新しいグループが開始されます。グループが(有効期限なしで)完了すると、空のグループが残り、遅れて到着したメッセージは破棄されます。空のグループは、MessageGroupStoreReaper を empty-group-min-timeout 属性と組み合わせて使用することにより、後で削除できます。

expire-groups-upon-completion は、ReleaseStrategy がグループを解放するときの「正常な」完了に関連しています。デフォルトは false です。

グループが正常に完了せず、タイムアウトのために解放または破棄された場合、グループは通常有効期限が切れます。バージョン 4.1 以降、expire-groups-upon-timeout を使用してこの動作を制御できます。下位互換性のため、デフォルトでは true になっています。

グループがタイムアウトすると、ReleaseStrategy にはグループを解放するもう 1 つの機会が与えられます。expire-groups-upon-timeout が false である場合、有効期限は expire-groups-upon-completion によって制御されます。タイムアウト時にグループがリリース戦略によってリリースされない場合、有効期限は expire-groups-upon-timeout によって制御されます。タイムアウトしたグループは破棄されるか、部分的なリリースが発生します(send-partial-result-on-expiry に基づく)。

バージョン 5.0 以降、空のグループも empty-group-min-timeout の後に削除される予定です。expireGroupsUponCompletion == false および minimumTimeoutForEmptyGroups > 0 の場合、グループを削除するタスクは、通常または部分シーケンスのリリースが発生したときにスケジュールされます。

バージョン 5.4 以降、アグリゲーター(およびリシーケンサー)は、孤立したグループ(他の方法では解放されない可能性のある永続メッセージストア内のグループ)を期限切れにするように構成できます。expireTimeout (0 より大きい場合)は、ストア内のこの値より古いグループをパージする必要があることを示します。purgeOrphanedGroups() メソッドは起動時に呼び出され、提供された expireDuration とともに、スケジュールされたタスク内で定期的に呼び出されます。このメソッドは、いつでも外部から呼び出すことができます。有効期限ロジックは、上記の提供された有効期限オプションに従って、forceComplete(MessageGroup) 機能に完全に委譲されます。このような定期的なパージ機能は、定期的なメッセージ到着ロジックでこれ以上解放されない古いグループからメッセージストアをクリーンアップする必要がある場合に役立ちます。ほとんどの場合、これは、永続的なメッセージグループストアを使用している場合、アプリケーションの再起動後に発生します。機能は、スケジュールされたタスクを備えた MessageGroupStoreReaper に似ていますが、リーパーの代わりにグループタイムアウトを使用する場合に、特定のコンポーネント内の古いグループを処理する便利な方法を提供します。MessageGroupStore は、現在の相関エンドポイント専用に提供する必要があります。そうしないと、あるアグリゲーターが別のアグリゲーターからグループをパージする可能性があります。アグリゲーターを使用すると、この手法を使用して期限切れになったグループは、expireGroupsUponCompletion プロパティに応じて、破棄されるか、部分的なグループとして解放されます。

通常、カスタムアグリゲーターハンドラー実装が他の <aggregator> 定義で参照される可能性がある場合は、ref 属性を使用することをお勧めします。ただし、カスタムアグリゲーターの実装が <aggregator> の単一の定義でのみ使用されている場合、次の例に示すように、<aggregator> エレメント内の集約 POJO を構成するために、内部 Bean 定義(バージョン 1.0.3 以降)を使用できます。

<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.PojoAggregator"/>
</aggregator>
同じ <aggregator> 構成で ref 属性と内部 Bean 定義の両方を使用することは、あいまいな条件を作成するため許可されません。そのような場合、例外がスローされます。

次の例は、アグリゲーター Bean の実装を示しています。

public class PojoAggregator {

  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }
}

前述の例の完了戦略 Bean の実装は、次のとおりです。

public class PojoReleaseStrategy {
...
  public boolean canRelease(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}
意味がある場合はいつでも、リリース戦略メソッドとアグリゲーターメソッドを単一の Bean に組み合わせることができます。

上記の例の相関戦略 Bean の実装は、次のようになります。

public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}

前の例のアグリゲーターは、ある基準(この場合は 10 で割った後の残り)によって番号をグループ化し、ペイロードによって提供される番号の合計が特定の値を超えるまでグループを保持します。

そうすることが理にかなっている場合はいつでも、リリース戦略メソッド、相関戦略メソッド、アグリゲーターメソッドを単一の Bean に組み合わせることができます。(実際、それらのすべてまたは任意の 2 つを組み合わせることができます。)
アグリゲーターと Spring 式言語 (SpEL)

Spring Integration 2.0 以降、さまざまな戦略(相関、リリース、集約)を SpEL で処理できるようになりました。これは、そのようなリリース戦略の背後にあるロジックが比較的単純な場合に推奨されます。オブジェクトの配列を受け取るように設計されたレガシーコンポーネントがあるとします。デフォルトのリリース戦略では、List 内のすべての集約メッセージが組み立てられることがわかっています。ここで 2 つの問題があります。まず、リストから個々のメッセージを抽出する必要があります。次に、各メッセージのペイロードを抽出し、オブジェクトの配列を組み立てる必要があります。次の例は、両方の問題を解決します。

public String[] processRelease(List<Message<String>> messages){
    List<String> stringList = new ArrayList<String>();
    for (Message<String> message : messages) {
        stringList.add(message.getPayload());
    }
    return stringList.toArray(new String[]{});
}

ただし、SpEL では、このような要件は実際には 1 行の式で比較的簡単に処理できるため、カスタムクラスを記述して Bean として構成する必要はありません。次の例は、その方法を示しています。

<int:aggregator input-channel="aggChannel"
    output-channel="replyChannel"
    expression="#this.![payload].toArray()"/>

前述の構成では、コレクション射影式を使用して、リスト内のすべてのメッセージのペイロードから新しいコレクションを組み立て、それを配列に変換します。これにより、以前の Java コードと同じ結果が得られます。

カスタムリリースおよび相関戦略を扱うときに、同じ式ベースのアプローチを適用できます。

次の例に示すように、correlation-strategy 属性でカスタム CorrelationStrategy の Bean を定義する代わりに、単純な相関ロジックを SpEL 式として実装し、correlation-strategy-expression 属性で構成できます。

correlation-strategy-expression="payload.person.id"

前述の例では、ペイロードに person 属性と id があり、メッセージの関連付けに使用されると想定しています。

同様に、ReleaseStrategy の場合、リリースロジックを SpEL 式として実装し、release-strategy-expression 属性で構成できます。評価コンテキストのルートオブジェクトは MessageGroup 自体です。メッセージの List は、式内のグループの message プロパティを使用して参照できます。

前の例が示すように、バージョン 5.0 より前のリリースでは、ルートオブジェクトは Message<?> のコレクションでした。
release-strategy-expression="!messages.?[payload==5].empty"

上記の例では、SpEL 評価コンテキストのルートオブジェクトは MessageGroup 自体であり、このグループに 5 のペイロードを持つメッセージがあるとすぐに、グループを解放する必要があると述べています。

アグリゲーターとグループのタイムアウト

バージョン 4.0 以降、相互に排他的な 2 つの新しい属性 group-timeout と group-timeout-expression が導入されました。XML を使用したアグリゲーターの構成を参照してください。場合によっては、現在のメッセージが到着したときに ReleaseStrategy が解放されない場合、タイムアウト後にアグリゲーターの結果を出力する(またはグループを破棄する)必要があります。この目的のために、次の例に示すように、groupTimeout オプションを使用すると、MessageGroup のスケジューリングを強制的に完了することができます。

<aggregator input-channel="input" output-channel="output"
        send-partial-result-on-expiry="true"
        group-timeout-expression="size() ge 2 ? 10000 : -1"
        release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>

この例では、アグリゲーターが release-strategy-expression で定義された最後のメッセージを順番に受信した場合、通常のリリースが可能です。その特定のメッセージが到着しない場合、groupTimeout は、グループに少なくとも 2 つのメッセージが含まれている限り、10 秒後にグループを強制的に完了させます。

グループを強制的に完了する結果は、ReleaseStrategy と send-partial-result-on-expiry に依存します。まず、通常のリリースが行われるかどうかを確認するために、リリース戦略が再度参照されます。グループは変更されていませんが、ReleaseStrategy はこの時点でグループをリリースすることを決定できます。それでもリリース戦略がグループをリリースしない場合、期限切れになります。send-partial-result-on-expiry が true の場合、(部分的な) MessageGroup 内の既存のメッセージは、output-channel への通常のアグリゲーター応答メッセージとしてリリースされます。それ以外の場合は、破棄されます。

groupTimeout の動作と MessageGroupStoreReaper には違いがあります(XML を使用したアグリゲーターの構成を参照)。リーパーは、MessageGroupStore 内のすべての MessageGroup に対して定期的に強制完了を開始します。groupTimeout は、groupTimeout の間に新しいメッセージが到着しない場合、各 MessageGroup に対して個別にそれを行います。また、リーパーを使用して空のグループを削除できます(expire-groups-upon-completion が false の場合、遅延メッセージを破棄するために空のグループが保持されます)。

バージョン 5.5 以降、groupTimeoutExpression は java.util.Date インスタンスに対して評価できます。これは、groupTimeoutExpression が long に評価されるときに計算されるため、現在のメッセージ到着ではなく、グループ作成時間(MessageGroup.getTimestamp())に基づいてスケジュールされたタスクの瞬間を決定する場合に役立ちます。

group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"
アノテーション付きのアグリゲーターの構成

次の例は、アノテーションを使用して構成されたアグリゲーターを示しています。

public class Waiter {
  ...

  @Aggregator  (1)
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }

  @ReleaseStrategy  (2)
  public boolean releaseChecker(List<Message<?>> messages) {
    ...
  }

  @CorrelationStrategy  (3)
  public String correlateBy(OrderItem item) {
    ...
  }
}
1 このメソッドをアグリゲーターとして使用する必要があることを示すアノテーション。このクラスをアグリゲーターとして使用する場合は指定する必要があります。
2 このメソッドがアグリゲーターのリリース戦略として使用されることを示すアノテーション。どのメソッドにも存在しない場合、アグリゲーターは SimpleSequenceSizeReleaseStrategy を使用します。
3 このメソッドをアグリゲーターの相関戦略として使用する必要があることを示すアノテーション。相関戦略が示されていない場合、アグリゲーターは CORRELATION_ID に基づいて HeaderAttributeCorrelationStrategy を使用します。

XML 要素によって提供されるすべての構成オプションは、@Aggregator アノテーションでも使用できます。

アグリゲーターは、XML から明示的に参照するか、クラスで @MessageEndpoint が定義されている場合、クラスパススキャンによって自動的に検出されます。

Aggregator コンポーネントのアノテーション構成(@Aggregator など)は、ほとんどのデフォルトオプションで十分な単純なユースケースのみを対象としています。アノテーション構成を使用するときにこれらのオプションをさらに制御する必要がある場合は、次の例に示すように、AggregatingMessageHandler の @Bean 定義の使用を検討し、@Bean メソッドを @ServiceActivator でマークします。

@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
     AggregatingMessageHandler aggregator =
                       new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                                                 jdbcMessageGroupStore);
     aggregator.setOutputChannel(resultsChannel());
     aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
     aggregator.setTaskScheduler(this.taskScheduler);
     return aggregator;
}

詳細については、プログラミングモデルおよび @Bean メソッドのアノテーションを参照してください。

バージョン 4.2 以降、AggregatorFactoryBean は、AggregatingMessageHandler の Java 構成を簡素化するために使用可能です。

アグリゲーターでの状態の管理: MessageGroupStore

アグリゲーター(および Spring Integration のその他のパターン)は、すべて同じ相関キーを持つ一定期間に到着したメッセージのグループに基づいて決定を下す必要があるステートフルパターンです。ステートフルパターン(ReleaseStrategy など)のインターフェースの設計は、コンポーネント(フレームワークまたはユーザーによって定義されているかどうか)がステートレスのままである必要があるという原則に基づいています。すべての状態は MessageGroup によって運ばれ、その管理は MessageGroupStore に委譲されます。MessageGroupStore インターフェースは次のように定義されます。

public interface MessageGroupStore {

    int getMessageCountForAllMessageGroups();

    int getMarkedMessageCountForAllMessageGroups();

    int getMessageGroupCount();

    MessageGroup getMessageGroup(Object groupId);

    MessageGroup addMessageToGroup(Object groupId, Message<?> message);

    MessageGroup markMessageGroup(MessageGroup group);

    MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);

    MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);

    void removeMessageGroup(Object groupId);

    void registerMessageGroupExpiryCallback(MessageGroupCallback callback);

    int expireMessageGroups(long timeout);
}

詳しくは、Javadoc を参照してください。

MessageGroupStore は、リリース戦略がトリガーされるのを待っている間、MessageGroups に状態情報を蓄積しますが、そのイベントは発生しません。そのため、古いメッセージが残るのを防ぎ、揮発性ストアがアプリケーションのシャットダウン時にクリーンアップのフックを提供するために、MessageGroupStore では、MessageGroups が期限切れになったときに適用するコールバックを登録できます。次のように、インターフェースは非常に簡単です。

public interface MessageGroupCallback {

    void execute(MessageGroupStore messageGroupStore, MessageGroup group);

}

コールバックはストアとメッセージグループに直接アクセスできるため、永続的な状態を管理できます(たとえば、グループをストアから完全に削除するなど)。

MessageGroupStore は、これらのコールバックのリストを維持します。これは、タイムスタンプがパラメーターとして指定された時刻よりも前のすべてのメッセージにオンデマンドで適用されます(前述の registerMessageGroupExpiryCallback(..) および expireMessageGroups(..) メソッドを参照)。

expireMessageGroups 機能に依存する場合、異なるアグリゲーターコンポーネントで同じ MessageGroupStore インスタンスを使用しないことが重要です。すべての AbstractCorrelatingMessageHandler は、forceComplete() コールバックに基づいて独自の MessageGroupCallback を登録します。この方法では、有効期限の各グループが間違ったアグリゲーターによって完了または破棄される可能性があります。バージョン 5.0.10 から、MessageGroupStore の登録コールバックに UniqueExpiryCallback が AbstractCorrelatingMessageHandler から使用されます。MessageGroupStore は、このクラスのインスタンスが存在するかどうかを確認し、コールバックセットにすでに存在する場合は適切なメッセージでエラーを記録します。このように、フレームワークは、異なるアグリゲーター / リシーケンサーでの MessageGroupStore インスタンスの使用を禁止して、特定の相関ハンドラーによって作成されていないグループの期限切れの前述の副作用を回避します。

タイムアウト値を指定して expireMessageGroups メソッドを呼び出すことができます。現在の時刻からこの値を引いたものより古いメッセージは期限切れになり、コールバックが適用されます。メッセージグループの「有効期限」の意味を定義するのはストアのユーザーです。

次の例に示すように、ユーザーの利便性のために、Spring Integration は MessageGroupStoreReaper の形式でメッセージの有効期限のラッパーを提供します。

<bean id="reaper" class="org...MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="30000"/>
</bean>

<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>

リーパーは Runnable です。上記の例では、メッセージグループストアの expire メソッドは 10 秒ごとに呼び出されます。タイムアウト自体は 30 秒です。

MessageGroupStoreReaper の "timeout" プロパティは概算値であり、タスクスケジューラのレートの影響を受けることを理解することが重要です。このプロパティは、次にスケジュールされた MessageGroupStoreReaper タスクの実行時にのみチェックされるためです。例: タイムアウトが 10 分間に設定されているが、MessageGroupStoreReaper タスクが 1 時間ごとに実行されるようにスケジュールされ、MessageGroupStoreReaper タスクの最後の実行がタイムアウトの 1 分前に行われた場合、MessageGroup は次の 59 分間期限切れになりません。レートを少なくともタイムアウトの値以下に設定することをお勧めします。

リーパーに加えて、アプリケーションが AbstractCorrelatingMessageHandler のライフサイクルコールバックを介してシャットダウンすると、有効期限コールバックが呼び出されます。

AbstractCorrelatingMessageHandler は独自の有効期限コールバックを登録します。これは、アグリゲーターの XML 構成内のブールフラグ send-partial-result-on-expiry とのリンクです。フラグが true に設定されている場合、有効期限コールバックが呼び出されると、まだリリースされていないグループ内のマークされていないメッセージを出力チャネルに送信できます。

MessageGroupStoreReaper はスケジュールされたタスクから呼び出され、ダウンストリーム統合フローへのメッセージ(sendPartialResultOnExpiry オプションによって異なります)が生成される可能性があるため、errorChannel を介して例外を処理するために MessagePublishingErrorHandler を含むカスタム TaskScheduler を提供することをお勧めします。通常のアグリゲーターリリース機能で期待される場合があります。同じロジックが、TaskScheduler にも依存するグループタイムアウト機能にも適用されます。詳細については、エラー処理を参照してください。

共有 MessageStore が異なる相関エンドポイントに使用される場合、適切な CorrelationStrategy を構成して、グループ ID の一意性を確保する必要があります。そうしないと、ある相関エンドポイントが他の相関エンドポイントからメッセージを解放または期限切れにしたときに、予期しない動作が発生する可能性があります。同じ相関キーを持つメッセージは、同じメッセージグループに保存されます。

一部の MessageStore 実装では、データを分割することにより、同じ物理リソースを使用できます。例: JdbcMessageStore には region プロパティがあり、MongoDbMessageStore には collectionName プロパティがあります。

MessageStore インターフェースとその実装の詳細については、メッセージストアを参照してください。

Flux Aggregator

バージョン 5.2 では、FluxAggregatorMessageHandler コンポーネントが導入されました。Project Reactor Flux.groupBy() および Flux.window() オペレーターに基づいています。受信メッセージは、このコンポーネントのコンストラクターで Flux.create() によって開始された FluxSink に送信されます。outputChannel が提供されないか、ReactiveStreamsSubscribableChannel のインスタンスではない場合、メイン Flux へのサブスクリプションは Lifecycle.start() 実装から行われます。それ以外の場合は、ReactiveStreamsSubscribableChannel 実装によって行われたサブスクリプションに延期されます。メッセージは、グループキーに CorrelationStrategy を使用して Flux.groupBy() によってグループ化されます。デフォルトでは、メッセージの IntegrationMessageHeaderAccessor.CORRELATION_ID ヘッダーが調べられます。

デフォルトでは、閉じられたウィンドウはすべて、生成するメッセージのペイロードの Flux として解放されます。このメッセージには、ウィンドウの最初のメッセージのすべてのヘッダーが含まれています。出力メッセージペイロードのこの Flux は、ダウンストリームでサブスクライブおよび処理する必要があります。このようなロジックは、FluxAggregatorMessageHandler の setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>) 構成オプションによってカスタマイズ(または置き換え)できます。例: 最終メッセージに List のペイロードを含める場合、次のように Flux.collectList() を構成できます。

fluxAggregatorMessageHandler.setCombineFunction(
                (messageFlux) ->
                        messageFlux
                                .map(Message::getPayload)
                                .collectList()
                                .map(GenericMessage::new));

FluxAggregatorMessageHandler には、適切なウィンドウ戦略を選択するためのいくつかのオプションがあります。

  • setBoundaryTrigger(Predicate<Message<?>>) - Flux.windowUntil() オペレーターに伝搬されます。詳細については、JavaDocs を参照してください。他のすべてのウィンドウオプションよりも優先されます。

  • setWindowSize(int) および setWindowSizeFunction(Function<Message<?>, Integer>) - Flux.window(int) または windowTimeout(int, Duration) に伝搬されます。デフォルトでは、ウィンドウサイズはグループの最初のメッセージとその IntegrationMessageHeaderAccessor.SEQUENCE_SIZE ヘッダーから計算されます。

  • setWindowTimespan(Duration) - ウィンドウサイズの構成に応じて、Flux.window(Duration) または windowTimeout(int, Duration) に伝達されます。

  • setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>) - 公開されたオプションでカバーされていないカスタムウィンドウ操作に対して、グループ化されたフラックスに変換を適用する関数。

このコンポーネントは MessageHandler 実装であるため、@ServiceActivator メッセージングアノテーションとともに @Bean 定義として単純に使用できます。Java DSL を使用すると、.handle() EIP メソッドから使用できます。以下のサンプルは、実行時に IntegrationFlow を登録する方法と、FluxAggregatorMessageHandler をアップストリームのスプリッターと相関させる方法を示しています。

IntegrationFlow fluxFlow =
        (flow) -> flow
                .split()
                .channel(MessageChannels.flux())
                .handle(new FluxAggregatorMessageHandler());

IntegrationFlowContext.IntegrationFlowRegistration registration =
        this.integrationFlowContext.registration(fluxFlow)
                .register();

@SuppressWarnings("unchecked")
Flux<Message<?>> window =
        registration.getMessagingTemplate()
                .convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);

メッセージグループの条件

バージョン 5.5 以降、AbstractCorrelatingMessageHandler (Java および XML DSL を含む)は、BiFunction<Message<?>, String, String> 実装の groupConditionSupplier オプションを公開します。この関数は、グループに追加された各メッセージで使用され、結果条件文は、将来の検討のためにグループに保存されます。ReleaseStrategy は、グループ内のすべてのメッセージを繰り返す代わりに、この条件を参照する場合があります。詳細については、GroupConditionProvider JavaDocs およびメッセージグループの状態を参照してください。

ファイルアグリゲーターも参照してください。