ファイル分割
FileSplitter
はバージョン 4.1.2 で追加され、その名前空間サポートはバージョン 4.2 で追加されました。FileSplitter
は、BufferedReader.readLine()
に基づいてテキストファイルを個々の行に分割します。デフォルトでは、スプリッターは Iterator
を使用して、ファイルから読み取られる行を一度に 1 行ずつ出力します。iterator
プロパティを false
に設定すると、メッセージとして送信する前にすべての行がメモリに読み込まれます。この使用例の 1 つは、行を含むメッセージを送信する前にファイルの I/O エラーを検出する場合です。ただし、比較的短いファイルに対してのみ実用的です。
受信ペイロードは、File
、String
(File
パス)、InputStream
、Reader
です。他のペイロード型は変更されずに発行されます。
次のリストは、FileSplitter
を構成するための可能な方法を示しています。
Java DSL
Kotlin DSL
Java
XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
1 | スプリッターの Bean 名。 |
2 | true (デフォルト)に設定すると、イテレーターまたは false を使用して、行を送信する前にファイルをメモリにロードします。 |
3 | true に設定すると、ファイルデータの前後にファイル開始マーカーおよびファイル終了マーカーのメッセージが出力されます。マーカーは、FileSplitter.FileMarker ペイロードを持つメッセージです(mark プロパティに START および END 値が含まれます)。一部の行がフィルタリングされるダウンストリームフローでファイルを順次処理するときに、マーカーを使用できます。これにより、ファイルが完全に処理されたことをダウンストリーム処理で知ることができます。さらに、START または END を含む file_marker ヘッダーがこれらのメッセージに追加されます。END マーカーには行カウントが含まれます。ファイルが空の場合、START および END マーカーのみが lineCount として 0 で発行されます。デフォルトは false です。true の場合、apply-sequence はデフォルトで false です。markers-json (次の属性)も参照してください。 |
4 | markers が true の場合、これを true に設定して、FileMarker オブジェクトを JSON 文字列に変換します。(下に SimpleJsonSerializer を使用)。 |
5 | sequenceSize および sequenceNumber ヘッダーをメッセージに含めることを無効にするには、false に設定します。markers が true でない限り、デフォルトは true です。true および markers が true の場合、マーカーはシーケンスに含まれます。true および iterator が true の場合、サイズが不明であるため、sequenceSize ヘッダーは 0 に設定されます。 |
6 | true に設定すると、ファイルに行がない場合に RequiresReplyException がスローされます。デフォルトは false です。 |
7 | テキストデータを String ペイロードに読み込むときに使用する文字セット名を設定します。デフォルトはプラットフォームの文字セットです。 |
8 | 残りの行に対して発行されるメッセージのヘッダーとして搬送される最初の行のヘッダー名。バージョン 5.0 以降。 |
9 | メッセージをスプリッターに送信するために使用される入力チャネルを設定します。 |
10 | メッセージの送信先の出力チャネルを設定します。 |
11 | 送信タイムアウトを設定します。output-channel がブロックできる場合(フル QueueChannel など)にのみ適用されます。 |
12 | false に設定すると、コンテキストがリフレッシュされたときにスプリッターが自動的に開始されなくなります。デフォルトは true です。 |
13 | input-channel が <publish-subscribe-channel/> である場合、このエンドポイントの順序を設定します。 |
14 | スプリッターの起動フェーズを設定します(auto-startup が true の場合に使用)。 |
FileSplitter
はまた、テキストベースの InputStream
を行に分割します。バージョン 4.3 以降、FTP または SFTP ストリーミング受信チャネルアダプター、または stream
オプションを使用してファイルを取得する FTP または SFTP 送信ゲートウェイと併用すると、スプリッターはファイルが完全に終了したときにストリームをサポートするセッションを自動的に閉じます。消費これらの機能の詳細については、FTP ストリーミング受信チャネルアダプターおよび SFTP ストリーミング受信チャネルアダプターならびに FTP 送信ゲートウェイおよび SFTP 送信ゲートウェイを参照してください。
Java 構成を使用する場合、次の例に示すように、追加のコンストラクターを使用できます。
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
markersJson
が true の場合、マーカーは JSON 文字列として表されます(SimpleJsonSerializer
を使用)。
バージョン 5.0 では、コンテンツの最初の行がヘッダー (CSV ファイルの列名など) であることを指定する firstLineAsHeader
オプションが導入されました。このプロパティに渡される引数は、残りの行に対して発行されるメッセージで最初の行がヘッダーとして運ばれるヘッダー名です。この行は、シーケンスヘッダー (applySequence
が true の場合) にも、FileMarker.END
に関連付けられた lineCount
にも含まれません。注: バージョン 5.5 以降では、FileMarker
を JSON に直列化できるため、lineCount ` も FileMarker.END
メッセージのヘッダーに FileHeaders.LINE_COUNT
として含まれるようになりました。ファイルにヘッダー行のみが含まれている場合、ファイルは空として扱われるため、分割中に FileMarker
インスタンスのみが発行されます (マーカーが有効になっている場合。それ以外の場合はメッセージは発行されません)。デフォルトでは (ヘッダー名が設定されていない場合)、最初の行はデータと見なされ、最初に発行されたメッセージのペイロードになります。
ファイルの内容 (最初の行ではない、行の内容全体ではない、特定の 1 つのヘッダーではないなど) からのヘッダー抽出に関してより複雑なロジックが必要な場合は、FileSplitter
の前にヘッダーエンリッチャーを使用することを検討してください。ヘッダーに移動された行は、通常のコンテンツプロセスの下流でフィルターされる可能性があることに注意してください。
分割ファイルを処理するべき等べき下流
apply-sequence
が true の場合、スプリッターは SEQUENCE_NUMBER
ヘッダーに行番号を追加します(markers
が true の場合、マーカーは行としてカウントされます)。行番号をべき等レシーバーで使用すると、再起動後の行の再処理を回避できます。
例:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}