アグリゲーターとリシーケンサー
Aggregator
は概念的に Splitter
の反対です。個々のメッセージのシーケンスを単一のメッセージに集約し、必然的に複雑になります。デフォルトでは、アグリゲーターは受信メッセージからのペイロードのコレクションを含むメッセージを返します。同じルールが Resequencer
に適用されます。次の例は、スプリッターアグリゲーターパターンの標準的な例を示しています。
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
split()
メソッドは、リストを個々のメッセージに分割し、ExecutorChannel
に送信します。resequence()
メソッドは、メッセージヘッダーにあるシーケンスの詳細でメッセージを並べ替えます。aggregate()
メソッドは、これらのメッセージを収集します。
ただし、リリース戦略と相関戦略などを指定することにより、デフォルトの動作を変更できます。次の例を考えてみましょう。
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
上記の例は、myCorrelationKey
ヘッダーを持つメッセージを関連付け、少なくとも 10 が蓄積されるとメッセージを解放します。
同様のラムダ構成が resequence()
EIP メソッドに提供されています。