遅延器
遅延器は、メッセージフローを特定の間隔で遅延させる単純なエンドポイントです。メッセージが遅延しても、元の送信者はブロックしません。代わりに、遅延が経過した後に出力チャネルに送信されるように、org.springframework.scheduling.TaskScheduler
のインスタンスで遅延メッセージがスケジュールされます。このアプローチは、ブロックされた送信者スレッドの数が多くならないため、かなり長い遅延でもスケーラブルです。それどころか、典型的なケースでは、スレッドプールはメッセージを解放する実際の実行に使用されます。このセクションには、遅延器を構成するいくつかの例が含まれています。
遅延器の構成
<delayer>
要素は、2 つのメッセージチャネル間のメッセージフローを遅延させるために使用されます。他のエンドポイントと同様に、"input-channel" および "output-channel" 属性を指定できますが、遅延器にはミリ秒数を決定する "default-delay" および "expression" 属性(および 'expression' 要素)もあります。各メッセージを遅延させる必要があります。次の例では、すべてのメッセージを 3 秒遅延させます。
<int:delayer id="delayer" input-channel="input"
default-delay="3000" output-channel="output"/>
各メッセージの遅延を判別する必要がある場合、次の式が示すように、'expression' 属性を使用して SpEL 式を提供することもできます。
Java DSL
Kotlin DSL
Java
XML
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from("input")
.delay(d -> d
.messageGroupId("delayer.messageGroupId")
.defaultDelay(3_000L)
.delayExpression("headers['delay']"))
.channel("output")
.get();
}
@Bean
fun flow() =
integrationFlow("input") {
delay {
messageGroupId("delayer.messageGroupId")
defaultDelay(3000L)
delayExpression("headers['delay']")
}
channel("output")
}
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
DelayHandler handler = new DelayHandler("delayer.messageGroupId");
handler.setDefaultDelay(3_000L);
handler.setDelayExpressionString("headers['delay']");
handler.setOutputChannelName("output");
return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
default-delay="3000" expression="headers['delay']"/>
前の例では、3 秒の遅延は、指定された受信メッセージに対して式が null と評価された場合にのみ適用されます。式評価の有効な結果を持つメッセージにのみ遅延を適用する場合は、0
(デフォルト)の「デフォルト遅延」を使用できます。0
(またはそれ以下)の遅延があるメッセージの場合、メッセージは呼び出しスレッドですぐに送信されます。
XML パーサーは、<beanName>.messageGroupId のメッセージグループ ID を使用します。 |
遅延ハンドラーは、ミリ秒単位の間隔を表す式評価結果(toString() メソッドが Long に解析できる値を生成する Object )および絶対時間を表す java.util.Date インスタンスをサポートします。最初のケースでは、ミリ秒は現在の時刻からカウントされます(たとえば、5000 の値は、遅延機が受信した時刻から少なくとも 5 秒間メッセージを遅延させます)。Date インスタンスの場合、メッセージはその Date オブジェクトが表す時間まで解放されません。非正の遅延または過去の日付に等しい値は、遅延なしになります。代わりに、元の送信者のスレッドの出力チャネルに直接送信されます。式の評価結果が Date ではなく、Long として解析できない場合、デフォルトの遅延(ある場合 - デフォルトは 0 )が適用されます。 |
式の評価では、無効な式やその他の条件など、さまざまな理由で評価例外がスローされる場合があります。デフォルトでは、そのような例外は無視され(DEBUG レベルで記録されます)、遅延器はデフォルトの遅延(存在する場合)にフォールバックします。ignore-expression-failures 属性を設定することにより、この動作を変更できます。デフォルトでは、この属性は true に設定されており、遅延器の動作は前述のとおりです。ただし、式評価の例外を無視せずに遅延者の呼び出し元にスローする場合は、ignore-expression-failures 属性を false に設定します。 |
上記の例では、遅延式は
ヘッダーが省略される可能性があり、デフォルトの遅延にフォールバックしたい場合は、通常、ドットプロパティアクセサー構文ではなくインデクサー構文を使用する方が効率的 (かつ推奨) です。例外をキャッチします。 |
遅延器は、Spring の TaskScheduler
抽象化のインスタンスに委譲します。遅延器によって使用されるデフォルトのスケジューラは、起動時に Spring Integration によって提供される ThreadPoolTaskScheduler
インスタンスです。タスクスケジューラの構成を参照してください。別のスケジューラに委譲する場合は、次の例に示すように、delayer 要素の 'scheduler' 属性を介して参照を提供できます。
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
scheduler="exampleTaskScheduler"/>
<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
外部 ThreadPoolTaskScheduler を構成する場合、このプロパティで waitForTasksToCompleteOnShutdown = true を設定できます。これにより、アプリケーションのシャットダウン時にすでに実行状態にある(メッセージを解放する)「遅延」タスクを正常に完了できます。Spring Integration 2.2 より前は、DelayHandler が独自のスケジューラをバックグラウンドで作成できるため、このプロパティは <delayer> 要素で使用できました。2.2 以降、遅延器には外部スケジューラインスタンスが必要で、waitForTasksToCompleteOnShutdown が削除されました。スケジューラー独自の構成を使用する必要があります。 |
ThreadPoolTaskScheduler にはプロパティ errorHandler があり、org.springframework.util.ErrorHandler の実装を使用して注入できます。このハンドラーにより、遅延メッセージを送信するスケジュールされたタスクのスレッドから Exception を処理できます。デフォルトでは、org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler を使用し、ログにスタックトレースを表示できます。org.springframework.integration.channel.MessagePublishingErrorHandler の使用を検討することもできます。org.springframework.integration.channel.MessagePublishingErrorHandler は、失敗したメッセージのヘッダーから、またはデフォルトの error-channel に、ErrorMessage を error-channel に送信します。このエラー処理は、トランザクションがロールバックした後に実行されます(存在する場合)。リリースの失敗を参照してください。 |
遅延器とメッセージストア
DelayHandler
は、遅延メッセージを指定された MessageStore
のメッセージグループに保持します。( "groupId" は、<delayer>
要素の必須 'id' 属性 に基づいています。DelayHandler.setMessageGroupId(String)
も参照してください) 遅延メッセージは、DelayHandler
がメッセージを output-channel
に送信する直前に、スケジュールされたタスクによって MessageStore
から削除されます。指定された MessageStore
が永続的である場合 (JdbcMessageStore
など)、アプリケーションのシャットダウン時にメッセージを失わない機能が提供されます。アプリケーションの起動後、DelayHandler
は MessageStore
のメッセージグループからメッセージを読み取り、メッセージの元の到着時間 (遅延が数値の場合) に基づいて遅延を付けて再スケジュールします。遅延ヘッダーが Date
であったメッセージの場合、再スケジュール時にその Date
が使用されます。遅延メッセージが MessageStore
に「遅延」時間を超えて残っている場合、起動後すぐに送信されます。messageGroupId
は必須であり、生成可能な DelayHandler
Bean 名に依存することはできません。このため、アプリケーションの再起動後、DelayHandler
は新しく生成された Bean 名を取得できます。遅延メッセージのグループはアプリケーションによって管理されなくなるため、再スケジュールから失われる可能性があります。
<delayer>
は、相互に排他的な 2 つの要素 <transactional>
および <advice-chain>
のいずれかで強化できます。これらの AOP アドバイスの List
は、遅延の後に、スケジュールされたタスクの Thread
でメッセージを解放する責任を持つプロキシ化された内部 DelayHandler.ReleaseMessageHandler
に適用されます。たとえば、ダウンストリームメッセージフローが例外をスローし、ReleaseMessageHandler
のトランザクションがロールバックされる場合に使用できます。この場合、遅延メッセージは永続的な MessageStore
に残ります。<advice-chain>
内で任意のカスタム org.aopalliance.aop.Advice
実装を使用できます。<transactional>
要素は、トランザクションアドバイスのみを持つ単純なアドバイスチェーンを定義します。次の例は、<delayer>
内の advice-chain
を示しています。
<int:delayer id="delayer" input-channel="input" output-channel="output"
expression="headers.delay"
message-store="jdbcMessageStore">
<int:advice-chain>
<beans:ref bean="customAdviceBean"/>
<tx:advice>
<tx:attributes>
<tx:method name="*" read-only="true"/>
</tx:attributes>
</tx:advice>
</int:advice-chain>
</int:delayer>
DelayHandler
は、管理操作(getDelayedMessageCount
および reschedulePersistedMessages
)を備えた JMX MBean
としてエクスポートできます。これにより、たとえば TaskScheduler
が以前に停止されていた場合、実行時に遅延持続メッセージの再スケジュールが可能になります。これらの操作は、次の例に示すように、Control Bus
コマンドを介して呼び出すことができます。
Message<String> delayerReschedulingMessage =
MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);
メッセージストア、JMX、コントロールバスの詳細については、システムマネジメントを参照してください。 |
バージョン 5.3.7 以降、メッセージが MessageStore
に保管されているときにトランザクションがアクティブである場合、解放タスクは TransactionSynchronization.afterCommit()
コールバックでスケジュールされます。これは、トランザクションがコミットされる前にスケジュールされたリリースが実行される可能性があり、メッセージが見つからない競合状態を防ぐために必要です。この場合、メッセージは遅延後、トランザクションのコミット後のいずれか遅い方でリリースされます。
リリースの失敗
バージョン 5.0.8 以降、遅延器には 2 つの新しいプロパティがあります。
maxAttempts
(デフォルト 5)retryDelay
(デフォルトは 1 秒)
メッセージがリリースされたときに、ダウンストリームフローが失敗すると、retryDelay
の後にリリースが試行されます。maxAttempts
に達すると、メッセージは破棄されます(リリースがトランザクションの場合を除き、その場合、メッセージはストアに残りますが、アプリケーションが再起動されるか、reschedulePersistedMessages()
メソッドが呼び出されるまで、リリースのスケジュールは設定されません。上記のように)。
さらに、delayedMessageErrorChannel
を構成できます。リリースが失敗すると、ErrorMessage
がペイロードとして例外を使用してそのチャネルに送信され、originalMessage
プロパティが設定されます。ErrorMessage
には、現在のカウントを含むヘッダー IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT
が含まれています。
エラーフローがエラーメッセージを消費して正常に終了した場合、それ以上のアクションは行われません。リリースがトランザクションの場合、トランザクションはコミットされ、メッセージはストアから削除されます。エラーフローが例外をスローした場合、リリースは上記のように maxAttempts
まで再試行されます。