ファイルサポート
Spring Integration のファイルサポートは、Spring Integration コアを継承し、ファイルの読み取り、書き込み、変換を処理する専用の語彙を提供します。
この依存関係をプロジェクトに含める必要があります。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-file</artifactId>
<version>5.5.16</version>
</dependency>
compile "org.springframework.integration:spring-integration-file:5.5.16"
ファイル専用のチャネルアダプターを定義する要素を有効にする名前空間と、ファイルの内容を文字列またはバイト配列に読み込むことができるトランスフォーマーのサポートを提供します。
このセクションでは、FileReadingMessageSource
および FileWritingMessageHandler
の動作と、Bean として構成する方法について説明します。また、Transformer
のファイル固有の実装を介したファイル処理のサポートについても説明します。最後に、ファイル固有の名前空間について説明します。
ファイルを読む
FileReadingMessageSource
を使用して、ファイルシステムからファイルを消費できます。これは、ファイルシステムディレクトリからメッセージを作成する MessageSource
の実装です。次の例は、FileReadingMessageSource
を構成する方法を示しています。
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
特定のファイルのメッセージを作成しないようにするには、FileListFilter
を指定できます。デフォルトでは、次のフィルターを使用します。
IgnoreHiddenFileListFilter
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter
は、隠しファイルが処理されないようにします。hidden の正確な定義はシステムに依存することに注意してください。例: UNIX ベースのシステムでは、ピリオド文字で始まるファイルは非表示と見なされます。一方、Microsoft Windows には、隠しファイルを示す専用のファイル属性があります。
バージョン 4.2 は |
AcceptOnceFileListFilter
は、ファイルがディレクトリから 1 回だけピックアップされるようにします。
バージョン 4.0 以降、このフィルターには バージョン 4.1.5 以降、このフィルターには新しいプロパティ( |
永続ファイルリストフィルターにブールプロパティ forRecursion
が追加されました。このプロパティを true
に設定すると、alwaysAcceptDirectories
も設定されます。これは、送信ゲートウェイ(ls
および mget
)での再帰操作が常にディレクトリツリー全体を毎回トラバースすることを意味します。これは、ディレクトリツリーの奥深くで変更が検出されなかった問題を解決するためです。さらに、forRecursion=true
により、ファイルへのフルパスがメタデータストアキーとして使用されます。これにより、同じ名前のファイルが異なるディレクトリに複数回表示された場合にフィルターが正しく機能しなかった問題が解決されます。重要: これは、永続メタデータストア内の既存のキーが、最上位ディレクトリにあるファイルで見つからないことを意味します。このため、プロパティはデフォルトで false
です。これは将来のリリースで変更される可能性があります。
次の例では、フィルターを使用して FileReadingMessageSource
を構成します。
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
ファイルの読み取りに関する一般的な問題は、準備が整う前にファイルが検出される可能性があることです(つまり、他のプロセスがまだファイルを書き込んでいる可能性があります)。デフォルトの AcceptOnceFileListFilter
はこれを妨げません。ほとんどの場合、ファイル書き込みプロセスが読み取りの準備ができるとすぐに各ファイルの名前を変更すると、これを防ぐことができます。デフォルトの AcceptOnceFileListFilter
で構成された(おそらく既知のサフィックスに基づいた)準備ができているファイルのみを受け入れる filename-pattern
または filename-regex
フィルターは、この状況に対応します。次の例に示すように、CompositeFileListFilter
は構成を有効にします。
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
一時的な名前でファイルを作成し、最終的な名前に名前を変更することができない場合、Spring Integration は別の代替手段を提供します。バージョン 4.2 は LastModifiedFileListFilter
を追加しました。このフィルターは、この値より古いファイルのみがフィルターによって渡されるように、age
プロパティで構成できます。経過時間のデフォルトは 60 秒ですが、ファイルの早期取得を回避するのに十分な大きさの経過時間を選択する必要があります(ネットワークの不具合などによる)。次の例は、LastModifiedFileListFilter
を構成する方法を示しています。
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
バージョン 4.3.7 から、ChainFileListFilter
(CompositeFileListFilter
の拡張)が導入され、後続のフィルターが前のフィルターの結果のみを表示するシナリオを可能にしました。(CompositeFileListFilter
では、すべてのフィルターがすべてのファイルを表示しますが、すべてのフィルターを通過したファイルのみを通過させます)。新しい動作が必要な場所の例は、LastModifiedFileListFilter
と AcceptOnceFileListFilter
の組み合わせです。一定の時間が経過するまでファイルを受け入れたくない場合です。CompositeFileListFilter
では、AcceptOnceFileListFilter
は最初のパスですべてのファイルを見るため、後で他のフィルターがパスするときにファイルを渡しません。CompositeFileListFilter
アプローチは、パターンフィルターを、ファイル転送が完了したことを示すセカンダリファイルを探すカスタムフィルターと組み合わせる場合に役立ちます。パターンフィルターはプライマリファイル(something.txt
など)のみを渡す場合がありますが、「完了」フィルターは(たとえば) something.done
が存在するかどうかを確認する必要があります。
ファイル a.txt
、a.done
、b.txt
があるとします。
パターンフィルターは a.txt
と b.txt
のみを通過させますが、「完了」フィルターは 3 つのファイルすべてを認識し、a.txt
のみを通過させます。複合フィルターの最終結果は、a.txt
のみがリリースされることです。
ChainFileListFilter では、チェーンのいずれかのフィルターが空のリストを返す場合、残りのフィルターは呼び出されません。 |
バージョン 5.0 は、コンテキスト評価ルートオブジェクトとしてファイルに対して SpEL 式を実行する ExpressionFileListFilter
を導入しました。この目的のために、次の例に示すように、ファイル処理用のすべての XML コンポーネント(ローカルおよびリモート)と既存の filter
属性が filter-expression
オプションとともに提供されています。
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
バージョン 5.0.5 は、拒否されたファイルに関心がある DiscardAwareFileListFilter
実装を導入しました。この目的のために、そのようなフィルター実装には addDiscardCallback(Consumer<File>)
を介してコールバックを提供する必要があります。フレームワークでは、この機能は FileReadingMessageSource.WatchServiceDirectoryScanner
から LastModifiedFileListFilter
と組み合わせて使用されます。通常の DirectoryScanner
とは異なり、WatchService
はターゲットファイルシステムのイベントに応じて処理するファイルを提供します。これらのファイルで内部キューをポーリングする瞬間に、LastModifiedFileListFilter
は、構成された age
に比べて若すぎるため、破棄する場合があります。今後の考慮事項のためにファイルを失います。破棄コールバックフックを使用すると、内部キューにファイルを保持できるため、後続のポーリングで age
に対してファイルをチェックできます。CompositeFileListFilter
は DiscardAwareFileListFilter
も実装し、そのすべての DiscardAwareFileListFilter
デリゲートに廃棄コールバックを追加します。
CompositeFileListFilter はすべてのデリゲートに対してファイルを照合するため、同じファイルに対して discardCallback が複数回呼び出される場合があります。 |
バージョン 5.1 以降、FileReadingMessageSource
はディレクトリの存在を確認せず、start()
が呼び出されるまで(通常は SourcePollingChannelAdapter
をラップすることにより)ディレクトリを作成しません。以前は、テストからディレクトリを参照するとき、または後でアクセス許可が適用されるときに、オペレーティングシステムのアクセス許可エラーを防ぐ簡単な方法はありませんでした。
メッセージヘッダー
バージョン 5.0 以降、FileReadingMessageSource
は(ポーリングされた File
としての payload
に加えて)送信 Message
に以下のヘッダーを取り込みます。
FileHeaders.FILENAME
: 送信するファイルのFile.getName()
。後続の名前変更またはコピーロジックに使用できます。FileHeaders.ORIGINAL_FILE
:File
オブジェクト自体。通常、このヘッダーは、元のFile
オブジェクトを失うと、フレームワークコンポーネント(スプリッターやトランスフォーマーなど)によって自動的に入力されます。ただし、他のカスタムユースケースとの一貫性と利便性のために、このヘッダーは元のファイルへのアクセスに役立ちます。FileHeaders.RELATIVE_PATH
: スキャンのルートディレクトリに相対的なファイルパスの部分を表すために導入された新しいヘッダー。このヘッダーは、他の場所でソースディレクトリ階層を復元する必要がある場合に役立ちます。この目的のために、このヘッダーを使用するようにDefaultFileNameGenerator
( "ファイル名の生成" を参照)を構成できます。
ディレクトリのスキャンとポーリング
FileReadingMessageSource
は、ディレクトリからのファイルのメッセージをすぐには生成しません。scanner
によって返される「適格ファイル」に内部キューを使用します。scanEachPoll
オプションを使用して、内部キューが各ポーリングの最新の入力ディレクトリコンテンツでリフレッシュされるようにします。デフォルト(scanEachPoll = false
)では、FileReadingMessageSource
はディレクトリを再度スキャンする前にキューを空にします。このデフォルトの動作は、ディレクトリ内の多数のファイルのスキャンを減らすのに特に役立ちます。ただし、カスタムの順序付けが必要な場合は、このフラグを true
に設定する効果を考慮することが重要です。ファイルが処理される順序は予想どおりではない場合があります。デフォルトでは、キュー内のファイルは自然な(path
)順序で処理されます。スキャンによって追加された新しいファイルは、キューにすでにファイルがある場合でも、適切な位置に挿入され、その自然な順序が維持されます。順序をカスタマイズするために、FileReadingMessageSource
は Comparator<File>
をコンストラクター引数として受け入れることができます。内部(PriorityBlockingQueue
)によって使用され、ビジネス要件に従ってコンテンツを並べ替えます。特定の順序でファイルを処理するには、カスタム DirectoryScanner
によって作成されたリストを並べ替えるのではなく、FileReadingMessageSource
にコンパレーターを提供する必要があります。
バージョン 5.0 では、ファイルツリー訪問を実行するために RecursiveDirectoryScanner
を導入しました。実装は、Files.walk(Path start, int maxDepth, FileVisitOption… options)
機能に基づいています。ルートディレクトリ(DirectoryScanner.listFiles(File)
)引数は結果から除外されます。他のすべてのサブディレクトリの包含および除外は、ターゲット FileListFilter
実装に基づいています。例: SimplePatternFileListFilter
はデフォルトでディレクトリを除外します。詳細については、AbstractDirectoryAwareFileListFilter
(Javadoc) とその実装を参照してください。
バージョン 5.5 以降、Java DSL の FileInboundChannelAdapterSpec には、デフォルトの FileReadingMessageSource の代わりにターゲット FileReadingMessageSource で RecursiveDirectoryScanner を使用するための便利な recursive(boolean) オプションがあります。 |
名前空間サポート
ファイル固有の名前空間を使用すると、ファイル読み取りの構成を簡素化できます。そのためには、次のテンプレートを使用します。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
この名前空間内で、次のように FileReadingMessageSource
を減らして受信チャネルアダプターにラップできます。
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
最初のチャネルアダプターの例は、デフォルトの FileListFilter
実装に依存しています。
IgnoreHiddenFileListFilter
(隠しファイルを処理しません)AcceptOnceFileListFilter
(重複を防ぐ)
prevent-duplicates
および ignore-hidden
属性はデフォルトで true
であるため、省略することもできます。
Spring Integration 4.2 は |
2 番目のチャネルアダプターの例はカスタムフィルターを使用し、3 番目は filename-pattern
属性を使用して AntPathMatcher
ベースのフィルターを追加し、4 番目は filename-regex
属性を使用して正規表現パターンベースのフィルターを FileReadingMessageSource
に追加します。filename-pattern
および filename-regex
属性は、それぞれ通常の filter
参照属性と相互に排他的です。ただし、filter
属性を使用して、特定のニーズに合わせて 1 つ以上のパターンベースのフィルターを含む、任意の数のフィルターを組み合わせた CompositeFileListFilter
のインスタンスを参照できます。
複数のプロセスが同じディレクトリから読み取る場合、ファイルが同時に取得されないようにファイルをロックすることができます。そのためには、FileLocker
を使用できます。java.nio
ベースの実装が利用可能ですが、独自のロックスキームを実装することもできます。nio
ロッカーは、次のように挿入できます。
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
次のようにカスタムロッカーを設定できます。
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
ファイル受信アダプターがロッカーで構成されている場合、ファイルの受信を許可する前にロックを取得する責任があります。ファイルのロックを解除する責任は負いません。ファイルを処理し、ロックをぶら下げたままにしておくと、メモリリークが発生します。これが問題になる場合は、適切なタイミングで自分で FileLocker.unlock(File file) を呼び出す必要があります。 |
ファイルのフィルタリングとロックだけでは不十分な場合は、ファイルを完全にリストする方法を制御する必要があります。この型の要件を実装するには、DirectoryScanner
の実装を使用できます。このスキャナーを使用すると、各ポーリングでリストされるファイルを正確に判別できます。これは、Spring Integration が FileListFilter
インスタンスと FileLocker
を FileReadingMessageSource
にワイヤリングするために内部的に使用するインターフェースでもあります。次の例に示すように、scanner
属性の <int-file:inbound-channel-adapter/>
にカスタム DirectoryScanner
を挿入できます。
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
これにより、順序付け、リスト、ロックの戦略を自由に選択できます。
また、フィルター(patterns
、regex
、prevent-duplicates
などを含む)および locker
インスタンスが実際に scanner
によって使用されることを理解することも重要です。アダプターに設定されたこれらの属性のいずれかは、その後内部 scanner
に注入されます。外部 scanner
の場合、FileReadingMessageSource
ではすべてのフィルターおよびロッカー属性が禁止されています。カスタム DirectoryScanner
で指定する必要があります(必要な場合)。つまり、scanner
を FileReadingMessageSource
に挿入する場合、FileReadingMessageSource
ではなく、その scanner
で filter
および locker
を指定する必要があります。
デフォルトでは、DefaultDirectoryScanner は IgnoreHiddenFileListFilter と AcceptOnceFileListFilter を使用します。使用しないようにするには、独自のフィルター(AcceptAllFileListFilter など)を構成するか、null に設定することもできます。 |
WatchServiceDirectoryScanner
FileReadingMessageSource.WatchServiceDirectoryScanner
は、新しいファイルがディレクトリに追加されるとき、ファイルシステムイベントに依存します。初期化中に、ディレクトリはイベントを生成するために登録されます。初期ファイルリストも初期化中に作成されます。ディレクトリツリーをたどっていくと、発生したサブディレクトリも登録され、イベントが生成されます。最初のポーリングでは、ディレクトリ内を移動した最初のファイルリストが返されます。後続のポーリングでは、新規作成イベントからのファイルが返されます。新しいサブディレクトリが追加されると、その作成イベントを使用して新しいサブツリーをたどって既存のファイルを見つけ、見つかった新しいサブディレクトリを登録します。
WatchKey の内部イベント queue が、ディレクトリ変更イベントが発生するほど迅速にプログラムによって排出されない場合、WatchKey に課題があります。キューサイズを超えると、一部のファイルシステムイベントが失われる可能性があることを示す StandardWatchEventKinds.OVERFLOW が発行されます。この場合、ルートディレクトリは完全に再スキャンされます。重複を避けるため、適切な FileListFilter (AcceptOnceFileListFilter など)を使用するか、処理が完了したらファイルを削除することを検討してください。 |
WatchServiceDirectoryScanner
は、scanner
オプションと相互に排他的な FileReadingMessageSource.use-watch-service
オプションによって有効にできます。提供された directory
の内部 FileReadingMessageSource.WatchServiceDirectoryScanner
インスタンスが読み込まれます。
さらに、WatchService
ポーリングロジックは StandardWatchEventKinds.ENTRY_MODIFY
および StandardWatchEventKinds.ENTRY_DELETE
を追跡できるようになりました。
既存のファイルと新しいファイルの変更を追跡する必要がある場合は、FileListFilter
に ENTRY_MODIFY
イベントロジックを実装する必要があります。それ以外の場合、それらのイベントのファイルは同じように扱われます。
ResettableFileListFilter
実装は、ENTRY_DELETE
イベントを取得します。その結果、それらのファイルは remove()
操作用に提供されます。このイベントを有効にすると、AcceptOnceFileListFilter
などのフィルターによってファイルが削除されます。その結果、同じ名前のファイルが表示された場合、そのファイルはフィルターを通過し、メッセージとして送信されます。
この目的のために、watch-events
プロパティ(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
)が導入されました。(WatchEventType
は FileReadingMessageSource
のパブリック内部列挙です)このようなオプションを使用すると、新しいファイルに 1 つのダウンストリームフローロジックを使用し、変更されたファイルに他のロジックを使用できます。次の例は、同じディレクトリでイベントを作成および変更するためのさまざまなロジックを構成する方法を示しています。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
メモリ消費の制限
HeadDirectoryScanner
を使用して、メモリに保持されるファイルの数を制限できます。これは、大きなディレクトリをスキャンするときに役立ちます。XML 構成では、受信チャネルアダプターの queue-size
プロパティを設定することでこれを有効にします。
バージョン 4.2 より前は、この設定は他のフィルターの使用と互換性がありませんでした。他のフィルター(prevent-duplicates="true"
を含む)は、サイズを制限するために使用されるフィルターを上書きしました。
通常、この場合は |
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で送信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
Java DSL を使用した構成
次の Spring Boot アプリケーションは、Java DSL で送信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
"tail" ファイル
もう 1 つの一般的な使用例は、ファイルの最後 (または末尾) から「行」を取得し、新しい行が追加されたときにそれをキャプチャーすることです。2 つの実装が提供されています。最初の OSDelegatingFileTailingMessageProducer
は、ネイティブの tail
コマンドを使用します (コマンドがあるオペレーティングシステムの場合)。これは通常、これらのプラットフォームで最も効率的な実装です。tail
コマンドがないオペレーティングシステムの場合、2 番目の実装 ApacheCommonsFileTailingMessageProducer
は Apache commons-io
Tailer
クラスを使用します。
どちらの場合も、利用できないファイルやその他のイベントなどのファイルシステムイベントは、通常の Spring イベント発行メカニズムを使用して ApplicationEvent
インスタンスとして発行されます。このようなイベントの例には次のものがあります。
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
前の例に示されている一連のイベントは、たとえば、ファイルがローテーションされるときに発生する可能性があります。
バージョン 5.0 以降、idleEventInterval
の実行中にファイルにデータがない場合、FileTailingIdleEvent
が発行されます。次の例は、そのようなイベントがどのように見えるかを示しています。
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
tail コマンドをサポートするすべてのプラットフォームがこれらのステータスメッセージを提供するわけではありません。 |
これらのエンドポイントから発信されるメッセージには、次のヘッダーがあります。
FileHeaders.ORIGINAL_FILE
:File
オブジェクトFileHeaders.FILENAME
: ファイル名 (File.getName()
)
バージョン 5.0 より前のバージョンでは、FileHeaders.FILENAME ヘッダーにはファイルの絶対パスの文字列表現が含まれていました。これで、元のファイルヘッダーで getAbsolutePath() を呼び出して、その文字列表現を取得できます。 |
次の例では、デフォルトのオプション( "-F -n 0"、現在の末尾からファイル名を追跡することを意味する)でネイティブアダプターを作成します。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
次の例では、"-F -n +0" オプションを使用してネイティブアダプターを作成します(つまり、ファイル名に従い、既存のすべての行を出力します)。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
tail
コマンドが失敗する場合(プラットフォームによっては、-F
が指定されている場合でもファイルの欠落により tail
が失敗する場合)、コマンドは 10 秒ごとに再試行されます。
デフォルトでは、ネイティブアダプターは標準出力からキャプチャーし、コンテンツをメッセージとして送信します。また、標準エラーからキャプチャーしてイベントを発生させます。バージョン 4.3.6 以降、次の例に示すように、enable-status-reader
を false
に設定することにより、標準エラーイベントを破棄できます。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
次の例では、IdleEventInterval
は 5000
に設定されています。つまり、5 秒間行が書き込まれない場合、FileTailingIdleEvent
は 5 秒ごとにトリガーされます。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
これは、アダプターを停止する必要がある場合に役立ちます。
次の例では、2 秒ごとにファイルの新しい行を調べ、10 秒ごとに不足しているファイルの存在を確認する Apache commons-io
Tailer
アダプターを作成します。
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | ファイルは、末尾(デフォルト)ではなく先頭(end="false" )から末尾に付けられます。 |
2 | ファイルはチャンクごとに再度開かれます(デフォルトではファイルを開いたままにします)。 |
delay 、end 、reopen 属性を指定すると、Apache commons-io アダプターの使用が強制され、native-options 属性は使用できなくなります。 |
不完全なデータの処理
ファイル転送シナリオの一般的な問題は、転送が完了したことを確認して、不完全なファイルの読み取りを開始しないようにする方法です。この問題を解決する一般的な方法は、一時的な名前でファイルを作成し、アトミックに名前を最終的な名前に変更することです。この技術は、一時ファイルがコンシューマーによって選択されないようにマスクするフィルターとともに、堅牢なソリューションを提供します。この手法は、ファイルを(ローカルまたはリモートで)書き込む Spring Integration コンポーネントによって使用されます。デフォルトでは、ファイル名に .writing
を追加し、転送が完了したら削除します。
もう 1 つの一般的な手法は、2 番目の「マーカー」ファイルを作成して、ファイル転送が完了したことを示すことです。このシナリオでは、somefile.txt.complete
も存在するまで、somefile.txt
(たとえば)が使用可能であると見なすべきではありません。Spring Integration バージョン 5.0 は、このメカニズムをサポートする新しいフィルターを導入しました。ファイルシステム(FileSystemMarkerFilePresentFileListFilter
)、FTP および SFTP の実装が提供されています。マーカーファイルには任意の名前を付けることができますが、通常は転送されるファイルに関連しています。詳細については、Javadoc を参照してください。
ファイルを書く
ファイルシステムにメッセージを書き込むには、FileWritingMessageHandler
(Javadoc) を使用できます。このクラスは、次のペイロード型を処理できます。
File
String
Byte 配列
InputStream
( バージョン 4.2 以降)
String ペイロードの場合、エンコードと文字セットを構成できます。
物事を簡単にするために、XML 名前空間を使用して、FileWritingMessageHandler
を送信チャネルアダプターまたは送信ゲートウェイの一部として構成できます。
バージョン 4.3 以降、ファイルの書き込み時に使用するバッファーサイズを指定できます。
バージョン 5.1 以降では、FileExistsMode.APPEND
または FileExistsMode.APPEND_NO_FLUSH
を使用し、新しいファイルを作成する必要がある場合にトリガーされる BiConsumer<File, Message<?>>
newFileCallback
を提供できます。このコールバックは、新しく作成されたファイルとそれをトリガーしたメッセージを受け取ります。このコールバックは、たとえば、メッセージヘッダーで定義された CSV ヘッダーを書き込むために使用できます。
ファイル名の生成
最も単純な形式では、FileWritingMessageHandler
はファイルを書き込むための宛先ディレクトリのみを必要とします。書き込まれるファイルの名前は、ハンドラーの FileNameGenerator
(Javadoc) によって決定されます。デフォルトの実装 (Javadoc) は、FileHeaders.FILENAME
(Javadoc) として定義された定数とキーが一致するメッセージヘッダーを探します。
あるいは、メッセージに対して評価される式を指定して、ファイル名を生成できます(例: headers['myCustomHeader'] + '.something'
)。式は String
に評価される必要があります。便宜上、DefaultFileNameGenerator
には setHeaderName
メソッドも用意されており、ファイル名として使用される値を持つメッセージヘッダーを明示的に指定できます。
一度設定すると、DefaultFileNameGenerator
は次の解決手順を使用して、特定のメッセージペイロードのファイル名を決定します。
メッセージに対して式を評価し、結果が空でない
String
である場合、それをファイル名として使用します。それ以外の場合、ペイロードが
java.io.File
の場合、File
オブジェクトのファイル名を使用します。それ以外の場合は、メッセージ ID にを追加して使用します。ファイル名として
msg
。
XML 名前空間のサポートを使用すると、ファイル送信チャネルアダプターとファイル送信ゲートウェイの両方が、以下の相互に排他的な構成属性をサポートします。
filename-generator
(FileNameGenerator
実装への参照)filename-generator-expression
(String
に評価される式)
ファイルの書き込み中に、一時ファイルのサフィックスが使用されます(デフォルトは .writing
です)。ファイルの書き込み中にファイル名に追加されます。サフィックスをカスタマイズするには、ファイル送信チャネルアダプターとファイル送信ゲートウェイの両方で temporary-file-suffix
属性を設定できます。
APPEND ファイル mode を使用する場合、データはファイルに直接追加されるため、temporary-file-suffix 属性は無視されます。 |
バージョン 4.2.5 以降、生成されたファイル名(filename-generator
または filename-generator-expression
の評価の結果)は、ターゲットファイル名とともに子パスを表すことができます。これは、以前と同様に File(File parent, String child)
の 2 番目のコンストラクター引数として使用されます。ただし、これまでは、ファイル名のみを想定して、子パス用のディレクトリ(mkdirs()
)を作成していませんでした。このアプローチは、ソースディレクトリに一致するようにファイルシステムツリーを復元する必要がある場合に役立ちます。たとえば、アーカイブを解凍し、ターゲットディレクトリ内のすべてのファイルを元の順序で保存する場合です。
出力ディレクトリの指定
ファイル送信チャネルアダプターとファイル送信ゲートウェイの両方は、出力ディレクトリを指定するための相互に排他的な 2 つの構成属性を提供します。
directory
directory-expression
Spring Integration 2.2 は directory-expression 属性を導入しました。 |
directory
属性の使用
directory
属性を使用すると、出力ディレクトリは固定値に設定されます。これは、FileWritingMessageHandler
の初期化時に設定されます。この属性を指定しない場合は、directory-expression
属性を使用する必要があります。
directory-expression
属性の使用
SpEL を完全にサポートしたい場合は、directory-expression
属性を使用できます。この属性は、処理されるメッセージごとに評価される SpEL 式を受け入れます。出力ファイルディレクトリを動的に指定すると、メッセージのペイロードとそのヘッダーにフルアクセスできます。
SpEL 式は、String
、java.io.File
、org.springframework.core.io.Resource
のいずれかに解決される必要があります。(後者はいずれにしても File
に評価されます)さらに、結果の String
または File
はディレクトリを指している必要があります。directory-expression
属性を指定しない場合、directory
属性を設定する必要があります。
auto-create-directory
属性の使用
デフォルトでは、宛先ディレクトリが存在しない場合、それぞれの宛先ディレクトリと存在しない親ディレクトリが自動的に作成されます。その動作を防ぐために、auto-create-directory
属性を false
に設定できます。この属性は、directory
属性と directory-expression
属性の両方に適用されます。
アダプターの初期化時に宛先ディレクトリの存在を確認する代わりに、処理中の各メッセージに対してこの確認が実行されるようになりました。 さらに、 |
既存の宛先ファイルの処理
ファイルを書き込み、宛先ファイルがすでに存在する場合、デフォルトの動作ではそのターゲットファイルが上書きされます。関連するファイル送信コンポーネントの mode
属性を設定することにより、この動作を変更できます。次のオプションがあります。
REPLACE
(デフォルト)REPLACE_IF_MODIFIED
APPEND
APPEND_NO_FLUSH
FAIL
IGNORE
Spring Integration 2.2 は、mode 属性と APPEND 、FAIL 、IGNORE オプションを導入しました。 |
REPLACE
ターゲットファイルがすでに存在する場合、上書きされます。
mode
属性が指定されていない場合、これはファイルを書き込むときのデフォルトの動作です。REPLACE_IF_MODIFIED
ターゲットファイルがすでに存在する場合、最後に変更されたタイムスタンプがソースファイルのタイムスタンプと異なる場合にのみ上書きされます。
File
ペイロードの場合、ペイロードlastModified
時間は既存のファイルと比較されます。他のペイロードの場合、FileHeaders.SET_MODIFIED
(file_setModified
)ヘッダーが既存のファイルと比較されます。ヘッダーがないか、値がNumber
でない場合、ファイルは常に置き換えられます。APPEND
このモードでは、毎回新しいファイルを作成する代わりに、既存のファイルにメッセージコンテンツを追加できます。この属性は
temporary-file-suffix
属性と相互に排他的であることに注意してください。これは、既存のファイルにコンテンツを追加するときに、アダプターが一時ファイルを使用しなくなるためです。ファイルは各メッセージの後に閉じられます。APPEND_NO_FLUSH
このオプションのセマンティクスは
APPEND
と同じですが、データはフラッシュされず、各メッセージの後にファイルは閉じられません。これにより、障害が発生した場合にデータ損失のリスクを伴う大きなパフォーマンスを提供できます。詳細については、APPEND_NO_FLUSH
使用時のファイルのフラッシュを参照してください。FAIL
ターゲットファイルが存在する場合、
MessageHandlingException
(Javadoc) がスローされます。IGNORE
ターゲットファイルが存在する場合、メッセージペイロードは警告なしに無視されます。
一時ファイルのサフィックスを使用する場合(デフォルトは .writing )、最終ファイル名または一時ファイル名のいずれかが存在する場合、IGNORE オプションが適用されます。 |
APPEND_NO_FLUSH
使用時のファイルのフラッシュ
APPEND_NO_FLUSH
モードは、バージョン 4.3 で追加されました。各メッセージの後にファイルが閉じられないため、これを使用するとパフォーマンスが向上します。ただし、これにより、障害発生時にデータが失われる可能性があります。
Spring Integration は、このデータ損失を軽減するためのいくつかのフラッシュ戦略を提供します。
flushInterval
を使用します。この期間ファイルが書き込まれない場合、自動的にフラッシュされます。これは概算であり、今回は1.33x
まで可能です(平均1.167x
)。正規表現を含むメッセージをメッセージハンドラーの
trigger
メソッドに送信します。パターンに一致する絶対パス名を持つファイルはフラッシュされます。ハンドラーにカスタム
MessageFlushPredicate
実装を提供して、メッセージがtrigger
メソッドに送信されたときに実行されるアクションを変更します。カスタム
FileWritingMessageHandler.FlushPredicate
またはFileWritingMessageHandler.MessageFlushPredicate
実装を渡すことにより、ハンドラーのflushIfNeeded
メソッドの 1 つを呼び出します。
述語は、開いているファイルごとに呼び出されます。詳細については、これらのインターフェースの Javadoc を参照してください。バージョン 5.0 以降、述語メソッドは別のパラメーターを提供することに注意してください。現在のファイルが新規または以前に閉じられた場合に最初に書き込まれた時間です。
flushInterval
を使用する場合、間隔は最後の書き込みから始まります。ファイルは、その間隔でアイドル状態の場合にのみフラッシュされます。バージョン 4.3.7 から、追加のプロパティ(flushWhenIdle
)を false
に設定できます。これは、以前にフラッシュされた(または新しい)ファイルへの最初の書き込みで間隔が始まることを意味します。
ファイルのタイムスタンプ
デフォルトでは、宛先ファイルの lastModified
タイムスタンプは、ファイルが作成された時刻です(ただし、インプレース名前変更では現在のタイムスタンプが保持されます)。バージョン 4.3 から、preserve-timestamp
(または Java 構成を使用する場合は setPreserveTimestamp(true)
)を構成できるようになりました。File
ペイロードの場合、これにより、タイムスタンプが受信ファイルから送信に転送されます(コピーが必要かどうかに関係なく)。他のペイロードでは、FileHeaders.SET_MODIFIED
ヘッダー(file_setModified
)が存在する場合、ヘッダーが Number
である限り、宛先ファイルの lastModified
タイムスタンプの設定に使用されます。
ファイル許可
バージョン 5.0 から、Posix 許可をサポートするファイルシステムにファイルを書き込むときに、送信チャネルアダプターまたはゲートウェイでそれらの許可を指定できます。このプロパティは整数であり、通常は使い慣れた 8 進数形式で提供されます。たとえば、0640
は、所有者に読み取り / 書き込み権限があり、グループに読み取り専用権限があり、その他にはアクセス権がないことを意味します。
ファイル送信チャネルアダプター
次の例では、ファイル送信チャネルアダプターを構成します。
<int-file:outbound-channel-adapter id="filesOut" directory="${input.directory.property}"/>
名前空間ベースの構成は、delete-source-files
属性もサポートしています。true
に設定すると、宛先への書き込み後に元のソースファイルの削除がトリガーされます。そのフラグのデフォルト値は false
です。次の例は、true
に設定する方法を示しています。
<int-file:outbound-channel-adapter id="filesOut"
directory="${output.directory}"
delete-source-files="true"/>
delete-source-files 属性は、受信メッセージに File ペイロードがある場合、または FileHeaders.ORIGINAL_FILE ヘッダー値にソース File インスタンスまたは元のファイルパスを表す String が含まれている場合にのみ効果があります。 |
バージョン 4.2 以降、FileWritingMessageHandler
は append-new-line
オプションをサポートしています。true
に設定されている場合、メッセージが書き込まれた後、ファイルに新しい行が追加されます。デフォルトの属性値は false
です。次の例は、append-new-line
オプションの使用方法を示しています。
<int-file:outbound-channel-adapter id="newlineAdapter"
append-new-line="true"
directory="${output.directory}"/>
送信ゲートウェイ
書き込まれたファイルに基づいてメッセージの処理を続行する場合は、代わりに outbound-gateway
を使用できます。outbound-channel-adapter
と同様のロールを果たします。ただし、ファイルを書き込んだ後、メッセージのペイロードとして応答チャネルに送信します。
次の例では、送信ゲートウェイを構成します。
<int-file:outbound-gateway id="mover" request-channel="moveInput"
reply-channel="output"
directory="${output.directory}"
mode="REPLACE" delete-source-files="true"/>
前述のように、mode
属性を指定することもできます。これは、宛先ファイルがすでに存在する状況に対処する方法の動作を定義します。詳細については、既存の宛先ファイルの処理を参照してください。一般に、ファイル送信ゲートウェイを使用する場合、結果ファイルは応答チャネルのメッセージペイロードとして返されます。
これは、IGNORE
モードを指定する場合にも適用されます。その場合、既存の宛先ファイルが返されます。リクエストメッセージのペイロードがファイルの場合、メッセージヘッダーを介して元のファイルにアクセスできます。FileHeaders.ORIGINAL_FILE (Javadoc) を参照してください。
「送信ゲートウェイ」は、最初にファイルを移動してから処理パイプラインを介して送信する場合に適しています。このような場合、ファイル名前空間の inbound-channel-adapter 要素を outbound-gateway に接続してから、そのゲートウェイの reply-channel をパイプラインの先頭に接続できます。 |
より複雑な要件がある場合、またはファイルコンテンツに変換する入力として追加のペイロード型をサポートする必要がある場合は、FileWritingMessageHandler
を継承できますが、はるかに優れたオプションは Transformer
に依存することです。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
@IntegrationComponentScan
public class FileWritingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.writeToFile("foo.txt", new File(tmpDir.getRoot(), "fileWritingFlow"), "foo");
}
@Bean
@ServiceActivator(inputChannel = "writeToFileChannel")
public MessageHandler fileWritingMessageHandler() {
Expression directoryExpression = new SpelExpressionParser().parseExpression("headers.directory");
FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
handler.setFileExistsMode(FileExistsMode.APPEND);
return handler;
}
@MessagingGateway(defaultRequestChannel = "writeToFileChannel")
public interface MyGateway {
void writeToFile(@Header(FileHeaders.FILENAME) String fileName,
@Header(FileHeaders.FILENAME) File directory, String data);
}
}
Java DSL を使用した構成
次の Spring Boot アプリケーションは、Java DSL で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class FileWritingJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FileWritingJavaApplication.class)
.web(false)
.run(args);
MessageChannel fileWritingInput = context.getBean("fileWritingInput", MessageChannel.class);
fileWritingInput.send(new GenericMessage<>("foo"));
}
@Bean
public IntegrationFlow fileWritingFlow() {
return IntegrationFlows.from("fileWritingInput")
.enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.txt")
.header("directory", new File(tmpDir.getRoot(), "fileWritingFlow")))
.handle(Files.outboundGateway(m -> m.getHeaders().get("directory")))
.channel(MessageChannels.queue("fileWritingResultChannel"))
.get();
}
}
ファイル Transformers
ファイルシステムから読み取ったデータをオブジェクトに変換したり、その逆を行うには、何らかの作業を行う必要があります。FileReadingMessageSource
とは異なり、FileWritingMessageHandler
ほどではありませんが、ジョブを完了するためにはおそらく独自のメカニズムが必要です。このために、Transformer
インターフェースを実装できます。または、受信メッセージ用に AbstractFilePayloadTransformer
を継承できます。Spring Integration は、いくつかの明らかな実装を提供します。
Transformer
インターフェースの Javadoc を参照して、どの Spring Integration クラスがそれを実装しているかを確認してください。同様に、AbstractFilePayloadTransformer
クラスの Javadoc をチェックして、どの Spring Integration クラスがそれを継承しているかを確認できます。
FileToByteArrayTransformer
は AbstractFilePayloadTransformer
を継承し、Spring の FileCopyUtils
を使用して File
オブジェクトを byte[]
に変換します。多くの場合、すべての変換を単一のクラスに入れるよりも、一連のトランスフォーマーを使用する方が適切です。その場合、File
から byte[]
への変換は論理的な最初のステップかもしれません。
FileToStringTransformer
は AbstractFilePayloadTransformer
を継承し、File
オブジェクトを String
に変換します。それ以外の場合は、これはデバッグに役立ちます(ワイヤータップで使用することを検討してください)。
ファイル固有のトランスフォーマーを構成するには、次の例に示すように、ファイル名前空間の適切な要素を使用できます。
<int-file:file-to-bytes-transformer input-channel="input" output-channel="output"
delete-files="true"/>
<int-file:file-to-string-transformer input-channel="input" output-channel="output"
delete-files="true" charset="UTF-8"/>
delete-files
オプションは、変換の補完後に受信ファイルを削除する必要があることをトランスフォーマーに通知します。これは、FileReadingMessageSource
がマルチスレッド環境で使用されている場合(一般に Spring Integration を使用している場合など)に、AcceptOnceFileListFilter
を使用する代わりにはなりません。
ファイル分割
FileSplitter
はバージョン 4.1.2 で追加され、その名前空間サポートはバージョン 4.2 で追加されました。FileSplitter
は、BufferedReader.readLine()
に基づいてテキストファイルを個々の行に分割します。デフォルトでは、スプリッターは Iterator
を使用して、ファイルから読み取られる行を一度に 1 行ずつ出力します。iterator
プロパティを false
に設定すると、メッセージとして送信する前にすべての行がメモリに読み込まれます。この使用例の 1 つは、行を含むメッセージを送信する前にファイルの I/O エラーを検出する場合です。ただし、比較的短いファイルに対してのみ実用的です。
受信ペイロードは、File
、String
(File
パス)、InputStream
、Reader
です。他のペイロード型は変更されずに発行されます。
次のリストは、FileSplitter
を構成するための可能な方法を示しています。
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlows
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
1 | スプリッターの Bean 名。 |
2 | true (デフォルト)に設定すると、イテレーターまたは false を使用して、行を送信する前にファイルをメモリにロードします。 |
3 | true に設定すると、ファイルデータの前後にファイル開始マーカーおよびファイル終了マーカーのメッセージが出力されます。マーカーは、FileSplitter.FileMarker ペイロードを持つメッセージです(mark プロパティに START および END 値が含まれます)。一部の行がフィルタリングされるダウンストリームフローでファイルを順次処理するときに、マーカーを使用できます。これにより、ファイルが完全に処理されたことをダウンストリーム処理で知ることができます。さらに、START または END を含む file_marker ヘッダーがこれらのメッセージに追加されます。END マーカーには行カウントが含まれます。ファイルが空の場合、START および END マーカーのみが lineCount として 0 で発行されます。デフォルトは false です。true の場合、apply-sequence はデフォルトで false です。markers-json (次の属性)も参照してください。 |
4 | markers が true の場合、これを true に設定して、FileMarker オブジェクトを JSON 文字列に変換します。(下に SimpleJsonSerializer を使用)。 |
5 | sequenceSize および sequenceNumber ヘッダーをメッセージに含めることを無効にするには、false に設定します。markers が true でない限り、デフォルトは true です。true および markers が true の場合、マーカーはシーケンスに含まれます。true および iterator が true の場合、サイズが不明であるため、sequenceSize ヘッダーは 0 に設定されます。 |
6 | true に設定すると、ファイルに行がない場合に RequiresReplyException がスローされます。デフォルトは false です。 |
7 | テキストデータを String ペイロードに読み込むときに使用する文字セット名を設定します。デフォルトはプラットフォームの文字セットです。 |
8 | 残りの行に対して発行されるメッセージのヘッダーとして搬送される最初の行のヘッダー名。バージョン 5.0 以降。 |
9 | メッセージをスプリッターに送信するために使用される入力チャネルを設定します。 |
10 | メッセージの送信先の出力チャネルを設定します。 |
11 | 送信タイムアウトを設定します。output-channel がブロックできる場合(フル QueueChannel など)にのみ適用されます。 |
12 | false に設定すると、コンテキストがリフレッシュされたときにスプリッターが自動的に開始されなくなります。デフォルトは true です。 |
13 | input-channel が <publish-subscribe-channel/> である場合、このエンドポイントの順序を設定します。 |
14 | スプリッターの起動フェーズを設定します(auto-startup が true の場合に使用)。 |
FileSplitter
はまた、テキストベースの InputStream
を行に分割します。バージョン 4.3 以降、FTP または SFTP ストリーミング受信チャネルアダプター、または stream
オプションを使用してファイルを取得する FTP または SFTP 送信ゲートウェイと併用すると、スプリッターはファイルが完全に終了したときにストリームをサポートするセッションを自動的に閉じます。消費これらの機能の詳細については、FTP ストリーミング受信チャネルアダプターおよび SFTP ストリーミング受信チャネルアダプターならびに FTP 送信ゲートウェイおよび SFTP 送信ゲートウェイを参照してください。
Java 構成を使用する場合、次の例に示すように、追加のコンストラクターを使用できます。
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
markersJson
が true の場合、マーカーは JSON 文字列として表されます(SimpleJsonSerializer
を使用)。
バージョン 5.0 では、コンテンツの最初の行がヘッダー(CSV ファイルの列名など)であることを指定する firstLineAsHeader
オプションが導入されました。このプロパティに渡される引数は、最初の行が残りの行に対して発行されるメッセージのヘッダーとして運ばれるヘッダー名です。この行は、シーケンスヘッダー(applySequence
が true の場合)にも、FileMarker.END
に関連付けられた lineCount
にも含まれていません。注: バージョン 5.5 以降、FileMarker
は JSON に直列化できるため、lineCount` も FileHeaders.LINE_COUNT
として FileMarker.END
メッセージのヘッダーに含まれています。ファイルにヘッダー行のみが含まれている場合、ファイルは空として扱われるため、分割中に FileMarker
インスタンスのみが発行されます(マーカーが有効になっている場合、それ以外の場合、メッセージは発行されません)。デフォルトでは(ヘッダー名が設定されていない場合)、最初の行はデータと見なされ、最初に発行されたメッセージのペイロードになります。
ファイルコンテンツからのヘッダー抽出に関するより複雑なロジック(最初の行ではなく、行のコンテンツ全体ではなく、特定のヘッダーなどではない)が必要な場合は、FileSplitter
の前にヘッダーエンリッチャーを使用することを検討してください。ヘッダーに移動された行は、通常のコンテンツプロセスの下流でフィルタリングされる場合があることに注意してください。
分割ファイルを処理するべき等べき下流
apply-sequence
が true の場合、スプリッターは SEQUENCE_NUMBER
ヘッダーに行番号を追加します(markers
が true の場合、マーカーは行としてカウントされます)。行番号をべき等レシーバーで使用すると、再起動後の行の再処理を回避できます。
例:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}
ファイルアグリゲーター
バージョン 5.5 以降、START/END マーカーが有効になっている場合に、FileSplitter
ユースケースの反対側をカバーするために FileAggregator
が導入されました。便宜上、FileAggregator
は 3 つのシーケンス詳細戦略すべてを実装しています。
FileHeaders.FILENAME
属性を持つHeaderAttributeCorrelationStrategy
は、相関キーの計算に使用されます。FileSplitter
でマーカーが有効になっている場合、START/END マーカーメッセージもシーケンスサイズに含まれるため、シーケンス詳細ヘッダーには入力されません。FileHeaders.FILENAME
は、START/END マーカーメッセージを含め、発行された各行に引き続き入力されます。FileMarkerReleaseStrategy
- グループ内のFileSplitter.FileMarker.Mark.END
メッセージをチェックしてから、FileHeaders.LINE_COUNT
ヘッダー値をグループサイズから2
-FileSplitter.FileMarker
インスタンスを引いた値と比較します。また、AbstractCorrelatingMessageHandler
で使用されるconditionSupplier
機能用の便利なGroupConditionProvider
接点も実装しています。詳細については、メッセージグループの状態を参照してください。FileAggregatingMessageGroupProcessor
は、グループからFileSplitter.FileMarker
メッセージを削除し、残りのメッセージをリストペイロードに収集して生成します。
次のリストは、FileAggregator
を構成するための可能な方法を示しています。
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
integrationFlow {
split(Files.splitter().markers().firstLineAsHeader("firstLine"))
channel { executor(taskExecutor) }
filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
transform(String::toUpperCase)
channel("aggregatorChannel")
aggregate(FileAggregator())
channel { queue("resultChannel") }
}
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
aggregator.setProcessorBean(new FileAggregator());
aggregator.setOutputChannel(outputChannel);
return aggregator;
}
<int:chain input-channel="input" output-channel="output">
<int-file:splitter markers="true"/>
<int:aggregator>
<bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
</int:aggregator>
</int:chain>
FileAggregator
のデフォルトの動作がターゲットロジックを満たさない場合は、個々の戦略でアグリゲーターエンドポイントを構成することをお勧めします。詳細については、FileAggregator
JavaDocs を参照してください。
リモート永続ファイルリストフィルター
受信およびストリーミング受信リモートファイルチャネルアダプター(FTP
、SFTP
、その他のテクノロジ)は、デフォルトで AbstractPersistentFileListFilter
の対応する実装で構成され、メモリ内 MetadataStore
で構成されます。クラスターで実行するには、共有 MetadataStore
を使用してこれらをフィルターに置き換えることができます(詳細については、メタデータストアを参照してください)。これらのフィルターは、同じファイルが複数回フェッチされるのを防ぐために使用されます(変更された時間の変更がない限り)。バージョン 5.2 以降、ファイルは、ファイルがフェッチされる直前にフィルターに追加されます(フェッチが失敗した場合は、逆になります)。
致命的な障害(停電など)が発生した場合、現在フェッチされているファイルがフィルターに残り、アプリケーションの再起動時に再フェッチされない可能性があります。この場合、このファイルを MetadataStore から手動で削除する必要があります。 |
以前のバージョンでは、ファイルは取得される前にフィルタリングされていたため、壊滅的な障害が発生した後、いくつかのファイルがこの状態になる可能性がありました。
この新しい動作を容易にするために、FileListFilter
に 2 つの新しいメソッドが追加されました。
boolean accept(F file);
boolean supportsSingleFileFiltering();
フィルターが supportsSingleFileFiltering
で true
を返す場合、accept()
を実装する必要があります。
リモートフィルターが単一ファイルのフィルター処理(AbstractMarkerFilePresentFileListFilter
など)をサポートしない場合、アダプターは以前の動作に戻ります。
(CompositeFileListFilter
または ChainFileListFilter
を使用して)複数のフィルターが使用されている場合、複合フィルターがそれをサポートするには、すべてのデリゲートフィルターが単一ファイルフィルタリングをサポートする必要があります。
永続ファイルリストフィルターにブールプロパティ forRecursion
が追加されました。このプロパティを true
に設定すると、alwaysAcceptDirectories
も設定されます。これは、送信ゲートウェイ(ls
および mget
)での再帰操作が常にディレクトリツリー全体を毎回トラバースすることを意味します。これは、ディレクトリツリーの奥深くで変更が検出されなかった問題を解決するためです。さらに、forRecursion=true
により、ファイルへのフルパスがメタデータストアキーとして使用されます。これにより、同じ名前のファイルが異なるディレクトリに複数回表示された場合にフィルターが正しく機能しなかった問題が解決されます。重要: これは、永続メタデータストア内の既存のキーが、最上位ディレクトリにあるファイルで見つからないことを意味します。このため、プロパティはデフォルトで false
です。これは将来のリリースで変更される可能性があります。