SFTP ストリーミング受信チャネルアダプター
Version 4.3 introduced the streaming inbound channel adapter. This adapter produces a message with payloads of type InputStream, letting you fetch files without writing to the local file system. Since the session remains open, the consuming application is responsible for closing the session when the file has been consumed. The session is provided in the closeableResource header (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE). Standard framework components, such as the FileSplitter and StreamTransformer, automatically close the session. See ファイル分割 and ストリームトランス for more information about these components. The following example shows how to configure an SFTP streaming inbound channel adapter:
<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>filename-pattern、filename-regex、filter または filter-expression のいずれか 1 つのみを使用できます。
バージョン 5.0 以降、デフォルトでは、SftpStreamingMessageSource アダプターは、メモリ内 SimpleMetadataStore に基づいて SftpPersistentAcceptOnceFileListFilter を使用することにより、リモートファイルの重複を防ぎます。デフォルトでは、このフィルターもファイル名パターン(または正規表現)とともに適用されます。重複を許可する必要がある場合は、AcceptAllFileListFilter を使用できます。CompositeFileListFilter (または ChainFileListFilter)を使用して、他のユースケースを処理できます。後で示す Java 構成は、処理後にリモートファイルを削除して重複を回避する 1 つの手法を示しています。 |
SftpPersistentAcceptOnceFileListFilter の詳細と使用方法については、リモート永続ファイルリストフィルターを参照してください。
max-fetch-size 属性を使用して、フェッチが必要な場合に各ポーリングでフェッチされるファイルの数を制限できます。1 に設定し、クラスター環境で実行する場合は永続フィルターを使用します。詳細については、受信チャネルアダプター: リモートファイルフェッチの制御を参照してください。
アダプターは、リモートディレクトリとファイル名をヘッダーに入れます (それぞれ FileHeaders.REMOTE_DIRECTORY と FileHeaders.REMOTE_FILE)。バージョン 5.0 以降、FileHeaders.REMOTE_FILE_INFO ヘッダーは追加の リモートファイル情報を (JSON で) 提供します。SftpStreamingMessageSource の fileInfoJson プロパティを false に設定すると、ヘッダーに SftpFileInfo オブジェクトが含まれます。SftpFileInfo.getFileInfo() メソッドを使用して、基になる SftpClient によって提供される SftpClient.DirEntry オブジェクトにアクセスできます。XML 構成を使用する場合、fileInfoJson プロパティは使用できませんが、構成クラスの 1 つに SftpStreamingMessageSource を挿入することによって設定できます。リモートファイル情報も参照してください。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory("sftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + '/' + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
} この例では、トランスフォーマーの下流にあるメッセージハンドラーに、処理後に リモートファイルを削除する advice があることに注意してください。