ファイルアグリゲーター

バージョン 5.5 以降、START/END マーカーが有効になっている場合に、FileSplitter ユースケースの反対側をカバーするために FileAggregator が導入されました。便宜上、FileAggregator は 3 つのシーケンス詳細戦略すべてを実装しています。

  • FileHeaders.FILENAME 属性を持つ HeaderAttributeCorrelationStrategy は、相関キーの計算に使用されます。FileSplitter でマーカーが有効になっている場合、START/END マーカーメッセージもシーケンスサイズに含まれるため、シーケンス詳細ヘッダーには入力されません。FileHeaders.FILENAME は、START/END マーカーメッセージを含め、発行された各行に引き続き入力されます。

  • FileMarkerReleaseStrategy - グループ内の FileSplitter.FileMarker.Mark.END メッセージをチェックしてから、FileHeaders.LINE_COUNT ヘッダー値をグループサイズから 2 - FileSplitter.FileMarker インスタンスを引いた値と比較します。また、AbstractCorrelatingMessageHandler で使用される conditionSupplier 機能用の便利な GroupConditionProvider 接点も実装しています。詳細については、メッセージグループの状態を参照してください。

  • FileAggregatingMessageGroupProcessor は、グループから FileSplitter.FileMarker メッセージを削除し、残りのメッセージをリストペイロードに収集して生成します。

次のリストは、FileAggregator を構成するための可能な方法を示しています。

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
    return f -> f
            .split(Files.splitter()
                    .markers()
                    .firstLineAsHeader("firstLine"))
            .channel(c -> c.executor(taskExecutor))
            .filter(payload -> !(payload instanceof FileSplitter.FileMarker),
                    e -> e.discardChannel("aggregatorChannel"))
            .<String, String>transform(String::toUpperCase)
            .channel("aggregatorChannel")
            .aggregate(new FileAggregator())
            .channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
    integrationFlow {
        split(Files.splitter().markers().firstLineAsHeader("firstLine"))
        channel { executor(taskExecutor) }
        filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
        transform(String::toUpperCase)
        channel("aggregatorChannel")
        aggregate(FileAggregator())
        channel { queue("resultChannel") }
    }
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setProcessorBean(new FileAggregator());
    aggregator.setOutputChannel(outputChannel);
    return aggregator;
}
<int:chain input-channel="input" output-channel="output">
    <int-file:splitter markers="true"/>
    <int:aggregator>
        <bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
    </int:aggregator>
</int:chain>

FileAggregator のデフォルトの動作がターゲットロジックを満たさない場合は、個々の戦略でアグリゲーターエンドポイントを構成することをお勧めします。詳細については、FileAggregator JavaDocs を参照してください。