FTP ストリーミング受信チャネルアダプター

バージョン 4.3 は、ストリーミング受信チャネルアダプターを導入しました。このアダプターは、型 InputStream のペイロードを持つメッセージを生成し、ローカルファイルシステムに書き込むことなくファイルを取得できるようにします。セッションは開いたままなので、ファイルを消費したときに消費側アプリケーションがセッションを閉じる必要があります。セッションは closeableResource ヘッダー(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE)で提供されます。FileSplitter や StreamTransformer などの標準フレームワークコンポーネントは、セッションを自動的に閉じます。これらのコンポーネントの詳細については、ファイル分割およびストリームトランスを参照してください。次の例は、inbound-streaming-channel-adapter を構成する方法を示しています。

<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>

filename-patternfilename-regexfilter または filter-expression のいずれか 1 つのみが許可されます。

バージョン 5.0 以降、デフォルトでは、FtpStreamingMessageSource アダプターは、メモリ内 SimpleMetadataStore に基づいて FtpPersistentAcceptOnceFileListFilter を使用したリモートファイルの重複を防ぎます。デフォルトでは、このフィルターはファイル名パターン(または正規表現)でも適用されます。重複を許可する必要がある場合は、AcceptAllFileListFilter を使用できます。その他のユースケースは、CompositeFileListFilter (または ChainFileListFilter)で処理できます。Java 構成(このドキュメントで後述)には、重複を避けるために処理後にリモートファイルを削除する 1 つの手法が示されています

FtpPersistentAcceptOnceFileListFilter の詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。

max-fetch-size 属性を使用して、フェッチが必要なときに各ポーリングでフェッチされるファイルの数を制限します。1 に設定し、クラスター環境で実行する場合は永続フィルターを使用します。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。

アダプターは、リモートディレクトリとファイル名をそれぞれ FileHeaders.REMOTE_DIRECTORY ヘッダーと FileHeaders.REMOTE_FILE ヘッダーに挿入します。バージョン 5.0 以降、FileHeaders.REMOTE_FILE_INFO ヘッダーは追加の リモートファイル情報を提供します (デフォルトでは JSON で表されます)。FtpStreamingMessageSource の fileInfoJson プロパティを false に設定すると、ヘッダーには FtpFileInfo オブジェクトが含まれます。基礎となる Apache Net ライブラリによって提供される FTPFile オブジェクトには、FtpFileInfo.getFileInfo() メソッドを使用してアクセスできます。fileInfoJson プロパティは、XML 構成を使用する場合は使用できませんが、構成クラスの 1 つに FtpStreamingMessageSource を挿入することで設定できます。リモートファイル情報も参照してください。

バージョン 5.1 以降、comparator の汎用型は FTPFile です。以前は、AbstractFileInfo<FTPFile> でした。これは、maxFetch をフィルタリングして適用する前に、処理の早い段階でソートが実行されるようになったためです。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class FtpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("ftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public FtpRemoteFileTemplate template() {
        return new FtpRemoteFileTemplate(ftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

この例では、トランスフォーマーの下流にあるメッセージハンドラーに、処理後に リモートファイルを削除する advice があることに注意してください。