最新の安定バージョンについては、Spring Integration 6.4.4 を使用してください! |
ポーラー
このセクションでは、Spring Integration でのポーリングの仕組みについて説明します。
ポーリングコンシューマー
メッセージエンドポイント(チャネルアダプター)がチャネルに接続され、インスタンス化されると、次のいずれかのインスタンスが生成されます。
実際の実装は、これらのエンドポイントが接続するチャネルの型によって異なります。org.springframework.messaging.SubscribableChannel
(Javadoc) インターフェースを実装するチャネルに接続されたチャネルアダプターは、EventDrivenConsumer
のインスタンスを生成します。一方、org.springframework.messaging.PollableChannel
(Javadoc) インターフェース(QueueChannel
など)を実装するチャネルに接続されたチャネルアダプターは、PollingConsumer
のインスタンスを生成します。
ポーリングコンシューマーにより、Spring Integration コンポーネントは、イベント駆動型の方法でメッセージを処理するのではなく、メッセージをアクティブにポーリングできます。
これらは、多くのメッセージングシナリオで重要な横断的関心事です。Spring Integration では、ポーリングコンシューマーは同じ名前のパターンに基づいています。これは、Gregor Hohpe と Bobby Woolf の書籍 Enterprise Integration Patterns で説明されています。パターンの説明は、本の Web サイト (英語) で見つけることができます。
ポーリングコンシューマー構成の詳細については、"メッセージエンドポイント" を参照してください。
ポーリング可能なメッセージソース
Spring Integration は、ポーリングコンシューマーパターンの 2 番目のバリエーションを提供します。受信チャネルアダプターが使用される場合、これらのアダプターは多くの場合 SourcePollingChannelAdapter
によってラップされます。例: リモート FTP サーバーの場所からメッセージを取得する場合、FTP 受信チャネルアダプターで説明されているアダプターは、定期的にメッセージを取得するポーラーで構成されます。そのため、コンポーネントがポーラーで構成されている場合、結果のインスタンスは次のいずれかの型になります。
つまり、ポーラーは受信と送信の両方のメッセージングシナリオで使用されます。ポーラーが使用されるいくつかの使用例は次のとおりです。
FTP サーバー、データベース、Web サービスなどの特定の外部システムのポーリング
内部(ポーリング可能)メッセージチャネルのポーリング
内部サービスのポーリング (Java クラスでメソッドを繰り返し実行するなど)
AOP アドバイスクラスは、トランザクションを開始するトランザクションアドバイスなど、advice-chain のポーラーに適用できます。バージョン 4.1 から、PollSkipAdvice が提供されます。ポーラーはトリガーを使用して、次のポーリングの時間を決定します。PollSkipAdvice を使用して、おそらくメッセージの処理を妨げるダウンストリーム条件があるため、ポーリングを抑制(スキップ)することができます。このアドバイスを使用するには、PollSkipStrategy の実装を提供する必要があります。バージョン 4.2.5 から、SimplePollSkipStrategy が提供されます。それを使用するには、インスタンスを Bean としてアプリケーションコンテキストに追加し、それを PollSkipAdvice に挿入し、それをポーラーのアドバイスチェーンに追加します。ポーリングをスキップするには、skipPolls() を呼び出します。ポーリングを再開するには、reset() を呼び出します。バージョン 4.2 は、この領域に柔軟性を追加しました。条件付きポーラーを参照してください。 |
この章の目的は、ポーリングコンシューマーの概要と、メッセージコンシューマー(メッセージチャンネルを参照)およびチャネルアダプター(チャンネルアダプターを参照)の概念にどのように適合するかを説明することのみです。一般的なメッセージングエンドポイントおよび特にポーリングコンシューマーに関する詳細については、メッセージエンドポイントを参照してください。
遅延確認応答可能なメッセージソース
バージョン 5.0.1 以降、特定のモジュールは、ダウンストリームフローが完了する(またはメッセージを別のスレッドに渡す)まで確認応答を延期することをサポートする MessageSource
実装を提供します。これは現在、AmqpMessageSource
と KafkaMessageSource
に限定されています。
これらのメッセージソースでは、IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
ヘッダー(MessageHeaderAccessor
API を参照)がメッセージに追加されます。ポーリング可能なメッセージソースで使用する場合、次の例に示すように、ヘッダーの値は AcknowledgmentCallback
のインスタンスです。
@FunctionalInterface
public interface AcknowledgmentCallback {
void acknowledge(Status status);
boolean isAcknowledged();
void noAutoAck();
default boolean isAutoAck();
enum Status {
/**
* Mark the message as accepted.
*/
ACCEPT,
/**
* Mark the message as rejected.
*/
REJECT,
/**
* Reject the message and requeue so that it will be redelivered.
*/
REQUEUE
}
}
すべてのメッセージソース(たとえば、KafkaMessageSource
)が REJECT
ステータスをサポートしているわけではありません。ACCEPT
と同様に扱われます。
次の例に示すように、アプリケーションはいつでもメッセージを確認できます。
Message<?> received = source.receive();
...
StaticMessageHeaderAccessor.getAcknowledgmentCallback(received)
.acknowledge(Status.ACCEPT);
MessageSource
が SourcePollingChannelAdapter
に接続されている場合、ダウンストリームフローの完了後にポーラースレッドがアダプターに戻ると、アダプターは確認応答がすでに確認済みであるかどうかを確認し、確認されていない場合、そのステータスを ACCEPT
it(または、フローが REJECT
例外をスローします)。ステータス値は AcknowledgmentCallback.Status
列挙 (Javadoc) で定義されています。
Spring Integration は、MessageSourcePollingTemplate
を提供して、MessageSource
のアドホックポーリングを実行します。これも、MessageHandler
コールバックが戻る(または例外をスローする)ときに AcknowledgmentCallback
で ACCEPT
または REJECT
を設定することに注意します。次の例は、MessageSourcePollingTemplate
でポーリングする方法を示しています。
MessageSourcePollingTemplate template =
new MessageSourcePollingTemplate(this.source);
template.poll(h -> {
...
});
どちらの場合も (SourcePollingChannelAdapter
と MessageSourcePollingTemplate
)、コールバックで noAutoAck()
を呼び出すことで自動 ack/nack を無効にすることができます。メッセージを別のスレッドに渡して後で確認応答したい場合にこれを行うことができます。すべての実装がこれをサポートしているわけではありません (たとえば、オフセットコミットは同じスレッドで実行する必要があるため、Apache Kafka はサポートしていません)。
メッセージソースの条件付きポーラー
このセクションでは、条件付きポーラーの使用方法について説明します。
バックグラウンド
ポーラーの advice-chain
内の Advice
オブジェクトは、ポーリングタスク全体(メッセージの取得と処理の両方)をアドバイスします。これらの「アラウンドアドバイス」メソッドは、投票のコンテキストにはアクセスできず、投票自体にのみアクセスできます。これは、前述のように、何らかの外部条件によりタスクをトランザクションにしたり、ポーリングをスキップしたりするなどの要件には適しています。ポーリングの receive
部分の結果に応じて何らかのアクションを実行する場合、または条件に応じてポーラーを調整する場合はどうなるでしょうか? それらのインスタンスに対して、Spring Integration は「スマート」ポーリングを提供します。
「スマート」ポーリング
バージョン 5.3 では、ReceiveMessageAdvice
インターフェースが導入されました。このインターフェースを実装する advice-chain
内の Advice
オブジェクトは、receive()
操作(MessageSource.receive()
および PollableChannel.receive(timeout)
)にのみ適用されます。これらは SourcePollingChannelAdapter
または PollingConsumer
にのみ適用できます。このようなクラスは、次のメソッドを実装します。
beforeReceive(Object source)
このメソッドは、Object.receive()
メソッドの前に呼び出されます。ソースを調べて再構成できます。false
を返すと、このポーリングはキャンセルされます(前述のPollSkipAdvice
と同様)。Message<?> afterReceive(Message<?> result, Object source)
このメソッドは、receive()
メソッドの後に呼び出されます。繰り返しますが、ソースを再構成するか、何らかのアクションを実行できます(おそらく、結果によって異なります。ソースによってメッセージが作成されなかった場合、null
になる可能性があります)。別のメッセージを返すこともできます
スレッドセーフ
|
アドバイスチェーンオーダー 初期化中にアドバイスチェーンがどのように処理されるかを理解する必要があります。 |
SimpleActiveIdleReceiveMessageAdvice
このアドバイスは、ReceiveMessageAdvice
の単純な実装です。DynamicPeriodicTrigger
と組み合わせて使用すると、前回のポーリングでメッセージが生成されたかどうかに応じて、ポーリングの頻度が調整されます。ポーラーには、同じ DynamicPeriodicTrigger
への参照も必要です。
重要: 非同期ハンドオフ SimpleActiveIdleReceiveMessageAdvice は、receive() の結果に基づいてトリガーを変更します。これは、ポーラースレッドでアドバイスが呼び出された場合にのみ機能します。ポーラーに task-executor がある場合は機能しません。ポーリングの結果の後に非同期操作を使用する場合にこのアドバイスを使用するには、おそらく ExecutorChannel を使用して、後で非同期ハンドオフを実行します。 |
CompoundTriggerAdvice
このアドバイスにより、ポーリングがメッセージを返すかどうかに基づいて、2 つのトリガーのいずれかを選択できます。CronTrigger
を使用するポーラーを考えてください。CronTrigger
インスタンスは不変であるため、一度構築すると変更できません。cron 式を使用して 1 時間に 1 回ポーリングをトリガーするユースケースを考えますが、メッセージが受信されない場合は 1 分に 1 回ポーリングし、メッセージが取得されたら cron 式の使用に戻ります。
アドバイス(およびポーラー)は、この目的のために CompoundTrigger
を使用します。トリガーの primary
トリガーは CronTrigger
にすることができます。アドバイスは、メッセージが受信されていないことを検出すると、二次トリガーを CompoundTrigger
に追加します。CompoundTrigger
インスタンスの nextExecutionTime
メソッドが呼び出されると、セカンダリトリガー(存在する場合)に委譲されます。それ以外の場合は、プライマリトリガーに委譲します。
ポーラーには、同じ CompoundTrigger
への参照も必要です。
次の例は、毎分にフォールバックする時間単位の cron 式の構成を示しています。
<int:inbound-channel-adapter channel="nullChannel" auto-startup="false">
<bean class="org.springframework.integration.endpoint.PollerAdviceTests.Source" />
<int:poller trigger="compoundTrigger">
<int:advice-chain>
<bean class="org.springframework.integration.aop.CompoundTriggerAdvice">
<constructor-arg ref="compoundTrigger"/>
<constructor-arg ref="secondary"/>
</bean>
</int:advice-chain>
</int:poller>
</int:inbound-channel-adapter>
<bean id="compoundTrigger" class="org.springframework.integration.util.CompoundTrigger">
<constructor-arg ref="primary" />
</bean>
<bean id="primary" class="org.springframework.scheduling.support.CronTrigger">
<constructor-arg value="0 0 * * * *" /> <!-- top of every hour -->
</bean>
<bean id="secondary" class="org.springframework.scheduling.support.PeriodicTrigger">
<constructor-arg value="60000" />
</bean>
重要: 非同期ハンドオフ CompoundTriggerAdvice は、receive() の結果に基づいてトリガーを変更します。これは、ポーラースレッドでアドバイスが呼び出された場合にのみ機能します。ポーラーに task-executor がある場合は機能しません。ポーリングの結果の後に非同期操作を使用する場合にこのアドバイスを使用するには、おそらく ExecutorChannel を使用して、後で非同期ハンドオフを実行します。 |
メッセージソースのみのアドバイス
一部のアドバイスは MessageSource.receive()
にのみ適用される可能性があり、PollableChannel
には意味がありません。この目的のために、MessageSourceMutator
インターフェース(ReceiveMessageAdvice
の拡張)はまだ存在しています。詳細については、受信チャネルアダプター: 複数のサーバーとディレクトリのポーリングを参照してください。