FTP 受信チャネルアダプター
FTP 受信チャネルアダプターは、FTP サーバーに接続し、ファイル転送を開始するリモートディレクトリイベント(たとえば、作成された新しいファイル)をリッスンする特別なリスナーです。次の例は、inbound-channel-adapter
を構成する方法を示しています。
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
上記の構成が示すように、inbound-channel-adapter
要素を使用して FTP 受信チャネルアダプターを構成すると同時に、local-directory
、filename-pattern
(正規表現ではなく単純なパターンマッチングに基づく)、および session-factory
デフォルトでは、転送されたファイルには元のファイルと同じ名前が付けられます。この動作をオーバーライドする場合は、local-filename-generator-expression
属性を設定できます。これにより、ローカルファイルの名前を生成する SpEL 式を提供できます。SpEL 評価コンテキストのルートオブジェクトが Message
である送信ゲートウェイおよびアダプターとは異なり、この受信アダプターは、評価時にメッセージをまだ持っていません。これは、転送されたファイルをペイロードとして最終的に生成するためです。SpEL 評価コンテキストのルートオブジェクトは、リモートファイルの元の名前(String
)です。
受信チャネルアダプターは、最初にローカルディレクトリの File
オブジェクトを取得し、ポーラー設定に従って各ファイルを発行します。バージョン 5.0 以降、新しいファイルの取得が必要な場合に FTP サーバーから取得するファイルの数を制限できるようになりました。これは、ターゲットファイルが非常に大きい場合や、後述する永続的なファイルリストフィルターを使用してクラスター化システムで実行する場合に役立ちます。この目的のために max-fetch-size
を使用します。負の値(デフォルト)は制限がないことを意味し、一致するすべてのファイルが取得されます。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。バージョン 5.0 以降、scanner
属性を設定することにより、inbound-channel-adapter
にカスタム DirectoryScanner
実装を提供することもできます。
Spring Integration 3.0 以降、preserve-timestamp
属性を指定できます(デフォルトは false
です)。true
の場合、ローカルファイルの変更されたタイムスタンプは、サーバーから取得した値に設定されます。それ以外の場合は、現在の時刻に設定されます。
バージョン 4.2 以降、remote-directory
の代わりに remote-directory-expression
を指定して、各ポーリングでディレクトリを動的に決定できます(例: remote-directory-expression="@myBean.determineRemoteDir()"
)。
バージョン 4.3 から、remote-directory
および remote-directory-expression
属性を省略できます。デフォルトは null
です。この場合、FTP プロトコルに従って、クライアントの作業ディレクトリがデフォルトのリモートディレクトリとして使用されます。
filename-pattern
属性で指定された単純なパターンに基づくファイルフィルタリングでは不十分な場合があります。この場合、filename-regex
属性を使用して、正規表現(filename-regex=".*\.test$"
など)を指定できます。また、完全な制御が必要な場合は、filter
属性を使用して、ファイルのリストをフィルタリングするための戦略インターフェースである o.s.i.file.filters.FileListFilter
のカスタム実装への参照を提供できます。このフィルターは、どのリモートファイルを取得するかを決定します。CompositeFileListFilter
を使用して、パターンベースのフィルターを他のフィルター(以前にフェッチされたファイルの同期を避けるための AcceptOnceFileListFilter
など)と組み合わせることもできます。
AcceptOnceFileListFilter
はその状態をメモリに保存します。システムの再起動後も状態を維持したい場合は、代わりに FtpPersistentAcceptOnceFileListFilter
の使用を検討してください。このフィルターは、受け入れられたファイル名を MetadataStore
ストラテジーのインスタンスに保存します(メタデータストアを参照)。このフィルターは、ファイル名とリモート変更時刻で一致します。
バージョン 4.0 以降、このフィルターには ConcurrentMetadataStore
が必要です。共有データストア(Redis
と RedisMetadataStore
など)で使用すると、フィルターキーを複数のアプリケーションまたはサーバーインスタンス間で共有できます。
バージョン 5.0 以降、FtpInboundFileSynchronizer
には、メモリ内 SimpleMetadataStore
を含む FtpPersistentAcceptOnceFileListFilter
がデフォルトで適用されます。このフィルターは、XML 構成の regex
または pattern
オプション、および Java DSL の FtpInboundChannelAdapterSpec
にも適用されます。その他のユースケースは、CompositeFileListFilter
(または ChainFileListFilter
)で管理できます。
これまでの説明では、ファイルを取得する前にフィルタリングすることについて言及しました。ファイルが取得されると、ファイルシステム上のファイルに追加のフィルターが適用されます。デフォルトでは、これは AcceptOnceFileListFilter
であり、前述のように、メモリ内の状態を保持し、ファイルの変更時刻は考慮しません。アプリケーションが処理後にファイルを削除しない限り、アダプターはアプリケーションの再起動後にデフォルトでディスク上のファイルを再処理します。
また、FtpPersistentAcceptOnceFileListFilter
を使用するように filter
を構成し、リモートファイルのタイムスタンプが変更された場合(再取得されるため)、デフォルトのローカルフィルターはこの新しいファイルの処理を許可しません。
このフィルターの詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。
local-filter
属性を使用して、ローカルファイルシステムフィルターの動作を構成できます。バージョン 4.3.8 以降、FileSystemPersistentAcceptOnceFileListFilter
はデフォルトで構成されています。このフィルターは、受け入れられたファイル名と変更されたタイムスタンプを MetadataStore
戦略のインスタンス(メタデータストアを参照)に保存し、ローカルファイルの変更時刻の変更を検出します。デフォルトの MetadataStore
は SimpleMetadataStore
であり、メモリに状態を保存します。
バージョン 4.1.5 以降、これらのフィルターには、更新ごとにメタデータストアをフラッシュする新しいプロパティ(flushOnUpdate
)があります(ストアが Flushable
を実装している場合)。
さらに、分散 MetadataStore ( Redis など) を使用する場合は、同じアダプターまたはアプリケーションの複数のインスタンスを持つことができ、各ファイルが 1 回だけ処理されるようにすることができます。 |
実際のローカルフィルターは、指定されたフィルターと、ダウンロード中のファイルの処理を防ぐパターンフィルターを含む CompositeFileListFilter
です(temporary-file-suffix
に基づく)。ファイルはこの接尾辞(デフォルトは .writing
)でダウンロードされ、転送が完了するとファイルの名前が最終的な名前に変更され、フィルターから「見える」ようになります。
remote-file-separator
属性を使用すると、デフォルトの "/" が特定の環境に適用できない場合に使用するファイル区切り文字を構成できます。
これらの属性の詳細については、スキーマ [GitHub] (英語) を参照してください。
また、FTP 受信チャネルアダプターはポーリングコンシューマーであることも理解する必要があります。ポーラーを構成する必要があります(グローバルデフォルトまたはローカルサブ要素を使用して)。ファイルが転送されると、java.io.File
をペイロードとして持つメッセージが生成され、channel
属性によって識別されるチャネルに送信されます。
バージョン 6.2 以降では、FtpLastModifiedFileListFilter
を使用して、最終変更戦略に基づいて FTP ファイルをフィルタリングできます。このフィルターは age
プロパティを使用して構成でき、この値よりも古いファイルのみがフィルターを通過するようになります。経過時間のデフォルトは 60 秒ですが、(ネットワークの不具合などにより)ファイルが早期に取得されることを避けるために、十分な長さを選択する必要があります。詳細については、Javadoc を参照してください。
ファイルフィルタリングと不完全なファイルの詳細
モニター対象(リモート)ディレクトリに表示されたばかりのファイルが完全でない場合があります。通常、このようなファイルは一時的な拡張子(somefile.txt.writing
など)で書き込まれ、書き込みプロセスが終了すると名前が変更されます。ほとんどの場合、完全なファイルのみに関心があり、完全なファイルのみをフィルタリングしたいと考えています。これらのシナリオを処理するために、filename-pattern
、filename-regex
、filter
属性によって提供されるフィルタリングサポートを使用できます。次の例では、カスタムフィルターの実装を使用しています。
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
受信 FTP アダプターのポーラー構成に関する注意
受信 FTP アダプターのジョブは、2 つのタスクで構成されています。
リモートディレクトリからローカルディレクトリにファイルを転送するために、リモートサーバーと通信します。
転送されたファイルごとに、そのファイルをペイロードとしてメッセージを生成し、'channel' 属性によって識別されたチャネルに送信します。そのため、単に「アダプター」ではなく「チャネルアダプター」と呼ばれています。このようなアダプターの主なジョブは、メッセージを生成してメッセージチャネルに送信することです。基本的に、2 番目のタスクは、ローカルディレクトリにすでに 1 つ以上のファイルがある場合、最初にそれらからメッセージを生成するようなメソッドで優先されます。すべてのローカルファイルが処理された場合にのみ、リモート通信を開始してさらにファイルを取得します。
また、ポーラーでトリガーを構成するときは、max-messages-per-poll
属性に細心の注意を払う必要があります。デフォルト値は、すべての SourcePollingChannelAdapter
インスタンス(FTP を含む)の 1
です。これは、1 つのファイルが処理されるとすぐに、トリガー構成で決定された次の実行時間まで待機することを意味します。local-directory
に 1 つ以上のファイルが存在する場合、リモート FTP サーバーとの通信を開始する前にそれらのファイルを処理します。また、max-messages-per-poll
が 1
(デフォルト)に設定されている場合、トリガーで定義された間隔で、一度に 1 つのファイルのみを処理し、"one-poll === one-file" として機能します。
典型的なファイル転送のユースケースでは、逆の動作が必要になる可能性が高くなります。各ポーリングで可能なすべてのファイルを処理し、次のポーリングを待つだけです。その場合は、max-messages-per-poll
を -1 に設定します。次に、各ポーリングで、アダプターは可能な限り多くのメッセージを生成しようとします。つまり、ローカルディレクトリ内のすべてを処理し、リモートディレクトリに接続して、そこで使用可能なすべてを転送してローカルで処理します。その場合にのみ、ポーリング操作は完了したと見なされ、ポーラーは次の実行時間まで待機します。
あるいは、「ポーリングごとの最大メッセージ数」の値を、各ポーリングでファイルから作成されるメッセージの上限を示す正の値に設定することもできます。例: 10
の値は、各ポーリングで、10 個以下のファイルを処理しようとすることを意味します。
障害からの回復
アダプターのアーキテクチャーを理解することが重要です。ファイルを取得するファイルシンクロナイザーと、同期されたファイルごとにメッセージを送信する FileReadingMessageSource
があります。前述のように、2 つのフィルターが関係しています。filter
属性(およびパターン)は、リモート(FTP)ファイルリストを参照して、すでに取得されたファイルの取得を回避します。local-filter
は、FileReadingMessageSource
がメッセージとして送信するファイルを決定するために使用されます。
シンクロナイザーはリモートファイルをリストし、そのフィルターを調べます。その後、ファイルが転送されます。ファイル転送中に IO エラーが発生した場合、フィルターにすでに追加されているファイルはすべて削除され、次のポーリングで再取得できるようになります。これは、フィルターが ReversibleFileListFilter
(AcceptOnceFileListFilter
など)を実装する場合にのみ適用されます。
ファイルを同期した後、ファイルを処理するダウンストリームフローでエラーが発生した場合、フィルターの自動ロールバックは発生しないため、失敗したファイルはデフォルトで再処理されません。
失敗後にそのようなファイルを再処理する場合は、次のような構成を使用して、フィルターから失敗したファイルを簡単に削除できます。
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
前述の構成は、すべての ResettableFileListFilter
で機能します。
バージョン 5.0 以降、受信チャネルアダプターは、生成されたローカルファイル名に対応するサブディレクトリをローカルに構築できます。リモートサブパスでもあります。階層サポートに従って変更するためにローカルディレクトリを再帰的に読み取ることができるように、Files.walk()
アルゴリズムに基づいて新しい RecursiveDirectoryScanner
を内部 FileReadingMessageSource
に提供できるようになりました。詳細については、AbstractInboundFileSynchronizingMessageSource.setScanner()
(Javadoc) を参照してください。また、setUseWatchService()
オプションを使用して、AbstractInboundFileSynchronizingMessageSource
を WatchService
ベースの DirectoryScanner
に切り替えることができるようになりました。また、すべての WatchEventType
インスタンスがローカルディレクトリの変更に反応するように構成されています。前に示した再処理サンプルは、FileReadingMessageSource.WatchServiceDirectoryScanner
の組み込み機能に基づいており、ローカルディレクトリからファイルが削除された(StandardWatchEventKinds.ENTRY_DELETE
)ときに ResettableFileListFilter.remove()
を実行します。詳細については、WatchServiceDirectoryScanner
を参照してください。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Java DSL を使用した構成
次の Spring Boot アプリケーションは、Java DSL で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
不完全なデータの処理
不完全なデータの処理を参照してください。
FtpSystemMarkerFilePresentFileListFilter
は、リモートシステム上に対応するマーカーファイルを持たないリモートファイルをフィルタリングするために提供されています。構成情報については、Javadoc を参照(および親クラスを参照)してください。