SFTP 受信チャネルアダプター

SFTP 受信チャネルアダプターは、サーバーに接続し、リモートディレクトリイベント(作成中の新しいファイルなど)をリッスンする特別なリスナーです。この時点で、ファイル転送を開始します。次の例は、SFTP 受信チャネルアダプターを構成する方法を示しています。

<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
              session-factory="sftpSessionFactory"
            channel="requestChannel"
            filename-pattern="*.txt"
            remote-directory="/foo/bar"
            preserve-timestamp="true"
            local-directory="file:target/foo"
            auto-create-local-directory="true"
            local-filename-generator-expression="#this.toUpperCase() + '.a'"
            scanner="myDirScanner"
            local-filter="myFilter"
            temporary-file-suffix=".writing"
            max-fetch-size="-1"
            delete-remote-files="false">
        <int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

上記の構成例は、以下を含むさまざまな属性の値を提供する方法を示しています。

  • local-directory: ファイルが転送される場所

  • remote-directory: ファイルの転送元のリモートソースディレクトリ

  • session-factory: 前に構成した Bean への参照

デフォルトでは、転送されたファイルには元のファイルと同じ名前が付けられます。この動作をオーバーライドする場合は、local-filename-generator-expression 属性を設定できます。これにより、ローカルファイルの名前を生成する SpEL 式を提供できます。SpEL 評価コンテキストのルートオブジェクトが Message である送信ゲートウェイおよびアダプターとは異なり、この受信アダプターは評価時にメッセージをまだ持っていません。これは、転送されたファイルをペイロードとして最終的に生成するためです。SpEL 評価コンテキストのルートオブジェクトは、リモートファイルの元の名前(String)です。

受信チャネルアダプターは、最初にファイルをローカルディレクトリに取得し、ポーラー構成に従って各ファイルを発行します。バージョン 5.0 以降、新しいファイルの取得が必要な場合に SFTP サーバーから取得するファイルの数を制限できます。これは、ターゲットファイルが大きい場合、またはこのセクションで後述する永続的なファイルリストフィルターを使用してクラスター化システムで実行する場合に役立ちます。この目的には max-fetch-size を使用してください。負の値(デフォルト)は制限がないことを意味し、一致するすべてのファイルが取得されます。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。バージョン 5.0 以降、scanner 属性を設定することにより、inbound-channel-adapter にカスタム DirectoryScanner 実装を提供することもできます。

Spring Integration 3.0 以降、preserve-timestamp 属性を指定できます(デフォルトは false です)。true の場合、ローカルファイルの変更されたタイムスタンプは、サーバーから取得した値に設定されます。それ以外の場合は、現在の時刻に設定されます。

バージョン 4.2 以降、remote-directory の代わりに remote-directory-expression を指定できます。これにより、各ポーリングでディレクトリを動的に決定できます(例: remote-directory-expression="@myBean.determineRemoteDir()")。

filename-pattern 属性で指定された単純なパターンに基づいたファイルフィルタリングでは不十分な場合があります。この場合、filename-regex 属性を使用して、正規表現(たとえば、filename-regex=".*\.test$")を指定できます。完全な制御が必要な場合は、filter 属性を使用して、ファイルのリストをフィルタリングするための戦略インターフェースである org.springframework.integration.file.filters.FileListFilter のカスタム実装への参照を提供できます。このフィルターは、どのリモートファイルを取得するかを決定します。CompositeFileListFilter を使用して、パターンベースのフィルターを他のフィルター(AcceptOnceFileListFilter など)と組み合わせて、以前にフェッチされたファイルの同期を回避することもできます。

AcceptOnceFileListFilter はその状態をメモリに保存します。システムの再起動後も状態を維持したい場合は、代わりに SftpPersistentAcceptOnceFileListFilter の使用を検討してください。このフィルターは、受け入れられたファイル名を MetadataStore ストラテジーのインスタンスに保存します(メタデータストアを参照)。このフィルターは、ファイル名とリモート変更時刻で一致します。

バージョン 4.0 以降、このフィルターには ConcurrentMetadataStore が必要です。共有データストア(Redis と RedisMetadataStore など)を併用すると、フィルターキーを複数のアプリケーションまたはサーバーインスタンス間で共有できます。

バージョン 5.0 以降、SftpInboundFileSynchronizer には、メモリ内 SimpleMetadataStore を持つ SftpPersistentAcceptOnceFileListFilter がデフォルトで適用されます。このフィルターは、XML 構成の regex または pattern オプションとともに、Java DSL の SftpInboundChannelAdapterSpec を通じても適用されます。CompositeFileListFilter (または ChainFileListFilter)を使用して、他のユースケースを処理できます。

上記の説明は、ファイルを取得する前にファイルをフィルタリングすることを示しています。ファイルが取得されると、ファイルシステム上のファイルに追加のフィルターが適用されます。デフォルトでは、これは `AcceptOnceFileListFilter` であり、このセクションで説明するように、状態をメモリに保持し、ファイルの変更時刻を考慮しません。アプリケーションが処理後にファイルを削除しない限り、アダプターはアプリケーションの再起動後にデフォルトでディスク上のファイルを再処理します。

また、SftpPersistentAcceptOnceFileListFilter を使用するように filter を構成し、リモートファイルのタイムスタンプが変更された場合(再取得されるため)、デフォルトのローカルフィルターはこの新しいファイルの処理を許可しません。

このフィルターの詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。

local-filter 属性を使用して、ローカルファイルシステムフィルターの動作を構成できます。バージョン 4.3.8 以降、FileSystemPersistentAcceptOnceFileListFilter はデフォルトで構成されています。このフィルターは、受け入れられたファイル名と変更されたタイムスタンプを MetadataStore 戦略のインスタンス(メタデータストアを参照)に保存し、ローカルファイルの変更時刻の変更を検出します。デフォルトの MetadataStore は、状態をメモリに保存する SimpleMetadataStore です。

バージョン 4.1.5 以降、これらのフィルターには flushOnUpdate と呼ばれる新しいプロパティがあり、更新ごとにメタデータストアをフラッシュします(ストアが Flushable を実装している場合)。

さらに、分散 MetadataStore ( Redis メタデータストアなど) を使用する場合は、同じアダプターまたはアプリケーションの複数のインスタンスを使用して、1 つのインスタンスのみがファイルを処理するようにすることができます。

実際のローカルフィルターは、指定されたフィルターと、ダウンロード中のファイルの処理を防ぐパターンフィルターを含む CompositeFileListFilter です(temporary-file-suffix に基づく)。ファイルはこの接尾辞(デフォルトは .writing)でダウンロードされ、ファイルは転送が完了すると最終的な名前に変更され、フィルターから「見える」ようになります。

これらの属性の詳細については、スキーマ [GitHub] (英語) を参照してください。

SFTP 受信チャネルアダプターは、ポーリングコンシューマーです。ポーラー(グローバルデフォルトまたはローカル要素)を構成する必要があります。ファイルがローカルディレクトリに転送されると、ペイロード型として java.io.File を持つメッセージが生成され、channel 属性によって識別されるチャネルに送信されます。

バージョン 6.2 以降では、SftpLastModifiedFileListFilter を使用して、最終変更戦略に基づいて SFTP ファイルをフィルタリングできます。このフィルターは age プロパティを使用して構成でき、この値よりも古いファイルのみがフィルターを通過するようになります。経過時間のデフォルトは 60 秒ですが、(ネットワークの不具合などにより)ファイルが早期に取得されることを避けるために、十分な長さを選択する必要があります。詳細については、Javadoc を参照してください。

ファイルフィルタリングと大きなファイルの詳細

監視対象 (リモート) ディレクトリに表示されたばかりのファイルが完全でない場合があります。通常、このようなファイルは一時的な拡張子 ( something.txt.writing という名前のファイルの .writing など) で書き込まれ、書き込みプロセスの補完後に名前が変更されます。ほとんどの場合、開発者は完全なファイルのみに関心があり、それらのファイルのみをフィルタリングしたいと考えています。これらのシナリオを処理するには、filename-patternfilename-regexfilter 属性によって提供されるフィルタリングサポートを使用できます。カスタムフィルターの実装が必要な場合は、filter 属性を設定してアダプターに参照を含めることができます。次の例は、その方法を示しています。

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="receiveChannel"
            session-factory="sftpSessionFactory"
            filter="customFilter"
            local-directory="file:/local-test-dir"
            remote-directory="/remote-test-dir">
        <int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>

<bean id="customFilter" class="org.foo.CustomFilter"/>

障害からの回復

アダプターのアーキテクチャを理解する必要があります。ファイルシンクロナイザーがファイルを取得し、FileReadingMessageSource が各同期ファイルのメッセージを送信します。前に説明したように、2 つのフィルターが関係しています。filter 属性(およびパターン)は、リモート(SFTP)ファイルリストを参照して、すでに取得されたファイルの取得を回避します。FileReadingMessageSource は local-filter を使用して、メッセージとして送信されるファイルを決定します。

シンクロナイザーはリモートファイルをリストし、そのフィルターを調べます。その後、ファイルが転送されます。ファイル転送中に IO エラーが発生した場合、フィルターにすでに追加されているファイルはすべて削除され、次のポーリングで再取得できるようになります。これは、フィルターが ReversibleFileListFilter (AcceptOnceFileListFilter など)を実装する場合にのみ適用されます。

ファイルを同期した後、ファイルを処理するダウンストリームフローでエラーが発生した場合、フィルターの自動ロールバックは発生しないため、失敗したファイルはデフォルトで再処理されません。

失敗後にそのようなファイルを再処理する場合は、次のような構成を使用して、失敗したファイルをフィルターから簡単に削除できます。

<int-sftp:inbound-channel-adapter id="sftpAdapter"
        session-factory="sftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/sftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-sftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

前述の構成は、すべての ResettableFileListFilter で機能します。

バージョン 5.0 以降、受信チャネルアダプターは、生成されたローカルファイル名に従って、サブディレクトリをローカルに構築できます。リモートサブパスでもあります。階層サポートに従って変更するためにローカルディレクトリを再帰的に読み取ることができるように、Files.walk() アルゴリズムに基づいて新しい RecursiveDirectoryScanner を内部 FileReadingMessageSource に提供できるようになりました。詳細については、AbstractInboundFileSynchronizingMessageSource.setScanner() (Javadoc) を参照してください。また、setUseWatchService() オプションを使用して、AbstractInboundFileSynchronizingMessageSource を WatchService ベースの DirectoryScanner に切り替えることができるようになりました。また、すべての WatchEventType インスタンスがローカルディレクトリの変更に反応するように設定されています。前に示した再処理のサンプルは、ファイルがローカルディレクトリから削除された(StandardWatchEventKinds.ENTRY_DELETE)ときに ResettableFileListFilter.remove() を使用する FileReadingMessageSource.WatchServiceDirectoryScanner の組み込み機能に基づいています。詳細については、WatchServiceDirectoryScanner を参照してください。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

Java DSL を使用した構成

次の Spring Boot アプリケーションは、Java DSL で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlow
            .from(Sftp.inboundAdapter(this.sftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilenameExpression("#this.toUpperCase() + '.a'")
                    .localDirectory(new File("sftp-inbound")),
                 e -> e.id("sftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

不完全なデータの処理

不完全なデータの処理を参照してください。

SftpSystemMarkerFilePresentFileListFilter は、リモートシステムに対応するマーカーファイルがないリモートファイルをフィルタリングするために提供されています。構成情報については、Javadoc を参照してください。