遅延器

遅延器は、メッセージフローを特定の間隔で遅延させる単純なエンドポイントです。メッセージが遅延しても、元の送信者はブロックしません。代わりに、遅延が経過した後に出力チャネルに送信されるように、org.springframework.scheduling.TaskScheduler のインスタンスで遅延メッセージがスケジュールされます。このアプローチは、ブロックされた送信者スレッドの数が多くならないため、かなり長い遅延でもスケーラブルです。それどころか、典型的なケースでは、スレッドプールはメッセージを解放する実際の実行に使用されます。このセクションには、遅延器を構成するいくつかの例が含まれています。

遅延器の構成

<delayer> 要素は、2 つのメッセージチャネル間のメッセージフローを遅延させるために使用されます。他のエンドポイントと同様に、"input-channel" および "output-channel" 属性を指定できますが、遅延器にはミリ秒数を決定する "default-delay" および "expression" 属性(および 'expression' 要素)もあります。各メッセージを遅延させる必要があります。次の例では、すべてのメッセージを 3 秒遅延させます。

<int:delayer id="delayer" input-channel="input"
             default-delay="3000" output-channel="output"/>

各メッセージの遅延を判別する必要がある場合、次の式が示すように、'expression' 属性を使用して SpEL 式を提供することもできます。

  • Java DSL

  • Kotlin DSL

  • Java

  • XML

@Bean
public IntegrationFlow flow() {
    return IntegrationFlow.from("input")
            .delay(d -> d
                    .messageGroupId("delayer.messageGroupId")
                    .defaultDelay(3_000L)
                    .delayExpression("headers['delay']"))
            .channel("output")
            .get();
}
@Bean
fun flow() =
    integrationFlow("input") {
        delay {
            messageGroupId("delayer.messageGroupId")
            defaultDelay(3000L)
            delayExpression("headers['delay']")
        }
        channel("output")
    }
@ServiceActivator(inputChannel = "input")
@Bean
public DelayHandler delayer() {
    DelayHandler handler = new DelayHandler("delayer.messageGroupId");
    handler.setDefaultDelay(3_000L);
    handler.setDelayExpressionString("headers['delay']");
    handler.setOutputChannelName("output");
    return handler;
}
<int:delayer id="delayer" input-channel="input" output-channel="output"
             default-delay="3000" expression="headers['delay']"/>

前の例では、3 秒の遅延は、指定された受信メッセージに対して式が null と評価された場合にのみ適用されます。式評価の有効な結果を持つメッセージにのみ遅延を適用する場合は、0 (デフォルト)の「デフォルト遅延」を使用できます。0 (またはそれ以下)の遅延があるメッセージの場合、メッセージは呼び出しスレッドですぐに送信されます。

XML パーサーは、<beanName>.messageGroupId のメッセージグループ ID を使用します。
遅延ハンドラーは、ミリ秒単位の間隔を表す式評価結果(toString() メソッドが Long に解析できる値を生成する Object)および絶対時間を表す java.util.Date インスタンスをサポートします。最初のケースでは、ミリ秒は現在の時刻からカウントされます(たとえば、5000 の値は、遅延機が受信した時刻から少なくとも 5 秒間メッセージを遅延させます)。Date インスタンスの場合、メッセージはその Date オブジェクトが表す時間まで解放されません。非正の遅延または過去の日付に等しい値は、遅延なしになります。代わりに、元の送信者のスレッドの出力チャネルに直接送信されます。式の評価結果が Date ではなく、Long として解析できない場合、デフォルトの遅延(ある場合 - デフォルトは 0)が適用されます。
式の評価では、無効な式やその他の条件など、さまざまな理由で評価例外がスローされる場合があります。デフォルトでは、そのような例外は無視され(DEBUG レベルで記録されます)、遅延器はデフォルトの遅延(存在する場合)にフォールバックします。ignore-expression-failures 属性を設定することにより、この動作を変更できます。デフォルトでは、この属性は true に設定されており、遅延器の動作は前述のとおりです。ただし、式評価の例外を無視せずに遅延者の呼び出し元にスローする場合は、ignore-expression-failures 属性を false に設定します。

上記の例では、遅延式は headers['delay'] として指定されています。これは、Map 要素にアクセスするための SpEL Indexer 構文です(MessageHeaders は Map を実装します)。headers.get("delay") を呼び出します。単純なマップ要素名( "." を含まない)の場合は、SpEL の「ドットアクセサー」構文も使用できます。この構文では、前に示したヘッダー式を headers.delay として指定できます。ただし、ヘッダーが欠落している場合、異なる結果が得られます。最初の場合、式は null に評価されます。2 番目の結果は、次のようになります。

 org.springframework.expression.spel.SpelEvaluationException: EL1008E:(pos 8):
		   Field or property 'delay' cannot be found on object of type 'org.springframework.messaging.MessageHeaders'

ヘッダーが省略される可能性があり、デフォルトの遅延にフォールバックしたい場合は、通常、ドットプロパティアクセサー構文ではなくインデクサー構文を使用する方が効率的 (かつ推奨) です。例外をキャッチします。

遅延器は、Spring の TaskScheduler 抽象化のインスタンスに委譲します。遅延器によって使用されるデフォルトのスケジューラは、起動時に Spring Integration によって提供される ThreadPoolTaskScheduler インスタンスです。タスクスケジューラの構成を参照してください。別のスケジューラに委譲する場合は、次の例に示すように、delayer 要素の 'scheduler' 属性を介して参照を提供できます。

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    scheduler="exampleTaskScheduler"/>

<task:scheduler id="exampleTaskScheduler" pool-size="3"/>
外部 ThreadPoolTaskScheduler を構成する場合、このプロパティで waitForTasksToCompleteOnShutdown = true を設定できます。これにより、アプリケーションのシャットダウン時にすでに実行状態にある(メッセージを解放する)「遅延」タスクを正常に完了できます。Spring Integration 2.2 より前は、DelayHandler が独自のスケジューラをバックグラウンドで作成できるため、このプロパティは <delayer> 要素で使用できました。2.2 以降、遅延器には外部スケジューラインスタンスが必要で、waitForTasksToCompleteOnShutdown が削除されました。スケジューラー独自の構成を使用する必要があります。
ThreadPoolTaskScheduler にはプロパティ errorHandler があり、org.springframework.util.ErrorHandler の実装を使用して注入できます。このハンドラーにより、遅延メッセージを送信するスケジュールされたタスクのスレッドから Exception を処理できます。デフォルトでは、org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler を使用し、ログにスタックトレースを表示できます。org.springframework.integration.channel.MessagePublishingErrorHandler の使用を検討することもできます。org.springframework.integration.channel.MessagePublishingErrorHandler は、失敗したメッセージのヘッダーから、またはデフォルトの error-channel に、ErrorMessage を error-channel に送信します。このエラー処理は、トランザクションがロールバックした後に実行されます(存在する場合)。リリースの失敗を参照してください。

遅延器とメッセージストア

DelayHandler は、遅延メッセージを指定された MessageStore のメッセージグループに保持します。( "groupId" は、<delayer> 要素の必須 'id' 属性 に基づいています。DelayHandler.setMessageGroupId(String) も参照してください) 遅延メッセージは、DelayHandler がメッセージを output-channel に送信する直前に、スケジュールされたタスクによって MessageStore から削除されます。指定された MessageStore が永続的である場合 (JdbcMessageStore など)、アプリケーションのシャットダウン時にメッセージを失わない機能が提供されます。アプリケーションの起動後、DelayHandler は MessageStore のメッセージグループからメッセージを読み取り、メッセージの元の到着時間 (遅延が数値の場合) に基づいて遅延を付けて再スケジュールします。遅延ヘッダーが Date であったメッセージの場合、再スケジュール時にその Date が使用されます。遅延メッセージが MessageStore に「遅延」時間を超えて残っている場合、起動後すぐに送信されます。messageGroupId は必須であり、生成可能な DelayHandler Bean 名に依存することはできません。このため、アプリケーションの再起動後、DelayHandler は新しく生成された Bean 名を取得できます。遅延メッセージのグループはアプリケーションによって管理されなくなるため、再スケジュールから失われる可能性があります。

<delayer> は、相互に排他的な 2 つの要素 <transactional> および <advice-chain> のいずれかで強化できます。これらの AOP アドバイスの List は、遅延の後に、スケジュールされたタスクの Thread でメッセージを解放する責任を持つプロキシ化された内部 DelayHandler.ReleaseMessageHandler に適用されます。たとえば、ダウンストリームメッセージフローが例外をスローし、ReleaseMessageHandler のトランザクションがロールバックされる場合に使用できます。この場合、遅延メッセージは永続的な MessageStore に残ります。<advice-chain> 内で任意のカスタム org.aopalliance.aop.Advice 実装を使用できます。<transactional> 要素は、トランザクションアドバイスのみを持つ単純なアドバイスチェーンを定義します。次の例は、<delayer> 内の advice-chain を示しています。

<int:delayer id="delayer" input-channel="input" output-channel="output"
    expression="headers.delay"
    message-store="jdbcMessageStore">
    <int:advice-chain>
        <beans:ref bean="customAdviceBean"/>
        <tx:advice>
            <tx:attributes>
                <tx:method name="*" read-only="true"/>
            </tx:attributes>
        </tx:advice>
    </int:advice-chain>
</int:delayer>

DelayHandler は、管理操作(getDelayedMessageCount および reschedulePersistedMessages)を備えた JMX MBean としてエクスポートできます。これにより、たとえば TaskScheduler が以前に停止されていた場合、実行時に遅延持続メッセージの再スケジュールが可能になります。これらの操作は、次の例に示すように、Control Bus コマンドを介して呼び出すことができます。

Message<String> delayerReschedulingMessage =
    MessageBuilder.withPayload("'delayer.handler'.reschedulePersistedMessages").build();
controlBusChannel.send(delayerReschedulingMessage);
メッセージストア、JMX、コントロールバスの詳細については、システムマネジメントを参照してください。

バージョン 5.3.7 以降、メッセージが MessageStore に保管されているときにトランザクションがアクティブである場合、解放タスクは TransactionSynchronization.afterCommit() コールバックでスケジュールされます。これは、トランザクションがコミットされる前にスケジュールされたリリースが実行される可能性があり、メッセージが見つからない競合状態を防ぐために必要です。この場合、メッセージは遅延後、トランザクションのコミット後のいずれか遅い方でリリースされます。

リリースの失敗

バージョン 5.0.8 以降、遅延器には 2 つの新しいプロパティがあります。

  • maxAttempts (デフォルト 5)

  • retryDelay (デフォルトは 1 秒)

メッセージがリリースされたときに、ダウンストリームフローが失敗すると、retryDelay の後にリリースが試行されます。maxAttempts に達すると、メッセージは破棄されます(リリースがトランザクションの場合を除き、その場合、メッセージはストアに残りますが、アプリケーションが再起動されるか、reschedulePersistedMessages() メソッドが呼び出されるまで、リリースのスケジュールは設定されません。上記のように)。

さらに、delayedMessageErrorChannel を構成できます。リリースが失敗すると、ErrorMessage がペイロードとして例外を使用してそのチャネルに送信され、originalMessage プロパティが設定されます。ErrorMessage には、現在のカウントを含むヘッダー IntegrationMessageHeaderAccessor.DELIVERY_ATTEMPT が含まれています。

エラーフローがエラーメッセージを消費して正常に終了した場合、それ以上のアクションは行われません。リリースがトランザクションの場合、トランザクションはコミットされ、メッセージはストアから削除されます。エラーフローが例外をスローした場合、リリースは上記のように maxAttempts まで再試行されます。