SFTP アダプター
Spring Integration は、SFTP を介したファイル転送操作をサポートします。
セキュアファイル転送プロトコル(SFTP)は、信頼できるストリームを介してインターネット上の 2 台のコンピューター間でファイルを転送できるネットワークプロトコルです。
SFTP プロトコルには、SSH などの安全なチャネルと、SFTP セッション全体でのクライアントの ID の可視性が必要です。
Spring Integration は、受信チャネルアダプター、送信チャネルアダプター、送信ゲートウェイの 3 つのクライアント側エンドポイントを提供することにより、SFTP を介したファイルの送受信をサポートしています。また、これらのクライアントコンポーネントを定義するための便利なネームスペース構成も提供します。
この依存関係をプロジェクトに含める必要があります。
xml 構成に SFTP 名前空間を含めるには、ルート要素に次の属性を含めます。
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/sftp
https://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd"
SFTP セッションファクトリ
バージョン 3.0 の時点で、セッションはデフォルトでキャッシュされなくなりました。SFTP セッションキャッシングを参照してください。 |
SFTP アダプターを構成する前に、SFTP セッションファクトリを構成する必要があります。次の例に示すように、SFTP セッションファクトリを通常の Bean 定義で構成できます。
<beans:bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<beans:property name="host" value="localhost"/>
<beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/>
<beans:property name="privateKeyPassphrase" value="springIntegration"/>
<beans:property name="port" value="22"/>
<beans:property name="user" value="kermit"/>
</beans:bean>
アダプターが SessionFactory
からセッションオブジェクトをリクエストするたびに、新しい SFTP セッションが作成されます。裏では、SFTP Session Factory は JSch (英語) ライブラリに依存して SFTP 機能を提供します。
ただし、Spring Integration は SFTP セッションのキャッシュもサポートしています。詳細については、SFTP セッションキャッシングを参照してください。
JSch は、サーバーへの接続を介して複数のチャネル(操作)をサポートします。デフォルトでは、Spring Integration セッションファクトリは、チャネルごとに個別の物理接続を使用します。Spring Integration 3.0 以降、サーバーへの単一の接続を使用し、その単一の接続上に複数の この機能を使用する場合、後述するように、セッションファクトリをキャッシングセッションファクトリでラップして、操作が完了したときに接続が物理的に閉じられないようにする必要があります。 キャッシュがリセットされると、最後のチャネルが閉じられたときにのみセッションが切断されます。 新しい操作がセッションを取得したときに接続が切断されていることが判明した場合、接続はリフレッシュされます。 |
接続の問題が発生し、セッションの作成をトレースして、どのセッションがポーリングされているかを確認したい場合は、ロガーを TRACE レベル(たとえば、log4j.category.org.springframework.integration.sftp=TRACE )に設定してトレースを有効にできます。SFTP/JSCH ロギングを参照してください。 |
あとは、この SFTP セッションファクトリをアダプターに挿入するだけです。
SFTP セッションファクトリに値を提供するより実用的な方法は、Spring のプロパティプレースホルダーサポートを使用することです。 |
プロパティの構成
次のリストは、DefaultSftpSessionFactory
(Javadoc) によって公開されるすべてのプロパティを説明しています。
isSharedSession
(コンストラクター引数): : true
の場合、単一の接続が使用され、JSch Channels
は多重化されます。デフォルトは false
です。
clientVersion
:: クライアントバージョンプロパティを設定できます。デフォルトは基礎となる JSch バージョンに依存しますが、SSH-2.0-JSCH-0.1.45 のようになります。
enableDaemonThread
:: true
の場合、すべてのスレッドはデーモンスレッドです。false
に設定すると、代わりに通常の非デーモンスレッドが使用されます。このプロパティは、基礎となるセッション (英語) で設定されます。そこでは、このプロパティのデフォルトは false
です。
host
:: 接続するホストの URL。必須。
hostKeyAlias
:: ホストキーを既知のホストリストと比較するときに使用されるホストキーエイリアスを設定します。
knownHostsResource
:: ホストキーリポジトリに使用するファイルリソースを指定します。このファイルは OpenSSH の known_hosts
ファイルと同じ形式であり、allowUnknownKeys
が false の場合は必須であり、事前に入力する必要があります。
password
:: リモートホストに対して認証するパスワード。パスワードが提供されない場合、privateKey
プロパティが必要です。userInfo
を設定した場合は許可されません。パスワードはそのオブジェクトから取得されます。
port
::SFTP 接続が確立されるポート。指定しない場合、この値はデフォルトで 22
になります。指定する場合、このプロパティは正数でなければなりません。
privateKey
:: リモートホストに対する認証に使用される秘密鍵の場所を表すリソース (Javadoc) を設定できます。privateKey
が提供されない場合、password
プロパティが必要です。
privateKeyPassphrase
:: 秘密鍵のパスワード。userInfo
を設定すると、privateKeyPassphrase
は許可されません。パスフレーズはそのオブジェクトから取得されます。オプション。
proxy
::JSch ベースのプロキシ (英語) を指定できます。設定すると、プロキシオブジェクトを使用して、プロキシを介したリモートホストへの接続が作成されます。プロキシを設定する便利な方法については、プロキシファクトリ Bean を参照してください。
serverAliveCountMax
:: 切断前にサーバーからの応答なしで送信されるサーバー生存メッセージの数を指定します。設定しない場合、このプロパティのデフォルトは 1
になります。
serverAliveInterval
:: サーバーからメッセージが受信されない場合に、サーバーアライブメッセージが送信されるまでのタイムアウト間隔(ミリ秒単位)を設定します。
sessionConfig
:: Properties
を使用すると、基礎となる JSch セッションに追加の構成設定を設定できます。
socketFactory
::SocketFactory
(英語) を渡します。ソケットファクトリは、ターゲットホストへのソケットを作成するために使用されます。プロキシを使用すると、ソケットファクトリがプロキシに渡されます。デフォルトでは、プレーン TCP ソケットが使用されます。
timeout
:: タイムアウトプロパティは、デフォルトの接続タイムアウトと同様に、ソケットタイムアウトパラメーターとして使用されます。デフォルトは 0
です。つまり、タイムアウトは発生しません。
user
:: 使用するリモートユーザー。必須。
allowUnknownKeys
:: true
に設定して、不明な(または変更された)キーを持つホストへの接続を許可します。デフォルトは "false" です。userInfo
が提供されていない場合にのみ適用されます。false
の場合、事前入力された knownHosts
ファイルが必要です。
userInfo
:: 認証中に使用されるカスタム UserInfo
を設定します。特に、promptYesNo()
は、不明な(または変更された)ホストキーを受信したときに呼び出されます。allowUnknownKeys
も参照してください。UserInfo
を指定すると、そこから password
および秘密鍵 passphrase
が取得され、個別の password
および privateKeyPassphrase
プロパティを設定できません。
プロキシファクトリ Bean
Jsch
は、HTTP または SOCKS プロキシを介してサーバーに接続するメカニズムを提供します。この機能を使用するには、前述のように、Proxy
を構成し、DefaultSftpSessionFactory
への参照を提供します。3 つの実装が Jsch
によって提供されます: HTTP
、SOCKS4
、SOCKS5
. 次の例に示すように、Spring Integration 4.3 は FactoryBean
を導入し、プロパティインジェクションを許可することでこれらのプロキシの構成を容易にしました。
<bean id="proxySocks5" class="org.springframework.integration.sftp.session.JschProxyFactoryBean">
<constructor-arg value="SOCKS5" />
<constructor-arg value="${sftp.proxy.address}" />
<constructor-arg value="${sftp.proxy.port}" />
<constructor-arg value="${sftp.proxy.user}" />
<constructor-arg value="${sftp.proxy.pw}" />
</bean>
<bean id="sessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory" >
...
<property name="proxy" ref="proxySocks5" />
...
</bean>
セッションファクトリの委譲
バージョン 4.2 では、実行時に実際のセッションファクトリを選択できる DelegatingSessionFactory
が導入されました。SFTP エンドポイントを呼び出す前に、ファクトリで setThreadKey()
を呼び出して、キーを現在のスレッドに関連付けることができます。次に、そのキーを使用して、使用する実際のセッションファクトリを検索します。使用後に clearThreadKey()
を呼び出すことにより、キーをクリアできます。
次の例に示すように、メッセージフローから簡単に行えるように便利なメソッドを追加しました。
<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
<constructor-arg>
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
<!-- delegate factories here -->
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel="in" output-channel="c1"
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />
<int-sftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />
<int:service-activator input-channel="c2" output-channel="out"
expression="@dsf.clearThreadKey(#root)" />
セッションキャッシングを使用する場合(SFTP セッションキャッシングを参照)、各デリゲートをキャッシュする必要があります。DelegatingSessionFactory 自体をキャッシュすることはできません。 |
バージョン 5.0.7 以降、DelegatingSessionFactory
を RotatingServerAdvice
と組み合わせて使用して複数のサーバーをポーリングできます。受信チャネルアダプター: 複数のサーバーとディレクトリのポーリングを参照してください。
SFTP セッションキャッシング
Spring Integration バージョン 3.0 以降、セッションはデフォルトでキャッシュされなくなりました。cache-sessions 属性はエンドポイントでサポートされなくなりました。セッションをキャッシュする場合は、CachingSessionFactory を使用する必要があります(次の例を参照)。 |
3.0 より前のバージョンでは、セッションはデフォルトで自動的にキャッシュされていました。cache-sessions
属性は自動キャッシュを無効にするために使用できましたが、そのソリューションは他のセッションキャッシュ属性を構成する方法を提供していませんでした。例: 作成されるセッションの数を制限できませんでした。その要件と他の構成オプションをサポートするために、CachingSessionFactory
を追加しました。sessionCacheSize
および sessionWaitTimeout
プロパティを提供します。その名前が示すように、sessionCacheSize
プロパティは、ファクトリがキャッシュに保持するアクティブセッションの数を制御します(デフォルトは無制限です)。sessionCacheSize
しきい値に達した場合、キャッシュされたセッションのいずれかが使用可能になるか、セッションの待機時間が経過するまで(デフォルトの待機時間は Integer.MAX_VALUE
)、別のセッションを取得しようとするとブロックされます。sessionWaitTimeout
プロパティにより、待機時間の構成が可能になります。
セッションをキャッシュする場合は、デフォルトのセッションファクトリを設定し(前述のとおり)、CachingSessionFactory
のインスタンスにラップして、追加のプロパティを提供します。次の例は、その方法を示しています。
<bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="localhost"/>
</bean>
<bean id="cachingSessionFactory"
class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg ref="sftpSessionFactory"/>
<constructor-arg value="10"/>
<property name="sessionWaitTimeout" value="1000"/>
</bean>
上記の例では、sessionCacheSize
を 10
に設定し、sessionWaitTimeout
を 1 秒(1000 ミリ秒)に設定して CachingSessionFactory
を作成します。
Spring Integration バージョン 3.0 から、CachingConnectionFactory
は resetCache()
メソッドを提供します。呼び出されると、すべてのアイドルセッションはすぐに閉じられ、使用中のセッションはキャッシュに返されると閉じられます。isSharedSession=true
を使用すると、チャネルが閉じられ、共有セッションは最後のチャネルが閉じられたときにのみ閉じられます。セッションの新しいリクエストは、必要に応じて新しいセッションを確立します。
バージョン 5.1 以降、CachingSessionFactory
には新しいプロパティ testSession
があります。true の場合、stat(getHome())
コマンドを実行してセッションがテストされ、セッションがまだアクティブであることを確認します。そうでない場合、キャッシュから削除されます。アクティブなセッションがキャッシュにない場合、新しいセッションが作成されます。
RemoteFileTemplate
を使用する
Spring Integration バージョン 3.0 は、SftpSession
オブジェクトの新しい抽象化を提供します。このテンプレートは、ファイルを送信、取得(InputStream
として)、削除、名前変更するためのメソッドを提供します。さらに、呼び出し側がセッションで複数の操作を実行できるようにする execute
メソッドを提供します。すべての場合において、テンプレートはセッションを確実に閉じるように配慮します。詳細については、RemoteFileTemplate
の Javadoc を参照してください。SFTP にはサブクラス SftpRemoteFileTemplate
(Javadoc) があります。
getClientInstance()
を含む追加のメソッドをバージョン 4.1 に追加しました。下位の ChannelSftp
へのアクセスを提供し、低レベル API へのアクセスを可能にします。
バージョン 5.0 は RemoteFileOperations.invoke(OperationsCallback<F, T> action)
メソッドを導入しました。このメソッドを使用すると、同じスレッドでバインドされた Session
のスコープ内でいくつかの RemoteFileOperations
呼び出しを呼び出すことができます。これは、RemoteFileTemplate
の複数の高レベル操作を 1 つの作業単位として実行する必要がある場合に役立ちます。例: AbstractRemoteFileOutboundGateway
は mput
コマンドの実装でそれを使用し、提供されたディレクトリ内の各ファイルに対して、およびそのサブディレクトリに対して再帰的に put
操作を実行します。詳細については、Javadoc を参照してください。
SFTP 受信チャネルアダプター
SFTP 受信チャネルアダプターは、サーバーに接続し、リモートディレクトリイベント(作成中の新しいファイルなど)をリッスンする特別なリスナーです。この時点で、ファイル転送を開始します。次の例は、SFTP 受信チャネルアダプターを構成する方法を示しています。
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
上記の構成例は、以下を含むさまざまな属性の値を提供する方法を示しています。
local-directory
: ファイルが転送される場所remote-directory
: ファイルの転送元のリモートソースディレクトリsession-factory
: 前に構成した Bean への参照
デフォルトでは、転送されたファイルには元のファイルと同じ名前が付けられます。この動作をオーバーライドする場合は、local-filename-generator-expression
属性を設定できます。これにより、ローカルファイルの名前を生成する SpEL 式を提供できます。SpEL 評価コンテキストのルートオブジェクトが Message
である送信ゲートウェイおよびアダプターとは異なり、この受信アダプターは評価時にメッセージをまだ持っていません。これは、転送されたファイルをペイロードとして最終的に生成するためです。SpEL 評価コンテキストのルートオブジェクトは、リモートファイルの元の名前(String
)です。
受信チャネルアダプターは、最初にファイルをローカルディレクトリに取得し、ポーラー構成に従って各ファイルを発行します。バージョン 5.0 以降、新しいファイルの取得が必要な場合に SFTP サーバーから取得するファイルの数を制限できます。これは、ターゲットファイルが大きい場合、またはこのセクションで後述する永続的なファイルリストフィルターを使用してクラスター化システムで実行する場合に役立ちます。この目的には max-fetch-size
を使用してください。負の値(デフォルト)は制限がないことを意味し、一致するすべてのファイルが取得されます。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。バージョン 5.0 以降、scanner
属性を設定することにより、inbound-channel-adapter
にカスタム DirectoryScanner
実装を提供することもできます。
Spring Integration 3.0 以降、preserve-timestamp
属性を指定できます(デフォルトは false
です)。true
の場合、ローカルファイルの変更されたタイムスタンプは、サーバーから取得した値に設定されます。それ以外の場合は、現在の時刻に設定されます。
バージョン 4.2 以降、remote-directory
の代わりに remote-directory-expression
を指定できます。これにより、各ポーリングでディレクトリを動的に決定できます(例: remote-directory-expression="@myBean.determineRemoteDir()"
)。
filename-pattern
属性で指定された単純なパターンに基づいたファイルフィルタリングでは不十分な場合があります。この場合、filename-regex
属性を使用して、正規表現(たとえば、filename-regex=".*\.test$"
)を指定できます。完全な制御が必要な場合は、filter
属性を使用して、ファイルのリストをフィルタリングするための戦略インターフェースである org.springframework.integration.file.filters.FileListFilter
のカスタム実装への参照を提供できます。このフィルターは、どのリモートファイルを取得するかを決定します。CompositeFileListFilter
を使用して、パターンベースのフィルターを他のフィルター(AcceptOnceFileListFilter
など)と組み合わせて、以前にフェッチされたファイルの同期を回避することもできます。
AcceptOnceFileListFilter
はその状態をメモリに保存します。システムの再起動後も状態を維持したい場合は、代わりに SftpPersistentAcceptOnceFileListFilter
の使用を検討してください。このフィルターは、受け入れられたファイル名を MetadataStore
ストラテジーのインスタンスに保存します(メタデータストアを参照)。このフィルターは、ファイル名とリモート変更時刻で一致します。
バージョン 4.0 以降、このフィルターには ConcurrentMetadataStore
が必要です。共有データストア(Redis
と RedisMetadataStore
など)を併用すると、フィルターキーを複数のアプリケーションまたはサーバーインスタンス間で共有できます。
バージョン 5.0 以降、SftpInboundFileSynchronizer
には、メモリ内 SimpleMetadataStore
を持つ SftpPersistentAcceptOnceFileListFilter
がデフォルトで適用されます。このフィルターは、XML 構成の regex
または pattern
オプションとともに、Java DSL の SftpInboundChannelAdapterSpec
を通じても適用されます。CompositeFileListFilter
(または ChainFileListFilter
)を使用して、他のユースケースを処理できます。
上記の説明は、ファイルを取得する前にファイルをフィルタリングすることを示しています。ファイルが取得されると、ファイルシステム上のファイルに追加のフィルターが適用されます。デフォルトでは、これは `AcceptOnceFileListFilter` であり、このセクションで説明するように、状態をメモリに保持し、ファイルの変更時刻を考慮しません。アプリケーションが処理後にファイルを削除しない限り、アダプターはアプリケーションの再起動後にデフォルトでディスク上のファイルを再処理します。
また、SftpPersistentAcceptOnceFileListFilter
を使用するように filter
を構成し、リモートファイルのタイムスタンプが変更された場合(再取得されるため)、デフォルトのローカルフィルターはこの新しいファイルの処理を許可しません。
このフィルターの詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。
local-filter
属性を使用して、ローカルファイルシステムフィルターの動作を構成できます。バージョン 4.3.8 以降、FileSystemPersistentAcceptOnceFileListFilter
はデフォルトで構成されています。このフィルターは、受け入れられたファイル名と変更されたタイムスタンプを MetadataStore
戦略のインスタンス(メタデータストアを参照)に保存し、ローカルファイルの変更時刻の変更を検出します。デフォルトの MetadataStore
は、状態をメモリに保存する SimpleMetadataStore
です。
バージョン 4.1.5 以降、これらのフィルターには flushOnUpdate
と呼ばれる新しいプロパティがあり、更新ごとにメタデータストアをフラッシュします(ストアが Flushable
を実装している場合)。
さらに、分散 MetadataStore (Redis メタデータストアや Gemfire メタデータストアなど)を使用する場合、同じアダプターまたはアプリケーションの複数のインスタンスを作成し、1 つのインスタンスのみがファイルを処理するようにすることができます。 |
実際のローカルフィルターは、指定されたフィルターと、ダウンロード中のファイルの処理を防ぐパターンフィルターを含む CompositeFileListFilter
です(temporary-file-suffix
に基づく)。ファイルはこの接尾辞(デフォルトは .writing
)でダウンロードされ、ファイルは転送が完了すると最終的な名前に変更され、フィルターから「見える」ようになります。
これらの属性の詳細については、スキーマ [GitHub] (英語) を参照してください。
SFTP 受信チャネルアダプターは、ポーリングコンシューマーです。ポーラー(グローバルデフォルトまたはローカル要素)を構成する必要があります。ファイルがローカルディレクトリに転送されると、ペイロード型として java.io.File
を持つメッセージが生成され、channel
属性によって識別されるチャネルに送信されます。
ファイルフィルタリングと大きなファイルの詳細
監視対象(リモート)ディレクトリに表示されたばかりのファイルが完全でない場合があります。通常、このようなファイルは一時的な拡張子(something.txt.writing
という名前のファイルの .writing
など)で書き込まれ、書き込みプロセスの補完後に名前が変更されます。ほとんどの場合、開発者は完全なファイルのみに関心があり、それらのファイルのみをフィルタリングしたいと考えています。これらのシナリオを処理するために、filename-pattern
、filename-regex
、filter
属性によって提供されるフィルタリングサポートを使用できます。カスタムフィルターの実装が必要な場合は、filter
属性を設定することにより、アダプターに参照を含めることができます。次の例は、その方法を示しています。
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
障害からの回復
アダプターのアーキテクチャを理解する必要があります。ファイルシンクロナイザーがファイルを取得し、FileReadingMessageSource
が各同期ファイルのメッセージを送信します。前に説明したように、2 つのフィルターが関係しています。filter
属性(およびパターン)は、リモート(SFTP)ファイルリストを参照して、すでに取得されたファイルの取得を回避します。FileReadingMessageSource
は local-filter
を使用して、メッセージとして送信されるファイルを決定します。
シンクロナイザーはリモートファイルをリストし、そのフィルターを調べます。その後、ファイルが転送されます。ファイル転送中に IO エラーが発生した場合、フィルターにすでに追加されているファイルはすべて削除され、次のポーリングで再取得できるようになります。これは、フィルターが ReversibleFileListFilter
(AcceptOnceFileListFilter
など)を実装する場合にのみ適用されます。
ファイルを同期した後、ファイルを処理するダウンストリームフローでエラーが発生した場合、フィルターの自動ロールバックは発生しないため、失敗したファイルはデフォルトで再処理されません。
失敗後にそのようなファイルを再処理する場合は、次のような構成を使用して、失敗したファイルをフィルターから簡単に削除できます。
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
前述の構成は、すべての ResettableFileListFilter
で機能します。
バージョン 5.0 以降、受信チャネルアダプターは、生成されたローカルファイル名に従って、サブディレクトリをローカルに構築できます。リモートサブパスでもあります。階層サポートに従って変更するためにローカルディレクトリを再帰的に読み取ることができるように、Files.walk()
アルゴリズムに基づいて新しい RecursiveDirectoryScanner
を内部 FileReadingMessageSource
に提供できるようになりました。詳細については、AbstractInboundFileSynchronizingMessageSource.setScanner()
(Javadoc) を参照してください。また、setUseWatchService()
オプションを使用して、AbstractInboundFileSynchronizingMessageSource
を WatchService
ベースの DirectoryScanner
に切り替えることができるようになりました。また、すべての WatchEventType
インスタンスがローカルディレクトリの変更に反応するように設定されています。前に示した再処理のサンプルは、ファイルがローカルディレクトリから削除された(StandardWatchEventKinds.ENTRY_DELETE
)ときに ResettableFileListFilter.remove()
を使用する FileReadingMessageSource.WatchServiceDirectoryScanner
の組み込み機能に基づいています。詳細については、WatchServiceDirectoryScanner
を参照してください。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Java DSL を使用した構成
次の Spring Boot アプリケーションは、Java DSL で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlows
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
不完全なデータの処理
不完全なデータの処理を参照してください。
SftpSystemMarkerFilePresentFileListFilter
は、リモートシステムに対応するマーカーファイルがないリモートファイルをフィルタリングするために提供されています。構成情報については、Javadoc を参照してください。
SFTP ストリーミング受信チャネルアダプター
バージョン 4.3 は、ストリーミング受信チャネルアダプターを導入しました。このアダプターは、型 InputStream
のペイロードを持つメッセージを生成し、ローカルファイルシステムに書き込むことなくファイルをフェッチできるようにします。セッションは開いたままなので、ファイルを消費したときに消費側アプリケーションがセッションを閉じる必要があります。セッションは closeableResource
ヘッダー(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
)で提供されます。FileSplitter
や StreamTransformer
などの標準フレームワークコンポーネントは、セッションを自動的に閉じます。これらのコンポーネントの詳細については、ファイル分割およびストリームトランスを参照してください。次の例は、SFTP ストリーミング受信チャネルアダプターを構成する方法を示しています。
<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>
filename-pattern
、filename-regex
、filter
または filter-expression
のいずれか 1 つのみを使用できます。
バージョン 5.0 以降、デフォルトでは、SftpStreamingMessageSource アダプターは、メモリ内 SimpleMetadataStore に基づいて SftpPersistentAcceptOnceFileListFilter を使用することにより、リモートファイルの重複を防ぎます。デフォルトでは、このフィルターもファイル名パターン(または正規表現)とともに適用されます。重複を許可する必要がある場合は、AcceptAllFileListFilter を使用できます。CompositeFileListFilter (または ChainFileListFilter )を使用して、他のユースケースを処理できます。後で示す Java 構成は、処理後にリモートファイルを削除して重複を回避する 1 つの手法を示しています。 |
SftpPersistentAcceptOnceFileListFilter
の詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。
max-fetch-size
属性を使用して、フェッチが必要な場合に各ポーリングでフェッチされるファイルの数を制限できます。1
に設定し、クラスター環境で実行する場合は永続フィルターを使用します。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。
アダプターは、リモートディレクトリとファイル名をヘッダーに入れます(それぞれ FileHeaders.REMOTE_DIRECTORY
および FileHeaders.REMOTE_FILE
)。バージョン 5.0 以降、FileHeaders.REMOTE_FILE_INFO
ヘッダーは追加のリモートファイル情報(JSON 形式)を提供します。SftpStreamingMessageSource
の fileInfoJson
プロパティを false
に設定すると、ヘッダーには SftpFileInfo
オブジェクトが含まれます。SftpFileInfo.getFileInfo()
メソッドを使用して、基礎となる Jsch ライブラリが提供する LsEntry
オブジェクトにアクセスできます。fileInfoJson
プロパティは、XML 構成を使用する場合は使用できませんが、SftpStreamingMessageSource
を構成クラスの 1 つに挿入することで設定できます。リモートファイル情報も参照してください。
バージョン 5.1 以降、comparator
の汎用型は LsEntry
です。以前は、AbstractFileInfo<LsEntry>
でした。これは、maxFetch
をフィルタリングして適用する前に、処理の早い段階でソートが実行されるようになったためです。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory("sftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
この例では、トランスフォーマーの下流のメッセージハンドラーに、処理後にリモートファイルを削除するアドバイスがあることに注意してください。
受信チャネルアダプター: 複数のサーバーとディレクトリのポーリング
バージョン 5.0.7 以降、RotatingServerAdvice
が利用可能になりました。ポーリングアドバイスとして構成されている場合、受信アダプターは複数のサーバーとディレクトリをポーリングできます。通常どおり、アドバイスを構成し、ポーラーのアドバイスチェーンに追加します。DelegatingSessionFactory
を使用してサーバーを選択します。詳細については、セッションファクトリの委譲を参照してください。アドバイス構成は、RotationPolicy.KeyDirectory
オブジェクトのリストで構成されています。
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
このアドバイスは、新しいファイルが存在しなくなるまでサーバー one
上のディレクトリ foo
をポーリングし、その後、サーバー two
上のディレクトリ bar
、次にディレクトリ baz
に移動します。
このデフォルトの動作は、fair
コンストラクター arg を使用して変更できます。
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
この場合、前回のポーリングでファイルが返されたかどうかに関係なく、アドバイスは次のサーバー / ディレクトリに移動します。
または、独自の RotationPolicy
を提供して、必要に応じてメッセージソースを再構成できます。
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
および
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
local-filename-generator-expression
属性(シンクロナイザーの localFilenameGeneratorExpression
)に #remoteDirectory
変数を含めることができるようになりました。これにより、異なるディレクトリから取得したファイルを同様のディレクトリにローカルにダウンロードできます。
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(Sftp.inboundAdapter(sf())
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
このアドバイスを使用するときは、ポーラーで TaskExecutor を構成しないでください。詳細については、メッセージソースの条件付きポーラーを参照してください。 |
受信チャネルアダプター: リモートファイルフェッチの制御
受信チャネルアダプターを構成するときは、2 つのプロパティを考慮する必要があります。max-messages-per-poll
は、すべてのポーラーと同様に、各ポーリングで送信されるメッセージの数を制限するために使用できます(構成された値を超える準備ができている場合)。max-fetch-size
(バージョン 5.0 以降)は、一度にリモートサーバーから取得するファイルの数を制限できます。
次のシナリオでは、開始状態が空のローカルディレクトリであると想定しています。
max-messages-per-poll=2
およびmax-fetch-size=1
: アダプターは、1 つのファイルを取り出して発行し、次のファイルを取り出して発行します。その後、次のポーリングまでスリープします。max-messages-per-poll=2
およびmax-fetch-size=2
): アダプターは両方のファイルをフェッチしてから、それぞれを発行します。max-messages-per-poll=2
およびmax-fetch-size=4
: アダプターは、最大 4 つのファイル(使用可能な場合)をフェッチし、最初の 2 つを発行します(少なくとも 2 つある場合)。次の 2 つのファイルは、次のポーリングで発行されます。max-messages-per-poll=2
およびmax-fetch-size
が指定されていない: アダプターは、すべてのリモートファイルをフェッチし、最初の 2 つを発行します(少なくとも 2 つある場合)。後続のファイルは、後続のポーリングで発行されます(一度に 2 つ)。すべてが消費されると、リモートフェッチが再度試行され、新しいファイルが取得されます。
アプリケーションの複数のインスタンスをデプロイするときは、小さな max-fetch-size を設定して、1 つのインスタンスがすべてのファイルを「取得」し、他のインスタンスを枯渇させないようにすることをお勧めします。 |
max-fetch-size
のもう 1 つの用途は、リモートファイルのフェッチを停止するが、すでにフェッチされたファイルの処理を続行する場合です。MessageSource
で maxFetchSize
プロパティを設定すると(プログラム、JMX、コントロールバスを介して)、アダプターはさらにファイルを取得できなくなりますが、ポーラーは以前に取得したファイルのメッセージを引き続き送信できます。プロパティが変更されたときにポーラーがアクティブな場合、変更は次のポーリングで有効になります。
バージョン 5.1 以降、シンクロナイザーに Comparator<LsEntry>
を提供できます。これは、maxFetchSize
でフェッチされるファイルの数を制限するときに役立ちます。
SFTP 送信チャネルアダプター
SFTP 発信チャネルアダプターは、リモートディレクトリに接続し、受信 Message
のペイロードとして受信するすべてのファイルに対してファイル転送を開始する特別な MessageHandler
です。また、ファイルのいくつかの表現もサポートしているため、File
オブジェクトに限定されません。FTP 送信アダプターと同様に、SFTP 送信チャネルアダプターは次のペイロードをサポートしています。
java.io.File
: 実際のファイルオブジェクトbyte[]
: ファイルの内容を表すバイト配列java.lang.String
: ファイルの内容を表すテキストjava.io.InputStream
: リモートファイルに転送するデータのストリームorg.springframework.core.io.Resource
: リモートファイルに転送するデータのリソース
次の例は、SFTP 発信チャネルアダプターを構成する方法を示しています。
<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter"
session-factory="sftpSessionFactory"
channel="inputChannel"
charset="UTF-8"
remote-file-separator="/"
remote-directory="foo/bar"
remote-filename-generator-expression="payload.getName() + '-mysuffix'"
filename-generator="fileNameGenerator"
use-temporary-filename="true"
chmod="600"
mode="REPLACE"/>
これらの属性の詳細については、スキーマ [GitHub] (英語) を参照してください。
SpEL と SFTP 送信アダプター
Spring Integration の他の多くのコンポーネントと同様に、2 つの属性 remote-directory-expression
および remote-filename-generator-expression
(前述)を指定して SFTP 発信チャネルアダプターを構成するときに、Spring Expression Language(SpEL)を使用できます。式評価コンテキストにはメッセージがルートオブジェクトとして含まれており、メッセージのデータに基づいて(「ペイロード」または「ヘッダー」から)ファイル名または既存のディレクトリパスを動的に計算できる式を使用できます。前の例では、元の名前に基づいてファイル名を計算し、サフィックス "-mysuffix" を追加する式の値で remote-filename-generator-expression
属性を定義します。
バージョン 4.1 以降では、ファイルを転送するときに mode
を指定できます。デフォルトでは、既存のファイルは上書きされます。モードは FileExistsMode
列挙で定義され、次の値が含まれます。
REPLACE
(default)REPLACE_IF_MODIFIED
APPEND
APPEND_NO_FLUSH
IGNORE
FAIL
IGNORE
および FAIL
では、ファイルは転送されません。FAIL
は例外をスローしますが、IGNORE
は通知なしで転送を無視します(DEBUG
ログエントリは生成されます)。
バージョン 4.3 は chmod
属性を導入しました。これを使用して、アップロード後にリモートファイルの許可を変更できます。従来の Unix 8 進形式を使用できます(たとえば、600
では、ファイル所有者に対してのみ読み取り / 書き込みが許可されます)。java を使用してアダプターを構成する場合、setChmodOctal("600")
または setChmod(0600)
を使用できます。
部分的に書き込まれたファイルの回避
ファイル転送を処理する際の一般的な問題の 1 つは、部分的なファイルを処理する可能性です。転送が実際に完了する前に、ファイルがファイルシステムに表示される場合があります。
この課題に対処するために、Spring Integration SFTP アダプターは、ファイルが一時的な名前で転送され、完全に転送された後に名前が変更される一般的なアルゴリズムを使用します。
デフォルトでは、転送中のすべてのファイルは、追加のサフィックス(デフォルトでは .writing
)とともにファイルシステムに表示されます。temporary-file-suffix
属性を設定して変更できます。
ただし、この手法を使用したくない場合があります(たとえば、サーバーがファイル名の変更を許可していない場合)。このような状況では、use-temporary-file-name
を false
に設定することにより、この機能を無効にできます(デフォルトは true
です)。この属性が false
の場合、ファイルは最終的な名前で書き込まれ、消費側アプリケーションは、ファイルにアクセスする前にファイルが完全にアップロードされたことを検出する他のメカニズムを必要とします。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java で送信アダプターを構成する方法の例を示しています。
@SpringBootApplication
@IntegrationComponentScan
public class SftpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToSftp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<LsEntry>(factory);
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message<?> message) {
return "handlerContent.test";
}
});
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toSftpChannel")
void sendToSftp(File file);
}
}
Java DSL を使用した構成
次の Spring Boot アプリケーションは、Java DSL で送信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpOutboundFlow() {
return IntegrationFlows.from("toSftpChannel")
.handle(Sftp.outboundAdapter(this.sftpSessionFactory, FileExistsMode.FAIL)
.useTemporaryFileName(false)
.remoteDirectory("/foo")
).get();
}
}
SFTP 送信ゲートウェイ
SFTP 送信ゲートウェイは、リモート SFTP サーバーとやり取りできる限られたコマンドセットを提供します。
ls
(リストファイル)nlst
(リストファイル名)get
(ファイルを取得する)mget
(複数のファイルを取得する)rm
(ファイルを削除)mv
(ファイルの移動と名前変更)put
(ファイルを送る)mput
(複数のファイルを送信する)
ls
コマンドの使用
ls
はリモートファイルをリストし、次のオプションをサポートします。
-1
: ファイル名のリストを取得します。デフォルトでは、FileInfo
オブジェクトのリストを取得します-a
: すべてのファイルを含める ( "." で始まるものを含む)-f
: リストをソートしないでください-dirs
: インクルードディレクトリ (デフォルトで除外)-links
: シンボリックリンクを含める (デフォルトで除外)-R
: リモートディレクトリを再帰的にリストする
さらに、ファイル名のフィルタリングは inbound-channel-adapter
と同じ方法で提供されます。
ls
操作から生じるメッセージペイロードは、ファイル名のリストまたは FileInfo
オブジェクトのリストです(-1
スイッチを usr するかどうかによって異なります)。これらのオブジェクトは、変更された時間、権限などの情報を提供します。
ls
コマンドが実行されたリモートディレクトリは、file_remoteDirectory
ヘッダーで提供されます。
再帰オプション(-R
)を使用する場合、fileName
にはサブディレクトリ要素が含まれ、ファイルへの相対パス(リモートディレクトリに対する相対パス)を表します。-dirs
オプションを使用する場合、各再帰ディレクトリもリスト内の要素として返されます。この場合、-1
オプションを使用しないことをお勧めします。FileInfo
オブジェクトを使用するとファイルとディレクトリを区別できなくなるためです。
nlst
コマンドの使用
バージョン 5 では、nlst
コマンドのサポートが導入されました。
nlst
はリモートファイル名をリストし、1 つのオプションのみをサポートします。
-f
: リストをソートしないでください
nlst
操作から生じるメッセージペイロードは、ファイル名のリストです。
file_remoteDirectory
ヘッダーには、nlst
コマンドが実行されたリモートディレクトリが保持されます。
SFTP プロトコルは、名前をリストする機能を提供しません。このコマンドは、-1
オプションを指定した ls
コマンドに相当し、便宜上ここに追加されます。
get
コマンドの使用
get
はリモートファイルを取得し、次のオプションをサポートします。
-P
: リモートファイルのタイムスタンプを保持します。-stream
: リモートファイルをストリームとして取得します。-D
: 転送に成功したら、リモートファイルを削除します。FileExistsMode
はIGNORE
であり、ローカルファイルがすでに存在するため、転送が無視される場合、リモートファイルは削除されません。
file_remoteDirectory
ヘッダーはリモートディレクトリを保持し、file_remoteFile
ヘッダーはファイル名を保持します。
get
操作から生じるメッセージペイロードは、取得したファイルを表す File
オブジェクトです。-stream
オプションを使用する場合、ペイロードは File
ではなく InputStream
です。テキストファイルの一般的なユースケースは、この操作をファイルスプリッターまたはストリームトランスフォーマーと組み合わせることです。リモートファイルをストリームとして使用する場合、ストリームが消費された後に Session
を閉じる必要があります。便宜上、Session
は closeableResource
ヘッダーで提供され、IntegrationMessageHeaderAccessor
は便利なメソッドを提供します。
Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}
次の例は、ファイルをストリームとして使用する方法を示しています。
<int-sftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />
<int-file:splitter input-channel="stream" output-channel="lines" />
カスタムコンポーネントで入力ストリームを使用する場合は、Session を閉じる必要があります。次の例に示すように、カスタムコードでそれを行うか、メッセージのコピーを service-activator にルーティングして SpEL を使用できます。 |
<int:service-activator input-channel="closeSession"
expression="headers['closeableResource'].close()" />
mget
コマンドの使用
mget
は、パターンに基づいて複数のリモートファイルを取得し、次のオプションをサポートします。
-P
: リモートファイルのタイムスタンプを保持します。-R
: ディレクトリツリー全体を再帰的に取得します。-x
: パターンに一致するファイルがない場合は例外をスローします(そうでない場合は空のリストが返されます)。-D
: 転送が成功したら、各リモートファイルを削除します。転送が無視された場合、FileExistsMode
はIGNORE
であり、ローカルファイルがすでに存在するため、リモートファイルは削除されません。
mget
操作の結果のメッセージペイロードは、List<File>
オブジェクト(つまり、File
オブジェクトの List
、それぞれが取得したファイルを表す)です。
バージョン 5.0 以降、FileExistsMode が IGNORE の場合、出力メッセージのペイロードには、ファイルがすでに存在するためにフェッチされなかったファイルが含まれなくなりました。以前は、配列には既存のファイルを含むすべてのファイルが含まれていました。 |
使用する式は、リモートパスが で終わる結果を生成する必要があることを決定します。たとえば、
myfiles/
は myfiles
の完全なツリーをフェッチします。
バージョン 5.0 以降、FileExistsMode.REPLACE_IF_MODIFIED
モードと組み合わせて再帰的な MGET
を使用して、リモートディレクトリツリー全体を定期的にローカルに同期できます。このモードは、-P
(タイムスタンプを保持)オプションに関係なく、ローカルファイルの最終変更タイムスタンプをリモートファイルのタイムスタンプに設定します。
再帰を使用する場合の注意 ( -R ) パターンは無視され、 サブディレクトリをフィルタリングすると、そのサブディレクトリの追加の走査は実行されません。
通常、リモートディレクトリ構造がローカルに保持されるように、 |
永続ファイルリストフィルターにブールプロパティ forRecursion
が追加されました。このプロパティを true
に設定すると、alwaysAcceptDirectories
も設定されます。これは、送信ゲートウェイ(ls
および mget
)での再帰操作が常にディレクトリツリー全体を毎回トラバースすることを意味します。これは、ディレクトリツリーの奥深くで変更が検出されなかった問題を解決するためです。さらに、forRecursion=true
により、ファイルへのフルパスがメタデータストアキーとして使用されます。これにより、同じ名前のファイルが異なるディレクトリに複数回表示された場合にフィルターが正しく機能しなかった問題が解決されます。重要: これは、永続メタデータストア内の既存のキーが、最上位ディレクトリにあるファイルで見つからないことを意味します。このため、プロパティはデフォルトで false
です。これは将来のリリースで変更される可能性があります。
バージョン 5.0 から、alwaysAcceptDirectorties
を true
に設定することにより、常にディレクトリを渡すように SftpSimplePatternFileListFilter
および SftpRegexPatternFileListFilter
を構成できます。これにより、次の例に示すように、単純なパターンの再帰が可能になります。
<bean id="starDotTxtFilter"
class="org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter">
<constructor-arg value="*.txt" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
<bean id="dotStarDotTxtFilter"
class="org.springframework.integration.sftp.filters.SftpRegexPatternFileListFilter">
<constructor-arg value="^.*\.txt$" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
ゲートウェイで filter
プロパティを使用して、これらのフィルターのいずれかを提供できます。
送信ゲートウェイの部分的な成功 (mget
および mput
) も参照してください。
put
コマンドの使用
put
はファイルをリモートサーバーに送信します。メッセージのペイロードは、java.io.File
、byte[]
、String
にすることができます。remote-filename-generator
(または式)は、リモートファイルに名前を付けるために使用されます。その他の使用可能な属性には、remote-directory
、temporary-remote-directory
、それらに相当する *-expression
(use-temporary-file-name
および auto-create-directory
)が含まれます。詳細については、スキーマのドキュメント [GitHub] (英語) を参照してください。
put
操作から生じるメッセージペイロードは、転送後のサーバー上のファイルの完全パスを含む String
です。
バージョン 4.3 では chmod
属性が導入され、アップロード後にリモートファイルのアクセス許可が変更されます。従来の Unix 8 進形式を使用できます(たとえば、600
では、ファイル所有者に対してのみ読み取り / 書き込みが許可されます)。java を使用してアダプターを構成する場合、setChmod(0600)
を使用できます。
mput
コマンドの使用
mput
は複数のファイルをサーバーに送信し、次のオプションをサポートします。
-R
: 再帰 — ディレクトリとサブディレクトリ内のすべてのファイルを送信します(フィルタリングされている場合もあります)
メッセージペイロードは、ローカルディレクトリを表す java.io.File
(または String
)でなければなりません。バージョン 5.1 以降、File
または String
のコレクションもサポートされています。
put
コマンドと同じ属性がサポートされています。さらに、mput-pattern
、mput-regex
、mput-filter
または mput-filter-expression
のいずれかを使用して、ローカルディレクトリ内のファイルをフィルタリングできます。サブディレクトリ自体がフィルターを通過する限り、フィルターは再帰で機能します。フィルターを通過しないサブディレクトリは再帰されません。
mput
操作から生じるメッセージペイロードは、List<String>
オブジェクト(つまり、転送から生じるリモートファイルパスの List
)です。
送信ゲートウェイの部分的な成功 (mget
および mput
) も参照してください。
バージョン 4.3 では、chmod
属性が導入されました。これにより、アップロード後にリモートファイルの権限を変更できます。従来の Unix 8 進形式を使用できます(たとえば、600
では、ファイル所有者に対してのみ読み取り / 書き込みが許可されます)。Java を使用してアダプターを構成する場合、setChmodOctal("600")
または setChmod(0600)
を使用できます。
rm
コマンドの使用
rm
コマンドにはオプションがありません。
削除操作が成功した場合、結果のメッセージペイロードは Boolean.TRUE
です。それ以外の場合、メッセージペイロードは Boolean.FALSE
です。file_remoteDirectory
ヘッダーはリモートディレクトリを保持し、file_remoteFile
ヘッダーはファイル名を保持します。
mv
コマンドの使用
mv
コマンドにはオプションがありません。
expression
属性は "from" パスを定義し、rename-expression
属性は "to" パスを定義します。デフォルトでは、rename-expression
は headers['file_renameTo']
です。この式は、null または空の String
と評価されてはなりません。必要に応じて、必要なリモートディレクトリが作成されます。結果メッセージのペイロードは Boolean.TRUE
です。file_remoteDirectory
ヘッダーは元のリモートディレクトリを保持し、file_remoteFile
ヘッダーはファイル名を保持します。file_renameTo
ヘッダーは新しいパスを保持します。
バージョン 5.5.6 以降、remoteDirectoryExpression
を mv
コマンドで使用できるようになりました。"from" ファイルが完全なファイルパスでない場合、remoteDirectoryExpression
の結果がリモートディレクトリとして使用されます。同じことが "to" ファイルにも当てはまります。たとえば、タスクがディレクトリ内のリモートファイルの名前を変更するだけの場合です。
追加のコマンド情報
get
および mget
コマンドは、local-filename-generator-expression
属性をサポートしています。転送中にローカルファイルの名前を生成する SpEL 式を定義します。評価コンテキストのルートオブジェクトはリクエストメッセージです。remoteFileName
変数も使用できます。mget
で特に役立ちます(例: local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.foo"
)。
get
および mget
コマンドは、local-directory-expression
属性をサポートしています。転送中にローカルディレクトリの名前を生成する SpEL 式を定義します。評価コンテキストのルートオブジェクトはリクエストメッセージです。remoteDirectory
変数も使用できます。mget で特に役立ちます(例: local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.myheader"
)。この属性は、local-directory
属性と相互に排他的です。
すべてのコマンドについて、ゲートウェイの「式」プロパティは、コマンドが動作するパスを保持します。mget
コマンドの場合、式は に評価される場合があります。これは、すべてのファイル
somedirectory/
、および *
で終わるその他の値を取得することを意味します。
次の例は、ls
コマンド用に構成されたゲートウェイを示しています。
<int-ftp:outbound-gateway id="gateway1"
session-factory="ftpSessionFactory"
request-channel="inbound1"
command="ls"
command-options="-1"
expression="payload"
reply-channel="toSplitter"/>
toSplitter
チャネルに送信されるメッセージのペイロードは、String
オブジェクトのリストであり、各オブジェクトにはファイルの名前が含まれています。command-options="-1"
を省略した場合、ペイロードは FileInfo
オブジェクトのリストになります。オプションをスペース区切りリストとして提供できます(例: command-options="-1 -dirs -links"
)。
バージョン 4.2 以降、GET
、MGET
、PUT
、MPUT
コマンドは FileExistsMode
プロパティ(名前空間サポートを使用する場合は mode
)をサポートします。これは、ローカルファイルが存在する場合(GET
および MGET
)またはリモートファイルが存在する場合(PUT
および MPUT
)の動作に影響します。サポートされているモードは REPLACE
、APPEND
、FAIL
、IGNORE
です。下位互換性のために、PUT
および MPUT
操作のデフォルトモードは REPLACE
です。GET
および MGET
操作の場合、デフォルトは FAIL
です。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java で送信ゲートウェイを構成する方法の例を示しています。
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new SftpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
}
}
Java DSL を使用した構成
次の Spring Boot アプリケーションは、送信ゲートウェイを Java DSL で構成する方法の例を示しています。
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
factory.setTestSession(true);
return new CachingSessionFactory<LsEntry>(sf);
}
@Bean
public QueueChannelSpec remoteFileOutputChannel() {
return MessageChannels.queue();
}
@Bean
public IntegrationFlow sftpMGetFlow() {
return IntegrationFlows.from("sftpMgetInputChannel")
.handle(Sftp.outboundGateway(sftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
.regexFileNameFilter("(subSftpSource|.*1.txt)")
.localDirectoryExpression("'myDir/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replaceFirst('sftpSource', 'localTarget')"))
.channel("remoteFileOutputChannel")
.get();
}
}
送信ゲートウェイの部分的な成功 (mget
および mput
)
(mget
および mput
を使用して)複数のファイルで操作を実行する場合、1 つ以上のファイルが転送された後、しばらく時間が経過すると例外が発生することがあります。この場合(バージョン 4.2 以降)、PartialSuccessException
がスローされます。通常の MessagingException
プロパティ(failedMessage
および cause
)と同様に、この例外には 2 つの追加プロパティがあります。
partialResults
: 正常な転送結果。derivedInput
: リクエストメッセージから生成されたファイルのリスト(mput
用に転送するローカルファイルなど)。
これらの属性により、正常に転送されたファイルと転送されなかったファイルを判別できます。
再帰的な mput
の場合、PartialSuccessException
にはネストされた PartialSuccessException
インスタンスがあります。
次のディレクトリ構造を考慮してください。
root/
|- file1.txt
|- subdir/
| - file2.txt
| - file3.txt
|- zoo.txt
file3.txt
で例外が発生した場合、ゲートウェイによってスローされた PartialSuccessException
には、file1.txt
、subdir
、zoo.txt
の derivedInput
と file1.txt
の partialResults
があります。その cause
は、file2.txt
の derivedInput
と file2.txt
の file3.txt
および partialResults
を備えた別の PartialSuccessException
です。
SFTP/JSCH ロギング
JSch ライブラリを使用して SFTP サポートを提供するため、特に何かが適切に機能していない場合(認証の例外など)、JSch API 自体からの詳細情報が必要になる場合があります。残念ながら、JSch は commons-logging
を使用しませんが、代わりに com.jcraft.jsch.Logger
インターフェースのカスタム実装に依存しています。Spring Integration 2.0.1 の時点で、このインターフェースを実装しました。JSch ロギングを有効にするために、通常の方法でロガーを構成できます。例: 次の例は、Log4J を使用するロガーの有効な構成です。
log4j.category.com.jcraft.jsch=DEBUG
MessageSessionCallback
Spring Integration バージョン 4.2 以降、<int-sftp:outbound-gateway/>
(SftpOutboundGateway
)で MessageSessionCallback<F, T>
実装を使用して、requestMessage
コンテキストで Session<LsEntry>
の操作を実行できます。統合フロー定義や関数インターフェース(ラムダ)実装インジェクションからのアクセスを許可するなど、非標準または低レベルの SFTP 操作(または複数)に使用できます。次の例では、ラムダを使用しています。
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler sftpOutboundGateway(SessionFactory<ChannelSftp.LsEntry> sessionFactory) {
return new SftpOutboundGateway(sessionFactory,
(session, requestMessage) -> session.list(requestMessage.getPayload()));
}
別の例は、送信または取得されるファイルデータの前処理または後処理です。
XML 構成を使用する場合、<int-sftp:outbound-gateway/>
は session-callback
属性を提供し、MessageSessionCallback
Bean 名を指定できます。
session-callback は、command および expression 属性と相互に排他的です。Java で構成する場合、SftpOutboundGateway クラスはさまざまなコンストラクターを提供します。 |
Apache Mina SFTP サーバーイベント
バージョン 5.2 で追加された ApacheMinaSftpEventListener
は、特定の Apache Mina SFTP サーバーイベントをリッスンし、ApplicationListener
Bean、@EventListener
Bean メソッド、またはイベント受信チャネルアダプターで受信できる ApplicationEvent
として公開します。
現在サポートされているイベントは次のとおりです。
SessionOpenedEvent
- クライアントセッションが開かれましたDirectoryCreatedEvent
- ディレクトリが作成されましたFileWrittenEvent
- ファイルが書き込まれたPathMovedEvent
- ファイルまたはディレクトリの名前が変更されましたPathRemovedEvent
- ファイルまたはディレクトリが削除されましたSessionClosedEvent
- クライアントが切断されました
これらはそれぞれ ApacheMinaSftpEvent
のサブクラスです。すべてのイベント型を受信する単一のリスナーを設定できます。各イベントの source
プロパティは ServerSession
であり、そこからクライアントアドレスなどの情報を取得できます。便利な getSession()
メソッドが抽象イベントで提供されます。
リスナー(Spring Bean でなければなりません)でサーバーを構成するには、単に SftpSubsystemFactory
に追加します。
server = SshServer.setUpDefaultServer();
...
SftpSubsystemFactory sftpFactory = new SftpSubsystemFactory();
sftpFactory.addSftpEventListener(apacheMinaSftpEventListenerBean);
...
Spring Integration イベントアダプターを使用してこれらのイベントを消費するには:
@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
ApplicationEventListeningMessageProducer producer =
new ApplicationEventListeningMessageProducer();
producer.setEventTypes(ApacheMinaSftpEvent.class);
producer.setOutputChannel(eventChannel());
return producer;
}
リモートファイル情報
バージョン 5.2 以降、SftpStreamingMessageSource
(SFTP ストリーミング受信チャネルアダプター)、SftpInboundFileSynchronizingMessageSource
(SFTP 受信チャネルアダプター)、SftpOutboundGateway
(SFTP 送信ゲートウェイ)の -commands の「読み取り」は、リモートファイルに関する情報を生成する追加のヘッダーをメッセージに提供します。
FileHeaders.REMOTE_HOST_PORT
- ファイル転送操作中にリモートセッションが接続された host:port ペア。FileHeaders.REMOTE_DIRECTORY
- 操作が実行されたリモートディレクトリ。FileHeaders.REMOTE_FILE
- リモートファイル名。単一ファイル操作にのみ適用されます。
SftpInboundFileSynchronizingMessageSource
はリモートファイルに対してメッセージを生成せず、ローカルコピーを使用するため、AbstractInboundFileSynchronizer
はリモートファイルに関する情報を、同期操作中に URI スタイル(protocol://host:port/remoteDirectory#remoteFileName
)で MetadataStore
(外部で構成可能)に保存します。このメタデータは、ローカルファイルがポーリングされるときに SftpInboundFileSynchronizingMessageSource
によって取得されます。ローカルファイルが削除された場合、そのメタデータエントリを削除することをお勧めします。AbstractInboundFileSynchronizer
は、この目的のために removeRemoteFileMetadata()
コールバックを提供します。さらに、メタデータキーで使用される setMetadataStorePrefix()
があります。これらのコンポーネント間で同じ MetadataStore
インスタンスが共有される場合、フィルターと AbstractInboundFileSynchronizer
の両方がメタデータエントリに同じローカルファイル名を使用するため、このプレフィックスを MetadataStore
ベースの FileListFilter
実装で使用されるものと異なるものにすることをお勧めします。キー。