このバージョンはまだ開発中であり、まだ安定しているとは見なされていません。最新の安定バージョンについては、Spring Integration 6.5.3 を使用してください!

ポーラー

このセクションでは、Spring Integration でのポーリングの仕組みについて説明します。

ポーリングコンシューマー

メッセージエンドポイント(チャネルアダプター)がチャネルに接続され、インスタンス化されると、次のいずれかのインスタンスが生成されます。

実際の実装は、これらのエンドポイントが接続するチャネルの型によって異なります。org.springframework.messaging.SubscribableChannel (Javadoc) インターフェースを実装するチャネルに接続されたチャネルアダプターは、EventDrivenConsumer のインスタンスを生成します。一方、org.springframework.messaging.PollableChannel (Javadoc) インターフェース(QueueChannel など)を実装するチャネルに接続されたチャネルアダプターは、PollingConsumer のインスタンスを生成します。

ポーリングコンシューマーにより、Spring Integration コンポーネントは、イベント駆動型の方法でメッセージを処理するのではなく、メッセージをアクティブにポーリングできます。

これらは、多くのメッセージングシナリオで重要な横断的関心事です。Spring Integration では、ポーリングコンシューマーは同じ名前のパターンに基づいています。これは、Gregor Hohpe と Bobby Woolf の書籍 Enterprise Integration Patterns で説明されています。パターンの説明は、本の Web サイト (英語) で見つけることができます。

ポーリングコンシューマー構成の詳細については、"メッセージエンドポイント" を参照してください。

ポーリング可能なメッセージソース

Spring Integration は、ポーリングコンシューマーパターンの 2 番目のバリエーションを提供します。受信チャネルアダプターが使用される場合、これらのアダプターは多くの場合 SourcePollingChannelAdapter によってラップされます。例: リモート FTP サーバーの場所からメッセージを取得する場合、FTP 受信チャネルアダプターで説明されているアダプターは、定期的にメッセージを取得するポーラーで構成されます。そのため、コンポーネントがポーラーで構成されている場合、結果のインスタンスは次のいずれかの型になります。

つまり、ポーラーは受信と送信の両方のメッセージングシナリオで使用されます。ポーラーが使用されるいくつかの使用例は次のとおりです。

  • FTP サーバー、データベース、Web サービスなどの特定の外部システムのポーリング

  • 内部(ポーリング可能)メッセージチャネルのポーリング

  • 内部サービスのポーリング (Java クラスでメソッドを繰り返し実行するなど)

AOP アドバイスクラスは、トランザクションを開始するトランザクションアドバイスなど、advice-chain のポーラーに適用できます。バージョン 4.1 から、PollSkipAdvice が提供されます。ポーラーはトリガーを使用して、次のポーリングの時間を決定します。PollSkipAdvice を使用して、おそらくメッセージの処理を妨げるダウンストリーム条件があるため、ポーリングを抑制(スキップ)することができます。このアドバイスを使用するには、PollSkipStrategy の実装を提供する必要があります。バージョン 4.2.5 から、SimplePollSkipStrategy が提供されます。それを使用するには、インスタンスを Bean としてアプリケーションコンテキストに追加し、それを PollSkipAdvice に挿入し、それをポーラーのアドバイスチェーンに追加します。ポーリングをスキップするには、skipPolls() を呼び出します。ポーリングを再開するには、reset() を呼び出します。バージョン 4.2 は、この領域に柔軟性を追加しました。条件付きポーラーを参照してください。

この章の目的は、ポーリングコンシューマーの概要と、メッセージコンシューマー(メッセージチャンネルを参照)およびチャネルアダプター(チャンネルアダプターを参照)の概念にどのように適合するかを説明することのみです。一般的なメッセージングエンドポイントおよび特にポーリングコンシューマーに関する詳細については、メッセージエンドポイントを参照してください。

遅延確認応答可能なメッセージソース

バージョン 5.0.1 以降、特定のモジュールは、ダウンストリームフローが完了する(またはメッセージを別のスレッドに渡す)まで確認応答を延期することをサポートする MessageSource 実装を提供します。これは現在、AmqpMessageSource と KafkaMessageSource に限定されています。

これらのメッセージソースでは、IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK ヘッダー(MessageHeaderAccessor API を参照)がメッセージに追加されます。ポーリング可能なメッセージソースで使用する場合、次の例に示すように、ヘッダーの値は AcknowledgmentCallback のインスタンスです。

@FunctionalInterface
public interface AcknowledgmentCallback extends SimpleAcknowledgment {

    void acknowledge(Status status);

    @Override
    default void acknowledge() {
        acknowledge(Status.ACCEPT);
    }

    default boolean isAcknowledged() {
        return false;
    }


    default void noAutoAck() {
        throw new UnsupportedOperationException("You cannot disable auto acknowledgment with this implementation");
    }

    default boolean isAutoAck() {
        return true;
    }

    enum Status {

        /**
         * Mark the message as accepted.
         */
        ACCEPT,

        /**
         * Mark the message as rejected.
         */
        REJECT,

        /**
         * Reject the message and requeue so that it will be redelivered.
         */
        REQUEUE

    }

}

Not all message sources, (for example, a KafkaMessageSource) support the REJECT status. It is treated the same as ACCEPT.

次の例に示すように、アプリケーションはいつでもメッセージを確認できます。

Message<?> received = source.receive();

...

StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
        .acknowledge(Status.ACCEPT);

MessageSource が SourcePollingChannelAdapter に接続されている場合、ダウンストリームフローの完了後にポーラースレッドがアダプターに戻ると、アダプターは確認応答がすでに確認済みであるかどうかを確認し、確認されていない場合、そのステータスを ACCEPT it(または、フローが REJECT 例外をスローします)。ステータス値は AcknowledgmentCallback.Status 列挙 (Javadoc) で定義されています。

Spring Integration は、MessageSourcePollingTemplate を提供して、MessageSource のアドホックポーリングを実行します。これも、MessageHandler コールバックが戻る(または例外をスローする)ときに AcknowledgmentCallback で ACCEPT または REJECT を設定することに注意します。次の例は、MessageSourcePollingTemplate でポーリングする方法を示しています。

MessageSourcePollingTemplate template =
    new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
    ...
});

In both cases (SourcePollingChannelAdapter and MessageSourcePollingTemplate), you can disable auto ack/nack by calling noAutoAck() on the callback. You might do this if you hand off the message to another thread and wish to acknowledge later. Not all implementations support this, for example, Apache Kafka does not, because the offset commit has to be performed on the same thread.

メッセージソースの条件付きポーラー

このセクションでは、条件付きポーラーの使用方法について説明します。

バックグラウンド

ポーラーの advice-chain 内の Advice オブジェクトは、ポーリングタスク全体(メッセージの取得と処理の両方)をアドバイスします。これらの「アラウンドアドバイス」メソッドは、投票のコンテキストにはアクセスできず、投票自体にのみアクセスできます。これは、前述のように、何らかの外部条件によりタスクをトランザクションにしたり、ポーリングをスキップしたりするなどの要件には適しています。ポーリングの receive 部分の結果に応じて何らかのアクションを実行する場合、または条件に応じてポーラーを調整する場合はどうなるでしょうか? それらのインスタンスに対して、Spring Integration は「スマート」ポーリングを提供します。

「スマート」ポーリング

バージョン 5.3 では、ReceiveMessageAdvice インターフェースが導入されました。このインターフェースを実装する advice-chain 内の Advice オブジェクトは、receive() 操作(MessageSource.receive() および PollableChannel.receive(timeout))にのみ適用されます。これらは SourcePollingChannelAdapter または PollingConsumer にのみ適用できます。このようなクラスは、次のメソッドを実装します。

  • beforeReceive(Object source) このメソッドは、Object.receive() メソッドの前に呼び出されます。ソースを調べて再構成できます。false を返すと、このポーリングはキャンセルされます(前述の PollSkipAdvice と同様)。

  • Message<?> afterReceive(Message<?> result, Object source) このメソッドは、receive() メソッドの後に呼び出されます。繰り返しますが、ソースを再構成するか、何らかのアクションを実行できます(おそらく、結果によって異なります。ソースによってメッセージが作成されなかった場合、null になる可能性があります)。別のメッセージを返すこともできます

スレッドセーフ

If an Advice mutates the source, you should not configure the poller with a TaskExecutor. If an Advice mutates the source, such mutations are not thread safe and could cause unexpected results, especially with high-frequency pollers. If you need to process poll results concurrently, consider using a downstream ExecutorChannel instead of adding an executor to the poller.

アドバイスチェーンオーダー

初期化中にアドバイスチェーンがどのように処理されるかを理解する必要があります。ReceiveMessageAdvice を実装しない Advice オブジェクトは、ポーリングプロセス全体に適用され、すべての ReceiveMessageAdvice の前に、順番に最初に呼び出されます。次に、ReceiveMessageAdvice オブジェクトが、ソース receive() メソッドの周囲に順番に呼び出されます。たとえば、Advice オブジェクト a, b, c, d があり、b および d が ReceiveMessageAdvice である場合、オブジェクトは a, c, b, d の順序で適用されます。また、ソースがすでに Proxy である場合、既存の Advice オブジェクトの後に ReceiveMessageAdvice が呼び出されます。順序を変更する場合は、自分でプロキシを接続する必要があります。

SimpleActiveIdleReceiveMessageAdvice

このアドバイスは、ReceiveMessageAdvice の単純な実装です。DynamicPeriodicTrigger と組み合わせて使用すると、前回のポーリングでメッセージが生成されたかどうかに応じて、ポーリングの頻度が調整されます。ポーラーには、同じ DynamicPeriodicTrigger への参照も必要です。

重要: 非同期ハンドオフ
SimpleActiveIdleReceiveMessageAdvice は、receive() の結果に基づいてトリガーを変更します。これは、ポーラースレッドでアドバイスが呼び出された場合にのみ機能します。ポーラーに task-executor がある場合は機能しません。ポーリングの結果の後に非同期操作を使用する場合にこのアドバイスを使用するには、おそらく ExecutorChannel を使用して、後で非同期ハンドオフを実行します。

CompoundTriggerAdvice

このアドバイスにより、ポーリングがメッセージを返すかどうかに基づいて、2 つのトリガーのいずれかを選択できます。CronTrigger を使用するポーラーを考えてください。CronTrigger インスタンスは不変であるため、一度構築すると変更できません。cron 式を使用して 1 時間に 1 回ポーリングをトリガーするユースケースを考えますが、メッセージが受信されない場合は 1 分に 1 回ポーリングし、メッセージが取得されたら cron 式の使用に戻ります。

アドバイス(およびポーラー)は、この目的のために CompoundTrigger を使用します。トリガーの primary トリガーは CronTrigger にすることができます。アドバイスは、メッセージが受信されていないことを検出すると、二次トリガーを CompoundTrigger に追加します。CompoundTrigger インスタンスの nextExecutionTime メソッドが呼び出されると、セカンダリトリガー(存在する場合)に委譲されます。それ以外の場合は、プライマリトリガーに委譲します。

ポーラーには、同じ CompoundTrigger への参照も必要です。

次の例は、毎分にフォールバックする時間単位の cron 式の構成を示しています。

<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
    <bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
    <int:poller trigger="compoundTrigger">
        <int:advice-chain>
            <bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
                <constructor-arg ref="compoundTrigger"/>
                <constructor-arg ref="secondary"/>
            </bean>
        </int:advice-chain>
    </int:poller>
</int:inbound-channel-adapter>

<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
    <constructor-arg ref="primary" />
</bean>

<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
    <constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>

<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
    <constructor-arg value="60000" />
</bean>
重要: 非同期ハンドオフ
CompoundTriggerAdvice は、receive() の結果に基づいてトリガーを変更します。これは、ポーラースレッドでアドバイスが呼び出された場合にのみ機能します。ポーラーに task-executor がある場合は機能しません。ポーリングの結果の後に非同期操作を使用する場合にこのアドバイスを使用するには、おそらく ExecutorChannel を使用して、後で非同期ハンドオフを実行します。

メッセージソースのみのアドバイス

一部のアドバイスは MessageSource.receive() にのみ適用される可能性があり、PollableChannel には意味がありません。この目的のために、MessageSourceMutator インターフェース(ReceiveMessageAdvice の拡張)はまだ存在しています。詳細については、受信チャネルアダプター: 複数のサーバーとディレクトリのポーリングを参照してください。