ファイル分割

FileSplitter はバージョン 4.1.2 で追加され、その名前空間サポートはバージョン 4.2 で追加されました。FileSplitter は、BufferedReader.readLine() に基づいてテキストファイルを個々の行に分割します。デフォルトでは、スプリッターは Iterator を使用して、ファイルから読み取られる行を一度に 1 行ずつ出力します。iterator プロパティを false に設定すると、メッセージとして送信する前にすべての行がメモリに読み込まれます。この使用例の 1 つは、行を含むメッセージを送信する前にファイルの I/O エラーを検出する場合です。ただし、比較的短いファイルに対してのみ実用的です。

受信ペイロードは、FileString (File パス)、InputStreamReader です。他のペイロード型は変更されずに発行されます。

次のリストは、FileSplitter を構成するための可能な方法を示しています。

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 スプリッターの Bean 名。
2true (デフォルト)に設定すると、イテレーターまたは false を使用して、行を送信する前にファイルをメモリにロードします。
3true に設定すると、ファイルデータの前後にファイル開始マーカーおよびファイル終了マーカーのメッセージが出力されます。マーカーは、FileSplitter.FileMarker ペイロードを持つメッセージです(mark プロパティに START および END 値が含まれます)。一部の行がフィルタリングされるダウンストリームフローでファイルを順次処理するときに、マーカーを使用できます。これにより、ファイルが完全に処理されたことをダウンストリーム処理で知ることができます。さらに、START または END を含む file_marker ヘッダーがこれらのメッセージに追加されます。END マーカーには行カウントが含まれます。ファイルが空の場合、START および END マーカーのみが lineCount として 0 で発行されます。デフォルトは false です。true の場合、apply-sequence はデフォルトで false です。markers-json (次の属性)も参照してください。
4markers が true の場合、これを true に設定して、FileMarker オブジェクトを JSON 文字列に変換します。(下に SimpleJsonSerializer を使用)。
5sequenceSize および sequenceNumber ヘッダーをメッセージに含めることを無効にするには、false に設定します。markers が true でない限り、デフォルトは true です。true および markers が true の場合、マーカーはシーケンスに含まれます。true および iterator が true の場合、サイズが不明であるため、sequenceSize ヘッダーは 0 に設定されます。
6true に設定すると、ファイルに行がない場合に RequiresReplyException がスローされます。デフォルトは false です。
7 テキストデータを String ペイロードに読み込むときに使用する文字セット名を設定します。デフォルトはプラットフォームの文字セットです。
8 残りの行に対して発行されるメッセージのヘッダーとして搬送される最初の行のヘッダー名。バージョン 5.0 以降。
9 メッセージをスプリッターに送信するために使用される入力チャネルを設定します。
10 メッセージの送信先の出力チャネルを設定します。
11 送信タイムアウトを設定します。output-channel がブロックできる場合(フル QueueChannel など)にのみ適用されます。
12false に設定すると、コンテキストがリフレッシュされたときにスプリッターが自動的に開始されなくなります。デフォルトは true です。
13input-channel が <publish-subscribe-channel/> である場合、このエンドポイントの順序を設定します。
14 スプリッターの起動フェーズを設定します(auto-startup が true の場合に使用)。

FileSplitter はまた、テキストベースの InputStream を行に分割します。バージョン 4.3 以降、FTP または SFTP ストリーミング受信チャネルアダプター、または stream オプションを使用してファイルを取得する FTP または SFTP 送信ゲートウェイと併用すると、スプリッターはファイルが完全に終了したときにストリームをサポートするセッションを自動的に閉じます。消費これらの機能の詳細については、FTP ストリーミング受信チャネルアダプターおよび SFTP ストリーミング受信チャネルアダプターならびに FTP 送信ゲートウェイおよび SFTP 送信ゲートウェイを参照してください。

Java 構成を使用する場合、次の例に示すように、追加のコンストラクターを使用できます。

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

markersJson が true の場合、マーカーは JSON 文字列として表されます(SimpleJsonSerializer を使用)。

バージョン 5.0 では、コンテンツの最初の行がヘッダー(CSV ファイルの列名など)であることを指定する firstLineAsHeader オプションが導入されました。このプロパティに渡される引数は、最初の行が残りの行に対して発行されるメッセージのヘッダーとして運ばれるヘッダー名です。この行は、シーケンスヘッダー(applySequence が true の場合)にも、FileMarker.END に関連付けられた lineCount にも含まれていません。注: バージョン 5.5 以降、FileMarker は JSON に直列化できるため、lineCount` も FileHeaders.LINE_COUNT として FileMarker.END メッセージのヘッダーに含まれています。ファイルにヘッダー行のみが含まれている場合、ファイルは空として扱われるため、分割中に FileMarker インスタンスのみが発行されます(マーカーが有効になっている場合、それ以外の場合、メッセージは発行されません)。デフォルトでは(ヘッダー名が設定されていない場合)、最初の行はデータと見なされ、最初に発行されたメッセージのペイロードになります。

ファイルの内容 (最初の行ではない、行の内容全体ではない、特定の 1 つのヘッダーではないなど) からのヘッダー抽出に関してより複雑なロジックが必要な場合は、FileSplitter の前にヘッダーエンリッチャーを使用することを検討してください。ヘッダーに移動された行は、通常のコンテンツプロセスの下流でフィルターされる可能性があることに注意してください。

分割ファイルを処理するべき等べき下流

apply-sequence が true の場合、スプリッターは SEQUENCE_NUMBER ヘッダーに行番号を追加します(markers が true の場合、マーカーは行としてカウントされます)。行番号をべき等レシーバーで使用すると、再起動後の行の再処理を回避できます。

例:

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}