受信チャネルアダプター: 複数のサーバーとディレクトリのポーリング

バージョン 5.0.7 以降、RotatingServerAdvice が利用可能になりました。ポーリングアドバイスとして構成されている場合、受信アダプターは複数のサーバーとディレクトリをポーリングできます。通常どおり、アドバイスを構成し、ポーラーのアドバイスチェーンに追加します。DelegatingSessionFactory を使用してサーバーを選択します。詳細については、セッションファクトリの委譲を参照してください。アドバイス構成は、RotationPolicy.KeyDirectory オブジェクトのリストで構成されています。

サンプル
@Bean
public RotatingServerAdvice advice() {
    List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
    keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
    return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

このアドバイスは、新しいファイルが存在しなくなるまでサーバー one 上のディレクトリ foo をポーリングし、その後、サーバー two 上のディレクトリ bar、次にディレクトリ baz に移動します。

このデフォルトの動作は、fair コンストラクター arg を使用して変更できます。

公平
@Bean
public RotatingServerAdvice advice() {
    ...
    return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}

この場合、前回のポーリングでファイルが返されたかどうかに関係なく、アドバイスは次のサーバー / ディレクトリに移動します。

または、独自の RotationPolicy を提供して、必要に応じてメッセージソースを再構成できます。

ポリシー
public interface RotationPolicy {

    void beforeReceive(MessageSource<?> source);

    void afterReceive(boolean messageReceived, MessageSource<?> source);

}

および

カスタム
@Bean
public RotatingServerAdvice advice() {
    return new RotatingServerAdvice(myRotationPolicy());
}

local-filename-generator-expression 属性(シンクロナイザーの localFilenameGeneratorExpression)に #remoteDirectory 変数を含めることができるようになりました。これにより、異なるディレクトリから取得したファイルを同様のディレクトリにローカルにダウンロードできます。

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from(Sftp.inboundAdapter(sf())
                    .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
                    .localDirectory(new File(tmpDir))
                    .localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
                    .remoteDirectory("."),
                e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
            .channel(MessageChannels.queue("files"))
            .get();
}
このアドバイスを使用するときは、ポーラーで TaskExecutor を構成しないでください。詳細については、メッセージソースの条件付きポーラーを参照してください。

取得されたすべてのファイルが単一のポーリングサイクル内で処理されず、SessionFactory が別のものにローテーションされる可能性がある場合は、便利な AbstractRemoteFileStreamingMessageSource.clearFetchedCache() API も参照してください。