ファイルサポート

Spring Integration のファイルサポートは、Spring Integration コアを継承し、ファイルの読み取り、書き込み、変換を処理する専用の語彙を提供します。

この依存関係をプロジェクトに含める必要があります。

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>5.5.16</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-file:5.5.16"

ファイル専用のチャネルアダプターを定義する要素を有効にする名前空間と、ファイルの内容を文字列またはバイト配列に読み込むことができるトランスフォーマーのサポートを提供します。

このセクションでは、FileReadingMessageSource および FileWritingMessageHandler の動作と、Bean として構成する方法について説明します。また、Transformer のファイル固有の実装を介したファイル処理のサポートについても説明します。最後に、ファイル固有の名前空間について説明します。

ファイルを読む

FileReadingMessageSource を使用して、ファイルシステムからファイルを消費できます。これは、ファイルシステムディレクトリからメッセージを作成する MessageSource の実装です。次の例は、FileReadingMessageSource を構成する方法を示しています。

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:directory="${input.directory}"/>

特定のファイルのメッセージを作成しないようにするには、FileListFilter を指定できます。デフォルトでは、次のフィルターを使用します。

  • IgnoreHiddenFileListFilter

  • AcceptOnceFileListFilter

IgnoreHiddenFileListFilter は、隠しファイルが処理されないようにします。hidden の正確な定義はシステムに依存することに注意してください。例: UNIX ベースのシステムでは、ピリオド文字で始まるファイルは非表示と見なされます。一方、Microsoft Windows には、隠しファイルを示す専用のファイル属性があります。

バージョン 4.2 は IgnoreHiddenFileListFilter を導入しました。以前のバージョンでは、隠しファイルが含まれていました。デフォルト構成では、IgnoreHiddenFileListFilter が最初にトリガーされ、次に AcceptOnceFileListFilter がトリガーされます。

AcceptOnceFileListFilter は、ファイルがディレクトリから 1 回だけピックアップされるようにします。

AcceptOnceFileListFilter はその状態をメモリに保存します。システムの再起動後も状態を維持したい場合は、FileSystemPersistentAcceptOnceFileListFilter を使用できます。このフィルターは、受け入れられたファイル名を MetadataStore 実装に保管します(メタデータストアを参照)。このフィルターは、ファイル名と変更時刻で一致します。

バージョン 4.0 以降、このフィルターには ConcurrentMetadataStore が必要です。共有データストア(Redis と RedisMetadataStore など)で使用すると、フィルターキーを複数のアプリケーションインスタンス間または複数のサーバーで使用されているネットワークファイル共有間で共有できます。

バージョン 4.1.5 以降、このフィルターには新しいプロパティ(flushOnUpdate)があり、更新ごとにメタデータストアをフラッシュします(ストアが Flushable を実装している場合)。

永続ファイルリストフィルターにブールプロパティ forRecursion が追加されました。このプロパティを true に設定すると、alwaysAcceptDirectories も設定されます。これは、送信ゲートウェイ(ls および mget)での再帰操作が常にディレクトリツリー全体を毎回トラバースすることを意味します。これは、ディレクトリツリーの奥深くで変更が検出されなかった問題を解決するためです。さらに、forRecursion=true により、ファイルへのフルパスがメタデータストアキーとして使用されます。これにより、同じ名前のファイルが異なるディレクトリに複数回表示された場合にフィルターが正しく機能しなかった問題が解決されます。重要: これは、永続メタデータストア内の既存のキーが、最上位ディレクトリにあるファイルで見つからないことを意味します。このため、プロパティはデフォルトで false です。これは将来のリリースで変更される可能性があります。

次の例では、フィルターを使用して FileReadingMessageSource を構成します。

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="customFilterBean"/>

ファイルの読み取りに関する一般的な問題は、準備が整う前にファイルが検出される可能性があることです(つまり、他のプロセスがまだファイルを書き込んでいる可能性があります)。デフォルトの AcceptOnceFileListFilter はこれを妨げません。ほとんどの場合、ファイル書き込みプロセスが読み取りの準備ができるとすぐに各ファイルの名前を変更すると、これを防ぐことができます。デフォルトの AcceptOnceFileListFilter で構成された(おそらく既知のサフィックスに基づいた)準備ができているファイルのみを受け入れる filename-pattern または filename-regex フィルターは、この状況に対応します。次の例に示すように、CompositeFileListFilter は構成を有効にします。

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="compositeFilter"/>

<bean id="compositeFilter"
    class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
            <bean class="o.s.i.file.filters.RegexPatternFileListFilter">
                <constructor-arg value="^test.*$"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

一時的な名前でファイルを作成し、最終的な名前に名前を変更することができない場合、Spring Integration は別の代替手段を提供します。バージョン 4.2 は LastModifiedFileListFilter を追加しました。このフィルターは、この値より古いファイルのみがフィルターによって渡されるように、age プロパティで構成できます。経過時間のデフォルトは 60 秒ですが、ファイルの早期取得を回避するのに十分な大きさの経過時間を選択する必要があります(ネットワークの不具合などによる)。次の例は、LastModifiedFileListFilter を構成する方法を示しています。

<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
    <property name="age" value="120" />
</bean>

バージョン 4.3.7 から、ChainFileListFilter (CompositeFileListFilter の拡張)が導入され、後続のフィルターが前のフィルターの結果のみを表示するシナリオを可能にしました。(CompositeFileListFilter では、すべてのフィルターがすべてのファイルを表示しますが、すべてのフィルターを通過したファイルのみを通過させます)。新しい動作が必要な場所の例は、LastModifiedFileListFilter と AcceptOnceFileListFilter の組み合わせです。一定の時間が経過するまでファイルを受け入れたくない場合です。CompositeFileListFilter では、AcceptOnceFileListFilter は最初のパスですべてのファイルを見るため、後で他のフィルターがパスするときにファイルを渡しません。CompositeFileListFilter アプローチは、パターンフィルターを、ファイル転送が完了したことを示すセカンダリファイルを探すカスタムフィルターと組み合わせる場合に役立ちます。パターンフィルターはプライマリファイル(something.txt など)のみを渡す場合がありますが、「完了」フィルターは(たとえば) something.done が存在するかどうかを確認する必要があります。

ファイル a.txta.doneb.txt があるとします。

パターンフィルターは a.txt と b.txt のみを通過させますが、「完了」フィルターは 3 つのファイルすべてを認識し、a.txt のみを通過させます。複合フィルターの最終結果は、a.txt のみがリリースされることです。

ChainFileListFilter では、チェーンのいずれかのフィルターが空のリストを返す場合、残りのフィルターは呼び出されません。

バージョン 5.0 は、コンテキスト評価ルートオブジェクトとしてファイルに対して SpEL 式を実行する ExpressionFileListFilter を導入しました。この目的のために、次の例に示すように、ファイル処理用のすべての XML コンポーネント(ローカルおよびリモート)と既存の filter 属性が filter-expression オプションとともに提供されています。

<int-file:inbound-channel-adapter
        directory="${inputdir}"
        filter-expression="name matches '.text'"
        auto-startup="false"/>

バージョン 5.0.5 は、拒否されたファイルに関心がある DiscardAwareFileListFilter 実装を導入しました。この目的のために、そのようなフィルター実装には addDiscardCallback(Consumer<File>) を介してコールバックを提供する必要があります。フレームワークでは、この機能は FileReadingMessageSource.WatchServiceDirectoryScanner から LastModifiedFileListFilter と組み合わせて使用されます。通常の DirectoryScanner とは異なり、WatchService はターゲットファイルシステムのイベントに応じて処理するファイルを提供します。これらのファイルで内部キューをポーリングする瞬間に、LastModifiedFileListFilter は、構成された age に比べて若すぎるため、破棄する場合があります。今後の考慮事項のためにファイルを失います。破棄コールバックフックを使用すると、内部キューにファイルを保持できるため、後続のポーリングで age に対してファイルをチェックできます。CompositeFileListFilter は DiscardAwareFileListFilter も実装し、そのすべての DiscardAwareFileListFilter デリゲートに廃棄コールバックを追加します。

CompositeFileListFilter はすべてのデリゲートに対してファイルを照合するため、同じファイルに対して discardCallback が複数回呼び出される場合があります。

バージョン 5.1 以降、FileReadingMessageSource はディレクトリの存在を確認せず、start() が呼び出されるまで(通常は SourcePollingChannelAdapter をラップすることにより)ディレクトリを作成しません。以前は、テストからディレクトリを参照するとき、または後でアクセス許可が適用されるときに、オペレーティングシステムのアクセス許可エラーを防ぐ簡単な方法はありませんでした。

メッセージヘッダー

バージョン 5.0 以降、FileReadingMessageSource は(ポーリングされた File としての payload に加えて)送信 Message に以下のヘッダーを取り込みます。

  • FileHeaders.FILENAME: 送信するファイルの File.getName()。後続の名前変更またはコピーロジックに使用できます。

  • FileHeaders.ORIGINAL_FILEFile オブジェクト自体。通常、このヘッダーは、元の File オブジェクトを失うと、フレームワークコンポーネント(スプリッタートランスフォーマーなど)によって自動的に入力されます。ただし、他のカスタムユースケースとの一貫性と利便性のために、このヘッダーは元のファイルへのアクセスに役立ちます。

  • FileHeaders.RELATIVE_PATH: スキャンのルートディレクトリに相対的なファイルパスの部分を表すために導入された新しいヘッダー。このヘッダーは、他の場所でソースディレクトリ階層を復元する必要がある場合に役立ちます。この目的のために、このヘッダーを使用するように DefaultFileNameGenerator ( "ファイル名の生成" を参照)を構成できます。

ディレクトリのスキャンとポーリング

FileReadingMessageSource は、ディレクトリからのファイルのメッセージをすぐには生成しません。scanner によって返される「適格ファイル」に内部キューを使用します。scanEachPoll オプションを使用して、内部キューが各ポーリングの最新の入力ディレクトリコンテンツでリフレッシュされるようにします。デフォルト(scanEachPoll = false)では、FileReadingMessageSource はディレクトリを再度スキャンする前にキューを空にします。このデフォルトの動作は、ディレクトリ内の多数のファイルのスキャンを減らすのに特に役立ちます。ただし、カスタムの順序付けが必要な場合は、このフラグを true に設定する効果を考慮することが重要です。ファイルが処理される順序は予想どおりではない場合があります。デフォルトでは、キュー内のファイルは自然な(path)順序で処理されます。スキャンによって追加された新しいファイルは、キューにすでにファイルがある場合でも、適切な位置に挿入され、その自然な順序が維持されます。順序をカスタマイズするために、FileReadingMessageSource は Comparator<File> をコンストラクター引数として受け入れることができます。内部(PriorityBlockingQueue)によって使用され、ビジネス要件に従ってコンテンツを並べ替えます。特定の順序でファイルを処理するには、カスタム DirectoryScanner によって作成されたリストを並べ替えるのではなく、FileReadingMessageSource にコンパレーターを提供する必要があります。

バージョン 5.0 では、ファイルツリー訪問を実行するために RecursiveDirectoryScanner を導入しました。実装は、Files.walk(Path start, int maxDepth, FileVisitOption…​ options) 機能に基づいています。ルートディレクトリ(DirectoryScanner.listFiles(File))引数は結果から除外されます。他のすべてのサブディレクトリの包含および除外は、ターゲット FileListFilter 実装に基づいています。例: SimplePatternFileListFilter はデフォルトでディレクトリを除外します。詳細については、AbstractDirectoryAwareFileListFilter (Javadoc) とその実装を参照してください。

バージョン 5.5 以降、Java DSL の FileInboundChannelAdapterSpec には、デフォルトの FileReadingMessageSource の代わりにターゲット FileReadingMessageSource で RecursiveDirectoryScanner を使用するための便利な recursive(boolean) オプションがあります。

名前空間サポート

ファイル固有の名前空間を使用すると、ファイル読み取りの構成を簡素化できます。そのためには、次のテンプレートを使用します。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-file="http://www.springframework.org/schema/integration/file"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>

この名前空間内で、次のように FileReadingMessageSource を減らして受信チャネルアダプターにラップできます。

<int-file:inbound-channel-adapter id="filesIn1"
    directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>

<int-file:inbound-channel-adapter id="filesIn2"
    directory="file:${input.directory}"
    filter="customFilterBean" />

<int-file:inbound-channel-adapter id="filesIn3"
    directory="file:${input.directory}"
    filename-pattern="test*" />

<int-file:inbound-channel-adapter id="filesIn4"
    directory="file:${input.directory}"
    filename-regex="test[0-9]+\.txt" />

最初のチャネルアダプターの例は、デフォルトの FileListFilter 実装に依存しています。

  • IgnoreHiddenFileListFilter (隠しファイルを処理しません)

  • AcceptOnceFileListFilter (重複を防ぐ)

prevent-duplicates および ignore-hidden 属性はデフォルトで true であるため、省略することもできます。

Spring Integration 4.2 は ignore-hidden 属性を導入しました。以前のバージョンでは、隠しファイルが含まれていました。

2 番目のチャネルアダプターの例はカスタムフィルターを使用し、3 番目は filename-pattern 属性を使用して AntPathMatcher ベースのフィルターを追加し、4 番目は filename-regex 属性を使用して正規表現パターンベースのフィルターを FileReadingMessageSource に追加します。filename-pattern および filename-regex 属性は、それぞれ通常の filter 参照属性と相互に排他的です。ただし、filter 属性を使用して、特定のニーズに合わせて 1 つ以上のパターンベースのフィルターを含む、任意の数のフィルターを組み合わせた CompositeFileListFilter のインスタンスを参照できます。

複数のプロセスが同じディレクトリから読み取る場合、ファイルが同時に取得されないようにファイルをロックすることができます。そのためには、FileLocker を使用できます。java.nio ベースの実装が利用可能ですが、独自のロックスキームを実装することもできます。nio ロッカーは、次のように挿入できます。

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:nio-locker/>
</int-file:inbound-channel-adapter>

次のようにカスタムロッカーを設定できます。

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
ファイル受信アダプターがロッカーで構成されている場合、ファイルの受信を許可する前にロックを取得する責任があります。ファイルのロックを解除する責任は負いません。ファイルを処理し、ロックをぶら下げたままにしておくと、メモリリークが発生します。これが問題になる場合は、適切なタイミングで自分で FileLocker.unlock(File file) を呼び出す必要があります。

ファイルのフィルタリングとロックだけでは不十分な場合は、ファイルを完全にリストする方法を制御する必要があります。この型の要件を実装するには、DirectoryScanner の実装を使用できます。このスキャナーを使用すると、各ポーリングでリストされるファイルを正確に判別できます。これは、Spring Integration が FileListFilter インスタンスと FileLocker を FileReadingMessageSource にワイヤリングするために内部的に使用するインターフェースでもあります。次の例に示すように、scanner 属性の <int-file:inbound-channel-adapter/> にカスタム DirectoryScanner を挿入できます。

<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
     scanner="customDirectoryScanner"/>

これにより、順序付け、リスト、ロックの戦略を自由に選択できます。

また、フィルター(patternsregexprevent-duplicates などを含む)および locker インスタンスが実際に scanner によって使用されることを理解することも重要です。アダプターに設定されたこれらの属性のいずれかは、その後内部 scanner に注入されます。外部 scanner の場合、FileReadingMessageSource ではすべてのフィルターおよびロッカー属性が禁止されています。カスタム DirectoryScanner で指定する必要があります(必要な場合)。つまり、scanner を FileReadingMessageSource に挿入する場合、FileReadingMessageSource ではなく、その scanner で filter および locker を指定する必要があります。

デフォルトでは、DefaultDirectoryScanner は IgnoreHiddenFileListFilter と AcceptOnceFileListFilter を使用します。使用しないようにするには、独自のフィルター(AcceptAllFileListFilter など)を構成するか、null に設定することもできます。

WatchServiceDirectoryScanner

FileReadingMessageSource.WatchServiceDirectoryScanner は、新しいファイルがディレクトリに追加されるとき、ファイルシステムイベントに依存します。初期化中に、ディレクトリはイベントを生成するために登録されます。初期ファイルリストも初期化中に作成されます。ディレクトリツリーをたどっていくと、発生したサブディレクトリも登録され、イベントが生成されます。最初のポーリングでは、ディレクトリ内を移動した最初のファイルリストが返されます。後続のポーリングでは、新規作成イベントからのファイルが返されます。新しいサブディレクトリが追加されると、その作成イベントを使用して新しいサブツリーをたどって既存のファイルを見つけ、見つかった新しいサブディレクトリを登録します。

WatchKey の内部イベント queue が、ディレクトリ変更イベントが発生するほど迅速にプログラムによって排出されない場合、WatchKey に課題があります。キューサイズを超えると、一部のファイルシステムイベントが失われる可能性があることを示す StandardWatchEventKinds.OVERFLOW が発行されます。この場合、ルートディレクトリは完全に再スキャンされます。重複を避けるため、適切な FileListFilter (AcceptOnceFileListFilter など)を使用するか、処理が完了したらファイルを削除することを検討してください。

WatchServiceDirectoryScanner は、scanner オプションと相互に排他的な FileReadingMessageSource.use-watch-service オプションによって有効にできます。提供された directory の内部 FileReadingMessageSource.WatchServiceDirectoryScanner インスタンスが読み込まれます。

さらに、WatchService ポーリングロジックは StandardWatchEventKinds.ENTRY_MODIFY および StandardWatchEventKinds.ENTRY_DELETE を追跡できるようになりました。

既存のファイルと新しいファイルの変更を追跡する必要がある場合は、FileListFilter に ENTRY_MODIFY イベントロジックを実装する必要があります。それ以外の場合、それらのイベントのファイルは同じように扱われます。

ResettableFileListFilter 実装は、ENTRY_DELETE イベントを取得します。その結果、それらのファイルは remove() 操作用に提供されます。このイベントを有効にすると、AcceptOnceFileListFilter などのフィルターによってファイルが削除されます。その結果、同じ名前のファイルが表示された場合、そのファイルはフィルターを通過し、メッセージとして送信されます。

この目的のために、watch-events プロパティ(FileReadingMessageSource.setWatchEvents(WatchEventType…​ watchEvents))が導入されました。(WatchEventType は FileReadingMessageSource のパブリック内部列挙です)このようなオプションを使用すると、新しいファイルに 1 つのダウンストリームフローロジックを使用し、変更されたファイルに他のロジックを使用できます。次の例は、同じディレクトリでイベントを作成および変更するためのさまざまなロジックを構成する方法を示しています。

<int-file:inbound-channel-adapter id="newFiles"
     directory="${input.directory}"
     use-watch-service="true"/>

<int-file:inbound-channel-adapter id="modifiedFiles"
     directory="${input.directory}"
     use-watch-service="true"
     filter="acceptAllFilter"
     watch-events="MODIFY"/> <!-- The default is CREATE. -->

メモリ消費の制限

HeadDirectoryScanner を使用して、メモリに保持されるファイルの数を制限できます。これは、大きなディレクトリをスキャンするときに役立ちます。XML 構成では、受信チャネルアダプターの queue-size プロパティを設定することでこれを有効にします。

バージョン 4.2 より前は、この設定は他のフィルターの使用と互換性がありませんでした。他のフィルター(prevent-duplicates="true" を含む)は、サイズを制限するために使用されるフィルターを上書きしました。

HeadDirectoryScanner の使用は AcceptOnceFileListFilter と互換性がありません。ポーリングの決定中にすべてのフィルターが調べられるため、AcceptOnceFileListFilter は他のフィルターが一時的にファイルをフィルターしている可能性があることを知りません。以前に HeadDirectoryScanner.HeadFilter によってフィルタリングされたファイルが利用可能になった場合でも、AcceptOnceFileListFilter はそれらをフィルタリングします。

通常、この場合は AcceptOnceFileListFilter を使用する代わりに、処理済みのファイルを削除して、以前にフィルタリングされたファイルが今後の投票で利用できるようにする必要があります。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java 構成で送信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
         FileReadingMessageSource source = new FileReadingMessageSource();
         source.setDirectory(new File(INBOUND_PATH));
         source.setFilter(new SimplePatternFileListFilter("*.txt"));
         return source;
    }

    @Bean
    @Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
    public FileToStringTransformer fileToStringTransformer() {
        return new FileToStringTransformer();
    }

}

Java DSL を使用した構成

次の Spring Boot アプリケーションは、Java DSL で送信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileReadingFlow() {
         return IntegrationFlows
                  .from(Files.inboundAdapter(new File(INBOUND_PATH))
                              .patternFilter("*.txt"),
                          e -> e.poller(Pollers.fixedDelay(1000)))
                  .transform(Files.toStringTransformer())
                  .channel("processFileChannel")
                  .get();
    }

}

"tail" ファイル

もう 1 つの一般的な使用例は、ファイルの最後 (または末尾) から「行」を取得し、新しい行が追加されたときにそれをキャプチャーすることです。2 つの実装が提供されています。最初の OSDelegatingFileTailingMessageProducer は、ネイティブの tail コマンドを使用します (コマンドがあるオペレーティングシステムの場合)。これは通常、これらのプラットフォームで最も効率的な実装です。tail コマンドがないオペレーティングシステムの場合、2 番目の実装 ApacheCommonsFileTailingMessageProducer は Apache commons-ioTailer クラスを使用します。

どちらの場合も、利用できないファイルやその他のイベントなどのファイルシステムイベントは、通常の Spring イベント発行メカニズムを使用して ApplicationEvent インスタンスとして発行されます。このようなイベントの例には次のものがあります。

[message=tail: cannot open '/tmp/somefile' for reading:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become inaccessible:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has appeared;
               following end of new file, file=/tmp/somefile]

前の例に示されている一連のイベントは、たとえば、ファイルがローテーションされるときに発生する可能性があります。

バージョン 5.0 以降、idleEventInterval の実行中にファイルにデータがない場合、FileTailingIdleEvent が発行されます。次の例は、そのようなイベントがどのように見えるかを示しています。

[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
tail コマンドをサポートするすべてのプラットフォームがこれらのステータスメッセージを提供するわけではありません。

これらのエンドポイントから発信されるメッセージには、次のヘッダーがあります。

  • FileHeaders.ORIGINAL_FILEFile オブジェクト

  • FileHeaders.FILENAME: ファイル名 (File.getName())

バージョン 5.0 より前のバージョンでは、FileHeaders.FILENAME ヘッダーにはファイルの絶対パスの文字列表現が含まれていました。これで、元のファイルヘッダーで getAbsolutePath() を呼び出して、その文字列表現を取得できます。

次の例では、デフォルトのオプション( "-F -n 0"、現在の末尾からファイル名を追跡することを意味する)でネイティブアダプターを作成します。

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	task-executor="exec"
	file="/tmp/foo"/>

次の例では、"-F -n +0" オプションを使用してネイティブアダプターを作成します(つまり、ファイル名に従い、既存のすべての行を出力します)。

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	native-options="-F -n +0"
	task-executor="exec"
	file-delay=10000
	file="/tmp/foo"/>

tail コマンドが失敗する場合(プラットフォームによっては、-F が指定されている場合でもファイルの欠落により tail が失敗する場合)、コマンドは 10 秒ごとに再試行されます。

デフォルトでは、ネイティブアダプターは標準出力からキャプチャーし、コンテンツをメッセージとして送信します。また、標準エラーからキャプチャーしてイベントを発生させます。バージョン 4.3.6 以降、次の例に示すように、enable-status-reader を false に設定することにより、標準エラーイベントを破棄できます。

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	enable-status-reader="false"
	task-executor="exec"
	file="/tmp/foo"/>

次の例では、IdleEventInterval は 5000 に設定されています。つまり、5 秒間行が書き込まれない場合、FileTailingIdleEvent は 5 秒ごとにトリガーされます。

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	idle-event-interval="5000"
	task-executor="exec"
	file="/tmp/somefile"/>

これは、アダプターを停止する必要がある場合に役立ちます。

次の例では、2 秒ごとにファイルの新しい行を調べ、10 秒ごとに不足しているファイルの存在を確認する Apache commons-ioTailer アダプターを作成します。

<int-file:tail-inbound-channel-adapter id="apache"
	channel="input"
	task-executor="exec"
	file="/tmp/bar"
	delay="2000"
	end="false"             (1)
	reopen="true"           (2)
	file-delay="10000"/>
1 ファイルは、末尾(デフォルト)ではなく先頭(end="false")から末尾に付けられます。
2 ファイルはチャンクごとに再度開かれます(デフォルトではファイルを開いたままにします)。
delayendreopen 属性を指定すると、Apache commons-io アダプターの使用が強制され、native-options 属性は使用できなくなります。

不完全なデータの処理

ファイル転送シナリオの一般的な問題は、転送が完了したことを確認して、不完全なファイルの読み取りを開始しないようにする方法です。この問題を解決する一般的な方法は、一時的な名前でファイルを作成し、アトミックに名前を最終的な名前に変更することです。この技術は、一時ファイルがコンシューマーによって選択されないようにマスクするフィルターとともに、堅牢なソリューションを提供します。この手法は、ファイルを(ローカルまたはリモートで)書き込む Spring Integration コンポーネントによって使用されます。デフォルトでは、ファイル名に .writing を追加し、転送が完了したら削除します。

もう 1 つの一般的な手法は、2 番目の「マーカー」ファイルを作成して、ファイル転送が完了したことを示すことです。このシナリオでは、somefile.txt.complete も存在するまで、somefile.txt (たとえば)が使用可能であると見なすべきではありません。Spring Integration バージョン 5.0 は、このメカニズムをサポートする新しいフィルターを導入しました。ファイルシステム(FileSystemMarkerFilePresentFileListFilter)、FTP および SFTP の実装が提供されています。マーカーファイルには任意の名前を付けることができますが、通常は転送されるファイルに関連しています。詳細については、Javadoc を参照してください。

ファイルを書く

ファイルシステムにメッセージを書き込むには、FileWritingMessageHandler (Javadoc) を使用できます。このクラスは、次のペイロード型を処理できます。

  • File

  • String

  • Byte 配列

  • InputStream ( バージョン 4.2 以降)

String ペイロードの場合、エンコードと文字セットを構成できます。

物事を簡単にするために、XML 名前空間を使用して、FileWritingMessageHandler を送信チャネルアダプターまたは送信ゲートウェイの一部として構成できます。

バージョン 4.3 以降、ファイルの書き込み時に使用するバッファーサイズを指定できます。

バージョン 5.1 以降では、FileExistsMode.APPEND または FileExistsMode.APPEND_NO_FLUSH を使用し、新しいファイルを作成する必要がある場合にトリガーされる BiConsumer<File, Message<?>>newFileCallback を提供できます。このコールバックは、新しく作成されたファイルとそれをトリガーしたメッセージを受け取ります。このコールバックは、たとえば、メッセージヘッダーで定義された CSV ヘッダーを書き込むために使用できます。

ファイル名の生成

最も単純な形式では、FileWritingMessageHandler はファイルを書き込むための宛先ディレクトリのみを必要とします。書き込まれるファイルの名前は、ハンドラーの FileNameGenerator (Javadoc) によって決定されます。デフォルトの実装 (Javadoc) は、FileHeaders.FILENAME (Javadoc) として定義された定数とキーが一致するメッセージヘッダーを探します。

あるいは、メッセージに対して評価される式を指定して、ファイル名を生成できます(例: headers['myCustomHeader'] + '.something')。式は String に評価される必要があります。便宜上、DefaultFileNameGenerator には setHeaderName メソッドも用意されており、ファイル名として使用される値を持つメッセージヘッダーを明示的に指定できます。

一度設定すると、DefaultFileNameGenerator は次の解決手順を使用して、特定のメッセージペイロードのファイル名を決定します。

  1. メッセージに対して式を評価し、結果が空でない String である場合、それをファイル名として使用します。

  2. それ以外の場合、ペイロードが java.io.File の場合、File オブジェクトのファイル名を使用します。

  3. それ以外の場合は、メッセージ ID にを追加して使用します。ファイル名として msg

XML 名前空間のサポートを使用すると、ファイル送信チャネルアダプターとファイル送信ゲートウェイの両方が、以下の相互に排他的な構成属性をサポートします。

  • filename-generator (FileNameGenerator 実装への参照)

  • filename-generator-expression (String に評価される式)

ファイルの書き込み中に、一時ファイルのサフィックスが使用されます(デフォルトは .writing です)。ファイルの書き込み中にファイル名に追加されます。サフィックスをカスタマイズするには、ファイル送信チャネルアダプターとファイル送信ゲートウェイの両方で temporary-file-suffix 属性を設定できます。

APPEND ファイル mode を使用する場合、データはファイルに直接追加されるため、temporary-file-suffix 属性は無視されます。

バージョン 4.2.5 以降、生成されたファイル名(filename-generator または filename-generator-expression の評価の結果)は、ターゲットファイル名とともに子パスを表すことができます。これは、以前と同様に File(File parent, String child) の 2 番目のコンストラクター引数として使用されます。ただし、これまでは、ファイル名のみを想定して、子パス用のディレクトリ(mkdirs())を作成していませんでした。このアプローチは、ソースディレクトリに一致するようにファイルシステムツリーを復元する必要がある場合に役立ちます。たとえば、アーカイブを解凍し、ターゲットディレクトリ内のすべてのファイルを元の順序で保存する場合です。

出力ディレクトリの指定

ファイル送信チャネルアダプターとファイル送信ゲートウェイの両方は、出力ディレクトリを指定するための相互に排他的な 2 つの構成属性を提供します。

  • directory

  • directory-expression

Spring Integration 2.2 は directory-expression 属性を導入しました。
directory 属性の使用

directory 属性を使用すると、出力ディレクトリは固定値に設定されます。これは、FileWritingMessageHandler の初期化時に設定されます。この属性を指定しない場合は、directory-expression 属性を使用する必要があります。

directory-expression 属性の使用

SpEL を完全にサポートしたい場合は、directory-expression 属性を使用できます。この属性は、処理されるメッセージごとに評価される SpEL 式を受け入れます。出力ファイルディレクトリを動的に指定すると、メッセージのペイロードとそのヘッダーにフルアクセスできます。

SpEL 式は、Stringjava.io.Fileorg.springframework.core.io.Resource のいずれかに解決される必要があります。(後者はいずれにしても File に評価されます)さらに、結果の String または File はディレクトリを指している必要があります。directory-expression 属性を指定しない場合、directory 属性を設定する必要があります。

auto-create-directory 属性の使用

デフォルトでは、宛先ディレクトリが存在しない場合、それぞれの宛先ディレクトリと存在しない親ディレクトリが自動的に作成されます。その動作を防ぐために、auto-create-directory 属性を false に設定できます。この属性は、directory 属性と directory-expression 属性の両方に適用されます。

directory 属性を使用していて、auto-create-directory が false である場合、Spring Integration 2.2 以降、次の変更が行われました。

アダプターの初期化時に宛先ディレクトリの存在を確認する代わりに、処理中の各メッセージに対してこの確認が実行されるようになりました。

さらに、auto-create-directory が true であり、ディレクトリがメッセージの処理の間に削除された場合、ディレクトリは処理されるメッセージごとに再作成されます。

既存の宛先ファイルの処理

ファイルを書き込み、宛先ファイルがすでに存在する場合、デフォルトの動作ではそのターゲットファイルが上書きされます。関連するファイル送信コンポーネントの mode 属性を設定することにより、この動作を変更できます。次のオプションがあります。

  • REPLACE (デフォルト)

  • REPLACE_IF_MODIFIED

  • APPEND

  • APPEND_NO_FLUSH

  • FAIL

  • IGNORE

Spring Integration 2.2 は、mode 属性と APPENDFAILIGNORE オプションを導入しました。
REPLACE

ターゲットファイルがすでに存在する場合、上書きされます。mode 属性が指定されていない場合、これはファイルを書き込むときのデフォルトの動作です。

REPLACE_IF_MODIFIED

ターゲットファイルがすでに存在する場合、最後に変更されたタイムスタンプがソースファイルのタイムスタンプと異なる場合にのみ上書きされます。File ペイロードの場合、ペイロード lastModified 時間は既存のファイルと比較されます。他のペイロードの場合、FileHeaders.SET_MODIFIED (file_setModified)ヘッダーが既存のファイルと比較されます。ヘッダーがないか、値が Number でない場合、ファイルは常に置き換えられます。

APPEND

このモードでは、毎回新しいファイルを作成する代わりに、既存のファイルにメッセージコンテンツを追加できます。この属性は temporary-file-suffix 属性と相互に排他的であることに注意してください。これは、既存のファイルにコンテンツを追加するときに、アダプターが一時ファイルを使用しなくなるためです。ファイルは各メッセージの後に閉じられます。

APPEND_NO_FLUSH

このオプションのセマンティクスは APPEND と同じですが、データはフラッシュされず、各メッセージの後にファイルは閉じられません。これにより、障害が発生した場合にデータ損失のリスクを伴う大きなパフォーマンスを提供できます。詳細については、APPEND_NO_FLUSH 使用時のファイルのフラッシュを参照してください。

FAIL

ターゲットファイルが存在する場合、MessageHandlingException (Javadoc) がスローされます。

IGNORE

ターゲットファイルが存在する場合、メッセージペイロードは警告なしに無視されます。

一時ファイルのサフィックスを使用する場合(デフォルトは .writing)、最終ファイル名または一時ファイル名のいずれかが存在する場合、IGNORE オプションが適用されます。

APPEND_NO_FLUSH 使用時のファイルのフラッシュ

APPEND_NO_FLUSH モードは、バージョン 4.3 で追加されました。各メッセージの後にファイルが閉じられないため、これを使用するとパフォーマンスが向上します。ただし、これにより、障害発生時にデータが失われる可能性があります。

Spring Integration は、このデータ損失を軽減するためのいくつかのフラッシュ戦略を提供します。

  • flushInterval を使用します。この期間ファイルが書き込まれない場合、自動的にフラッシュされます。これは概算であり、今回は 1.33x まで可能です(平均 1.167x)。

  • 正規表現を含むメッセージをメッセージハンドラーの trigger メソッドに送信します。パターンに一致する絶対パス名を持つファイルはフラッシュされます。

  • ハンドラーにカスタム MessageFlushPredicate 実装を提供して、メッセージが trigger メソッドに送信されたときに実行されるアクションを変更します。

  • カスタム FileWritingMessageHandler.FlushPredicate または FileWritingMessageHandler.MessageFlushPredicate 実装を渡すことにより、ハンドラーの flushIfNeeded メソッドの 1 つを呼び出します。

述語は、開いているファイルごとに呼び出されます。詳細については、これらのインターフェースの Javadoc を参照してください。バージョン 5.0 以降、述語メソッドは別のパラメーターを提供することに注意してください。現在のファイルが新規または以前に閉じられた場合に最初に書き込まれた時間です。

flushInterval を使用する場合、間隔は最後の書き込みから始まります。ファイルは、その間隔でアイドル状態の場合にのみフラッシュされます。バージョン 4.3.7 から、追加のプロパティ(flushWhenIdle)を false に設定できます。これは、以前にフラッシュされた(または新しい)ファイルへの最初の書き込みで間隔が始まることを意味します。

ファイルのタイムスタンプ

デフォルトでは、宛先ファイルの lastModified タイムスタンプは、ファイルが作成された時刻です(ただし、インプレース名前変更では現在のタイムスタンプが保持されます)。バージョン 4.3 から、preserve-timestamp (または Java 構成を使用する場合は setPreserveTimestamp(true))を構成できるようになりました。File ペイロードの場合、これにより、タイムスタンプが受信ファイルから送信に転送されます(コピーが必要かどうかに関係なく)。他のペイロードでは、FileHeaders.SET_MODIFIED ヘッダー(file_setModified)が存在する場合、ヘッダーが Number である限り、宛先ファイルの lastModified タイムスタンプの設定に使用されます。

ファイル許可

バージョン 5.0 から、Posix 許可をサポートするファイルシステムにファイルを書き込むときに、送信チャネルアダプターまたはゲートウェイでそれらの許可を指定できます。このプロパティは整数であり、通常は使い慣れた 8 進数形式で提供されます。たとえば、0640 は、所有者に読み取り / 書き込み権限があり、グループに読み取り専用権限があり、その他にはアクセス権がないことを意味します。

ファイル送信チャネルアダプター

次の例では、ファイル送信チャネルアダプターを構成します。

<int-file:outbound-channel-adapter id="filesOut" directory="${input.directory.property}"/>

名前空間ベースの構成は、delete-source-files 属性もサポートしています。true に設定すると、宛先への書き込み後に元のソースファイルの削除がトリガーされます。そのフラグのデフォルト値は false です。次の例は、true に設定する方法を示しています。

<int-file:outbound-channel-adapter id="filesOut"
    directory="${output.directory}"
    delete-source-files="true"/>
delete-source-files 属性は、受信メッセージに File ペイロードがある場合、または FileHeaders.ORIGINAL_FILE ヘッダー値にソース File インスタンスまたは元のファイルパスを表す String が含まれている場合にのみ効果があります。

バージョン 4.2 以降、FileWritingMessageHandler は append-new-line オプションをサポートしています。true に設定されている場合、メッセージが書き込まれた後、ファイルに新しい行が追加されます。デフォルトの属性値は false です。次の例は、append-new-line オプションの使用方法を示しています。

<int-file:outbound-channel-adapter id="newlineAdapter"
	append-new-line="true"
    directory="${output.directory}"/>

送信ゲートウェイ

書き込まれたファイルに基づいてメッセージの処理を続行する場合は、代わりに outbound-gateway を使用できます。outbound-channel-adapter と同様のロールを果たします。ただし、ファイルを書き込んだ後、メッセージのペイロードとして応答チャネルに送信します。

次の例では、送信ゲートウェイを構成します。

<int-file:outbound-gateway id="mover" request-channel="moveInput"
    reply-channel="output"
    directory="${output.directory}"
    mode="REPLACE" delete-source-files="true"/>

前述のように、mode 属性を指定することもできます。これは、宛先ファイルがすでに存在する状況に対処する方法の動作を定義します。詳細については、既存の宛先ファイルの処理を参照してください。一般に、ファイル送信ゲートウェイを使用する場合、結果ファイルは応答チャネルのメッセージペイロードとして返されます。

これは、IGNORE モードを指定する場合にも適用されます。その場合、既存の宛先ファイルが返されます。リクエストメッセージのペイロードがファイルの場合、メッセージヘッダーを介して元のファイルにアクセスできます。FileHeaders.ORIGINAL_FILE (Javadoc) を参照してください。

「送信ゲートウェイ」は、最初にファイルを移動してから処理パイプラインを介して送信する場合に適しています。このような場合、ファイル名前空間の inbound-channel-adapter 要素を outbound-gateway に接続してから、そのゲートウェイの reply-channel をパイプラインの先頭に接続できます。

より複雑な要件がある場合、またはファイルコンテンツに変換する入力として追加のペイロード型をサポートする必要がある場合は、FileWritingMessageHandler を継承できますが、はるかに優れたオプションは Transformer に依存することです。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
@IntegrationComponentScan
public class FileWritingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                      new SpringApplicationBuilder(FileWritingJavaApplication.class)
                              .web(false)
                              .run(args);
             MyGateway gateway = context.getBean(MyGateway.class);
             gateway.writeToFile("foo.txt", new File(tmpDir.getRoot(), "fileWritingFlow"), "foo");
    }

    @Bean
    @ServiceActivator(inputChannel = "writeToFileChannel")
    public MessageHandler fileWritingMessageHandler() {
         Expression directoryExpression = new SpelExpressionParser().parseExpression("headers.directory");
         FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
         handler.setFileExistsMode(FileExistsMode.APPEND);
         return handler;
    }

    @MessagingGateway(defaultRequestChannel = "writeToFileChannel")
    public interface MyGateway {

        void writeToFile(@Header(FileHeaders.FILENAME) String fileName,
                       @Header(FileHeaders.FILENAME) File directory, String data);

    }
}

Java DSL を使用した構成

次の Spring Boot アプリケーションは、Java DSL で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class FileWritingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                 new SpringApplicationBuilder(FileWritingJavaApplication.class)
                         .web(false)
                         .run(args);
        MessageChannel fileWritingInput = context.getBean("fileWritingInput", MessageChannel.class);
        fileWritingInput.send(new GenericMessage<>("foo"));
    }

    @Bean
   	public IntegrationFlow fileWritingFlow() {
   	    return IntegrationFlows.from("fileWritingInput")
   		        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.txt")
   		                  .header("directory", new File(tmpDir.getRoot(), "fileWritingFlow")))
   	            .handle(Files.outboundGateway(m -> m.getHeaders().get("directory")))
   	            .channel(MessageChannels.queue("fileWritingResultChannel"))
   	            .get();
    }

}

ファイル Transformers

ファイルシステムから読み取ったデータをオブジェクトに変換したり、その逆を行うには、何らかの作業を行う必要があります。FileReadingMessageSource とは異なり、FileWritingMessageHandler ほどではありませんが、ジョブを完了するためにはおそらく独自のメカニズムが必要です。このために、Transformer インターフェースを実装できます。または、受信メッセージ用に AbstractFilePayloadTransformer を継承できます。Spring Integration は、いくつかの明らかな実装を提供します。

Transformer インターフェースの Javadoc を参照して、どの Spring Integration クラスがそれを実装しているかを確認してください。同様に、AbstractFilePayloadTransformer クラスの Javadoc をチェックして、どの Spring Integration クラスがそれを継承しているかを確認できます。

FileToByteArrayTransformer は AbstractFilePayloadTransformer を継承し、Spring の FileCopyUtils を使用して File オブジェクトを byte[] に変換します。多くの場合、すべての変換を単一のクラスに入れるよりも、一連のトランスフォーマーを使用する方が適切です。その場合、File から byte[] への変換は論理的な最初のステップかもしれません。

FileToStringTransformer は AbstractFilePayloadTransformer を継承し、File オブジェクトを String に変換します。それ以外の場合は、これはデバッグに役立ちます(ワイヤータップで使用することを検討してください)。

ファイル固有のトランスフォーマーを構成するには、次の例に示すように、ファイル名前空間の適切な要素を使用できます。

<int-file:file-to-bytes-transformer  input-channel="input" output-channel="output"
    delete-files="true"/>

<int-file:file-to-string-transformer input-channel="input" output-channel="output"
    delete-files="true" charset="UTF-8"/>

delete-files オプションは、変換の補完後に受信ファイルを削除する必要があることをトランスフォーマーに通知します。これは、FileReadingMessageSource がマルチスレッド環境で使用されている場合(一般に Spring Integration を使用している場合など)に、AcceptOnceFileListFilter を使用する代わりにはなりません。

ファイル分割

FileSplitter はバージョン 4.1.2 で追加され、その名前空間サポートはバージョン 4.2 で追加されました。FileSplitter は、BufferedReader.readLine() に基づいてテキストファイルを個々の行に分割します。デフォルトでは、スプリッターは Iterator を使用して、ファイルから読み取られる行を一度に 1 行ずつ出力します。iterator プロパティを false に設定すると、メッセージとして送信する前にすべての行がメモリに読み込まれます。この使用例の 1 つは、行を含むメッセージを送信する前にファイルの I/O エラーを検出する場合です。ただし、比較的短いファイルに対してのみ実用的です。

受信ペイロードは、FileString (File パス)、InputStreamReader です。他のペイロード型は変更されずに発行されます。

次のリストは、FileSplitter を構成するための可能な方法を示しています。

Java DSL
@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlows
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
Kotlin DSL
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
Java
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
XML
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 スプリッターの Bean 名。
2true (デフォルト)に設定すると、イテレーターまたは false を使用して、行を送信する前にファイルをメモリにロードします。
3true に設定すると、ファイルデータの前後にファイル開始マーカーおよびファイル終了マーカーのメッセージが出力されます。マーカーは、FileSplitter.FileMarker ペイロードを持つメッセージです(mark プロパティに START および END 値が含まれます)。一部の行がフィルタリングされるダウンストリームフローでファイルを順次処理するときに、マーカーを使用できます。これにより、ファイルが完全に処理されたことをダウンストリーム処理で知ることができます。さらに、START または END を含む file_marker ヘッダーがこれらのメッセージに追加されます。END マーカーには行カウントが含まれます。ファイルが空の場合、START および END マーカーのみが lineCount として 0 で発行されます。デフォルトは false です。true の場合、apply-sequence はデフォルトで false です。markers-json (次の属性)も参照してください。
4markers が true の場合、これを true に設定して、FileMarker オブジェクトを JSON 文字列に変換します。(下に SimpleJsonSerializer を使用)。
5sequenceSize および sequenceNumber ヘッダーをメッセージに含めることを無効にするには、false に設定します。markers が true でない限り、デフォルトは true です。true および markers が true の場合、マーカーはシーケンスに含まれます。true および iterator が true の場合、サイズが不明であるため、sequenceSize ヘッダーは 0 に設定されます。
6true に設定すると、ファイルに行がない場合に RequiresReplyException がスローされます。デフォルトは false です。
7 テキストデータを String ペイロードに読み込むときに使用する文字セット名を設定します。デフォルトはプラットフォームの文字セットです。
8 残りの行に対して発行されるメッセージのヘッダーとして搬送される最初の行のヘッダー名。バージョン 5.0 以降。
9 メッセージをスプリッターに送信するために使用される入力チャネルを設定します。
10 メッセージの送信先の出力チャネルを設定します。
11 送信タイムアウトを設定します。output-channel がブロックできる場合(フル QueueChannel など)にのみ適用されます。
12false に設定すると、コンテキストがリフレッシュされたときにスプリッターが自動的に開始されなくなります。デフォルトは true です。
13input-channel が <publish-subscribe-channel/> である場合、このエンドポイントの順序を設定します。
14 スプリッターの起動フェーズを設定します(auto-startup が true の場合に使用)。

FileSplitter はまた、テキストベースの InputStream を行に分割します。バージョン 4.3 以降、FTP または SFTP ストリーミング受信チャネルアダプター、または stream オプションを使用してファイルを取得する FTP または SFTP 送信ゲートウェイと併用すると、スプリッターはファイルが完全に終了したときにストリームをサポートするセッションを自動的に閉じます。消費これらの機能の詳細については、FTP ストリーミング受信チャネルアダプターおよび SFTP ストリーミング受信チャネルアダプターならびに FTP 送信ゲートウェイおよび SFTP 送信ゲートウェイを参照してください。

Java 構成を使用する場合、次の例に示すように、追加のコンストラクターを使用できます。

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

markersJson が true の場合、マーカーは JSON 文字列として表されます(SimpleJsonSerializer を使用)。

バージョン 5.0 では、コンテンツの最初の行がヘッダー(CSV ファイルの列名など)であることを指定する firstLineAsHeader オプションが導入されました。このプロパティに渡される引数は、最初の行が残りの行に対して発行されるメッセージのヘッダーとして運ばれるヘッダー名です。この行は、シーケンスヘッダー(applySequence が true の場合)にも、FileMarker.END に関連付けられた lineCount にも含まれていません。注: バージョン 5.5 以降、FileMarker は JSON に直列化できるため、lineCount` も FileHeaders.LINE_COUNT として FileMarker.END メッセージのヘッダーに含まれています。ファイルにヘッダー行のみが含まれている場合、ファイルは空として扱われるため、分割中に FileMarker インスタンスのみが発行されます(マーカーが有効になっている場合、それ以外の場合、メッセージは発行されません)。デフォルトでは(ヘッダー名が設定されていない場合)、最初の行はデータと見なされ、最初に発行されたメッセージのペイロードになります。

ファイルコンテンツからのヘッダー抽出に関するより複雑なロジック(最初の行ではなく、行のコンテンツ全体ではなく、特定のヘッダーなどではない)が必要な場合は、FileSplitter の前にヘッダーエンリッチャーを使用することを検討してください。ヘッダーに移動された行は、通常のコンテンツプロセスの下流でフィルタリングされる場合があることに注意してください。

分割ファイルを処理するべき等べき下流

apply-sequence が true の場合、スプリッターは SEQUENCE_NUMBER ヘッダーに行番号を追加します(markers が true の場合、マーカーは行としてカウントされます)。行番号をべき等レシーバーで使用すると、再起動後の行の再処理を回避できます。

例:

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}

ファイルアグリゲーター

バージョン 5.5 以降、START/END マーカーが有効になっている場合に、FileSplitter ユースケースの反対側をカバーするために FileAggregator が導入されました。便宜上、FileAggregator は 3 つのシーケンス詳細戦略すべてを実装しています。

  • FileHeaders.FILENAME 属性を持つ HeaderAttributeCorrelationStrategy は、相関キーの計算に使用されます。FileSplitter でマーカーが有効になっている場合、START/END マーカーメッセージもシーケンスサイズに含まれるため、シーケンス詳細ヘッダーには入力されません。FileHeaders.FILENAME は、START/END マーカーメッセージを含め、発行された各行に引き続き入力されます。

  • FileMarkerReleaseStrategy - グループ内の FileSplitter.FileMarker.Mark.END メッセージをチェックしてから、FileHeaders.LINE_COUNT ヘッダー値をグループサイズから 2 - FileSplitter.FileMarker インスタンスを引いた値と比較します。また、AbstractCorrelatingMessageHandler で使用される conditionSupplier 機能用の便利な GroupConditionProvider 接点も実装しています。詳細については、メッセージグループの状態を参照してください。

  • FileAggregatingMessageGroupProcessor は、グループから FileSplitter.FileMarker メッセージを削除し、残りのメッセージをリストペイロードに収集して生成します。

次のリストは、FileAggregator を構成するための可能な方法を示しています。

Java DSL
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
    return f -> f
            .split(Files.splitter()
                    .markers()
                    .firstLineAsHeader("firstLine"))
            .channel(c -> c.executor(taskExecutor))
            .filter(payload -> !(payload instanceof FileSplitter.FileMarker),
                    e -> e.discardChannel("aggregatorChannel"))
            .<String, String>transform(String::toUpperCase)
            .channel("aggregatorChannel")
            .aggregate(new FileAggregator())
            .channel(c -> c.queue("resultChannel"));
}
Kotlin DSL
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
    integrationFlow {
        split(Files.splitter().markers().firstLineAsHeader("firstLine"))
        channel { executor(taskExecutor) }
        filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
        transform(String::toUpperCase)
        channel("aggregatorChannel")
        aggregate(FileAggregator())
        channel { queue("resultChannel") }
    }
Java
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setProcessorBean(new FileAggregator());
    aggregator.setOutputChannel(outputChannel);
    return aggregator;
}
XML
<int:chain input-channel="input" output-channel="output">
    <int-file:splitter markers="true"/>
    <int:aggregator>
        <bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
    </int:aggregator>
</int:chain>

FileAggregator のデフォルトの動作がターゲットロジックを満たさない場合は、個々の戦略でアグリゲーターエンドポイントを構成することをお勧めします。詳細については、FileAggregator JavaDocs を参照してください。

リモート永続ファイルリストフィルター

受信およびストリーミング受信リモートファイルチャネルアダプター(FTPSFTP、その他のテクノロジ)は、デフォルトで AbstractPersistentFileListFilter の対応する実装で構成され、メモリ内 MetadataStore で構成されます。クラスターで実行するには、共有 MetadataStore を使用してこれらをフィルターに置き換えることができます(詳細については、メタデータストアを参照してください)。これらのフィルターは、同じファイルが複数回フェッチされるのを防ぐために使用されます(変更された時間の変更がない限り)。バージョン 5.2 以降、ファイルは、ファイルがフェッチされる直前にフィルターに追加されます(フェッチが失敗した場合は、逆になります)。

致命的な障害(停電など)が発生した場合、現在フェッチされているファイルがフィルターに残り、アプリケーションの再起動時に再フェッチされない可能性があります。この場合、このファイルを MetadataStore から手動で削除する必要があります。

以前のバージョンでは、ファイルは取得される前にフィルタリングされていたため、壊滅的な障害が発生した後、いくつかのファイルがこの状態になる可能性がありました。

この新しい動作を容易にするために、FileListFilter に 2 つの新しいメソッドが追加されました。

boolean accept(F file);

boolean supportsSingleFileFiltering();

フィルターが supportsSingleFileFiltering で true を返す場合、accept() を実装する必要があります。

リモートフィルターが単一ファイルのフィルター処理(AbstractMarkerFilePresentFileListFilter など)をサポートしない場合、アダプターは以前の動作に戻ります。

CompositeFileListFilter または ChainFileListFilter を使用して)複数のフィルターが使用されている場合、複合フィルターがそれをサポートするには、すべてのデリゲートフィルターが単一ファイルフィルタリングをサポートする必要があります。

永続ファイルリストフィルターにブールプロパティ forRecursion が追加されました。このプロパティを true に設定すると、alwaysAcceptDirectories も設定されます。これは、送信ゲートウェイ(ls および mget)での再帰操作が常にディレクトリツリー全体を毎回トラバースすることを意味します。これは、ディレクトリツリーの奥深くで変更が検出されなかった問題を解決するためです。さらに、forRecursion=true により、ファイルへのフルパスがメタデータストアキーとして使用されます。これにより、同じ名前のファイルが異なるディレクトリに複数回表示された場合にフィルターが正しく機能しなかった問題が解決されます。重要: これは、永続メタデータストア内の既存のキーが、最上位ディレクトリにあるファイルで見つからないことを意味します。このため、プロパティはデフォルトで false です。これは将来のリリースで変更される可能性があります。