SFTP アダプター

Spring Integration は、SFTP を介したファイル転送操作をサポートします。

セキュアファイル転送プロトコル(SFTP)は、信頼できるストリームを介してインターネット上の 2 台のコンピューター間でファイルを転送できるネットワークプロトコルです。

SFTP プロトコルには、SSH などの安全なチャネルと、SFTP セッション全体でのクライアントの ID の可視性が必要です。

Spring Integration は、受信チャネルアダプター、送信チャネルアダプター、送信ゲートウェイの 3 つのクライアント側エンドポイントを提供することにより、SFTP を介したファイルの送受信をサポートしています。また、これらのクライアントコンポーネントを定義するための便利なネームスペース構成も提供します。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-sftp</artifactId>
    <version>5.5.8</version>
</dependency>
compile "org.springframework.integration:spring-integration-sftp:5.5.8"

xml 構成に SFTP 名前空間を含めるには、ルート要素に次の属性を含めます。

xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/sftp
    https://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd"

SFTP セッションファクトリ

バージョン 3.0 の時点で、セッションはデフォルトでキャッシュされなくなりました。SFTP セッションキャッシングを参照してください。

SFTP アダプターを構成する前に、SFTP セッションファクトリを構成する必要があります。次の例に示すように、SFTP セッションファクトリを通常の Bean 定義で構成できます。

<beans:bean id="sftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <beans:property name="host" value="localhost"/>
    <beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/>
    <beans:property name="privateKeyPassphrase" value="springIntegration"/>
    <beans:property name="port" value="22"/>
    <beans:property name="user" value="kermit"/>
</beans:bean>

アダプターが SessionFactory からセッションオブジェクトをリクエストするたびに、新しい SFTP セッションが作成されます。裏では、SFTP Session Factory は JSch (英語) ライブラリに依存して SFTP 機能を提供します。

ただし、Spring Integration は SFTP セッションのキャッシュもサポートしています。詳細については、SFTP セッションキャッシングを参照してください。

JSch は、サーバーへの接続を介して複数のチャネル(操作)をサポートします。デフォルトでは、Spring Integration セッションファクトリは、チャネルごとに個別の物理接続を使用します。Spring Integration 3.0 以降、サーバーへの単一の接続を使用し、その単一の接続上に複数の JSch チャネルを作成するように(ブールコンストラクター arg- デフォルト false を使用して)セッションファクトリを構成できます。

この機能を使用する場合、後述するように、セッションファクトリをキャッシングセッションファクトリでラップして、操作が完了したときに接続が物理的に閉じられないようにする必要があります。

キャッシュがリセットされると、最後のチャネルが閉じられたときにのみセッションが切断されます。

新しい操作がセッションを取得したときに接続が切断されていることが判明した場合、接続はリフレッシュされます。

接続の問題が発生し、セッションの作成をトレースして、どのセッションがポーリングされているかを確認したい場合は、ロガーを TRACE レベル(たとえば、log4j.category.org.springframework.integration.sftp=TRACE)に設定してトレースを有効にできます。SFTP/JSCH ロギングを参照してください。

あとは、この SFTP セッションファクトリをアダプターに挿入するだけです。

SFTP セッションファクトリに値を提供するより実用的な方法は、Spring のプロパティプレースホルダーサポートを使用することです。

プロパティの構成

次のリストは、DefaultSftpSessionFactory (Javadoc) によって公開されるすべてのプロパティを説明しています。

isSharedSession (コンストラクター引数): : true の場合、単一の接続が使用され、JSch Channels は多重化されます。デフォルトは false です。

clientVersion:: クライアントバージョンプロパティを設定できます。デフォルトは基礎となる JSch バージョンに依存しますが、SSH-2.0-JSCH-0.1.45 のようになります。

enableDaemonThread:: true の場合、すべてのスレッドはデーモンスレッドです。false に設定すると、代わりに通常の非デーモンスレッドが使用されます。このプロパティは、基礎となるセッション (英語) で設定されます。そこでは、このプロパティのデフォルトは false です。

host:: 接続するホストの URL。必須。

hostKeyAlias:: ホストキーを既知のホストリストと比較するときに使用されるホストキーエイリアスを設定します。

knownHostsResource:: ホストキーリポジトリに使用するファイルリソースを指定します。このファイルは OpenSSH の known_hosts ファイルと同じ形式であり、allowUnknownKeys が false の場合は必須であり、事前に入力する必要があります。

password:: リモートホストに対して認証するパスワード。パスワードが提供されない場合、privateKey プロパティが必要です。userInfo を設定した場合は許可されません。パスワードはそのオブジェクトから取得されます。

port::SFTP 接続が確立されるポート。指定しない場合、この値はデフォルトで 22 になります。指定する場合、このプロパティは正数でなければなりません。

privateKey:: リモートホストに対する認証に使用される秘密鍵の場所を表すリソース (Javadoc) を設定できます。privateKey が提供されない場合、password プロパティが必要です。

privateKeyPassphrase:: 秘密鍵のパスワード。userInfo を設定すると、privateKeyPassphrase は許可されません。パスフレーズはそのオブジェクトから取得されます。オプション。

proxy::JSch ベースのプロキシ (英語) を指定できます。設定すると、プロキシオブジェクトを使用して、プロキシを介したリモートホストへの接続が作成されます。プロキシを設定する便利な方法については、プロキシファクトリ Bean を参照してください。

serverAliveCountMax:: 切断前にサーバーからの応答なしで送信されるサーバー生存メッセージの数を指定します。設定しない場合、このプロパティのデフォルトは 1 になります。

serverAliveInterval:: サーバーからメッセージが受信されない場合に、サーバーアライブメッセージが送信されるまでのタイムアウト間隔(ミリ秒単位)を設定します。

sessionConfig:: Properties を使用すると、基礎となる JSch セッションに追加の構成設定を設定できます。

socketFactory::SocketFactory (英語) を渡します。ソケットファクトリは、ターゲットホストへのソケットを作成するために使用されます。プロキシを使用すると、ソケットファクトリがプロキシに渡されます。デフォルトでは、プレーン TCP ソケットが使用されます。

timeout:: タイムアウトプロパティは、デフォルトの接続タイムアウトと同様に、ソケットタイムアウトパラメーターとして使用されます。デフォルトは 0 です。つまり、タイムアウトは発生しません。

user:: 使用するリモートユーザー。必須。

allowUnknownKeys:: true に設定して、不明な(または変更された)キーを持つホストへの接続を許可します。デフォルトは "false" です。userInfo が提供されていない場合にのみ適用されます。false の場合、事前入力された knownHosts ファイルが必要です。

userInfo:: 認証中に使用されるカスタム UserInfo を設定します。特に、promptYesNo() は、不明な(または変更された)ホストキーを受信したときに呼び出されます。allowUnknownKeys も参照してください。UserInfo を指定すると、そこから password および秘密鍵 passphrase が取得され、個別の password および privateKeyPassphrase プロパティを設定できません。

プロキシファクトリ Bean

Jsch は、HTTP または SOCKS プロキシを介してサーバーに接続するメカニズムを提供します。この機能を使用するには、前述のように、Proxy を構成し、DefaultSftpSessionFactory への参照を提供します。3 つの実装が Jsch によって提供されます: HTTPSOCKS4SOCKS5. 次の例に示すように、Spring Integration 4.3 は FactoryBean を導入し、プロパティインジェクションを許可することでこれらのプロキシの構成を容易にしました。

<bean id="proxySocks5" class="org.springframework.integration.sftp.session.JschProxyFactoryBean">
    <constructor-arg value="SOCKS5" />
    <constructor-arg value="${sftp.proxy.address}" />
    <constructor-arg value="${sftp.proxy.port}" />
    <constructor-arg value="${sftp.proxy.user}" />
    <constructor-arg value="${sftp.proxy.pw}" />
</bean>

<bean id="sessionFactory"
          class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory" >
    ...
    <property name="proxy" ref="proxySocks5" />
    ...
</bean>

セッションファクトリの委譲

バージョン 4.2 では、実行時に実際のセッションファクトリを選択できる DelegatingSessionFactory が導入されました。SFTP エンドポイントを呼び出す前に、ファクトリで setThreadKey() を呼び出して、キーを現在のスレッドに関連付けることができます。次に、そのキーを使用して、使用する実際のセッションファクトリを検索します。使用後に clearThreadKey() を呼び出すことにより、キーをクリアできます。

次の例に示すように、メッセージフローから簡単に行えるように便利なメソッドを追加しました。

<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
    <constructor-arg>
        <bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
            <!-- delegate factories here -->
        </bean>
    </constructor-arg>
</bean>

<int:service-activator input-channel="in" output-channel="c1"
        expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />

<int-sftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />

<int:service-activator input-channel="c2" output-channel="out"
        expression="@dsf.clearThreadKey(#root)" />
セッションキャッシングを使用する場合(SFTP セッションキャッシングを参照)、各デリゲートをキャッシュする必要があります。DelegatingSessionFactory 自体をキャッシュすることはできません。

バージョン 5.0.7 以降、DelegatingSessionFactory を RotatingServerAdvice と組み合わせて使用して複数のサーバーをポーリングできます。受信チャネルアダプター: 複数のサーバーとディレクトリのポーリングを参照してください。

SFTP セッションキャッシング

Spring Integration バージョン 3.0 以降、セッションはデフォルトでキャッシュされなくなりました。cache-sessions 属性はエンドポイントでサポートされなくなりました。セッションをキャッシュする場合は、CachingSessionFactory を使用する必要があります(次の例を参照)。

3.0 より前のバージョンでは、セッションはデフォルトで自動的にキャッシュされていました。cache-sessions 属性は自動キャッシュを無効にするために使用できましたが、そのソリューションは他のセッションキャッシュ属性を構成する方法を提供していませんでした。例: 作成されるセッションの数を制限できませんでした。その要件と他の構成オプションをサポートするために、CachingSessionFactory を追加しました。sessionCacheSize および sessionWaitTimeout プロパティを提供します。その名前が示すように、sessionCacheSize プロパティは、ファクトリがキャッシュに保持するアクティブセッションの数を制御します(デフォルトは無制限です)。sessionCacheSize しきい値に達した場合、キャッシュされたセッションのいずれかが使用可能になるか、セッションの待機時間が経過するまで(デフォルトの待機時間は Integer.MAX_VALUE)、別のセッションを取得しようとするとブロックされます。sessionWaitTimeout プロパティにより、待機時間の構成が可能になります。

セッションをキャッシュする場合は、デフォルトのセッションファクトリを設定し(前述のとおり)、CachingSessionFactory のインスタンスにラップして、追加のプロパティを提供します。次の例は、その方法を示しています。

<bean id="sftpSessionFactory"
    class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
    <property name="host" value="localhost"/>
</bean>

<bean id="cachingSessionFactory"
    class="org.springframework.integration.file.remote.session.CachingSessionFactory">
    <constructor-arg ref="sftpSessionFactory"/>
    <constructor-arg value="10"/>
    <property name="sessionWaitTimeout" value="1000"/>
</bean>

上記の例では、sessionCacheSize を 10 に設定し、sessionWaitTimeout を 1 秒(1000 ミリ秒)に設定して CachingSessionFactory を作成します。

Spring Integration バージョン 3.0 から、CachingConnectionFactory は resetCache() メソッドを提供します。呼び出されると、すべてのアイドルセッションはすぐに閉じられ、使用中のセッションはキャッシュに返されると閉じられます。isSharedSession=true を使用すると、チャネルが閉じられ、共有セッションは最後のチャネルが閉じられたときにのみ閉じられます。セッションの新しいリクエストは、必要に応じて新しいセッションを確立します。

バージョン 5.1 以降、CachingSessionFactory には新しいプロパティ testSession があります。true の場合、stat(getHome()) コマンドを実行してセッションがテストされ、セッションがまだアクティブであることを確認します。そうでない場合、キャッシュから削除されます。アクティブなセッションがキャッシュにない場合、新しいセッションが作成されます。

RemoteFileTemplate を使用する

Spring Integration バージョン 3.0 は、SftpSession オブジェクトの新しい抽象化を提供します。このテンプレートは、ファイルを送信、取得(InputStream として)、削除、名前変更するためのメソッドを提供します。さらに、呼び出し側がセッションで複数の操作を実行できるようにする execute メソッドを提供します。すべての場合において、テンプレートはセッションを確実に閉じるように配慮します。詳細については、RemoteFileTemplate の Javadoc を参照してください。SFTP にはサブクラス SftpRemoteFileTemplate (Javadoc) があります。

getClientInstance() を含む追加のメソッドをバージョン 4.1 に追加しました。下位の ChannelSftp へのアクセスを提供し、低レベル API へのアクセスを可能にします。

バージョン 5.0 は RemoteFileOperations.invoke(OperationsCallback<F, T> action) メソッドを導入しました。このメソッドを使用すると、同じスレッドでバインドされた Session のスコープ内でいくつかの RemoteFileOperations 呼び出しを呼び出すことができます。これは、RemoteFileTemplate の複数の高レベル操作を 1 つの作業単位として実行する必要がある場合に役立ちます。例: AbstractRemoteFileOutboundGateway は mput コマンドの実装でそれを使用し、提供されたディレクトリ内の各ファイルに対して、およびそのサブディレクトリに対して再帰的に put 操作を実行します。詳細については、Javadoc を参照してください。

SFTP 受信チャネルアダプター

SFTP 受信チャネルアダプターは、サーバーに接続し、リモートディレクトリイベント(作成中の新しいファイルなど)をリッスンする特別なリスナーです。この時点で、ファイル転送を開始します。次の例は、SFTP 受信チャネルアダプターを構成する方法を示しています。

<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
              session-factory="sftpSessionFactory"
            channel="requestChannel"
            filename-pattern="*.txt"
            remote-directory="/foo/bar"
            preserve-timestamp="true"
            local-directory="file:target/foo"
            auto-create-local-directory="true"
            local-filename-generator-expression="#this.toUpperCase() + '.a'"
            scanner="myDirScanner"
            local-filter="myFilter"
            temporary-file-suffix=".writing"
            max-fetch-size="-1"
            delete-remote-files="false">
        <int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

上記の構成例は、以下を含むさまざまな属性の値を提供する方法を示しています。

  • local-directory: ファイルが転送される場所

  • remote-directory: ファイルの転送元のリモートソースディレクトリ

  • session-factory: 前に構成した Bean への参照

デフォルトでは、転送されたファイルには元のファイルと同じ名前が付けられます。この動作をオーバーライドする場合は、local-filename-generator-expression 属性を設定できます。これにより、ローカルファイルの名前を生成する SpEL 式を提供できます。SpEL 評価コンテキストのルートオブジェクトが Message である送信ゲートウェイおよびアダプターとは異なり、この受信アダプターは評価時にメッセージをまだ持っていません。これは、転送されたファイルをペイロードとして最終的に生成するためです。SpEL 評価コンテキストのルートオブジェクトは、リモートファイルの元の名前(String)です。

受信チャネルアダプターは、最初にファイルをローカルディレクトリに取得し、ポーラー構成に従って各ファイルを発行します。バージョン 5.0 以降、新しいファイルの取得が必要な場合に SFTP サーバーから取得するファイルの数を制限できます。これは、ターゲットファイルが大きい場合、またはこのセクションで後述する永続的なファイルリストフィルターを使用してクラスター化システムで実行する場合に役立ちます。この目的には max-fetch-size を使用してください。負の値(デフォルト)は制限がないことを意味し、一致するすべてのファイルが取得されます。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。バージョン 5.0 以降、scanner 属性を設定することにより、inbound-channel-adapter にカスタム DirectoryScanner 実装を提供することもできます。

Spring Integration 3.0 以降、preserve-timestamp 属性を指定できます(デフォルトは false です)。true の場合、ローカルファイルの変更されたタイムスタンプは、サーバーから取得した値に設定されます。それ以外の場合は、現在の時刻に設定されます。

バージョン 4.2 以降、remote-directory の代わりに remote-directory-expression を指定できます。これにより、各ポーリングでディレクトリを動的に決定できます(例: remote-directory-expression="@myBean.determineRemoteDir()")。

filename-pattern 属性で指定された単純なパターンに基づいたファイルフィルタリングでは不十分な場合があります。この場合、filename-regex 属性を使用して、正規表現(たとえば、filename-regex=".*\.test$")を指定できます。完全な制御が必要な場合は、filter 属性を使用して、ファイルのリストをフィルタリングするための戦略インターフェースである org.springframework.integration.file.filters.FileListFilter のカスタム実装への参照を提供できます。このフィルターは、どのリモートファイルを取得するかを決定します。CompositeFileListFilter を使用して、パターンベースのフィルターを他のフィルター(AcceptOnceFileListFilter など)と組み合わせて、以前にフェッチされたファイルの同期を回避することもできます。

AcceptOnceFileListFilter はその状態をメモリに保存します。システムの再起動後も状態を維持したい場合は、代わりに SftpPersistentAcceptOnceFileListFilter の使用を検討してください。このフィルターは、受け入れられたファイル名を MetadataStore ストラテジーのインスタンスに保存します(メタデータストアを参照)。このフィルターは、ファイル名とリモート変更時刻で一致します。

バージョン 4.0 以降、このフィルターには ConcurrentMetadataStore が必要です。共有データストア(Redis と RedisMetadataStore など)を併用すると、フィルターキーを複数のアプリケーションまたはサーバーインスタンス間で共有できます。

バージョン 5.0 以降、SftpInboundFileSynchronizer には、メモリ内 SimpleMetadataStore を持つ SftpPersistentAcceptOnceFileListFilter がデフォルトで適用されます。このフィルターは、XML 構成の regex または pattern オプションとともに、Java DSL の SftpInboundChannelAdapterSpec を通じても適用されます。CompositeFileListFilter (または ChainFileListFilter)を使用して、他のユースケースを処理できます。

上記の説明は、ファイルを取得する前にファイルをフィルタリングすることを示しています。ファイルが取得されると、ファイルシステム上のファイルに追加のフィルターが適用されます。デフォルトでは、これは `AcceptOnceFileListFilter` であり、このセクションで説明するように、状態をメモリに保持し、ファイルの変更時刻を考慮しません。アプリケーションが処理後にファイルを削除しない限り、アダプターはアプリケーションの再起動後にデフォルトでディスク上のファイルを再処理します。

また、SftpPersistentAcceptOnceFileListFilter を使用するように filter を構成し、リモートファイルのタイムスタンプが変更された場合(再取得されるため)、デフォルトのローカルフィルターはこの新しいファイルの処理を許可しません。

このフィルターの詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。

local-filter 属性を使用して、ローカルファイルシステムフィルターの動作を構成できます。バージョン 4.3.8 以降、FileSystemPersistentAcceptOnceFileListFilter はデフォルトで構成されています。このフィルターは、受け入れられたファイル名と変更されたタイムスタンプを MetadataStore 戦略のインスタンス(メタデータストアを参照)に保存し、ローカルファイルの変更時刻の変更を検出します。デフォルトの MetadataStore は、状態をメモリに保存する SimpleMetadataStore です。

バージョン 4.1.5 以降、これらのフィルターには flushOnUpdate と呼ばれる新しいプロパティがあり、更新ごとにメタデータストアをフラッシュします(ストアが Flushable を実装している場合)。

さらに、分散 MetadataStore (Redis メタデータストアGemfire メタデータストアなど)を使用する場合、同じアダプターまたはアプリケーションの複数のインスタンスを作成し、1 つのインスタンスのみがファイルを処理するようにすることができます。

実際のローカルフィルターは、指定されたフィルターと、ダウンロード中のファイルの処理を防ぐパターンフィルターを含む CompositeFileListFilter です(temporary-file-suffix に基づく)。ファイルはこの接尾辞(デフォルトは .writing)でダウンロードされ、ファイルは転送が完了すると最終的な名前に変更され、フィルターから「見える」ようになります。

これらの属性の詳細については、スキーマ [GitHub] (英語) を参照してください。

SFTP 受信チャネルアダプターは、ポーリングコンシューマーです。ポーラー(グローバルデフォルトまたはローカル要素)を構成する必要があります。ファイルがローカルディレクトリに転送されると、ペイロード型として java.io.File を持つメッセージが生成され、channel 属性によって識別されるチャネルに送信されます。

ファイルフィルタリングと大きなファイルの詳細

監視対象(リモート)ディレクトリに表示されたばかりのファイルが完全でない場合があります。通常、このようなファイルは一時的な拡張子(something.txt.writing という名前のファイルの .writing など)で書き込まれ、書き込みプロセスの補完後に名前が変更されます。ほとんどの場合、開発者は完全なファイルのみに関心があり、それらのファイルのみをフィルタリングしたいと考えています。これらのシナリオを処理するために、filename-patternfilename-regexfilter 属性によって提供されるフィルタリングサポートを使用できます。カスタムフィルターの実装が必要な場合は、filter 属性を設定することにより、アダプターに参照を含めることができます。次の例は、その方法を示しています。

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
            channel="receiveChannel"
            session-factory="sftpSessionFactory"
            filter="customFilter"
            local-directory="file:/local-test-dir"
            remote-directory="/remote-test-dir">
        <int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>

<bean id="customFilter" class="org.foo.CustomFilter"/>

障害からの回復

アダプターのアーキテクチャを理解する必要があります。ファイルシンクロナイザーがファイルを取得し、FileReadingMessageSource が各同期ファイルのメッセージを送信します。前に説明したように、2 つのフィルターが関係しています。filter 属性(およびパターン)は、リモート(SFTP)ファイルリストを参照して、すでに取得されたファイルの取得を回避します。FileReadingMessageSource は local-filter を使用して、メッセージとして送信されるファイルを決定します。

シンクロナイザーはリモートファイルをリストし、そのフィルターを調べます。その後、ファイルが転送されます。ファイル転送中に IO エラーが発生した場合、フィルターにすでに追加されているファイルはすべて削除され、次のポーリングで再取得できるようになります。これは、フィルターが ReversibleFileListFilter (AcceptOnceFileListFilter など)を実装する場合にのみ適用されます。

ファイルを同期した後、ファイルを処理するダウンストリームフローでエラーが発生した場合、フィルターの自動ロールバックは発生しないため、失敗したファイルはデフォルトで再処理されません。

失敗後にそのようなファイルを再処理する場合は、次のような構成を使用して、失敗したファイルをフィルターから簡単に削除できます。

<int-sftp:inbound-channel-adapter id="sftpAdapter"
        session-factory="sftpSessionFactory"
        channel="requestChannel"
        remote-directory-expression="'/sftpSource'"
        local-directory="file:myLocalDir"
        auto-create-local-directory="true"
        filename-pattern="*.txt">
    <int:poller fixed-rate="1000">
        <int:transactional synchronization-factory="syncFactory" />
    </int:poller>
</int-sftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
    class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
    class="org.springframework.integration.transaction.PseudoTransactionManager" />

前述の構成は、すべての ResettableFileListFilter で機能します。

バージョン 5.0 以降、受信チャネルアダプターは、生成されたローカルファイル名に従って、サブディレクトリをローカルに構築できます。リモートサブパスでもあります。階層サポートに従って変更するためにローカルディレクトリを再帰的に読み取ることができるように、Files.walk() アルゴリズムに基づいて新しい RecursiveDirectoryScanner を内部 FileReadingMessageSource に提供できるようになりました。詳細については、AbstractInboundFileSynchronizingMessageSource.setScanner() (Javadoc) を参照してください。また、setUseWatchService() オプションを使用して、AbstractInboundFileSynchronizingMessageSource を WatchService ベースの DirectoryScanner に切り替えることができるようになりました。また、すべての WatchEventType インスタンスがローカルディレクトリの変更に反応するように設定されています。前に示した再処理のサンプルは、ファイルがローカルディレクトリから削除された(StandardWatchEventKinds.ENTRY_DELETE)ときに ResettableFileListFilter.remove() を使用する FileReadingMessageSource.WatchServiceDirectoryScanner の組み込み機能に基づいています。詳細については、WatchServiceDirectoryScanner を参照してください。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
        fileSynchronizer.setDeleteRemoteFiles(false);
        fileSynchronizer.setRemoteDirectory("foo");
        fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
        return fileSynchronizer;
    }

    @Bean
    @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
    public MessageSource<File> sftpMessageSource() {
        SftpInboundFileSynchronizingMessageSource source =
                new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
        source.setLocalDirectory(new File("sftp-inbound"));
        source.setAutoCreateLocalDirectory(true);
        source.setLocalFilter(new AcceptOnceFileListFilter<File>());
        source.setMaxFetchSize(1);
        return source;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

Java DSL を使用した構成

次の Spring Boot アプリケーションは、Java DSL で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpInboundFlow() {
        return IntegrationFlows
            .from(Sftp.inboundAdapter(this.sftpSessionFactory)
                    .preserveTimestamp(true)
                    .remoteDirectory("foo")
                    .regexFilter(".*\\.txt$")
                    .localFilenameExpression("#this.toUpperCase() + '.a'")
                    .localDirectory(new File("sftp-inbound")),
                 e -> e.id("sftpInboundAdapter")
                    .autoStartup(true)
                    .poller(Pollers.fixedDelay(5000)))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
    }
}

不完全なデータの処理

不完全なデータの処理を参照してください。

SftpSystemMarkerFilePresentFileListFilter は、リモートシステムに対応するマーカーファイルがないリモートファイルをフィルタリングするために提供されています。構成情報については、Javadoc を参照してください。

SFTP ストリーミング受信チャネルアダプター

バージョン 4.3 は、ストリーミング受信チャネルアダプターを導入しました。このアダプターは、型 InputStream のペイロードを持つメッセージを生成し、ローカルファイルシステムに書き込むことなくファイルをフェッチできるようにします。セッションは開いたままなので、ファイルを消費したときに消費側アプリケーションがセッションを閉じる必要があります。セッションは closeableResource ヘッダー(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE)で提供されます。FileSplitter や StreamTransformer などの標準フレームワークコンポーネントは、セッションを自動的に閉じます。これらのコンポーネントの詳細については、ファイル分割およびストリームトランスを参照してください。次の例は、SFTP ストリーミング受信チャネルアダプターを構成する方法を示しています。

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

filename-patternfilename-regexfilter または filter-expression のいずれか 1 つのみを使用できます。

バージョン 5.0 以降、デフォルトでは、SftpStreamingMessageSource アダプターは、メモリ内 SimpleMetadataStore に基づいて SftpPersistentAcceptOnceFileListFilter を使用することにより、リモートファイルの重複を防ぎます。デフォルトでは、このフィルターもファイル名パターン(または正規表現)とともに適用されます。重複を許可する必要がある場合は、AcceptAllFileListFilter を使用できます。CompositeFileListFilter (または ChainFileListFilter)を使用して、他のユースケースを処理できます。で示す Java 構成は、処理後にリモートファイルを削除して重複を回避する 1 つの手法を示しています。

SftpPersistentAcceptOnceFileListFilter の詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。

max-fetch-size 属性を使用して、フェッチが必要な場合に各ポーリングでフェッチされるファイルの数を制限できます。1 に設定し、クラスター環境で実行する場合は永続フィルターを使用します。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。

アダプターは、リモートディレクトリとファイル名をヘッダーに入れます(それぞれ FileHeaders.REMOTE_DIRECTORY および FileHeaders.REMOTE_FILE)。バージョン 5.0 以降、FileHeaders.REMOTE_FILE_INFO ヘッダーは追加のリモートファイル情報(JSON 形式)を提供します。SftpStreamingMessageSource の fileInfoJson プロパティを false に設定すると、ヘッダーには SftpFileInfo オブジェクトが含まれます。SftpFileInfo.getFileInfo() メソッドを使用して、基礎となる Jsch ライブラリが提供する LsEntry オブジェクトにアクセスできます。fileInfoJson プロパティは、XML 構成を使用する場合は使用できませんが、SftpStreamingMessageSource を構成クラスの 1 つに挿入することで設定できます。リモートファイル情報も参照してください。

バージョン 5.1 以降、comparator の汎用型は LsEntry です。以前は、AbstractFileInfo<LsEntry> でした。これは、maxFetch をフィルタリングして適用する前に、処理の早い段階でソートが実行されるようになったためです。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("sftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

この例では、トランスフォーマーの下流のメッセージハンドラーに、処理後にリモートファイルを削除するアドバイスがあることに注意してください。

受信チャネルアダプター: 複数のサーバーとディレクトリのポーリング

バージョン 5.0.7 以降、RotatingServerAdvice が利用可能になりました。ポーリングアドバイスとして構成されている場合、受信アダプターは複数のサーバーとディレクトリをポーリングできます。通常どおり、アドバイスを構成し、ポーラーのアドバイスチェーンに追加します。DelegatingSessionFactory を使用してサーバーを選択します。詳細については、セッションファクトリの委譲を参照してください。アドバイス構成は、RotationPolicy.KeyDirectory オブジェクトのリストで構成されています。

サンプル
@Bean
public RotatingServerAdvice advice() {
    List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
    return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

このアドバイスは、新しいファイルが存在しなくなるまでサーバー one 上のディレクトリ foo をポーリングし、その後、サーバー two 上のディレクトリ bar、次にディレクトリ baz に移動します。

このデフォルトの動作は、fair コンストラクター arg を使用して変更できます。

公平
@Bean
public RotatingServerAdvice advice() {
    ...
    return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}

この場合、前回のポーリングでファイルが返されたかどうかに関係なく、アドバイスは次のサーバー / ディレクトリに移動します。

または、独自の RotationPolicy を提供して、必要に応じてメッセージソースを再構成できます。

ポリシー
public interface RotationPolicy {

    void beforeReceive(MessageSource<?> source);

    void afterReceive(boolean messageReceived, MessageSource<?> source);

}

および

カスタム
@Bean
public RotatingServerAdvice advice() {
    return new RotatingServerAdvice(myRotationPolicy());
}

local-filename-generator-expression 属性(シンクロナイザーの localFilenameGeneratorExpression)に #remoteDirectory 変数を含めることができるようになりました。これにより、異なるディレクトリから取得したファイルを同様のディレクトリにローカルにダウンロードできます。

@Bean
public IntegrationFlow flow() {
    return IntegrationFlows.from(Sftp.inboundAdapter(sf())
                    .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                    .localDirectory(new File(tmpDir))
                    .localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
                    .remoteDirectory("."),
                e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
            .channel(MessageChannels.queue("files"))
            .get();
}
このアドバイスを使用するときは、ポーラーで TaskExecutor を構成しないでください。詳細については、メッセージソースの条件付きポーラーを参照してください。

受信チャネルアダプター: リモートファイルフェッチの制御

受信チャネルアダプターを構成するときは、2 つのプロパティを考慮する必要があります。max-messages-per-poll は、すべてのポーラーと同様に、各ポーリングで送信されるメッセージの数を制限するために使用できます(構成された値を超える準備ができている場合)。max-fetch-size (バージョン 5.0 以降)は、一度にリモートサーバーから取得するファイルの数を制限できます。

次のシナリオでは、開始状態が空のローカルディレクトリであると想定しています。

  • max-messages-per-poll=2 および max-fetch-size=1: アダプターは、1 つのファイルを取り出して発行し、次のファイルを取り出して発行します。その後、次のポーリングまでスリープします。

  • max-messages-per-poll=2 および max-fetch-size=2): アダプターは両方のファイルをフェッチしてから、それぞれを発行します。

  • max-messages-per-poll=2 および max-fetch-size=4: アダプターは、最大 4 つのファイル(使用可能な場合)をフェッチし、最初の 2 つを発行します(少なくとも 2 つある場合)。次の 2 つのファイルは、次のポーリングで発行されます。

  • max-messages-per-poll=2 および max-fetch-size が指定されていない: アダプターは、すべてのリモートファイルをフェッチし、最初の 2 つを発行します(少なくとも 2 つある場合)。後続のファイルは、後続のポーリングで発行されます(一度に 2 つ)。すべてが消費されると、リモートフェッチが再度試行され、新しいファイルが取得されます。

アプリケーションの複数のインスタンスをデプロイするときは、小さな max-fetch-size を設定して、1 つのインスタンスがすべてのファイルを「取得」し、他のインスタンスを枯渇させないようにすることをお勧めします。

max-fetch-size のもう 1 つの用途は、リモートファイルのフェッチを停止するが、すでにフェッチされたファイルの処理を続行する場合です。MessageSource で maxFetchSize プロパティを設定すると(プログラム、JMX、コントロールバスを介して)、アダプターはさらにファイルを取得できなくなりますが、ポーラーは以前に取得したファイルのメッセージを引き続き送信できます。プロパティが変更されたときにポーラーがアクティブな場合、変更は次のポーリングで有効になります。

バージョン 5.1 以降、シンクロナイザーに Comparator<LsEntry> を提供できます。これは、maxFetchSize でフェッチされるファイルの数を制限するときに役立ちます。

SFTP 送信チャネルアダプター

SFTP 発信チャネルアダプターは、リモートディレクトリに接続し、受信 Message のペイロードとして受信するすべてのファイルに対してファイル転送を開始する特別な MessageHandler です。また、ファイルのいくつかの表現もサポートしているため、File オブジェクトに限定されません。FTP 送信アダプターと同様に、SFTP 送信チャネルアダプターは次のペイロードをサポートしています。

  • java.io.File: 実際のファイルオブジェクト

  • byte[]: ファイルの内容を表すバイト配列

  • java.lang.String: ファイルの内容を表すテキスト

  • java.io.InputStream: リモートファイルに転送するデータのストリーム

  • org.springframework.core.io.Resource: リモートファイルに転送するデータのリソース

次の例は、SFTP 発信チャネルアダプターを構成する方法を示しています。

<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter"
    session-factory="sftpSessionFactory"
    channel="inputChannel"
    charset="UTF-8"
    remote-file-separator="/"
    remote-directory="foo/bar"
    remote-filename-generator-expression="payload.getName() + '-mysuffix'"
    filename-generator="fileNameGenerator"
    use-temporary-filename="true"
    chmod="600"
    mode="REPLACE"/>

これらの属性の詳細については、スキーマ [GitHub] (英語) を参照してください。

SpEL と SFTP 送信アダプター

Spring Integration の他の多くのコンポーネントと同様に、2 つの属性 remote-directory-expression および remote-filename-generator-expression (前述)を指定して SFTP 発信チャネルアダプターを構成するときに、Spring Expression Language(SpEL)を使用できます。式評価コンテキストにはメッセージがルートオブジェクトとして含まれており、メッセージのデータに基づいて(「ペイロード」または「ヘッダー」から)ファイル名または既存のディレクトリパスを動的に計算できる式を使用できます。前の例では、元の名前に基づいてファイル名を計算し、サフィックス "-mysuffix" を追加する式の値で remote-filename-generator-expression 属性を定義します。

バージョン 4.1 以降では、ファイルを転送するときに mode を指定できます。デフォルトでは、既存のファイルは上書きされます。モードは FileExistsMode 列挙で定義され、次の値が含まれます。

  • REPLACE (default)

  • REPLACE_IF_MODIFIED

  • APPEND

  • APPEND_NO_FLUSH

  • IGNORE

  • FAIL

IGNORE および FAIL では、ファイルは転送されません。FAIL は例外をスローしますが、IGNORE は通知なしで転送を無視します(DEBUG ログエントリは生成されます)。

バージョン 4.3 は chmod 属性を導入しました。これを使用して、アップロード後にリモートファイルの許可を変更できます。従来の Unix 8 進形式を使用できます(たとえば、600 では、ファイル所有者に対してのみ読み取り / 書き込みが許可されます)。java を使用してアダプターを構成する場合、setChmodOctal("600") または setChmod(0600) を使用できます。

部分的に書き込まれたファイルの回避

ファイル転送を処理する際の一般的な問題の 1 つは、部分的なファイルを処理する可能性です。転送が実際に完了する前に、ファイルがファイルシステムに表示される場合があります。

この課題に対処するために、Spring Integration SFTP アダプターは、ファイルが一時的な名前で転送され、完全に転送された後に名前が変更される一般的なアルゴリズムを使用します。

デフォルトでは、転送中のすべてのファイルは、追加のサフィックス(デフォルトでは .writing)とともにファイルシステムに表示されます。temporary-file-suffix 属性を設定して変更できます。

ただし、この手法を使用したくない場合があります(たとえば、サーバーがファイル名の変更を許可していない場合)。このような状況では、use-temporary-file-name を false に設定することにより、この機能を無効にできます(デフォルトは true です)。この属性が false の場合、ファイルは最終的な名前で書き込まれ、消費側アプリケーションは、ファイルにアクセスする前にファイルが完全にアップロードされたことを検出する他のメカニズムを必要とします。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java で送信アダプターを構成する方法の例を示しています。

@SpringBootApplication
@IntegrationComponentScan
public class SftpJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                    new SpringApplicationBuilder(SftpJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToSftp(new File("/foo/bar.txt"));
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
        factory.setHost("localhost");
        factory.setPort(port);
        factory.setUser("foo");
        factory.setPassword("foo");
        factory.setAllowUnknownKeys(true);
        factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(factory);
    }

    @Bean
    @ServiceActivator(inputChannel = "toSftpChannel")
    public MessageHandler handler() {
        SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
        handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
        handler.setFileNameGenerator(new FileNameGenerator() {

            @Override
            public String generateFileName(Message<?> message) {
                 return "handlerContent.test";
            }

        });
        return handler;
    }

    @MessagingGateway
    public interface MyGateway {

         @Gateway(requestChannel = "toSftpChannel")
         void sendToSftp(File file);

    }
}

Java DSL を使用した構成

次の Spring Boot アプリケーションは、Java DSL で送信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow sftpOutboundFlow() {
        return IntegrationFlows.from("toSftpChannel")
            .handle(Sftp.outboundAdapter(this.sftpSessionFactory, FileExistsMode.FAIL)
                         .useTemporaryFileName(false)
                         .remoteDirectory("/foo")
            ).get();
    }

}

SFTP 送信ゲートウェイ

SFTP 送信ゲートウェイは、リモート SFTP サーバーとやり取りできる限られたコマンドセットを提供します。

  • ls (リストファイル)

  • nlst (リストファイル名)

  • get (ファイルを取得する)

  • mget (複数のファイルを取得する)

  • rm (ファイルを削除)

  • mv (ファイルの移動と名前変更)

  • put (ファイルを送る)

  • mput (複数のファイルを送信する)

ls コマンドの使用

ls はリモートファイルをリストし、次のオプションをサポートします。

  • -1: ファイル名のリストを取得します。デフォルトでは、FileInfo オブジェクトのリストを取得します

  • -a: すべてのファイルを含める ( "." で始まるものを含む)

  • -f: リストをソートしないでください

  • -dirs: インクルードディレクトリ (デフォルトで除外)

  • -links: シンボリックリンクを含める (デフォルトで除外)

  • -R: リモートディレクトリを再帰的にリストする

さらに、ファイル名のフィルタリングは inbound-channel-adapter と同じ方法で提供されます。

ls 操作から生じるメッセージペイロードは、ファイル名のリストまたは FileInfo オブジェクトのリストです(-1 スイッチを usr するかどうかによって異なります)。これらのオブジェクトは、変更された時間、権限などの情報を提供します。

ls コマンドが実行されたリモートディレクトリは、file_remoteDirectory ヘッダーで提供されます。

再帰オプション(-R)を使用する場合、fileName にはサブディレクトリ要素が含まれ、ファイルへの相対パス(リモートディレクトリに対する相対パス)を表します。-dirs オプションを使用する場合、各再帰ディレクトリもリスト内の要素として返されます。この場合、-1 オプションを使用しないことをお勧めします。FileInfo オブジェクトを使用するとファイルとディレクトリを区別できなくなるためです。

nlst コマンドの使用

バージョン 5 では、nlst コマンドのサポートが導入されました。

nlst はリモートファイル名をリストし、1 つのオプションのみをサポートします。

  • -f: リストをソートしないでください

nlst 操作から生じるメッセージペイロードは、ファイル名のリストです。

file_remoteDirectory ヘッダーには、nlst コマンドが実行されたリモートディレクトリが保持されます。

SFTP プロトコルは、名前をリストする機能を提供しません。このコマンドは、-1 オプションを指定した ls コマンドに相当し、便宜上ここに追加されます。

get コマンドの使用

get はリモートファイルを取得し、次のオプションをサポートします。

  • -P: リモートファイルのタイムスタンプを保持します。

  • -stream: リモートファイルをストリームとして取得します。

  • -D: 転送に成功したら、リモートファイルを削除します。FileExistsMode は IGNORE であり、ローカルファイルがすでに存在するため、転送が無視される場合、リモートファイルは削除されません。

file_remoteDirectory ヘッダーはリモートディレクトリを保持し、file_remoteFile ヘッダーはファイル名を保持します。

get 操作から生じるメッセージペイロードは、取得したファイルを表す File オブジェクトです。-stream オプションを使用する場合、ペイロードは File ではなく InputStream です。テキストファイルの一般的なユースケースは、この操作をファイルスプリッターまたはストリームトランスフォーマーと組み合わせることです。リモートファイルをストリームとして使用する場合、ストリームが消費された後に Session を閉じる必要があります。便宜上、Session は closeableResource ヘッダーで提供され、IntegrationMessageHeaderAccessor は便利なメソッドを提供します。

Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
    closeable.close();
}

ファイル分割ストリームトランスなどのフレームワークコンポーネントは、データの転送後にセッションを自動的に閉じます。

次の例は、ファイルをストリームとして使用する方法を示しています。

<int-sftp:outbound-gateway session-factory="ftpSessionFactory"
                            request-channel="inboundGetStream"
                            command="get"
                            command-options="-stream"
                            expression="payload"
                            remote-directory="ftpTarget"
                            reply-channel="stream" />

<int-file:splitter input-channel="stream" output-channel="lines" />
カスタムコンポーネントで入力ストリームを使用する場合は、Session を閉じる必要があります。次の例に示すように、カスタムコードでそれを行うか、メッセージのコピーを service-activator にルーティングして SpEL を使用できます。
<int:service-activator input-channel="closeSession"
    expression="headers['closeableResource'].close()" />

mget コマンドの使用

mget は、パターンに基づいて複数のリモートファイルを取得し、次のオプションをサポートします。

  • -P: リモートファイルのタイムスタンプを保持します。

  • -R: ディレクトリツリー全体を再帰的に取得します。

  • -x: パターンに一致するファイルがない場合は例外をスローします(そうでない場合は空のリストが返されます)。

  • -D: 転送が成功したら、各リモートファイルを削除します。転送が無視された場合、FileExistsMode は IGNORE であり、ローカルファイルがすでに存在するため、リモートファイルは削除されません。

mget 操作の結果のメッセージペイロードは、List<File> オブジェクト(つまり、File オブジェクトの List、それぞれが取得したファイルを表す)です。

バージョン 5.0 以降、FileExistsMode が IGNORE の場合、出力メッセージのペイロードには、ファイルがすでに存在するためにフェッチされなかったファイルが含まれなくなりました。以前は、配列には既存のファイルを含むすべてのファイルが含まれていました。

使用する式は、リモートパスが  で終わる結果を生成する必要があることを決定します。たとえば、myfiles/ は myfiles の完全なツリーをフェッチします。

バージョン 5.0 以降、FileExistsMode.REPLACE_IF_MODIFIED モードと組み合わせて再帰的な MGET を使用して、リモートディレクトリツリー全体を定期的にローカルに同期できます。このモードは、-P (タイムスタンプを保持)オプションに関係なく、ローカルファイルの最終変更タイムスタンプをリモートファイルのタイムスタンプに設定します。

再帰を使用する場合の注意 (-R)

パターンは無視され、* が想定されます。デフォルトでは、リモートツリー全体が取得されます。ただし、FileListFilter を提供することにより、ツリー内のファイルをフィルタリングできます。この方法でツリー内のディレクトリをフィルタリングすることもできます。FileListFilter は、参照によって、または filename-pattern または filename-regex 属性によって提供されます。例: filename-regex="(subDir|.*1.txt)" は、リモートディレクトリおよびサブディレクトリ subDir 内の 1.txt で終わるすべてのファイルを取得します。ただし、この注の後に利用可能な代替手段について説明します。

サブディレクトリをフィルタリングすると、そのサブディレクトリの追加の走査は実行されません。

-dirs オプションは許可されていません(再帰的な mget は再帰的な ls を使用してディレクトリツリーを取得し、ディレクトリ自体をリストに含めることはできません)。

通常、リモートディレクトリ構造がローカルに保持されるように、local-directory-expression で #remoteDirectory 変数を使用します。

永続ファイルリストフィルターにブールプロパティ forRecursion が追加されました。このプロパティを true に設定すると、alwaysAcceptDirectories も設定されます。これは、送信ゲートウェイ(ls および mget)での再帰操作が常にディレクトリツリー全体を毎回トラバースすることを意味します。これは、ディレクトリツリーの奥深くで変更が検出されなかった問題を解決するためです。さらに、forRecursion=true により、ファイルへのフルパスがメタデータストアキーとして使用されます。これにより、同じ名前のファイルが異なるディレクトリに複数回表示された場合にフィルターが正しく機能しなかった問題が解決されます。重要: これは、永続メタデータストア内の既存のキーが、最上位ディレクトリにあるファイルで見つからないことを意味します。このため、プロパティはデフォルトで false です。これは将来のリリースで変更される可能性があります。

バージョン 5.0 から、alwaysAcceptDirectorties を true に設定することにより、常にディレクトリを渡すように SftpSimplePatternFileListFilter および SftpRegexPatternFileListFilter を構成できます。これにより、次の例に示すように、単純なパターンの再帰が可能になります。

<bean id="starDotTxtFilter"
            class="org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter">
    <constructor-arg value="*.txt" />
    <property name="alwaysAcceptDirectories" value="true" />
</bean>

<bean id="dotStarDotTxtFilter"
            class="org.springframework.integration.sftp.filters.SftpRegexPatternFileListFilter">
    <constructor-arg value="^.*\.txt$" />
    <property name="alwaysAcceptDirectories" value="true" />
</bean>

ゲートウェイで filter プロパティを使用して、これらのフィルターのいずれかを提供できます。

put コマンドの使用

put はファイルをリモートサーバーに送信します。メッセージのペイロードは、java.io.Filebyte[]String にすることができます。remote-filename-generator (または式)は、リモートファイルに名前を付けるために使用されます。その他の使用可能な属性には、remote-directorytemporary-remote-directory、それらに相当する *-expression (use-temporary-file-name および auto-create-directory)が含まれます。詳細については、スキーマのドキュメント [GitHub] (英語) を参照してください。

put 操作から生じるメッセージペイロードは、転送後のサーバー上のファイルの完全パスを含む String です。

バージョン 4.3 では chmod 属性が導入され、アップロード後にリモートファイルのアクセス許可が変更されます。従来の Unix 8 進形式を使用できます(たとえば、600 では、ファイル所有者に対してのみ読み取り / 書き込みが許可されます)。java を使用してアダプターを構成する場合、setChmod(0600) を使用できます。

mput コマンドの使用

mput は複数のファイルをサーバーに送信し、次のオプションをサポートします。

  • -R: 再帰 — ディレクトリとサブディレクトリ内のすべてのファイルを送信します(フィルタリングされている場合もあります)

メッセージペイロードは、ローカルディレクトリを表す java.io.File (または String)でなければなりません。バージョン 5.1 以降、File または String のコレクションもサポートされています。

put コマンドと同じ属性がサポートされています。さらに、mput-patternmput-regexmput-filter または mput-filter-expression のいずれかを使用して、ローカルディレクトリ内のファイルをフィルタリングできます。サブディレクトリ自体がフィルターを通過する限り、フィルターは再帰で機能します。フィルターを通過しないサブディレクトリは再帰されません。

mput 操作から生じるメッセージペイロードは、List<String> オブジェクト(つまり、転送から生じるリモートファイルパスの List)です。

バージョン 4.3 では、chmod 属性が導入されました。これにより、アップロード後にリモートファイルの権限を変更できます。従来の Unix 8 進形式を使用できます(たとえば、600 では、ファイル所有者に対してのみ読み取り / 書き込みが許可されます)。Java を使用してアダプターを構成する場合、setChmodOctal("600") または setChmod(0600) を使用できます。

rm コマンドの使用

rm コマンドにはオプションがありません。

削除操作が成功した場合、結果のメッセージペイロードは Boolean.TRUE です。それ以外の場合、メッセージペイロードは Boolean.FALSE です。file_remoteDirectory ヘッダーはリモートディレクトリを保持し、file_remoteFile ヘッダーはファイル名を保持します。

mv コマンドの使用

mv コマンドにはオプションがありません。

expression 属性は "from" パスを定義し、rename-expression 属性は "to" パスを定義します。デフォルトでは、rename-expression は headers['file_renameTo'] です。この式は、null または空の String と評価されてはなりません。必要に応じて、必要なリモートディレクトリが作成されます。結果メッセージのペイロードは Boolean.TRUE です。file_remoteDirectory ヘッダーは元のリモートディレクトリを保持し、file_remoteFile ヘッダーはファイル名を保持します。file_renameTo ヘッダーは新しいパスを保持します。

バージョン 5.5.6 以降、remoteDirectoryExpression を mv コマンドで使用できるようになりました。"from" ファイルが完全なファイルパスでない場合、remoteDirectoryExpression の結果がリモートディレクトリとして使用されます。同じことが "to" ファイルにも当てはまります。たとえば、タスクがディレクトリ内のリモートファイルの名前を変更するだけの場合です。

追加のコマンド情報

get および mget コマンドは、local-filename-generator-expression 属性をサポートしています。転送中にローカルファイルの名前を生成する SpEL 式を定義します。評価コンテキストのルートオブジェクトはリクエストメッセージです。remoteFileName 変数も使用できます。mget で特に役立ちます(例: local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.foo")。

get および mget コマンドは、local-directory-expression 属性をサポートしています。転送中にローカルディレクトリの名前を生成する SpEL 式を定義します。評価コンテキストのルートオブジェクトはリクエストメッセージです。remoteDirectory 変数も使用できます。mget で特に役立ちます(例: local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.myheader")。この属性は、local-directory 属性と相互に排他的です。

すべてのコマンドについて、ゲートウェイの「式」プロパティは、コマンドが動作するパスを保持します。mget コマンドの場合、式は  に評価される場合があります。これは、すべてのファイル somedirectory/、および * で終わるその他の値を取得することを意味します。

次の例は、ls コマンド用に構成されたゲートウェイを示しています。

<int-ftp:outbound-gateway id="gateway1"
        session-factory="ftpSessionFactory"
        request-channel="inbound1"
        command="ls"
        command-options="-1"
        expression="payload"
        reply-channel="toSplitter"/>

toSplitter チャネルに送信されるメッセージのペイロードは、String オブジェクトのリストであり、各オブジェクトにはファイルの名前が含まれています。command-options="-1" を省略した場合、ペイロードは FileInfo オブジェクトのリストになります。オプションをスペース区切りリストとして提供できます(例: command-options="-1 -dirs -links")。

バージョン 4.2 以降、GETMGETPUTMPUT コマンドは FileExistsMode プロパティ(名前空間サポートを使用する場合は mode)をサポートします。これは、ローカルファイルが存在する場合(GET および MGET)またはリモートファイルが存在する場合(PUT および MPUT)の動作に影響します。サポートされているモードは REPLACEAPPENDFAILIGNORE です。下位互換性のために、PUT および MPUT 操作のデフォルトモードは REPLACE です。GET および MGET 操作の場合、デフォルトは FAIL です。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java で送信ゲートウェイを構成する方法の例を示しています。

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler handler() {
        return new SftpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
    }

}

Java DSL を使用した構成

次の Spring Boot アプリケーションは、送信ゲートウェイを Java DSL で構成する方法の例を示しています。

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public SessionFactory<LsEntry> sftpSessionFactory() {
        DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
        sf.setHost("localhost");
        sf.setPort(port);
        sf.setUsername("foo");
        sf.setPassword("foo");
        factory.setTestSession(true);
        return new CachingSessionFactory<LsEntry>(sf);
    }

    @Bean
    public QueueChannelSpec remoteFileOutputChannel() {
        return MessageChannels.queue();
    }

    @Bean
    public IntegrationFlow sftpMGetFlow() {
        return IntegrationFlows.from("sftpMgetInputChannel")
            .handle(Sftp.outboundGateway(sftpSessionFactory(),
                            AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
                    .options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
                    .regexFileNameFilter("(subSftpSource|.*1.txt)")
                    .localDirectoryExpression("'myDir/' + #remoteDirectory")
                    .localFilenameExpression("#remoteFileName.replaceFirst('sftpSource', 'localTarget')"))
            .channel("remoteFileOutputChannel")
            .get();
    }

}

送信ゲートウェイの部分的な成功 (mget および mput)

mget および mput を使用して)複数のファイルで操作を実行する場合、1 つ以上のファイルが転送された後、しばらく時間が経過すると例外が発生することがあります。この場合(バージョン 4.2 以降)、PartialSuccessException がスローされます。通常の MessagingException プロパティ(failedMessage および cause)と同様に、この例外には 2 つの追加プロパティがあります。

  • partialResults: 正常な転送結果。

  • derivedInput: リクエストメッセージから生成されたファイルのリスト(mput 用に転送するローカルファイルなど)。

これらの属性により、正常に転送されたファイルと転送されなかったファイルを判別できます。

再帰的な mput の場合、PartialSuccessException にはネストされた PartialSuccessException インスタンスがあります。

次のディレクトリ構造を考慮してください。

root/
|- file1.txt
|- subdir/
   | - file2.txt
   | - file3.txt
|- zoo.txt

file3.txt で例外が発生した場合、ゲートウェイによってスローされた PartialSuccessException には、file1.txtsubdirzoo.txt の derivedInput と file1.txt の partialResults があります。その cause は、file2.txt の derivedInput と file2.txt の file3.txt および partialResults を備えた別の PartialSuccessException です。

SFTP/JSCH ロギング

JSch ライブラリを使用して SFTP サポートを提供するため、特に何かが適切に機能していない場合(認証の例外など)、JSch API 自体からの詳細情報が必要になる場合があります。残念ながら、JSch は commons-logging を使用しませんが、代わりに com.jcraft.jsch.Logger インターフェースのカスタム実装に依存しています。Spring Integration 2.0.1 の時点で、このインターフェースを実装しました。JSch ロギングを有効にするために、通常の方法でロガーを構成できます。例: 次の例は、Log4J を使用するロガーの有効な構成です。

log4j.category.com.jcraft.jsch=DEBUG

MessageSessionCallback

Spring Integration バージョン 4.2 以降、<int-sftp:outbound-gateway/> (SftpOutboundGateway)で MessageSessionCallback<F, T> 実装を使用して、requestMessage コンテキストで Session<LsEntry> の操作を実行できます。統合フロー定義や関数インターフェース(ラムダ)実装インジェクションからのアクセスを許可するなど、非標準または低レベルの SFTP 操作(または複数)に使用できます。次の例では、ラムダを使用しています。

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler sftpOutboundGateway(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
    return new SftpOutboundGateway(sessionFactory,
         (session, requestMessage) -> session.list(requestMessage.getPayload()));
}

別の例は、送信または取得されるファイルデータの前処理または後処理です。

XML 構成を使用する場合、<int-sftp:outbound-gateway/> は session-callback 属性を提供し、MessageSessionCallback Bean 名を指定できます。

session-callback は、command および expression 属性と相互に排他的です。Java で構成する場合、SftpOutboundGateway クラスはさまざまなコンストラクターを提供します。

Apache Mina SFTP サーバーイベント

バージョン 5.2 で追加された ApacheMinaSftpEventListener は、特定の Apache Mina SFTP サーバーイベントをリッスンし、ApplicationListener Bean、@EventListener Bean メソッド、またはイベント受信チャネルアダプターで受信できる ApplicationEvent として公開します。

現在サポートされているイベントは次のとおりです。

  • SessionOpenedEvent - クライアントセッションが開かれました

  • DirectoryCreatedEvent - ディレクトリが作成されました

  • FileWrittenEvent - ファイルが書き込まれた

  • PathMovedEvent - ファイルまたはディレクトリの名前が変更されました

  • PathRemovedEvent - ファイルまたはディレクトリが削除されました

  • SessionClosedEvent - クライアントが切断されました

これらはそれぞれ ApacheMinaSftpEvent のサブクラスです。すべてのイベント型を受信する単一のリスナーを設定できます。各イベントの source プロパティは ServerSession であり、そこからクライアントアドレスなどの情報を取得できます。便利な getSession() メソッドが抽象イベントで提供されます。

リスナー(Spring Bean でなければなりません)でサーバーを構成するには、単に SftpSubsystemFactory に追加します。

server = SshServer.setUpDefaultServer();
...
SftpSubsystemFactory sftpFactory = new SftpSubsystemFactory();
sftpFactory.addSftpEventListener(apacheMinaSftpEventListenerBean);
...

Spring Integration イベントアダプターを使用してこれらのイベントを消費するには:

@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
    ApplicationEventListeningMessageProducer producer =
        new ApplicationEventListeningMessageProducer();
    producer.setEventTypes(ApacheMinaSftpEvent.class);
    producer.setOutputChannel(eventChannel());
    return producer;
}

リモートファイル情報

バージョン 5.2 以降、SftpStreamingMessageSource (SFTP ストリーミング受信チャネルアダプター)、SftpInboundFileSynchronizingMessageSource (SFTP 受信チャネルアダプター)、SftpOutboundGateway (SFTP 送信ゲートウェイ)の -commands の「読み取り」は、リモートファイルに関する情報を生成する追加のヘッダーをメッセージに提供します。

  • FileHeaders.REMOTE_HOST_PORT - ファイル転送操作中にリモートセッションが接続された host:port ペア。

  • FileHeaders.REMOTE_DIRECTORY - 操作が実行されたリモートディレクトリ。

  • FileHeaders.REMOTE_FILE - リモートファイル名。単一ファイル操作にのみ適用されます。

SftpInboundFileSynchronizingMessageSource はリモートファイルに対してメッセージを生成せず、ローカルコピーを使用するため、AbstractInboundFileSynchronizer はリモートファイルに関する情報を、同期操作中に URI スタイル(protocol://host:port/remoteDirectory#remoteFileName)で MetadataStore (外部で構成可能)に保存します。このメタデータは、ローカルファイルがポーリングされるときに SftpInboundFileSynchronizingMessageSource によって取得されます。ローカルファイルが削除された場合、そのメタデータエントリを削除することをお勧めします。AbstractInboundFileSynchronizer は、この目的のために removeRemoteFileMetadata() コールバックを提供します。さらに、メタデータキーで使用される setMetadataStorePrefix() があります。これらのコンポーネント間で同じ MetadataStore インスタンスが共有される場合、フィルターと AbstractInboundFileSynchronizer の両方がメタデータエントリに同じローカルファイル名を使用するため、このプレフィックスを MetadataStore ベースの FileListFilter 実装で使用されるものと異なるものにすることをお勧めします。キー。