SFTP ストリーミング受信チャネルアダプター

バージョン 4.3 は、ストリーミング受信チャネルアダプターを導入しました。このアダプターは、型 InputStream のペイロードを持つメッセージを生成し、ローカルファイルシステムに書き込むことなくファイルをフェッチできるようにします。セッションは開いたままなので、ファイルを消費したときに消費側アプリケーションがセッションを閉じる必要があります。セッションは closeableResource ヘッダー(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE)で提供されます。FileSplitter や StreamTransformer などの標準フレームワークコンポーネントは、セッションを自動的に閉じます。これらのコンポーネントの詳細については、ファイル分割およびストリームトランスを参照してください。次の例は、SFTP ストリーミング受信チャネルアダプターを構成する方法を示しています。

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

filename-patternfilename-regexfilter または filter-expression のいずれか 1 つのみを使用できます。

バージョン 5.0 以降、デフォルトでは、SftpStreamingMessageSource アダプターは、メモリ内 SimpleMetadataStore に基づいて SftpPersistentAcceptOnceFileListFilter を使用することにより、リモートファイルの重複を防ぎます。デフォルトでは、このフィルターもファイル名パターン(または正規表現)とともに適用されます。重複を許可する必要がある場合は、AcceptAllFileListFilter を使用できます。CompositeFileListFilter (または ChainFileListFilter)を使用して、他のユースケースを処理できます。で示す Java 構成は、処理後にリモートファイルを削除して重複を回避する 1 つの手法を示しています。

SftpPersistentAcceptOnceFileListFilter の詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。

max-fetch-size 属性を使用して、フェッチが必要な場合に各ポーリングでフェッチされるファイルの数を制限できます。1 に設定し、クラスター環境で実行する場合は永続フィルターを使用します。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。

アダプターは、リモートディレクトリとファイル名をヘッダーに入れます (それぞれ FileHeaders.REMOTE_DIRECTORY と FileHeaders.REMOTE_FILE)。バージョン 5.0 以降、FileHeaders.REMOTE_FILE_INFO ヘッダーは追加の リモートファイル情報を (JSON で) 提供します。SftpStreamingMessageSource の fileInfoJson プロパティを false に設定すると、ヘッダーに SftpFileInfo オブジェクトが含まれます。SftpFileInfo.getFileInfo() メソッドを使用して、基になる SftpClient によって提供される SftpClient.DirEntry オブジェクトにアクセスできます。XML 構成を使用する場合、fileInfoJson プロパティは使用できませんが、構成クラスの 1 つに SftpStreamingMessageSource を挿入することによって設定できます。リモートファイル情報も参照してください。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("sftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + '/' +  headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

この例では、トランスフォーマーの下流にあるメッセージハンドラーに、処理後に リモートファイルを削除する advice があることに注意してください。