フィードアダプター

Spring Integration は、フィードアダプターを介したシンジケーションのサポートを提供します。実装は ROME フレームワーク (英語) に基づいています。

この依存関係をプロジェクトに含める必要があります。

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-feed</artifactId>
    <version>5.5.8</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-feed:5.5.8"

Web シンジケーションは、ニュース記事、プレスリリース、ブログ投稿、その他の Web サイトで一般的に利用可能な資料を公開する方法ですが、RSS や ATOM などのフィード形式でも利用できます。

Spring 統合は、その「フィード」アダプターを介して Web シンジケーションをサポートし、便利な名前空間ベースの構成を提供します。'feed' 名前空間を構成するには、XML 構成ファイルのヘッダー内に次の要素を含めます。

xmlns:int-feed="http://www.springframework.org/schema/integration/feed"
xsi:schemaLocation="http://www.springframework.org/schema/integration/feed
	https://www.springframework.org/schema/integration/feed/spring-integration-feed.xsd"

フィード受信チャネルアダプター

フィードの取得をサポートするために本当に必要な唯一のアダプターは、受信チャネルアダプターです。特定の URL にサブスクライブできます。次の例は、可能な構成を示しています。

<int-feed:inbound-channel-adapter id="feedAdapter"
        channel="feedChannel"
        url="https://feeds.bbci.co.uk/news/rss.xml">
    <int:poller fixed-rate="10000" max-messages-per-poll="100" />
</int-feed:inbound-channel-adapter>

上記の構成では、url 属性によって識別される URL にサブスクライブしています。

ニュース項目が取得されると、それらはメッセージに変換され、channel 属性によって識別されるチャネルに送信されます。各メッセージのペイロードは com.sun.syndication.feed.synd.SyndEntry インスタンスです。それぞれがニュースアイテムに関するさまざまなデータ(コンテンツ、日付、作成者、その他の詳細)をカプセル化します。

受信フィードチャネルアダプターは、ポーリングコンシューマーです。つまり、ポーラー構成を提供する必要があります。ただし、フィードに関して理解しなければならない重要なことの 1 つは、その内部の動作が他のほとんどのポーリングコンシューマーとわずかに異なることです。受信フィードアダプターが開始されると、最初のポーリングを行い、com.sun.syndication.feed.synd.SyndEntryFeed インスタンスを受け取ります。そのオブジェクトには、複数の SyndEntry オブジェクトが含まれています。各エントリはローカルエントリキューに格納され、各メッセージに単一のエントリが含まれるように、max-messages-per-poll 属性の値に基づいて解放されます。エントリキューからのエントリの取得中にキューが空になった場合、アダプターはフィードの更新を試行し、それにより、利用可能なエントリ(SyndEntry インスタンス)がキューに追加されます。それ以外の場合、フィードのポーリングの次の試行は、ポーラーのトリガーによって決定されます(前の構成では 10 秒ごと)。

重複エントリ

フィードのポーリングを行うと、すでに処理されたエントリが発生する可能性があります(「そのニュース項目をすでに読みました。なぜ再び表示するのですか?」)。Spring Integration は、重複エントリを心配する必要をなくす便利なメカニズムを提供します。各フィードエントリには「公開日」フィールドがあります。新しい Message が生成および送信されるたびに、Spring Integration は MetadataStore ストラテジーのインスタンスに最新の公開日の値を保存します(メタデータストアを参照)。

最新の公開日を保持するために使用されるキーは、フィード受信チャネルアダプターコンポーネントの(必須の) id 属性の値と、アダプターの構成からの feedUrl (存在する場合)です。

その他のオプション

バージョン 5.0 から、非推奨の com.rometools.fetcher.FeedFetcher オプションが削除され、org.springframework.core.io.Resource のオーバーロードされた FeedEntryMessageSource コンストラクターが提供されます。これは、フィードソースが HTTP エンドポイントではなく、他のリソース(FTP 上のローカルまたはリモートなど)である場合に役立ちます。FeedEntryMessageSource ロジックでは、そのようなリソース(または提供された URL)は、前述の処理のために SyndFeedInput によって SyndFeed オブジェクトに解析されます。また、カスタマイズされた SyndFeedInput (たとえば、allowDoctypes オプションを使用)インスタンスを FeedEntryMessageSource に挿入することもできます。

フィードへの接続にカスタマイズが必要な場合。接続と読み取りのタイムアウトでは、FeedEntryMessageSource へのプレーンな URL インジェクションの代わりに、customizeConnection(HttpURLConnection) オーバーライドを備えた org.springframework.core.io.UrlResource 拡張機能を使用する必要があります。例:

@Bean
@InboundChannelAdapter("feedChannel")
FeedEntryMessageSource feedEntrySource() {
    UrlResource urlResource =
	    new UrlResource(url) {

	        @Override
	        protected void customizeConnection(HttpURLConnection connection) throws IOException {
	            super.customizeConnection(connection);
	            connection.setConnectTimeout(10000);
	            connection.setReadTimeout(5000);
	        }
	    };
    return new FeedEntryMessageSource(urlResource, "myKey");
}

Java DSL 設定

次の Spring Boot アプリケーションは、Java DSL で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class FeedJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FeedJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Value("org/springframework/integration/feed/sample.rss")
    private Resource feedResource;

    @Bean
    public MetadataStore metadataStore() {
        PropertiesPersistingMetadataStore metadataStore = new PropertiesPersistingMetadataStore();
        metadataStore.setBaseDirectory(tempFolder.getRoot().getAbsolutePath());
        return metadataStore;
    }

    @Bean
    public IntegrationFlow feedFlow() {
        return IntegrationFlows
                .from(Feed.inboundAdapter(this.feedResource, "feedTest")
                                .metadataStore(metadataStore()),
                        e -> e.poller(p -> p.fixedDelay(100)))
                .channel(c -> c.queue("entries"))
                .get();
    }

}