FTP ストリーミング受信チャネルアダプター
バージョン 4.3 は、ストリーミング受信チャネルアダプターを導入しました。このアダプターは、型 InputStream
のペイロードを持つメッセージを生成し、ローカルファイルシステムに書き込むことなくファイルを取得できるようにします。セッションは開いたままなので、ファイルを消費したときに消費側アプリケーションがセッションを閉じる必要があります。セッションは closeableResource
ヘッダー(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
)で提供されます。FileSplitter
や StreamTransformer
などの標準フレームワークコンポーネントは、セッションを自動的に閉じます。これらのコンポーネントの詳細については、ファイル分割およびストリームトランスを参照してください。次の例は、inbound-streaming-channel-adapter
を構成する方法を示しています。
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
filename-pattern
、filename-regex
、filter
または filter-expression
のいずれか 1 つのみが許可されます。
バージョン 5.0 以降、デフォルトでは、FtpStreamingMessageSource アダプターは、メモリ内 SimpleMetadataStore に基づいて FtpPersistentAcceptOnceFileListFilter を使用したリモートファイルの重複を防ぎます。デフォルトでは、このフィルターはファイル名パターン(または正規表現)でも適用されます。重複を許可する必要がある場合は、AcceptAllFileListFilter を使用できます。その他のユースケースは、CompositeFileListFilter (または ChainFileListFilter )で処理できます。Java 構成(このドキュメントで後述)には、重複を避けるために処理後にリモートファイルを削除する 1 つの手法が示されています。 |
FtpPersistentAcceptOnceFileListFilter
の詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。
max-fetch-size
属性を使用して、フェッチが必要なときに各ポーリングでフェッチされるファイルの数を制限します。1
に設定し、クラスター環境で実行する場合は永続フィルターを使用します。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。
アダプターは、リモートディレクトリとファイル名をそれぞれ FileHeaders.REMOTE_DIRECTORY
ヘッダーと FileHeaders.REMOTE_FILE
ヘッダーに挿入します。バージョン 5.0 以降、FileHeaders.REMOTE_FILE_INFO
ヘッダーは追加の リモートファイル情報を提供します (デフォルトでは JSON で表されます)。FtpStreamingMessageSource
の fileInfoJson
プロパティを false
に設定すると、ヘッダーには FtpFileInfo
オブジェクトが含まれます。基礎となる Apache Net ライブラリによって提供される FTPFile
オブジェクトには、FtpFileInfo.getFileInfo()
メソッドを使用してアクセスできます。fileInfoJson
プロパティは、XML 構成を使用する場合は使用できませんが、構成クラスの 1 つに FtpStreamingMessageSource
を挿入することで設定できます。リモートファイル情報も参照してください。
バージョン 5.1 以降、comparator
の汎用型は FTPFile
です。以前は、AbstractFileInfo<FTPFile>
でした。これは、maxFetch
をフィルタリングして適用する前に、処理の早い段階でソートが実行されるようになったためです。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
この例では、トランスフォーマーの下流にあるメッセージハンドラーに、処理後に リモートファイルを削除する advice
があることに注意してください。