ポーラー

このセクションでは、Spring Integration でのポーリングの仕組みについて説明します。

ポーリングコンシューマー

メッセージエンドポイント(チャネルアダプター)がチャネルに接続され、インスタンス化されると、次のいずれかのインスタンスが生成されます。

実際の実装は、これらのエンドポイントが接続するチャネルの型によって異なります。org.springframework.messaging.SubscribableChannel (Javadoc) インターフェースを実装するチャネルに接続されたチャネルアダプターは、EventDrivenConsumer のインスタンスを生成します。一方、org.springframework.messaging.PollableChannel (Javadoc) インターフェース(QueueChannel など)を実装するチャネルに接続されたチャネルアダプターは、PollingConsumer のインスタンスを生成します。

ポーリングコンシューマーにより、Spring Integration コンポーネントは、イベント駆動型の方法でメッセージを処理するのではなく、メッセージをアクティブにポーリングできます。

これらは、多くのメッセージングシナリオで重要な横断的関心事です。Spring Integration では、ポーリングコンシューマーは同じ名前のパターンに基づいています。これは、Gregor Hohpe と Bobby Woolf の書籍 Enterprise Integration Patterns で説明されています。パターンの説明は、本の Web サイト (英語) で見つけることができます。

ポーリング可能なメッセージソース

Spring Integration は、ポーリングコンシューマーパターンの 2 番目のバリエーションを提供します。受信チャネルアダプターが使用される場合、これらのアダプターは多くの場合 SourcePollingChannelAdapter によってラップされます。例: リモート FTP サーバーの場所からメッセージを取得する場合、FTP 受信チャネルアダプターで説明されているアダプターは、定期的にメッセージを取得するポーラーで構成されます。そのため、コンポーネントがポーラーで構成されている場合、結果のインスタンスは次のいずれかの型になります。

つまり、ポーラーは受信と送信の両方のメッセージングシナリオで使用されます。ポーラーが使用されるいくつかの使用例は次のとおりです。

  • FTP サーバー、データベース、Web サービスなどの特定の外部システムのポーリング

  • 内部(ポーリング可能)メッセージチャネルのポーリング

  • 内部サービスのポーリング (Java クラスでメソッドを繰り返し実行するなど)

AOP アドバイスクラスは、トランザクションを開始するトランザクションアドバイスなど、advice-chain のポーラーに適用できます。バージョン 4.1 から、PollSkipAdvice が提供されます。ポーラーはトリガーを使用して、次のポーリングの時間を決定します。PollSkipAdvice を使用して、おそらくメッセージの処理を妨げるダウンストリーム条件があるため、ポーリングを抑制(スキップ)することができます。このアドバイスを使用するには、PollSkipStrategy の実装を提供する必要があります。バージョン 4.2.5 から、SimplePollSkipStrategy が提供されます。それを使用するには、インスタンスを Bean としてアプリケーションコンテキストに追加し、それを PollSkipAdvice に挿入し、それをポーラーのアドバイスチェーンに追加します。ポーリングをスキップするには、skipPolls() を呼び出します。ポーリングを再開するには、reset() を呼び出します。バージョン 4.2 は、この領域に柔軟性を追加しました。メッセージソースの条件付きポーラーを参照してください。

この章の目的は、ポーリングコンシューマーの概要と、メッセージコンシューマー(メッセージチャンネルを参照)およびチャネルアダプター(チャンネルアダプターを参照)の概念にどのように適合するかを説明することのみです。一般的なメッセージングエンドポイントおよび特にポーリングコンシューマーに関する詳細については、メッセージエンドポイントを参照してください。

遅延確認応答可能なメッセージソース

バージョン 5.0.1 以降、特定のモジュールは、ダウンストリームフローが完了する(またはメッセージを別のスレッドに渡す)まで確認応答を延期することをサポートする MessageSource 実装を提供します。これは現在、AmqpMessageSource と KafkaMessageSource に限定されています。

これらのメッセージソースでは、IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK ヘッダー(MessageHeaderAccessor API を参照)がメッセージに追加されます。ポーリング可能なメッセージソースで使用する場合、次の例に示すように、ヘッダーの値は AcknowledgmentCallback のインスタンスです。

@FunctionalInterface
public interface AcknowledgmentCallback {

    void acknowledge(Status status);

    boolean isAcknowledged();

    void noAutoAck();

    default boolean isAutoAck();

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

すべてのメッセージソース(たとえば、KafkaMessageSource)が REJECT ステータスをサポートしているわけではありません。ACCEPT と同様に扱われます。

次の例に示すように、アプリケーションはいつでもメッセージを確認できます。

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

MessageSource が SourcePollingChannelAdapter に接続されている場合、ダウンストリームフローの補完後にポーラースレッドがアダプターに戻ると、アダプターは確認応答がすでに確認済みであるかどうかを確認し、確認されていない場合、そのステータスを ACCEPT it(または、フローが REJECT 例外をスローします)。ステータス値は AcknowledgmentCallback.Status 列挙 (Javadoc) で定義されています。

Spring Integration は、MessageSourcePollingTemplate を提供して、MessageSource のアドホックポーリングを実行します。これも、MessageHandler コールバックが戻る(または例外をスローする)ときに AcknowledgmentCallback で ACCEPT または REJECT を設定することに注意します。次の例は、MessageSourcePollingTemplate でポーリングする方法を示しています。

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

どちらの場合も (SourcePollingChannelAdapter と MessageSourcePollingTemplate)、コールバックで noAutoAck() を呼び出すことで自動 ack/nack を無効にすることができます。メッセージを別のスレッドに渡して後で確認応答したい場合にこれを行うことができます。すべての実装がこれをサポートしているわけではありません (たとえば、オフセットコミットは同じスレッドで実行する必要があるため、Apache Kafka はサポートしていません)。

メッセージソースの条件付きポーラー

このセクションでは、条件付きポーラーの使用方法について説明します。

バックグラウンド

ポーラーの advice-chain 内の Advice オブジェクトは、ポーリングタスク全体(メッセージの取得と処理の両方)をアドバイスします。これらの「アラウンドアドバイス」メソッドは、投票のコンテキストにはアクセスできず、投票自体にのみアクセスできます。これは、前述のように、何らかの外部条件によりタスクをトランザクションにしたり、ポーリングをスキップしたりするなどの要件には適しています。ポーリングの receive 部分の結果に応じて何らかのアクションを実行する場合、または条件に応じてポーラーを調整する場合はどうなるでしょうか? それらのインスタンスに対して、Spring Integration は「スマート」ポーリングを提供します。

「スマート」ポーリング

バージョン 5.3 は ReceiveMessageAdvice インターフェースを導入しました。(MessageSourceMutator の default メソッドのために、AbstractMessageSourceAdvice は非推奨になりました)このインターフェースを実装する advice-chain の Advice オブジェクトは、受信操作にのみ適用されます。MessageSource.receive() および PollableChannel.receive(timeout)SourcePollingChannelAdapter または PollingConsumer にのみ適用できます。そのようなクラスは、次のメソッドを実装します。

  • beforeReceive(Object source) このメソッドは、Object.receive() メソッドの前に呼び出されます。ソースを調べて再構成できます。false を返すと、このポーリングはキャンセルされます(前述の PollSkipAdvice と同様)。

  • Message<?> afterReceive(Message<?> result, Object source) このメソッドは、receive() メソッドの後に呼び出されます。繰り返しますが、ソースを再構成するか、何らかのアクションを実行できます(おそらく、結果によって異なります。ソースによってメッセージが作成されなかった場合、null になる可能性があります)。別のメッセージを返すこともできます

スレッドセーフ

アドバイスが変化する場合は、TaskExecutor を使用してポーラーを構成しないでください。アドバイスがソースを変更する場合、そのような変更はスレッドセーフではなく、特に高頻度のポーラーで予期しない結果を引き起こす可能性があります。ポーリング結果を同時に処理する必要がある場合は、ポーラーにエグゼキューターを追加する代わりに、ダウンストリーム ExecutorChannel を使用することを検討してください。

アドバイスチェーンオーダー

初期化中にアドバイスチェーンがどのように処理されるかを理解する必要があります。ReceiveMessageAdvice を実装しない Advice オブジェクトは、ポーリングプロセス全体に適用され、すべての ReceiveMessageAdvice の前に、順番に最初に呼び出されます。次に、ReceiveMessageAdvice オブジェクトが、ソース receive() メソッドの周囲に順番に呼び出されます。たとえば、Advice オブジェクト a, b, c, d があり、b および d が ReceiveMessageAdvice である場合、オブジェクトは a, c, b, d の順序で適用されます。また、ソースがすでに Proxy である場合、既存の Advice オブジェクトの後に ReceiveMessageAdvice が呼び出されます。順序を変更する場合は、自分でプロキシを接続する必要があります。

SimpleActiveIdleReceiveMessageAdvice

MessageSource のみの以前の SimpleActiveIdleMessageSourceAdvice は非推奨です)このアドバイスは ReceiveMessageAdvice の単純な実装です。DynamicPeriodicTrigger と組み合わせて使用すると、前回のポーリングでメッセージが生成されたかどうかに応じて、ポーリング頻度が調整されます。ポーラーには、同じ DynamicPeriodicTrigger への参照も必要です。

重要: 非同期ハンドオフ
SimpleActiveIdleReceiveMessageAdvice は、receive() の結果に基づいてトリガーを変更します。これは、ポーラースレッドでアドバイスが呼び出された場合にのみ機能します。ポーラーに task-executor がある場合は機能しません。ポーリングの結果の後に非同期操作を使用する場合にこのアドバイスを使用するには、おそらく ExecutorChannel を使用して、後で非同期ハンドオフを実行します。
CompoundTriggerAdvice

このアドバイスにより、ポーリングがメッセージを返すかどうかに基づいて、2 つのトリガーのいずれかを選択できます。CronTrigger を使用するポーラーを考えてください。CronTrigger インスタンスは不変であるため、一度構築すると変更できません。cron 式を使用して 1 時間に 1 回ポーリングをトリガーするユースケースを考えますが、メッセージが受信されない場合は 1 分に 1 回ポーリングし、メッセージが取得されたら cron 式の使用に戻ります。

アドバイス(およびポーラー)は、この目的のために CompoundTrigger を使用します。トリガーの primary トリガーは CronTrigger にすることができます。アドバイスは、メッセージが受信されていないことを検出すると、二次トリガーを CompoundTrigger に追加します。CompoundTrigger インスタンスの nextExecutionTime メソッドが呼び出されると、セカンダリトリガー(存在する場合)に委譲されます。それ以外の場合は、プライマリトリガーに委譲します。

ポーラーには、同じ CompoundTrigger への参照も必要です。

次の例は、毎分にフォールバックする時間単位の cron 式の構成を示しています。

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要: 非同期ハンドオフ
CompoundTriggerAdvice は、receive() の結果に基づいてトリガーを変更します。これは、ポーラースレッドでアドバイスが呼び出された場合にのみ機能します。ポーラーに task-executor がある場合は機能しません。ポーリングの結果の後に非同期操作を使用する場合にこのアドバイスを使用するには、おそらく ExecutorChannel を使用して、後で非同期ハンドオフを実行します。
メッセージソースのみのアドバイス

一部のアドバイスは MessageSource.receive() にのみ適用され、PollableChannel には意味をなさない場合があります。この目的のために、MessageSourceMutator インターフェース(ReceiveMessageAdvice の拡張)がまだ存在しています。default メソッドを使用すると、すでに非推奨の AbstractMessageSourceAdvice を完全に置き換えるため、MessageSource プロキシのみが期待される実装で使用する必要があります。詳細については、受信チャネルアダプター: 複数のサーバーとディレクトリのポーリングを参照してください。