クラス FileReadingMessageSource

実装されたすべてのインターフェース:
AwareBeanFactoryAwareBeanNameAwareDisposableBeanInitializingBeanLifecycleMessageSource<FileSE>IntegrationPatternNamedComponentIntegrationInboundManagementIntegrationManagementManageableLifecycle

public class FileReadingMessageSource extends AbstractMessageSource<FileSE> implements ManageableLifecycle
ファイルシステムディレクトリからメッセージを作成する MessageSource。特定のファイルのメッセージを防ぐために、FileListFilter を指定できます。デフォルトでは、XML または DSL で構成する場合、AcceptOnceFileListFilter が使用されます。これにより、ファイルがディレクトリから 1 回だけ取得されるようになります。

ファイルの読み取りに関する一般的な問題は、準備が整う前にファイルが検出される可能性があることです。デフォルトの AcceptOnceFileListFilter はこれを防ぎません。ほとんどの場合、ファイル書き込みプロセスが読み取りの準備ができたらすぐに各ファイルの名前を変更すると、これを防ぐことができます。デフォルトの AcceptOnceFileListFilter で構成された、準備ができている(たとえば、既知のサフィックスに基づく)ファイルのみを受け入れるパターンマッチングフィルターを使用すると、これが可能になります。

外部 DirectoryScanner を使用する場合は、FileReadingMessageSource のインスタンスではなく、FileLocker および FileListFilter オブジェクトを外部 DirectoryScanner に設定する必要があります。それ以外の場合は、IllegalStateExceptionSE が発生します。

ComparatorSE を使用して、PriorityBlockingQueueSE 内のファイルの内部順序を確認できます。これは ResequencingMessageGroupProcessor と同じ保証を提供しませんが、ファイルの書き込みとダウンストリームの失敗がまれな場合はそれで十分かもしれません。

FileReadingMessageSource は、receive() の同時呼び出しとメッセージ配信コールバックで完全にスレッドセーフです。

作成者:
Iwein Fuld, Mark Fisher, Oleg Zhurakousky, Gary Russell, Artem Bilan, Steven Pearce, Patryk Ziobron
  • コンストラクターの詳細

    • FileReadingMessageSource

      public FileReadingMessageSource()
      無制限の容量の自然に順序付けられたキューを使用して FileReadingMessageSource を作成します。
    • FileReadingMessageSource

      public FileReadingMessageSource(int internalQueueCapacity)
      指定された容量の制限付きキューを使用して FileReadingMessageSource を作成します。これは、大きなディレクトリから読み取るときに、このコンポーネントのメモリフットプリントを削減するために使用できます。
      パラメーター:
      internalQueueCapacity - 内部で受信するファイルをキャッシュするために使用されるキューのサイズ。このキューを大きくして、ディレクトリスキャンを最適化することができます。scanEachPoll を false に設定し、キューを大きなサイズに設定すると、新しいディレクトリリストが作成される前に、一度いっぱいになり、完全に空になります。これは、ディレクトリ内の多数のファイルのスキャンを減らすのに特に役立ちます。
    • FileReadingMessageSource

      public FileReadingMessageSource(@Nullable ComparatorSE<FileSE> receptionOrderComparator)
      渡された ComparatorSE で順序付けされた PriorityBlockingQueueSE を使用して FileReadingMessageSource を作成します。

      使用するキューのサイズは、すべてのファイルを並べ替えるために、入力ディレクトリ内のすべてのファイルを保持するのに十分な大きさである必要があります。キューのサイズを制限することは、順序付けと相互に排他的です。同時アクセスでは、ファイル配信のオーダーについて保証することはできません。

      パラメーター:
      receptionOrderComparator - 内部キュー内のファイルを順序付けるために使用されるコンパレータ
  • メソッドの詳細

    • setDirectory

      public void setDirectory(FileSE directory)
      入力ディレクトリを指定します。
      パラメーター:
      directory - 監視するために
    • setScanner

      public void setScanner(DirectoryScanner scanner)
      オプションで、FileReadingMessageSource.WatchServiceDirectoryScanner などのカスタムスキャナーを指定します。
      パラメーター:
      scanner - スキャナーの実装
    • getScanner

      public DirectoryScanner getScanner()
      FileReadingMessageSource Bean を使用して、実行時にオプション(filterlocker など)を変更できるようにする scanner プロパティアクセサー。
      戻り値:
      この FileReadingMessageSourceDirectoryScanner
      導入:
      4.2
    • setAutoCreateDirectory

      public void setAutoCreateDirectory(boolean autoCreateDirectory)
      初期化時にソースディレクトリがまだ存在しない場合に、ソースディレクトリを自動的に作成するかどうかを指定します。デフォルトでは、この値は true です。false に設定され、ソースディレクトリが存在しない場合、初期化時に例外がスローされます。
      パラメーター:
      autoCreateDirectory - このコンポーネントの起動時に、監視対象のディレクトリを作成する必要がありますか?
    • setFilter

      public void setFilter(FileListFilter<FileSE> filter)
      FileListFilter を設定します。デフォルトでは、境界のない AcceptOnceFileListFilter が使用されます。ほとんどの場合、変更や重複の関心事に対処するために、カスタマイズされた FileListFilter が必要になります。複数のフィルターが必要な場合は、CompositeFileListFilter を使用してグループ化できます。

      付属のフィルターはスレッドセーフである必要があります

      パラメーター:
      filter - フィルター
    • setLocker

      public void setLocker(FileLocker locker)
      重複処理からファイルを保護するために使用する FileLocker を設定します。

      付属の FileLocker はスレッドセーフである必要があります

      パラメーター:
      locker - ロッカー
    • setScanEachPoll

      public void setScanEachPoll(boolean scanEachPoll)
      内部キューが各ポーリングで入力ディレクトリの最新のコンテンツでリフレッシュされるようにする場合は、このフラグを設定します。

      デフォルトでは、この実装はディレクトリを再度参照する前にキューを空にします。順序が関係する場合は、このフラグを設定した場合の影響を考慮することが重要です。このクラスが保持している内部 BlockingQueueSE は、このフラグが false に設定されている場合はファイルシステムと同期していない可能性が高くなりますが、true に設定されている場合はより頻繁に変更されます (コストのかかる再順序付けが発生します)。

      パラメーター:
      scanEachPoll - コンポーネントを再スキャンする必要があるかどうか (バックログ全体が配信されるまで再スキャンしないのとは対照的に)
    • setUseWatchService

      public void setUseWatchService(boolean useWatchService)
      この FileReadingMessageSource を切り替えて、内部 FileReadingMessageSource.WatchServiceDirectoryScanner を使用します。
      パラメーター:
      useWatchService - true で FileReadingMessageSource.WatchServiceDirectoryScanner に切り替える boolean フラグ。
      導入:
      4.3
      関連事項:
    • isUseWatchService

      public boolean isUseWatchService()
    • setWatchEvents

      public void setWatchEvents(FileReadingMessageSource.WatchEventType... watchEvents)
      WatchServiceSE イベント型。setUseWatchService(boolean) が true でない場合、このオプションは無視されます。
      パラメーター:
      watchEvents - FileReadingMessageSource.WatchEventType のセット。
      導入:
      4.3
      関連事項:
    • setWatchMaxDepth

      public void setWatchMaxDepth(int watchMaxDepth)
      useWatchService が有効な場合、Files.walkFileTree(Path, Set, int, FileVisitor)SE API の最大深さを設定します。デフォルトは Integer.MAX_VALUESE - ツリー全体を歩きます。
      パラメーター:
      watchMaxDepth - Files.walkFileTree(Path, Set, int, FileVisitor)SE の深さ。
      導入:
      6.1
    • setWatchDirPredicate

      public void setWatchDirPredicate(PredicateSE<PathSE> watchDirPredicate)
      Files.walkFileTree(Path, Set, int, FileVisitor)SE 呼び出しでディレクトリが WatchServiceSE に適格であるかどうかを確認するように PredicateSE を設定します。
      パラメーター:
      watchDirPredicate - PredicateSE を使用して、ウォーキングのディレクトリを確認します。
      導入:
      6.1
    • getComponentType

      public StringSE getComponentType()
      次で指定:
      インターフェース NamedComponentgetComponentType 
    • start

      public void start()
      次で指定:
      インターフェース Lifecyclestart 
      次で指定:
      インターフェース ManageableLifecyclestart 
    • stop

      public void stop()
      次で指定:
      インターフェース Lifecyclestop 
      次で指定:
      インターフェース ManageableLifecyclestop 
    • isRunning

      public boolean isRunning()
      次で指定:
      インターフェース LifecycleisRunning 
      次で指定:
      インターフェース ManageableLifecycleisRunning 
    • onInit

      protected void onInit()
      オーバーライド:
      クラス AbstractExpressionEvaluatoronInit 
    • doReceive

      protected AbstractIntegrationMessageBuilder<FileSE> doReceive()
      クラスからコピーされた説明: AbstractMessageSource
      サブクラスはこのメソッドを実装する必要があります。通常、戻り値は T 型の payload ですが、戻り値はペイロードが T 型の Message インスタンスでもかまいません。追加のヘッダーの作成に使用される AbstractIntegrationMessageBuilder にすることもできます。
      次で指定:
      クラス AbstractMessageSource<FileSE>doReceive 
      戻り値:
      返された値。
    • onFailure

      public void onFailure(Message<FileSE> failedMessage)
      余裕がある場合は、失敗したメッセージを "toBeReceived" キューに追加し直します。
      パラメーター:
      failedMessage - 失敗した Message