FTP 受信チャネルアダプター
FTP 受信チャネルアダプターは、FTP サーバーに接続し、リモートディレクトリイベント(たとえば、新規ファイルの作成)をリッスンする特別なリスナーです。イベントが発生すると、ファイル転送を開始します。次の例は、inbound-channel-adapter の設定方法を示しています。
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter> 上記の構成が示すように、inbound-channel-adapter 要素を使用して FTP 受信チャネルアダプターを構成すると同時に、local-directory、filename-pattern (正規表現ではなく単純なパターンマッチングに基づく)、および session-factory
デフォルトでは、転送されたファイルには元のファイルと同じ名前が付けられます。この動作をオーバーライドする場合は、local-filename-generator-expression 属性を設定できます。これにより、ローカルファイルの名前を生成する SpEL 式を提供できます。SpEL 評価コンテキストのルートオブジェクトが Message である送信ゲートウェイおよびアダプターとは異なり、この受信アダプターは、評価時にメッセージをまだ持っていません。これは、転送されたファイルをペイロードとして最終的に生成するためです。SpEL 評価コンテキストのルートオブジェクトは、リモートファイルの元の名前(String)です。
受信チャネルアダプターは、最初にローカルディレクトリの File オブジェクトを取得し、ポーラー設定に従って各ファイルを発行します。バージョン 5.0 以降、新しいファイルの取得が必要な場合に FTP サーバーから取得するファイルの数を制限できるようになりました。これは、ターゲットファイルが非常に大きい場合や、後述する永続的なファイルリストフィルターを使用してクラスター化システムで実行する場合に役立ちます。この目的のために max-fetch-size を使用します。負の値(デフォルト)は制限がないことを意味し、一致するすべてのファイルが取得されます。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。バージョン 5.0 以降、scanner 属性を設定することにより、inbound-channel-adapter にカスタム DirectoryScanner 実装を提供することもできます。
Spring Integration 3.0 以降、preserve-timestamp 属性を指定できます(デフォルトは false です)。true の場合、ローカルファイルの変更されたタイムスタンプは、サーバーから取得した値に設定されます。それ以外の場合は、現在の時刻に設定されます。
バージョン 4.2 以降、remote-directory の代わりに remote-directory-expression を指定して、各ポーリングでディレクトリを動的に決定できます(例: remote-directory-expression="@myBean.determineRemoteDir()")。
バージョン 4.3 から、remote-directory および remote-directory-expression 属性を省略できます。デフォルトは null です。この場合、FTP プロトコルに従って、クライアントの作業ディレクトリがデフォルトのリモートディレクトリとして使用されます。
filename-pattern 属性で指定された単純なパターンに基づくファイルフィルタリングでは不十分な場合があります。この場合、filename-regex 属性を使用して、正規表現(filename-regex=".*\.test$" など)を指定できます。また、完全な制御が必要な場合は、filter 属性を使用して、ファイルのリストをフィルタリングするための戦略インターフェースである o.s.i.file.filters.FileListFilter のカスタム実装への参照を提供できます。このフィルターは、どのリモートファイルを取得するかを決定します。CompositeFileListFilter を使用して、パターンベースのフィルターを他のフィルター(以前にフェッチされたファイルの同期を避けるための AcceptOnceFileListFilter など)と組み合わせることもできます。
AcceptOnceFileListFilter はその状態をメモリに保存します。システムの再起動後も状態を維持したい場合は、代わりに FtpPersistentAcceptOnceFileListFilter の使用を検討してください。このフィルターは、受け入れられたファイル名を MetadataStore ストラテジーのインスタンスに保存します(メタデータストアを参照)。このフィルターは、ファイル名とリモート変更時刻で一致します。
バージョン 4.0 以降、このフィルターには ConcurrentMetadataStore が必要です。共有データストア(Redis と RedisMetadataStore など)で使用すると、フィルターキーを複数のアプリケーションまたはサーバーインスタンス間で共有できます。
バージョン 5.0 以降、FtpInboundFileSynchronizer には、メモリ内 SimpleMetadataStore を含む FtpPersistentAcceptOnceFileListFilter がデフォルトで適用されます。このフィルターは、XML 構成の regex または pattern オプション、および Java DSL の FtpInboundChannelAdapterSpec にも適用されます。その他のユースケースは、CompositeFileListFilter (または ChainFileListFilter)で管理できます。
これまでの説明では、ファイルを取得する前にフィルタリングすることについて言及しました。ファイルが取得されると、ファイルシステム上のファイルに追加のフィルターが適用されます。デフォルトでは、これは AcceptOnceFileListFilter であり、前述のように、メモリ内の状態を保持し、ファイルの変更時刻は考慮しません。アプリケーションが処理後にファイルを削除しない限り、アダプターはアプリケーションの再起動後にデフォルトでディスク上のファイルを再処理します。
また、FtpPersistentAcceptOnceFileListFilter を使用するように filter を構成し、リモートファイルのタイムスタンプが変更された場合(再取得されるため)、デフォルトのローカルフィルターはこの新しいファイルの処理を許可しません。
このフィルターとその使用方法の詳細については、リモート永続ファイルリストフィルターを参照してください。
local-filter 属性を使用して、ローカルファイルシステムフィルターの動作を構成できます。バージョン 4.3.8 以降、FileSystemPersistentAcceptOnceFileListFilter はデフォルトで構成されています。このフィルターは、受け入れられたファイル名と変更されたタイムスタンプを MetadataStore 戦略のインスタンス(メタデータストアを参照)に保存し、ローカルファイルの変更時刻の変更を検出します。デフォルトの MetadataStore は SimpleMetadataStore であり、メモリに状態を保存します。
バージョン 4.1.5 以降、これらのフィルターには、更新ごとにメタデータストアをフラッシュする新しいプロパティ(flushOnUpdate)があります(ストアが Flushable を実装している場合)。
さらに、分散 MetadataStore ( Redis など) を使用する場合は、同じアダプターまたはアプリケーションの複数のインスタンスを持つことができ、各ファイルが 1 回だけ処理されるようにすることができます。 |
実際のローカルフィルターは ChainFileListFilter であり、ダウンロード中のファイル(temporary-file-suffix に基づく)と指定されたフィルターの処理を阻止するパターンフィルターが含まれています。ファイルはこのサフィックス(デフォルトは .writing)付きでダウンロードされ、転送が完了すると最終的な名前に変更され、フィルターに「見える」ようになります。
remote-file-separator 属性を使用すると、デフォルトの "/" が特定の環境に適用できない場合に使用するファイル区切り文字を構成できます。
これらの属性の詳細については、スキーマ [GitHub] (英語) を参照してください。
FTP 受信チャネルアダプターはポーリングコンシューマーであることも理解しておく必要があります。そのため、ポーラーを設定する必要があります(グローバルデフォルトまたはローカルサブ要素のいずれかを使用)。ファイルが転送されると、ペイロードとして java.io.File を持つメッセージが生成され、channel 属性で識別されるチャネルに送信されます。
バージョン 6.2 以降では、FtpLastModifiedFileListFilter を使用して、最終変更戦略に基づいて FTP ファイルをフィルタリングできます。このフィルターは age プロパティを使用して構成でき、この値よりも古いファイルのみがフィルターを通過するようになります。経過時間のデフォルトは 60 秒ですが、(ネットワークの不具合などにより)ファイルが早期に取得されることを避けるために、十分な長さを選択する必要があります。詳細については、Javadoc を参照してください。
対照的に、バージョン 6.5 以降では、提供された age よりも古くないファイルのみを受け入れるために FtpRecentFileListFilter が導入されました。
ファイルフィルタリングと不完全なファイルの詳細
モニター対象(リモート)ディレクトリに表示されたばかりのファイルが完全でない場合があります。通常、このようなファイルは一時的な拡張子(somefile.txt.writing など)で書き込まれ、書き込みプロセスが終了すると名前が変更されます。ほとんどの場合、完全なファイルのみに関心があり、完全なファイルのみをフィルタリングしたいと考えています。これらのシナリオを処理するために、filename-pattern、filename-regex、filter 属性によって提供されるフィルタリングサポートを使用できます。次の例では、カスタムフィルターの実装を使用しています。
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>受信 FTP アダプターのポーラー構成に関する注意
受信 FTP アダプターのジョブは、2 つのタスクで構成されています。
リモートディレクトリからローカルディレクトリにファイルを転送するために、リモートサーバーと通信します。
転送されたファイルごとに、そのファイルをペイロードとしてメッセージを生成し、'channel' 属性によって識別されたチャネルに送信します。そのため、単に「アダプター」ではなく「チャネルアダプター」と呼ばれています。このようなアダプターの主なジョブは、メッセージを生成してメッセージチャネルに送信することです。基本的に、2 番目のタスクは、ローカルディレクトリにすでに 1 つ以上のファイルがある場合、最初にそれらからメッセージを生成するようなメソッドで優先されます。すべてのローカルファイルが処理された場合にのみ、リモート通信を開始してさらにファイルを取得します。
また、ポーラーでトリガーを構成するときは、max-messages-per-poll 属性に細心の注意を払う必要があります。デフォルト値は、すべての SourcePollingChannelAdapter インスタンス(FTP を含む)の 1 です。これは、1 つのファイルが処理されるとすぐに、トリガー構成で決定された次の実行時間まで待機することを意味します。local-directory に 1 つ以上のファイルが存在する場合、リモート FTP サーバーとの通信を開始する前にそれらのファイルを処理します。また、max-messages-per-poll が 1 (デフォルト)に設定されている場合、トリガーで定義された間隔で、一度に 1 つのファイルのみを処理し、"one-poll === one-file" として機能します。
典型的なファイル転送のユースケースでは、逆の動作が必要になる可能性が高くなります。各ポーリングで可能なすべてのファイルを処理し、次のポーリングを待つだけです。その場合は、max-messages-per-poll を -1 に設定します。次に、各ポーリングで、アダプターは可能な限り多くのメッセージを生成しようとします。つまり、ローカルディレクトリ内のすべてを処理し、リモートディレクトリに接続して、そこで使用可能なすべてを転送してローカルで処理します。その場合にのみ、ポーリング操作は完了したと見なされ、ポーラーは次の実行時間まで待機します。
あるいは、「ポーリングごとの最大メッセージ数」の値を、各ポーリングでファイルから作成されるメッセージの上限を示す正の値に設定することもできます。例: 10 の値は、各ポーリングで、10 個以下のファイルを処理しようとすることを意味します。
障害からの回復
アダプターのアーキテクチャーを理解することが重要です。ファイルを取得するファイルシンクロナイザーと、同期されたファイルごとにメッセージを送信する FileReadingMessageSource があります。前述のように、2 つのフィルターが関係しています。filter 属性(およびパターン)は、リモート(FTP)ファイルリストを参照して、すでに取得されたファイルの取得を回避します。local-filter は、FileReadingMessageSource がメッセージとして送信するファイルを決定するために使用されます。
シンクロナイザーはリモートファイルをリストし、そのフィルターを調べます。その後、ファイルが転送されます。ファイル転送中に IO エラーが発生した場合、フィルターにすでに追加されているファイルはすべて削除され、次のポーリングで再取得できるようになります。これは、フィルターが ReversibleFileListFilter (AcceptOnceFileListFilter など)を実装する場合にのみ適用されます。
ファイルを同期した後、ファイルを処理するダウンストリームフローでエラーが発生した場合、フィルターの自動ロールバックは発生しないため、失敗したファイルはデフォルトで再処理されません。
失敗後にそのようなファイルを再処理する場合は、次のような構成を使用して、フィルターから失敗したファイルを簡単に削除できます。
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" /> 前述の構成は、すべての ResettableFileListFilter で機能します。
Starting with version 5.0, the inbound channel adapter can build subdirectories locally that correspond to the generated local file name. That can be a remote sub-path as well. To be able to read a local directory recursively for modification according to the hierarchy support, you can now supply an internal FileReadingMessageSource with a new RecursiveDirectoryScanner based on the Files.walk() algorithm. See AbstractInboundFileSynchronizingMessageSource.setScanner() (Javadoc) for more information. Also, you can now switch the AbstractInboundFileSynchronizingMessageSource to the WatchService based DirectoryScanner by using setUseWatchService() option. It is also configured for all the WatchEventType instances to react to any modifications in local directory. The reprocessing sample shown earlier is based on the built-in functionality of the FileReadingMessageSource.WatchServiceDirectoryScanner to perform ResettableFileListFilter.remove() when the file is deleted (StandardWatchEventKinds.ENTRY_DELETE) from the local directory. See WatchServiceDirectoryScanner for more information.
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}Java DSL を使用した構成
次の Spring Boot アプリケーションは、Java DSL で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}不完全なデータの処理
不完全なデータの処理を参照してください。
FtpSystemMarkerFilePresentFileListFilter は、リモートシステム上に対応するマーカーファイルを持たないリモートファイルをフィルタリングするために提供されています。構成情報については、Javadoc を参照(および親クラスを参照)してください。