アグリゲーターとリシーケンサー

Aggregator は概念的に Splitter の反対です。個々のメッセージのシーケンスを単一のメッセージに集約し、必然的に複雑になります。デフォルトでは、アグリゲーターは受信メッセージからのペイロードのコレクションを含むメッセージを返します。同じルールが Resequencer に適用されます。次の例は、スプリッターアグリゲーターパターンの標準的な例を示しています。

@Bean
public IntegrationFlow splitAggregateFlow() {
    return IntegrationFlow.from("splitAggregateInput")
            .split()
            .channel(MessageChannels.executor(this.taskExecutor()))
            .resequence()
            .aggregate()
            .get();
}

split() メソッドは、リストを個々のメッセージに分割し、ExecutorChannel に送信します。resequence() メソッドは、メッセージヘッダーにあるシーケンスの詳細でメッセージを並べ替えます。aggregate() メソッドは、これらのメッセージを収集します。

ただし、リリース戦略と相関戦略などを指定することにより、デフォルトの動作を変更できます。次の例を考えてみましょう。

.aggregate(a ->
        a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
            .releaseStrategy(g -> g.size() > 10)
            .messageStore(messageStore()))

上記の例は、myCorrelationKey ヘッダーを持つメッセージを関連付け、少なくとも 10 が蓄積されるとメッセージを解放します。

同様のラムダ構成が resequence() EIP メソッドに提供されています。