FTP/FTPS アダプター
Spring Integration は、FTP および FTPS を使用したファイル転送操作のサポートを提供します。
ファイル転送プロトコル(FTP)は、インターネット上の 2 台のコンピューター間でファイルを転送できる単純なネットワークプロトコルです。FTPS は "FTP over SSL" の略です。
この依存関係をプロジェクトに含める必要があります。
FTP 通信に関しては、クライアントとサーバーの 2 つのアクターがあります。FTP または FTPS でファイルを転送するには、FTP サーバーを実行しているリモートコンピューターへの接続を開始するクライアントを使用します。接続が確立された後、クライアントはファイルのコピーを送信または受信することを選択できます。
Spring Integration は、受信チャネルアダプター、送信チャネルアダプター、送信ゲートウェイの 3 つのクライアント側エンドポイントを提供することにより、FTP または FTPS を介したファイルの送受信をサポートします。また、これらのクライアントコンポーネントを定義するための便利な名前空間ベースの構成オプションも提供します。
FTP 名前空間を使用するには、XML ファイルのヘッダーに次を追加します。
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp
https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd"
FTP セッションファクトリ
Spring Integration は、FTP(または FTPS)セッションの作成に使用できるファクトリを提供します。
デフォルトのファクトリ
バージョン 3.0 以降、セッションはデフォルトでキャッシュされなくなりました。FTP セッションキャッシングを参照してください。 |
FTP アダプターを構成する前に、FTP セッションファクトリを構成する必要があります。実装クラスが o.s.i.ftp.session.DefaultFtpSessionFactory
である通常の Bean 定義で FTP セッションファクトリを構成できます。次の例は、基本的な構成を示しています。
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="kermit"/>
<property name="password" value="frog"/>
<property name="clientMode" value="0"/>
<property name="fileType" value="2"/>
<property name="bufferSize" value="100000"/>
</bean>
FTPS 接続の場合は、代わりに o.s.i.ftp.session.DefaultFtpsSessionFactory
を使用できます。
次の例は、完全な構成を示しています。
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpsSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="oleg"/>
<property name="password" value="password"/>
<property name="clientMode" value="1"/>
<property name="fileType" value="2"/>
<property name="useClientMode" value="true"/>
<property name="cipherSuites" value="a,b.c"/>
<property name="keyManager" ref="keyManager"/>
<property name="protocol" value="SSL"/>
<property name="trustManager" ref="trustManager"/>
<property name="prot" value="P"/>
<property name="needClientAuth" value="true"/>
<property name="authValue" value="oleg"/>
<property name="sessionCreation" value="true"/>
<property name="protocols" value="SSL, TLS"/>
<property name="implicit" value="true"/>
</bean>
接続の問題が発生し、セッション作成をトレースし、どのセッションがポーリングされているかを確認したい場合は、ロガーを TRACE レベル(log4j.category.org.springframework.integration.file=TRACE など)に設定することにより、セッショントレースを有効にできます。 |
これで、これらのセッションファクトリをアダプターに挿入するだけで済みます。アダプターが使用するプロトコル(FTP または FTPS)は、アダプターに挿入されたセッションファクトリの型によって異なります。
FTP または FTPS セッションファクトリの値を提供するより実用的な方法は、Spring のプロパティプレースホルダーサポートを使用することです(https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-factory-placeholderconfigurer を参照)。 |
高度な構成
DefaultFtpSessionFactory
は、(Spring Integration 2.0 以降)Apache Commons ネット (英語) である基礎となるクライアント API を抽象化します。これにより、org.apache.commons.net.ftp.FTPClient
の低レベルの構成の詳細から解放されます。いくつかの一般的なプロパティがセッションファクトリで公開されています(バージョン 4.0 以降、これには connectTimeout
、defaultTimeout
、dataTimeout
が含まれるようになりました)。ただし、より高度な構成(アクティブモードのポート範囲の設定など)を実現するために、低レベルの FTPClient
構成にアクセスする必要がある場合があります。そのために、AbstractFtpSessionFactory
(すべての FTP セッションファクトリの基本クラス)は、次のリストに示す 2 つの後処理メソッドの形式でフックを公開します。
/**
* Will handle additional initialization after client.connect() method was invoked,
* but before any action on the client has been taken
*/
protected void postProcessClientAfterConnect(T t) throws IOException {
// NOOP
}
/**
* Will handle additional initialization before client.connect() method was invoked.
*/
protected void postProcessClientBeforeConnect(T client) throws IOException {
// NOOP
}
ご覧のとおり、これら 2 つのメソッドにはデフォルトの実装はありません。ただし、次の例に示すように、DefaultFtpSessionFactory
を継承することにより、これらのメソッドをオーバーライドして、FTPClient
のより高度な構成を提供できます。
public class AdvancedFtpSessionFactory extends DefaultFtpSessionFactory {
protected void postProcessClientBeforeConnect(FTPClient ftpClient) throws IOException {
ftpClient.setActivePortRange(4000, 5000);
}
}
FTPS および共有 SSLSession
FTP over SSL または TLS を使用する場合、一部のサーバーでは、制御接続とデータ接続で同じ SSLSession
を使用する必要があります。これは、データ接続の「盗用」を防ぐためです。詳細については、https://scarybeastsecurity.blogspot.cz/2009/02/vsftpd-210-released.html (英語) を参照してください。
現在、Apache FTPSClient はこの機能をサポートしていません。NET-408 [Apache] (英語) を参照してください。
Stack Overflow (英語) の提供による以下のソリューションは、sun.security.ssl.SSLSessionContextImpl
でのリフレクションを使用しているため、他の JVM 上では動作しない可能性があります。このスタックオーバーフローの解答は 2015 年に提出されたもので、この解答は最近、Spring Integration チームによって JDK 1.8.0_112 上でテストされています。
次の例は、FTPS セッションを作成する方法を示しています。
@Bean
public DefaultFtpsSessionFactory sf() {
DefaultFtpsSessionFactory sf = new DefaultFtpsSessionFactory() {
@Override
protected FTPSClient createClientInstance() {
return new SharedSSLFTPSClient();
}
};
sf.setHost("...");
sf.setPort(21);
sf.setUsername("...");
sf.setPassword("...");
sf.setNeedClientAuth(true);
return sf;
}
private static final class SharedSSLFTPSClient extends FTPSClient {
@Override
protected void _prepareDataSocket_(final Socket socket) throws IOException {
if (socket instanceof SSLSocket) {
// Control socket is SSL
final SSLSession session = ((SSLSocket) _socket_).getSession();
final SSLSessionContext context = session.getSessionContext();
context.setSessionCacheSize(0); // you might want to limit the cache
try {
final Field sessionHostPortCache = context.getClass()
.getDeclaredField("sessionHostPortCache");
sessionHostPortCache.setAccessible(true);
final Object cache = sessionHostPortCache.get(context);
final Method method = cache.getClass().getDeclaredMethod("put", Object.class,
Object.class);
method.setAccessible(true);
String key = String.format("%s:%s", socket.getInetAddress().getHostName(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
key = String.format("%s:%s", socket.getInetAddress().getHostAddress(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
}
catch (NoSuchFieldException e) {
// Not running in expected JRE
logger.warn("No field sessionHostPortCache in SSLSessionContext", e);
}
catch (Exception e) {
// Not running in expected JRE
logger.warn(e.getMessage());
}
}
}
}
セッションファクトリの委譲
バージョン 4.2 では、実行時に実際のセッションファクトリを選択できる DelegatingSessionFactory
が導入されました。FTP エンドポイントを呼び出す前に、ファクトリで setThreadKey()
を呼び出して、キーを現在のスレッドに関連付けます。次に、そのキーを使用して、使用する実際のセッションファクトリを検索します。使用後に clearThreadKey()
を呼び出すことにより、キーをクリアできます。
メッセージフローから委譲セッションファクトリを簡単に使用できるように、便利なメソッドを追加しました。
次の例は、委譲セッションファクトリを宣言する方法を示しています。
<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
<constructor-arg>
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
<!-- delegate factories here -->
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel="in" output-channel="c1"
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />
<int-ftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />
<int:service-activator input-channel="c2" output-channel="out"
expression="@dsf.clearThreadKey(#root)" />
セッションキャッシュを使用する場合(FTP セッションキャッシングを参照)、各デリゲートをキャッシュする必要があります。DelegatingSessionFactory 自体をキャッシュすることはできません。 |
バージョン 5.0.7 以降、DelegatingSessionFactory
を RotatingServerAdvice
と組み合わせて使用して複数のサーバーをポーリングできます。受信チャネルアダプター: 複数のサーバーとディレクトリのポーリングを参照してください。
FTP 受信チャネルアダプター
FTP 受信チャネルアダプターは、FTP サーバーに接続し、ファイル転送を開始するリモートディレクトリイベント(たとえば、作成された新しいファイル)をリッスンする特別なリスナーです。次の例は、inbound-channel-adapter
を構成する方法を示しています。
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
上記の構成が示すように、inbound-channel-adapter
要素を使用して FTP 受信チャネルアダプターを構成すると同時に、local-directory
、filename-pattern
(正規表現ではなく単純なパターンマッチングに基づく)、および session-factory
デフォルトでは、転送されたファイルには元のファイルと同じ名前が付けられます。この動作をオーバーライドする場合は、local-filename-generator-expression
属性を設定できます。これにより、ローカルファイルの名前を生成する SpEL 式を提供できます。SpEL 評価コンテキストのルートオブジェクトが Message
である送信ゲートウェイおよびアダプターとは異なり、この受信アダプターは、評価時にメッセージをまだ持っていません。これは、転送されたファイルをペイロードとして最終的に生成するためです。SpEL 評価コンテキストのルートオブジェクトは、リモートファイルの元の名前(String
)です。
受信チャネルアダプターは、最初にローカルディレクトリの File
オブジェクトを取得し、ポーラー設定に従って各ファイルを発行します。バージョン 5.0 以降、新しいファイルの取得が必要な場合に FTP サーバーから取得するファイルの数を制限できるようになりました。これは、ターゲットファイルが非常に大きい場合や、後述する永続的なファイルリストフィルターを使用してクラスター化システムで実行する場合に役立ちます。この目的のために max-fetch-size
を使用します。負の値(デフォルト)は制限がないことを意味し、一致するすべてのファイルが取得されます。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。バージョン 5.0 以降、scanner
属性を設定することにより、inbound-channel-adapter
にカスタム DirectoryScanner
実装を提供することもできます。
Spring Integration 3.0 以降、preserve-timestamp
属性を指定できます(デフォルトは false
です)。true
の場合、ローカルファイルの変更されたタイムスタンプは、サーバーから取得した値に設定されます。それ以外の場合は、現在の時刻に設定されます。
バージョン 4.2 以降、remote-directory
の代わりに remote-directory-expression
を指定して、各ポーリングでディレクトリを動的に決定できます(例: remote-directory-expression="@myBean.determineRemoteDir()"
)。
バージョン 4.3 から、remote-directory
および remote-directory-expression
属性を省略できます。デフォルトは null
です。この場合、FTP プロトコルに従って、クライアントの作業ディレクトリがデフォルトのリモートディレクトリとして使用されます。
filename-pattern
属性で指定された単純なパターンに基づくファイルフィルタリングでは不十分な場合があります。この場合、filename-regex
属性を使用して、正規表現(filename-regex=".*\.test$"
など)を指定できます。また、完全な制御が必要な場合は、filter
属性を使用して、ファイルのリストをフィルタリングするための戦略インターフェースである o.s.i.file.filters.FileListFilter
のカスタム実装への参照を提供できます。このフィルターは、どのリモートファイルを取得するかを決定します。CompositeFileListFilter
を使用して、パターンベースのフィルターを他のフィルター(以前にフェッチされたファイルの同期を避けるための AcceptOnceFileListFilter
など)と組み合わせることもできます。
AcceptOnceFileListFilter
はその状態をメモリに保存します。システムの再起動後も状態を維持したい場合は、代わりに FtpPersistentAcceptOnceFileListFilter
の使用を検討してください。このフィルターは、受け入れられたファイル名を MetadataStore
ストラテジーのインスタンスに保存します(メタデータストアを参照)。このフィルターは、ファイル名とリモート変更時刻で一致します。
バージョン 4.0 以降、このフィルターには ConcurrentMetadataStore
が必要です。共有データストア(Redis
と RedisMetadataStore
など)で使用すると、フィルターキーを複数のアプリケーションまたはサーバーインスタンス間で共有できます。
バージョン 5.0 以降、FtpInboundFileSynchronizer
には、メモリ内 SimpleMetadataStore
を含む FtpPersistentAcceptOnceFileListFilter
がデフォルトで適用されます。このフィルターは、XML 構成の regex
または pattern
オプション、および Java DSL の FtpInboundChannelAdapterSpec
にも適用されます。その他のユースケースは、CompositeFileListFilter
(または ChainFileListFilter
)で管理できます。
これまでの説明では、ファイルを取得する前にフィルタリングすることについて言及しました。ファイルが取得されると、ファイルシステム上のファイルに追加のフィルターが適用されます。デフォルトでは、これは AcceptOnceFileListFilter
であり、前述のように、メモリ内の状態を保持し、ファイルの変更時刻は考慮しません。アプリケーションが処理後にファイルを削除しない限り、アダプターはアプリケーションの再起動後にデフォルトでディスク上のファイルを再処理します。
また、FtpPersistentAcceptOnceFileListFilter
を使用するように filter
を構成し、リモートファイルのタイムスタンプが変更された場合(再取得されるため)、デフォルトのローカルフィルターはこの新しいファイルの処理を許可しません。
このフィルターの詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。
local-filter
属性を使用して、ローカルファイルシステムフィルターの動作を構成できます。バージョン 4.3.8 以降、FileSystemPersistentAcceptOnceFileListFilter
はデフォルトで構成されています。このフィルターは、受け入れられたファイル名と変更されたタイムスタンプを MetadataStore
戦略のインスタンス(メタデータストアを参照)に保存し、ローカルファイルの変更時刻の変更を検出します。デフォルトの MetadataStore
は SimpleMetadataStore
であり、メモリに状態を保存します。
バージョン 4.1.5 以降、これらのフィルターには、更新ごとにメタデータストアをフラッシュする新しいプロパティ(flushOnUpdate
)があります(ストアが Flushable
を実装している場合)。
さらに、分散 MetadataStore (Redis や GemFire など)を使用する場合、同じアダプターまたはアプリケーションの複数のインスタンスを使用して、各ファイルが 1 回だけ処理されるようにすることができます。 |
実際のローカルフィルターは、指定されたフィルターと、ダウンロード中のファイルの処理を防ぐパターンフィルターを含む CompositeFileListFilter
です(temporary-file-suffix
に基づく)。ファイルはこの接尾辞(デフォルトは .writing
)でダウンロードされ、転送が完了するとファイルの名前が最終的な名前に変更され、フィルターから「見える」ようになります。
remote-file-separator
属性を使用すると、デフォルトの "/" が特定の環境に適用できない場合に使用するファイル区切り文字を構成できます。
これらの属性の詳細については、スキーマ [GitHub] (英語) を参照してください。
また、FTP 受信チャネルアダプターはポーリングコンシューマーであることも理解する必要があります。ポーラーを構成する必要があります(グローバルデフォルトまたはローカルサブ要素を使用して)。ファイルが転送されると、java.io.File
をペイロードとして持つメッセージが生成され、channel
属性によって識別されるチャネルに送信されます。
ファイルフィルタリングと不完全なファイルの詳細
モニター対象(リモート)ディレクトリに表示されたばかりのファイルが完全でない場合があります。通常、このようなファイルは一時的な拡張子(somefile.txt.writing
など)で書き込まれ、書き込みプロセスが終了すると名前が変更されます。ほとんどの場合、完全なファイルのみに関心があり、完全なファイルのみをフィルタリングしたいと考えています。これらのシナリオを処理するために、filename-pattern
、filename-regex
、filter
属性によって提供されるフィルタリングサポートを使用できます。次の例では、カスタムフィルターの実装を使用しています。
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
受信 FTP アダプターのポーラー構成に関する注意
受信 FTP アダプターのジョブは、2 つのタスクで構成されています。
リモートディレクトリからローカルディレクトリにファイルを転送するために、リモートサーバーと通信します。
転送されたファイルごとに、そのファイルをペイロードとしてメッセージを生成し、'channel' 属性によって識別されたチャネルに送信します。そのため、単に「アダプター」ではなく「チャネルアダプター」と呼ばれています。このようなアダプターの主なジョブは、メッセージを生成してメッセージチャネルに送信することです。基本的に、2 番目のタスクは、ローカルディレクトリにすでに 1 つ以上のファイルがある場合、最初にそれらからメッセージを生成するようなメソッドで優先されます。すべてのローカルファイルが処理された場合にのみ、リモート通信を開始してさらにファイルを取得します。
また、ポーラーでトリガーを構成するときは、max-messages-per-poll
属性に細心の注意を払う必要があります。デフォルト値は、すべての SourcePollingChannelAdapter
インスタンス(FTP を含む)の 1
です。これは、1 つのファイルが処理されるとすぐに、トリガー構成で決定された次の実行時間まで待機することを意味します。local-directory
に 1 つ以上のファイルが存在する場合、リモート FTP サーバーとの通信を開始する前にそれらのファイルを処理します。また、max-messages-per-poll
が 1
(デフォルト)に設定されている場合、トリガーで定義された間隔で、一度に 1 つのファイルのみを処理し、"one-poll === one-file" として機能します。
典型的なファイル転送のユースケースでは、逆の動作が必要になる可能性が高くなります。各ポーリングで可能なすべてのファイルを処理し、次のポーリングを待つだけです。その場合は、max-messages-per-poll
を -1 に設定します。次に、各ポーリングで、アダプターは可能な限り多くのメッセージを生成しようとします。つまり、ローカルディレクトリ内のすべてを処理し、リモートディレクトリに接続して、そこで使用可能なすべてを転送してローカルで処理します。その場合にのみ、ポーリング操作は完了したと見なされ、ポーラーは次の実行時間まで待機します。
あるいは、「ポーリングごとの最大メッセージ数」の値を、各ポーリングでファイルから作成されるメッセージの上限を示す正の値に設定することもできます。例: 10
の値は、各ポーリングで、10 個以下のファイルを処理しようとすることを意味します。
障害からの回復
アダプターのアーキテクチャーを理解することが重要です。ファイルを取得するファイルシンクロナイザーと、同期されたファイルごとにメッセージを送信する FileReadingMessageSource
があります。前述のように、2 つのフィルターが関係しています。filter
属性(およびパターン)は、リモート(FTP)ファイルリストを参照して、すでに取得されたファイルの取得を回避します。local-filter
は、FileReadingMessageSource
がメッセージとして送信するファイルを決定するために使用されます。
シンクロナイザーはリモートファイルをリストし、そのフィルターを調べます。その後、ファイルが転送されます。ファイル転送中に IO エラーが発生した場合、フィルターにすでに追加されているファイルはすべて削除され、次のポーリングで再取得できるようになります。これは、フィルターが ReversibleFileListFilter
(AcceptOnceFileListFilter
など)を実装する場合にのみ適用されます。
ファイルを同期した後、ファイルを処理するダウンストリームフローでエラーが発生した場合、フィルターの自動ロールバックは発生しないため、失敗したファイルはデフォルトで再処理されません。
失敗後にそのようなファイルを再処理する場合は、次のような構成を使用して、フィルターから失敗したファイルを簡単に削除できます。
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
前述の構成は、すべての ResettableFileListFilter
で機能します。
バージョン 5.0 以降、受信チャネルアダプターは、生成されたローカルファイル名に対応するサブディレクトリをローカルに構築できます。リモートサブパスでもあります。階層サポートに従って変更するためにローカルディレクトリを再帰的に読み取ることができるように、Files.walk()
アルゴリズムに基づいて新しい RecursiveDirectoryScanner
を内部 FileReadingMessageSource
に提供できるようになりました。詳細については、AbstractInboundFileSynchronizingMessageSource.setScanner()
(Javadoc) を参照してください。また、setUseWatchService()
オプションを使用して、AbstractInboundFileSynchronizingMessageSource
を WatchService
ベースの DirectoryScanner
に切り替えることができるようになりました。また、すべての WatchEventType
インスタンスがローカルディレクトリの変更に反応するように構成されています。前に示した再処理サンプルは、FileReadingMessageSource.WatchServiceDirectoryScanner
の組み込み機能に基づいており、ローカルディレクトリからファイルが削除された(StandardWatchEventKinds.ENTRY_DELETE
)ときに ResettableFileListFilter.remove()
を実行します。詳細については、WatchServiceDirectoryScanner
を参照してください。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Java DSL を使用した構成
次の Spring Boot アプリケーションは、Java DSL で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
不完全なデータの処理
不完全なデータの処理を参照してください。
FtpSystemMarkerFilePresentFileListFilter
は、リモートシステム上に対応するマーカーファイルを持たないリモートファイルをフィルタリングするために提供されています。構成情報については、Javadoc を参照(および親クラスを参照)してください。
FTP ストリーミング受信チャネルアダプター
バージョン 4.3 は、ストリーミング受信チャネルアダプターを導入しました。このアダプターは、型 InputStream
のペイロードを持つメッセージを生成し、ローカルファイルシステムに書き込むことなくファイルを取得できるようにします。セッションは開いたままなので、ファイルを消費したときに消費側アプリケーションがセッションを閉じる必要があります。セッションは closeableResource
ヘッダー(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
)で提供されます。FileSplitter
や StreamTransformer
などの標準フレームワークコンポーネントは、セッションを自動的に閉じます。これらのコンポーネントの詳細については、ファイル分割およびストリームトランスを参照してください。次の例は、inbound-streaming-channel-adapter
を構成する方法を示しています。
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
filename-pattern
、filename-regex
、filter
または filter-expression
のいずれか 1 つのみが許可されます。
バージョン 5.0 以降、デフォルトでは、FtpStreamingMessageSource アダプターは、メモリ内 SimpleMetadataStore に基づいて FtpPersistentAcceptOnceFileListFilter を使用したリモートファイルの重複を防ぎます。デフォルトでは、このフィルターはファイル名パターン(または正規表現)でも適用されます。重複を許可する必要がある場合は、AcceptAllFileListFilter を使用できます。その他のユースケースは、CompositeFileListFilter (または ChainFileListFilter )で処理できます。Java 構成(このドキュメントで後述)には、重複を避けるために処理後にリモートファイルを削除する 1 つの手法が示されています。 |
FtpPersistentAcceptOnceFileListFilter
の詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。
max-fetch-size
属性を使用して、フェッチが必要なときに各ポーリングでフェッチされるファイルの数を制限します。1
に設定し、クラスター環境で実行する場合は永続フィルターを使用します。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。
アダプターは、リモートディレクトリとファイル名をそれぞれ FileHeaders.REMOTE_DIRECTORY
ヘッダーと FileHeaders.REMOTE_FILE
ヘッダーに挿入します。バージョン 5.0 以降、FileHeaders.REMOTE_FILE_INFO
ヘッダーは追加の リモートファイル情報を提供します (デフォルトでは JSON で表されます)。FtpStreamingMessageSource
の fileInfoJson
プロパティを false
に設定すると、ヘッダーには FtpFileInfo
オブジェクトが含まれます。基礎となる Apache Net ライブラリによって提供される FTPFile
オブジェクトには、FtpFileInfo.getFileInfo()
メソッドを使用してアクセスできます。fileInfoJson
プロパティは、XML 構成を使用する場合は使用できませんが、構成クラスの 1 つに FtpStreamingMessageSource
を挿入することで設定できます。リモートファイル情報も参照してください。
バージョン 5.1 以降、comparator
の汎用型は FTPFile
です。以前は、AbstractFileInfo<FTPFile>
でした。これは、maxFetch
をフィルタリングして適用する前に、処理の早い段階でソートが実行されるようになったためです。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
この例では、トランスフォーマーの下流のメッセージハンドラーに、処理後にリモートファイルを削除するアドバイスがあることに注意してください。
受信チャネルアダプター: 複数のサーバーとディレクトリのポーリング
バージョン 5.0.7 以降、RotatingServerAdvice
が利用可能になりました。ポーリングアドバイスとして構成されている場合、受信アダプターは複数のサーバーとディレクトリをポーリングできます。通常どおり、アドバイスを構成し、ポーラーのアドバイスチェーンに追加します。DelegatingSessionFactory
を使用してサーバーを選択します。詳細については、セッションファクトリの委譲を参照してください。アドバイス構成は、RotationPolicy.KeyDirectory
オブジェクトのリストで構成されています。
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
このアドバイスは、新しいファイルが存在しなくなるまでサーバー one
上のディレクトリ foo
をポーリングし、その後、サーバー two
上のディレクトリ bar
、次にディレクトリ baz
に移動します。
このデフォルトの動作は、fair
コンストラクター arg を使用して変更できます。
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
この場合、前回のポーリングでファイルが返されたかどうかに関係なく、アドバイスは次のサーバー / ディレクトリに移動します。
または、独自の RotationPolicy
を提供して、必要に応じてメッセージソースを再構成できます。
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
および
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
local-filename-generator-expression
属性(シンクロナイザーの localFilenameGeneratorExpression
)に #remoteDirectory
変数を含めることができるようになりました。これにより、異なるディレクトリから取得したファイルを同様のディレクトリにローカルにダウンロードできます。
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(Ftp.inboundAdapter(sf())
.filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
このアドバイスを使用するときは、ポーラーで TaskExecutor を構成しないでください。詳細については、メッセージソースの条件付きポーラーを参照してください。 |
受信チャネルアダプター: リモートファイルフェッチの制御
受信チャネルアダプターを構成するときに考慮する必要がある 2 つのプロパティがあります。max-messages-per-poll
は、すべてのポーラーと同様に、各ポーリングで送信されるメッセージの数を制限するために使用できます(構成された値を超える準備ができている場合)。max-fetch-size
(バージョン 5.0 以降)は、一度にリモートサーバーから取得するファイルの数を制限できます。
次のシナリオでは、開始状態が空のローカルディレクトリであると想定しています。
max-messages-per-poll=2
およびmax-fetch-size=1
: アダプターは、1 つのファイルをフェッチし、それを発行し、次のファイルをフェッチし、それを発行してから、次のポーリングまでスリープします。max-messages-per-poll=2
およびmax-fetch-size=2
): アダプターは両方のファイルをフェッチしてから、それぞれを発行します。max-messages-per-poll=2
およびmax-fetch-size=4
: アダプターは最大 4 つのファイル(使用可能な場合)をフェッチし、最初の 2 つ(少なくとも 2 つある場合)を発行します。次の 2 つのファイルは、次のポーリングで発行されます。max-messages-per-poll=2
およびmax-fetch-size
が指定されていない: アダプターは、すべてのリモートファイルをフェッチし、最初の 2 つを発行します(少なくとも 2 つある場合)。後続のファイルは、後続のポーリングで発行されます(一度に 2 つ)。すべてのファイルが消費されると、リモートフェッチが再度試行され、新しいファイルが取得されます。
アプリケーションの複数のインスタンスをデプロイする場合、1 つのインスタンスがすべてのファイルを「取得」し、他のインスタンスを枯渇させないように、小さな max-fetch-size をお勧めします。 |
max-fetch-size
のもう 1 つの用途は、リモートファイルのフェッチを停止するが、すでにフェッチされたファイルの処理を続行する場合です。MessageSource
で maxFetchSize
プロパティを設定すると(プログラム、JMX、制御バスを使用)、アダプターはさらにファイルを取得できなくなりますが、ポーラーは以前に取得したファイルのメッセージを引き続き送信できます。プロパティが変更されたときにポーラーがアクティブな場合、変更は次のポーリングで有効になります。
バージョン 5.1 以降、シンクロナイザーに Comparator<FTPFile>
を提供できます。これは、maxFetchSize
でフェッチされるファイルの数を制限するときに役立ちます。
FTP 送信チャネルアダプター
FTP 発信チャネルアダプターは、FTP サーバーに接続し、受信メッセージのペイロードで受信するすべてのファイルに対して FTP 転送を開始する MessageHandler
実装に依存しています。また、ファイルのいくつかの表現もサポートしているため、java.io.File
-typed ペイロードのみに制限されません。FTP 発信チャネルアダプターは、次のペイロードをサポートしています。
java.io.File
: 実際のファイルオブジェクトbyte[]
: ファイルの内容を表すバイト配列java.lang.String
: ファイルの内容を表すテキストjava.io.InputStream
: リモートファイルに転送するデータのストリームorg.springframework.core.io.Resource
: リモートファイルに転送するデータのリソース
次の例は、outbound-channel-adapter
を構成する方法を示しています。
<int-ftp:outbound-channel-adapter id="ftpOutbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
charset="UTF-8"
remote-file-separator="/"
auto-create-directory="true"
remote-directory-expression="headers['remote_dir']"
temporary-remote-directory-expression="headers['temp_remote_dir']"
filename-generator="fileNameGenerator"
use-temporary-filename="true"
chmod="600"
mode="REPLACE"/>
上記の構成は、filename-generator
(o.s.i.file.FileNameGenerator
戦略インターフェースの実装)、session-factory
への参照、その他の属性などのさまざまな属性の値を提供しながら、outbound-channel-adapter
エレメントを使用して FTP 送信・チャネルアダプターを構成する方法を示しています。また、SpEL を使用して remote-directory-expression
、temporary-remote-directory-expression
、remote-filename-generator-expression
(前の例に示した filename-generator
の代替 SpEL)などの設定を構成できる *expression
属性の例もいくつか見ることができます。SpEL の使用を許可する他のコンポーネントと同様に、ペイロードとメッセージヘッダーへのアクセスは、「ペイロード」変数と「ヘッダー」変数を介して利用できます。使用可能な属性の詳細については、スキーマ [GitHub] (英語) を参照してください。
デフォルトでは、ファイル名ジェネレーターが指定されていない場合、Spring Integration は o.s.i.file.DefaultFileNameGenerator を使用します。DefaultFileNameGenerator は、MessageHeaders の file_name ヘッダー(存在する場合)の値に基づいてファイル名を決定します。または、メッセージのペイロードがすでに java.io.File である場合、そのファイルの元の名前を使用します。 |
特定の値(remote-directory など)の定義は、プラットフォームまたは FTP サーバーに依存する場合があります。例: https://forum.spring.io/showthread.php?p=333478&posted=1#post333478 (英語) で報告されたように、一部のプラットフォームでは、ディレクトリ定義の最後にスラッシュを追加する必要があります(たとえば、remote-directory="/thing1/thing2" の代わりに remote-directory="/thing1/thing2/" )。 |
バージョン 4.1 以降、ファイルの転送時に mode
を指定できます。デフォルトでは、既存のファイルは上書きされます。モードは FileExistsMode
列挙によって定義され、次の値が含まれます。
REPLACE
(default)REPLACE_IF_MODIFIED
APPEND
APPEND_NO_FLUSH
IGNORE
FAIL
IGNORE
および FAIL
はファイルを転送しません。FAIL
は例外をスローしますが、IGNORE
はサイレントに転送を無視します(ただし、DEBUG
ログエントリが生成されます)。
バージョン 5.2 は chmod
属性を導入しました。これを使用して、アップロード後にリモートファイルの許可を変更できます。従来の Unix 8 進形式を使用できます(たとえば、600
では、ファイル所有者に対してのみ読み取り / 書き込みが許可されます)。java を使用してアダプターを構成する場合、setChmodOctal("600")
または setChmod(0600)
を使用できます。FTP サーバーが SITE CHMOD
サブコマンドをサポートしている場合にのみ適用されます。
部分的に書き込まれたファイルの回避
ファイル転送を処理するときに発生する一般的な問題の 1 つは、部分的なファイルを処理する可能性です。つまり、転送が実際に完了する前にファイルがファイルシステムに表示される場合があります。
この課題に対処するために、Spring Integration FTP アダプターは一般的なアルゴリズムを使用します。ファイルは一時的な名前で転送され、完全に転送されると名前が変更されます。
デフォルトでは、転送中のすべてのファイルは、追加のサフィックス(デフォルトでは .writing
)とともにファイルシステムに表示されます。temporary-file-suffix
属性を設定することにより、このサフィックスを変更できます。
ただし、この手法を使用したくない場合があります(たとえば、サーバーがファイル名の変更を許可していない場合)。このような状況では、use-temporary-file-name
を false
に設定することにより、この機能を無効にできます(デフォルトは true
です)。この属性が false
の場合、ファイルは最終的な名前で書き込まれ、コンシュームするアプリケーションは、ファイルにアクセスする前にファイルが完全にアップロードされたことを検出する他のメカニズムを必要とします。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で送信アダプターを構成する方法の例を示しています。
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpMessageHandler handler = new FtpMessageHandler(ftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message<?> message) {
return "handlerContent.test";
}
});
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
Java DSL を使用した構成
次の Spring Boot アプリケーションは、Java DSL を使用して送信アダプターを構成する方法の例を示しています。
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public IntegrationFlow ftpOutboundFlow() {
return IntegrationFlows.from("toFtpChannel")
.handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
.useTemporaryFileName(false)
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
.remoteDirectory(this.ftpServer.getTargetFtpDirectory().getName())
).get();
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
FTP 送信ゲートウェイ
FTP 送信ゲートウェイは、リモート FTP または FTPS サーバーと対話するための限られたコマンドセットを提供します。サポートされているコマンドは次のとおりです。
ls
(リストファイル)nlst
(リストファイル名)get
(ファイルを取得する)mget
(ファイルの取得)rm
(ファイルを削除)mv
(ファイルの移動 / 名前変更)put
(ファイルの送信)mput
(複数のファイルを送信する)
ls
コマンドの使用
ls
はリモートファイルをリストし、次のオプションをサポートします。
-1
: ファイル名のリストを取得します。デフォルトでは、FileInfo
オブジェクトのリストを取得します。-a
: すべてのファイルを含める ( "." で始まるものを含む)-f
: リストをソートしないでください-dirs
: インクルードディレクトリ (それらはデフォルトで除外されます)-links
: シンボリックリンクを含める (それらはデフォルトで除外されます)-R
: リモートディレクトリを再帰的にリストする
さらに、inbound-channel-adapter
と同じ方法で、ファイル名のフィルタリングが提供されます。FTP 受信チャネルアダプターを参照してください。
ls
操作の結果のメッセージペイロードは、ファイル名のリストまたは FileInfo
オブジェクトのリストです。これらのオブジェクトは、変更された時間、権限、その他の詳細などの情報を提供します。
ls
コマンドが実行されたリモートディレクトリは、file_remoteDirectory
ヘッダーで提供されます。
再帰オプション(-R
)を使用する場合、fileName
にはサブディレクトリ要素が含まれ、ファイルへの相対パス(リモートディレクトリに対する相対パス)を表します。-dirs
オプションが含まれている場合、各再帰ディレクトリもリスト内の要素として返されます。この場合、-1
オプションを使用しないことをお勧めします。FileInfo
オブジェクトで実行できるディレクトリとファイルを区別できないためです。
バージョン 4.3 以降、FtpSession
は list()
および listNames()
メソッドの null
をサポートします。expression
属性は省略できます。便宜上、Java 構成には、expression
引数を持たない 2 つのコンストラクターがあります。または LS
、NLST
、PUT
、MPUT
コマンドの場合、null
は、FTP プロトコルに従ってクライアントの作業ディレクトリとして扱われます。リクエストメッセージに対してリモートパスを評価するには、他のすべてのコマンドを expression
とともに提供する必要があります。DefaultFtpSessionFactory
を継承し、postProcessClientAfterConnect()
コールバックを実装するときに、FTPClient.changeWorkingDirectory()
関数を使用して作業ディレクトリを設定できます。
nlst
コマンドの使用
バージョン 5 では、nlst
コマンドのサポートが導入されました。
nlst
はリモートファイル名をリストし、1 つのオプションのみをサポートします。
-f
: リストをソートしないでください
nlst
操作から生じるメッセージペイロードは、ファイル名のリストです。
nlst
コマンドが実行されたリモートディレクトリは、file_remoteDirectory
ヘッダーで提供されます。
LIST
コマンドを使用する ls
コマンドの -1
オプションとは異なり、nlst
コマンドは、ターゲット FTP サーバーに NLST
コマンドを送信します。このコマンドは、サーバーが LIST
をサポートしていない場合に役立ちます(たとえば、セキュリティ制限のため)。nlst
操作の結果は、他の詳細なしの名前です。フレームワークは、エンティティがディレクトリであるかどうかを判断できません。たとえば、フィルタリングや再帰的なリストを実行できます。
get
コマンドの使用
get
はリモートファイルを取得します。次のオプションをサポートしています。
-P
: リモートファイルのタイムスタンプを保持します。-stream
: リモートファイルをストリームとして取得します。-D
: 転送に成功したら、リモートファイルを削除します。FileExistsMode
はIGNORE
であり、ローカルファイルがすでに存在するため、転送が無視される場合、リモートファイルは削除されません。
file_remoteDirectory
ヘッダーはリモートディレクトリ名を提供し、file_remoteFile
ヘッダーはファイル名を提供します。
get
操作から生じるメッセージペイロードは、取得したファイルを表す File
オブジェクト、または -stream
オプションを使用する場合の InputStream
です。-stream
オプションを使用すると、ファイルをストリームとして取得できます。テキストファイルの一般的なユースケースは、この操作をファイルスプリッターまたはストリームトランスフォーマーと組み合わせることです。リモートファイルをストリームとして使用する場合、ストリームが消費された後に Session
を閉じる必要があります。便宜上、Session
は closeableResource
ヘッダーで提供され、IntegrationMessageHeaderAccessor
の簡易メソッドでアクセスできます。次の例は、簡易メソッドの使用メソッドを示しています。
Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}
ファイルスプリッターやストリームトランスフォーマーなどのフレームワークコンポーネントは、データの転送後にセッションを自動的に閉じます。
次の例は、ファイルをストリームとして使用する方法を示しています。
<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />
<int-file:splitter input-channel="stream" output-channel="lines" />
カスタムコンポーネントで入力ストリームを使用する場合は、Session を閉じる必要があります。次の例に示すように、カスタムコードで、またはメッセージのコピーを service-activator にルーティングして SpEL を使用することで、これを行うことができます。 |
<int:service-activator input-channel="closeSession"
expression="headers['closeableResource'].close()" />
mget
コマンドの使用
mget
は、パターンに基づいて複数のリモートファイルを取得し、次のオプションをサポートします。
-P
: リモートファイルのタイムスタンプを保持します。-R
: ディレクトリツリー全体を再帰的に取得します。-x
: パターンに一致するファイルがない場合は例外をスローします(そうでない場合は空のリストが返されます)。-D
: 転送が成功したら、各リモートファイルを削除します。FileExistsMode
はIGNORE
であり、ローカルファイルがすでに存在するため、転送が無視される場合、リモートファイルは削除されません。
mget
操作の結果のメッセージペイロードは、List<File>
オブジェクト(つまり、File
オブジェクトの List
、それぞれが取得したファイルを表す)です。
バージョン 5.0 以降、FileExistsMode が IGNORE の場合、出力メッセージのペイロードには、ファイルがすでに存在するためにフェッチされなかったファイルが含まれなくなりました。以前は、リストには既存のファイルを含むすべてのファイルが含まれていました。 |
リモートパスを決定するために使用される式は、 で終わる結果を生成する必要があります。
somedir/
は、somedir
の完全なツリーをフェッチします。
バージョン 5.0 以降、再帰的な mget
を新しい FileExistsMode.REPLACE_IF_MODIFIED
モードと組み合わせて、リモートディレクトリツリー全体を定期的にローカルに同期することができます。このモードは、-P
(タイムスタンプを保持)オプションに関係なく、ローカルファイルの最終変更タイムスタンプをリモートタイムスタンプに置き換えます。
再帰を使用する ( -R ) パターンは無視され、 サブディレクトリがフィルタリングされると、そのサブディレクトリの追加の走査は実行されません。
通常、リモートディレクトリ構造がローカルに保持されるように、 |
バージョン 5.0 から、FtpSimplePatternFileListFilter
および FtpRegexPatternFileListFilter
は、alwaysAcceptDirectories
プロパティを true
に設定することにより、常にディレクトリを渡すように構成できます。これにより、次の例に示すように、単純なパターンの再帰が可能になります。
<bean id="starDotTxtFilter"
class="org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter">
<constructor-arg value="*.txt" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
<bean id="dotStarDotTxtFilter"
class="org.springframework.integration.ftp.filters.FtpRegexPatternFileListFilter">
<constructor-arg value="^.*\.txt$" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
前の例のようなフィルターを定義したら、ゲートウェイで filter
プロパティを設定することにより、フィルターを使用できます。
送信ゲートウェイの部分的な成功 (mget
および mput
) も参照してください。
put
コマンドの使用
put
コマンドは、ファイルをリモートサーバーに送信します。メッセージのペイロードは、java.io.File
、byte[]
、String
にすることができます。remote-filename-generator
(または式)は、リモートファイルに名前を付けるために使用されます。その他の使用可能な属性には、remote-directory
、temporary-remote-directory
、それらに相当する *-expression
(use-temporary-file-name
および auto-create-directory
)が含まれます。詳細については、スキーマ [GitHub] (英語) のドキュメントを参照してください。
put
操作の結果として生じるメッセージペイロードは、転送後のサーバー上のファイルの完全パスを表す String
です。
バージョン 5.2 では chmod
属性が導入され、アップロード後にリモートファイルのアクセス許可が変更されます。従来の Unix 8 進形式を使用できます(たとえば、600
では、ファイル所有者に対してのみ読み取り / 書き込みが許可されます)。java を使用してアダプターを構成する場合、setChmod(0600)
を使用できます。FTP サーバーが SITE CHMOD
サブコマンドをサポートしている場合にのみ適用されます。
mput
コマンドの使用
mput
は複数のファイルをサーバーに送信し、1 つのオプションのみをサポートします。
-R
: 再帰的。ディレクトリとそのサブディレクトリ内のすべてのファイル(おそらくフィルタリングされた)を送信します。
メッセージペイロードは、ローカルディレクトリを表す java.io.File
(または String
)でなければなりません。バージョン 5.1 以降、File
または String
のコレクションもサポートされています。
このコマンドは、put
コマンドと同じ属性をサポートしています。さらに、ローカルディレクトリ内のファイルは、mput-pattern
、mput-regex
、mput-filter
または mput-filter-expression
のいずれかでフィルターできます。サブディレクトリ自体がフィルターを通過する限り、フィルターは再帰で機能します。フィルターを通過しないサブディレクトリは再帰されません。
mget
操作の結果のメッセージペイロードは、List<String>
オブジェクト(つまり、転送の結果としてのリモートファイルパスの List
)です。
送信ゲートウェイの部分的な成功 (mget
および mput
) も参照してください。
バージョン 5.2 では、chmod
属性が導入されました。これにより、アップロード後にリモートファイルの権限を変更できます。従来の Unix 8 進形式を使用できます(たとえば、600
では、ファイル所有者に対してのみ読み取り / 書き込みが許可されます)。Java を使用してアダプターを構成する場合、setChmodOctal("600")
または setChmod(0600)
を使用できます。FTP サーバーが SITE CHMOD
サブコマンドをサポートしている場合にのみ適用されます。
rm
コマンドの使用
rm
コマンドはファイルを削除します。
rm
コマンドにはオプションがありません。
rm
操作の結果としてのメッセージペイロードは、削除が成功した場合は Boolean.TRUE
、そうでない場合は Boolean.FALSE
です。file_remoteDirectory
ヘッダーはリモートディレクトリを提供し、file_remoteFile
ヘッダーはファイル名を提供します。
mv
コマンドの使用
mv
コマンドはファイルを移動します
mv
コマンドにはオプションがありません。
expression
属性は "from" パスを定義し、rename-expression
属性は "to" パスを定義します。デフォルトでは、rename-expression
は headers['file_renameTo']
です。この式は null または空の String
に評価してはなりません。必要に応じて、必要なリモートディレクトリが作成されます。結果メッセージのペイロードは Boolean.TRUE
です。file_remoteDirectory
ヘッダーは元のリモートディレクトリを提供し、file_remoteFile
ヘッダーはファイル名を提供します。新しいパスは file_renameTo
ヘッダーにあります。
FTP 送信ゲートウェイコマンドに関する追加情報
get
および mget
コマンドは、local-filename-generator-expression
属性をサポートしています。転送中にローカルファイルの名前を生成する SpEL 式を定義します。評価コンテキストのルートオブジェクトはリクエストメッセージです。mget
で特に有用な remoteFileName
変数も使用可能です(例: local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.something"
)。
get
および mget
コマンドは、local-directory-expression
属性をサポートしています。転送中にローカルディレクトリの名前を生成する SpEL 式を定義します。評価コンテキストのルートオブジェクトはリクエストメッセージですが、mget
で特に有用な remoteDirectory
変数も使用可能です(例: local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.something"
)。この属性は、local-directory
属性と相互に排他的です。
すべてのコマンドについて、ゲートウェイの「式」プロパティは、コマンドが機能するパスを提供します。mget
コマンドの場合、式は "'、すべてのファイルを取得することを意味する、または ' somedirectory/" などと評価される場合があります。
次の例は、ls
コマンド用に構成されたゲートウェイを示しています。
<int-ftp:outbound-gateway id="gateway1"
session-factory="ftpSessionFactory"
request-channel="inbound1"
command="ls"
command-options="-1"
expression="payload"
reply-channel="toSplitter"/>
toSplitter
チャネルに送信されるメッセージのペイロードは、それぞれファイルの名前を含む String
オブジェクトのリストです。command-options
属性が省略された場合、FileInfo
オブジェクトを保持します。スペースで区切られたオプション(たとえば、command-options="-1 -dirs -links"
)を使用します。
バージョン 4.2 以降、GET
、MGET
、PUT
、MPUT
コマンドは FileExistsMode
プロパティ(名前空間サポートを使用する場合は mode
)をサポートします。これは、ローカルファイルが存在する場合(GET
および MGET
)またはリモートファイルが存在する場合(PUT
および MPUT
)の動作に影響します。サポートされているモードは REPLACE
、APPEND
、FAIL
、IGNORE
です。下位互換性のために、PUT
および MPUT
操作のデフォルトモードは REPLACE
です。GET
および MGET
操作の場合、デフォルトは FAIL
です。
バージョン 5.0 から、setWorkingDirExpression()
(XML の working-dir-expression
)オプションが FtpOutboundGateway
(XML の <int-ftp:outbound-gateway>
)で提供されます。実行時にクライアントの作業ディレクトリを変更できます。式はリクエストメッセージに対して評価されます。前の作業ディレクトリは、各ゲートウェイ操作の後に復元されます。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で送信ゲートウェイを構成する方法の例を示しています。
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpOutboundGateway ftpOutboundGateway =
new FtpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
ftpOutboundGateway.setOutputChannelName("lsReplyChannel");
return ftpOutboundGateway;
}
}
Java DSL を使用した構成
次の Spring Boot アプリケーションは、送信ゲートウェイを Java DSL で構成する方法の例を示しています。
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpOutboundGatewaySpec ftpOutboundGateway() {
return Ftp.outboundGateway(ftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
.regexFileNameFilter("(subFtpSource|.*1.txt)")
.localDirectoryExpression("'localDirectory/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replaceFirst('ftpSource', 'localTarget')");
}
@Bean
public IntegrationFlow ftpMGetFlow(AbstractRemoteFileOutboundGateway<FTPFile> ftpOutboundGateway) {
return f -> f
.handle(ftpOutboundGateway)
.channel(c -> c.queue("remoteFileOutputChannel"));
}
}
送信ゲートウェイの部分的な成功 (mget
および mput
)
(mget
および mput
を使用して)複数のファイルに対して操作を実行すると、1 つ以上のファイルが転送された後、しばらく時間が経過する場合があります。この場合(バージョン 4.2 以降)、PartialSuccessException
がスローされます。通常の MessagingException
プロパティ(failedMessage
および cause
)と同様に、この例外には 2 つの追加プロパティがあります。
partialResults
: 正常な転送結果。derivedInput
: リクエストメッセージから生成されたファイルのリスト(たとえば、mput
用に転送するローカルファイル)。
これらの属性により、正常に転送されたファイルと転送されなかったファイルを判別できます。
再帰的な mput
の場合、PartialSuccessException
にはネストされた PartialSuccessException
出現箇所があります。
次のディレクトリ構造を考慮してください。
root/
|- file1.txt
|- subdir/
| - file2.txt
| - file3.txt
|- zoo.txt
file3.txt
で例外が発生した場合、ゲートウェイによってスローされた PartialSuccessException
には、file1.txt
、subdir
、zoo.txt
の derivedInput
と file1.txt
の partialResults
があります。その cause
は、file2.txt
の derivedInput
と file2.txt
の file3.txt
および partialResults
を備えた別の PartialSuccessException
です。
FTP セッションキャッシング
Spring Integration 3.0 以降、セッションはデフォルトでキャッシュされなくなりました。cache-sessions 属性は、エンドポイントではサポートされなくなりました。セッションをキャッシュする場合は、CachingSessionFactory (次の例に示す)を使用する必要があります。 |
3.0 より前のバージョンでは、セッションはデフォルトで自動的にキャッシュされていました。cache-sessions
属性は自動キャッシュを無効にするために使用できましたが、そのソリューションは他のセッションキャッシュ属性を構成する方法を提供しませんでした。例: 作成されるセッションの数を制限できませんでした。その要件と他の構成オプションをサポートするために、CachingSessionFactory
が追加されました。sessionCacheSize
および sessionWaitTimeout
プロパティを提供します。sessionCacheSize
プロパティは、ファクトリがキャッシュに保持するアクティブセッションの数を制御します(デフォルトは無制限です)。sessionCacheSize
しきい値に達した場合、キャッシュされたセッションのいずれかが使用可能になるか、セッションの待機時間が経過するまで(デフォルトの待機時間は Integer.MAX_VALUE
)、別のセッションを取得しようとするとブロックされます。sessionWaitTimeout
プロパティはその値を設定します。
セッションをキャッシュする場合は、前述のようにデフォルトのセッションファクトリを設定し、CachingSessionFactory
のインスタンスにラップして、追加のプロパティを提供できます。次の例は、その方法を示しています。
<bean id="ftpSessionFactory" class="o.s.i.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost"/>
</bean>
<bean id="cachingSessionFactory" class="o.s.i.file.remote.session.CachingSessionFactory">
<constructor-arg ref="ftpSessionFactory"/>
<constructor-arg value="10"/>
<property name="sessionWaitTimeout" value="1000"/>
</bean>
上記の例は、sessionCacheSize
を 10
に設定し、sessionWaitTimeout
を 1 秒に設定して作成された CachingSessionFactory
を示しています(その値はミリ秒単位です)。
Spring Integration 3.0 以降、CachingConnectionFactory
は resetCache()
メソッドを提供します。呼び出されると、すべてのアイドルセッションがすぐに閉じられ、使用中のセッションがキャッシュに戻されると閉じられます。セッションの新しいリクエストにより、必要に応じて新しいセッションが確立されます。
バージョン 5.1 以降、CachingSessionFactory
には新しいプロパティ testSession
があります。true の場合、NOOP コマンドを送信してセッションがテストされ、セッションがまだアクティブであることを確認します。そうでない場合、キャッシュから削除されます。アクティブなセッションがキャッシュにない場合、新しいセッションが作成されます。
RemoteFileTemplate
を使用する
Spring Integration 3.0 以降、FtpSession
オブジェクトに対して新しい抽象化が提供されています。テンプレートには、ファイルを送信、取得(InputStream
として)、削除、名前変更するメソッドが用意されています。さらに、execute
メソッドが提供され、呼び出し側はセッションで複数の操作を実行できます。すべての場合において、テンプレートは確実にセッションを閉じる処理を行います。詳細については、RemoteFileTemplate
の Javadoc を参照してください。FTP にはサブクラス FtpRemoteFileTemplate
があります。
バージョン 4.1 では、getClientInstance()
を含む追加のメソッドが追加されました。getClientInstance()
は、基礎となる FTPClient
へのアクセスを提供し、低レベル API へのアクセスを提供します。
すべての FTP サーバーが STAT <path>
コマンドを適切に実装しているわけではありません。存在しないパスに対して肯定的な結果を返すものもあります。NLST
コマンドは、パスがファイルで存在する場合、名前を確実に返します。ただし、パスがディレクトリの場合、NLST
は常に空のリストを返すため、空のディレクトリが存在するかどうかのチェックはサポートしていません。テンプレートはパスがディレクトリを表すかどうかを知らないため、パスが存在しないように見える場合(NLST
を使用する場合)、追加のチェックを実行する必要があります。これによりオーバーヘッドが追加され、サーバーへの複数のリクエストが必要になります。バージョン 4.1.9 以降、FtpRemoteFileTemplate
は FtpRemoteFileTemplate.ExistsMode
プロパティを提供します。これには次のオプションがあります。
STAT
:STAT
FTP コマンド(FTPClient.getStatus(path)
)を実行して、パスの存在を確認します。これはデフォルトであり、FTP サーバーがSTAT
コマンド(パス付き)を適切にサポートする必要があります。NLST
:NLST
FTP コマンド—FTPClient.listName(path)
を実行します。ファイルへのフルパスであるパスをテストする場合に使用します。空のディレクトリでは機能しません。NLST_AND_DIRS
: 最初にNLST
コマンドを実行し、ファイルが返されない場合は、FTPClient.changeWorkingDirectory(path)
を使用して一時的に作業ディレクトリを切り替える手法にフォールバックします。詳細については、FtpSession.exists()
(Javadoc) を参照してください。
FileExistsMode.FAIL
の場合は常にファイル(ディレクトリではなく)のみを検索することがわかっているため、FtpMessageHandler
および FtpOutboundGateway
コンポーネントには NLST
モードを安全に使用します。
その他の場合は、FtpRemoteFileTemplate
を継承して、オーバーライドされた exist()
メソッドでカスタムロジックを実装できます。
バージョン 5.0 から、新しい RemoteFileOperations.invoke(OperationsCallback<F, T> action)
メソッドが利用可能になりました。このメソッドを使用すると、複数の RemoteFileOperations
呼び出しを、同じスレッド限定の Session
のスコープ内で呼び出すことができます。これは、RemoteFileTemplate
の複数の高レベル操作を 1 つの作業単位として実行する必要がある場合に役立ちます。例: AbstractRemoteFileOutboundGateway
は mput
コマンドの実装でそれを使用し、提供されたディレクトリ内の各ファイルに対して、およびそのサブディレクトリに対して再帰的に put
操作を実行します。詳細については、Javadoc を参照してください。
MessageSessionCallback
を使用する
Spring Integration 4.2 から、<int-ftp:outbound-gateway/>
(Java の FtpOutboundGateway
)で MessageSessionCallback<F, T>
実装を使用して、requestMessage
コンテキストで Session<FTPFile>
の操作を実行できます。次の例に示すように、これは非標準または低レベルの FTP 操作に使用でき、統合フロー定義および関数インターフェース(Lambda)実装インジェクションからのアクセスを許可します。
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler ftpOutboundGateway(SessionFactory<FTPFile> sessionFactory) {
return new FtpOutboundGateway(sessionFactory,
(session, requestMessage) -> session.list(requestMessage.getPayload()));
}
別の例は、送信または取得されるファイルデータの前処理または後処理です。
XML 構成を使用する場合、<int-ftp:outbound-gateway/>
は session-callback
属性を提供して、MessageSessionCallback
Bean 名を指定できるようにします。
session-callback は、command および expression 属性と相互に排他的です。Java を使用して構成する場合、FtpOutboundGateway (Javadoc) クラスでさまざまなコンストラクターを使用できます。 |
Apache Mina FTP サーバーイベント
バージョン 5.2 で追加された ApacheMinaFtplet
は、特定の Apache Mina FTP サーバーイベントをリッスンし、ApplicationListener
Bean、@EventListener
Bean メソッド、またはイベント受信チャネルアダプターで受信できる ApplicationEvent
として公開します。
現在サポートされているイベントは次のとおりです。
SessionOpenedEvent
- クライアントセッションが開かれましたDirectoryCreatedEvent
- ディレクトリが作成されましたFileWrittenEvent
- ファイルが書き込まれたPathMovedEvent
- ファイルまたはディレクトリの名前が変更されましたPathRemovedEvent
- ファイルまたはディレクトリが削除されましたSessionClosedEvent
- クライアントが切断されました
これらはそれぞれ ApacheMinaFtpEvent
のサブクラスです。すべてのイベント型を受信する単一のリスナーを設定できます。各イベントの source
プロパティは FtpSession
であり、そこからクライアントアドレスなどの情報を取得できます。便利な getSession()
メソッドが抽象イベントで提供されます。
セッションのオープン / クローズ以外のイベントには、コマンドや引数などのプロパティを持つ別のプロパティ FtpRequest
があります。
リスナー(Spring Bean である必要があります)でサーバーを構成するには、サーバーファクトリに追加します。
FtpServerFactory serverFactory = new FtpServerFactory();
...
ListenerFactory factory = new ListenerFactory();
...
serverFactory.addListener("default", factory.createListener());
serverFactory.setFtplets(new HashMap<>(Collections.singletonMap("springFtplet", apacheMinaFtpletBean)));
server = serverFactory.createServer();
server.start();
Spring Integration イベントアダプターを使用してこれらのイベントを消費するには:
@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
ApplicationEventListeningMessageProducer producer =
new ApplicationEventListeningMessageProducer();
producer.setEventTypes(ApacheMinaFtpEvent.class);
producer.setOutputChannel(eventChannel());
return producer;
}
リモートファイル情報
バージョン 5.2 以降、FtpStreamingMessageSource
(FTP ストリーミング受信チャネルアダプター)、FtpInboundFileSynchronizingMessageSource
(FTP 受信チャネルアダプター)、FtpOutboundGateway
(FTP 送信ゲートウェイ)の -commands の「読み取り」は、リモートファイルに関する情報を生成する追加のヘッダーをメッセージに提供します。
FileHeaders.REMOTE_HOST_PORT
- ファイル転送操作中にリモートセッションが接続された host:port ペア。FileHeaders.REMOTE_DIRECTORY
- 操作が実行されたリモートディレクトリ。FileHeaders.REMOTE_FILE
- リモートファイル名。単一ファイル操作にのみ適用されます。
FtpInboundFileSynchronizingMessageSource
はリモートファイルに対してメッセージを生成せず、ローカルコピーを使用するため、AbstractInboundFileSynchronizer
はリモートファイルに関する情報を、同期操作中に URI スタイル(protocol://host:port/remoteDirectory#remoteFileName
)で MetadataStore
(外部で構成可能)に保存します。このメタデータは、ローカルファイルがポーリングされるときに FtpInboundFileSynchronizingMessageSource
によって取得されます。ローカルファイルが削除された場合、そのメタデータエントリを削除することをお勧めします。AbstractInboundFileSynchronizer
は、この目的のために removeRemoteFileMetadata()
コールバックを提供します。さらに、メタデータキーで使用される setMetadataStorePrefix()
があります。これらのコンポーネント間で同じ MetadataStore
インスタンスが共有される場合、フィルターと AbstractInboundFileSynchronizer
の両方がメタデータエントリに同じローカルファイル名を使用するため、このプレフィックスを MetadataStore
ベースの FileListFilter
実装で使用されるものと異なるものにすることをお勧めします。キー。