提供されたアドバイスクラス

Spring Integration は、AOP アドバイスクラスを適用する一般的なメカニズムを提供することに加えて、これらのすぐに使えるアドバイスの実装を提供します。

再試行のアドバイス

再試行のアドバイス(o.s.i.handler.advice.RequestHandlerRetryAdvice)は、Spring Retry [GitHub] (英語) プロジェクトによって提供される豊富な再試行メカニズムを活用します。spring-retry のコアコンポーネントは RetryTemplate です。これにより、RetryPolicy および BackoffPolicy 戦略(多数の実装を含む)、および再試行が尽きたときに実行するアクションを決定する RecoveryCallback 戦略を含む、洗練された再試行シナリオを構成できます。

ステートレス再試行

ステートレス再試行は、再試行アクティビティがアドバイス内で完全に処理される場合です。スレッドは一時停止し(そうするように構成されている場合)、アクションを再試行します。

ステートフルリトライ

ステートフル再試行は、再試行状態がアドバイス内で管理されているが、例外がスローされ、呼び出し元がリクエストを再送信する場合です。ステートフル再試行の例は、現在のスレッドで実行するのではなく、メッセージ発信者(たとえば、JMS)に再送信を行わせたい場合です。ステートフル再試行には、再試行された送信を検出するための何らかのメカニズムが必要です。

spring-retry の詳細については、プロジェクトの Javadoc と、spring-retry の作成元である Spring Batch のリファレンスドキュメントを参照してください。

デフォルトのバックオフ動作は、バックオフしないことです。再試行はすぐに試行されます。試行間でスレッドを一時停止させるバックオフポリシーを使用すると、過剰なメモリ使用やスレッド不足などのパフォーマンスの問題が発生する可能性があります。大量の環境では、バックオフポリシーを注意して使用する必要があります。

再試行アドバイスの構成

このセクションの例では、常に例外をスローする次の <service-activator> を使用しています。

public class FailingService {

    public void service(String message) {
        throw new RuntimeException("error");
    }
}
単純なステートレス再試行

デフォルトの RetryTemplate には、3 回試行する SimpleRetryPolicy があります。BackOffPolicy はないため、3 回の試行は連続して行われ、試行間の遅延はありません。RecoveryCallback はないため、最終的な失敗した再試行が発生した後、呼び出し元に例外をスローします。Spring Integration 環境では、この最終的な例外は、受信エンドポイントで error-channel を使用して処理される場合があります。次の例では、RetryTemplate を使用し、その DEBUG 出力を示しています。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
回復を伴う単純なステートレス再試行

次の例では、前の例に RecoveryCallback を追加し、ErrorMessageSendingRecoverer を使用して ErrorMessage をチャネルに送信します。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
カスタマイズされたポリシーを使用したステートレス再試行とリカバリ

より洗練されたものにするために、カスタマイズされた RetryTemplate でアドバイスを提供できます。この例では、SimpleRetryPolicy を引き続き使用していますが、試行回数を 4 回に増やしています。また、最初の再試行が 1 秒間待機し、2 番目が 5 秒間待機し、3 番目が 25 待機する ExponentialBackoffPolicy を追加します(合計 4 回の試行)。次のリストは、例とその DEBUG 出力を示しています。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
            <property name="retryTemplate" ref="retryTemplate" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="4" />
        </bean>
    </property>
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="1000" />
            <property name="multiplier" value="5.0" />
            <property name="maxInterval" value="60000" />
        </bean>
    </property>
</bean>

27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
27.071 DEBUG [task-scheduler-1]Retry: count=0
27.080 DEBUG [task-scheduler-1]Sleeping for 1000
28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1
28.081 DEBUG [task-scheduler-1]Retry: count=1
28.081 DEBUG [task-scheduler-1]Sleeping for 5000
33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2
33.082 DEBUG [task-scheduler-1]Retry: count=2
33.083 DEBUG [task-scheduler-1]Sleeping for 25000
58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3
58.083 DEBUG [task-scheduler-1]Retry: count=3
58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4
58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4
58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
ステートレス再試行のネームスペースサポート

バージョン 4.0 からは、次の例に示すように、再試行のアドバイスに対する名前空間のサポートのおかげで、前述の構成を大幅に簡素化できます。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <ref bean="retrier" />
    </int:request-handler-advice-chain>
</int:service-activator>

<int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
    <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>

上記の例では、アドバイスはトップレベルの Bean として定義されているため、複数の request-handler-advice-chain インスタンスで使用できます。次の例に示すように、チェーン内でアドバイスを直接定義することもできます。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
            <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
        </int:retry-advice>
    </int:request-handler-advice-chain>
</int:service-activator>

<handler-retry-advice> は、<fixed-back-off> または <exponential-back-off> の子要素を持つことも、子要素を持たないこともできます。子要素のない <handler-retry-advice> は、バックオフを使用しません。recovery-channel がない場合、再試行が使い果たされると例外がスローされます。名前空間は、ステートレス再試行でのみ使用できます。

より複雑な環境 (カスタムポリシーなど) の場合は、通常の <bean> 定義を使用します。

回復を伴う単純なステートフル再試行

再試行をステートフルにするには、RetryStateGenerator 実装でアドバイスを提供する必要があります。このクラスは、RetryTemplate がこのメッセージの再試行の現在の状態を判断できるように、メッセージを再送信として識別するために使用されます。フレームワークは、SpelExpressionRetryStateGenerator を提供します。SpelExpressionRetryStateGenerator は、SpEL 式を使用してメッセージ識別子を決定します。この例でもデフォルトのポリシー(バックオフなしの 3 回の試行)を使用しています。ステートレス再試行と同様に、これらのポリシーはカスタマイズできます。次のリストは、例とその DEBUG 出力を示しています。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="retryStateGenerator">
                <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator">
                    <constructor-arg value="headers['jms_messageId']" />
                </bean>
            </property>
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
24.368 DEBUG [Container#0-1]Retry: count=0
24.387 DEBUG [Container#0-1]Checking for rethrow: count=1
24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1
24.387 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
25.412 DEBUG [Container#0-1]Retry: count=1
25.413 DEBUG [Container#0-1]Checking for rethrow: count=2
25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2
25.413 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
26.418 DEBUG [Container#0-1]Retry: count=2
26.419 DEBUG [Container#0-1]Checking for rethrow: count=3
26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3
26.419 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3
27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]

前述の例をステートレスの例と比較すると、ステートフルリトライでは、失敗のたびに呼び出し元に例外がスローされることがわかります。

再試行の例外分類

Spring Retry には、どの例外が再試行を呼び出すことができるかを決定するための柔軟性があります。デフォルトの構成ではすべての例外が再試行され、例外分類子は最上位の例外を調べます。たとえば、MyException でのみ再試行するように構成し、アプリケーションが原因が MyException である SomeOtherException をスローする場合、再試行は発生しません。

Spring Retry 1.0.3 以降、BinaryExceptionClassifier には traverseCauses というプロパティがあります (デフォルトは false です)。true の場合、一致するものを見つけるか、トラバースする原因がなくなるまで、例外原因をトラバースします。

再試行にこの分類子を使用するには、最大試行回数、Exception オブジェクトの MaptraverseCauses ブール値を取るコンストラクターで作成された SimpleRetryPolicy を使用します。その後、このポリシーを RetryTemplate に注入できます。

ユーザー例外は MessagingException にラップされる可能性があるため、この場合は traverseCauses が必要です。

サーキットブレーカーのアドバイス

サーキットブレーカーパターンの一般的な考え方は、サービスが現在利用できない場合、それを使用しようとして時間(およびリソース)を浪費しないことです。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice はこのパターンを実装しています。サーキットブレーカーが閉じた状態の場合、エンドポイントはサービスの呼び出しを試みます。サーキットブレーカーは、一定回数の連続試行が失敗するとオープン状態になります。オープン状態の場合、新しいリクエストは「高速で失敗」し、時間が経過するまでサービスの呼び出しは試行されません。

その時間が経過すると、サーキットブレーカーは半開状態に設定されます。この状態で、1 回の試行でも失敗すると、ブレーカーはすぐにオープン状態になります。試行が成功すると、ブレーカーは閉じた状態になり、その場合、構成された数の連続した障害が再び発生するまで、再び開いた状態になりません。正常に試行されると、ブレーカーが再びオープン状態になるタイミングを判別するために、状態が障害ゼロにリセットされます。

通常、このアドバイスは外部サービスに使用される可能性があり、失敗するまでに時間がかかる場合があります(ネットワーク接続を試行するタイムアウトなど)。

RequestHandlerCircuitBreakerAdvice には、threshold と halfOpenAfter の 2 つのプロパティがあります。threshold プロパティは、ブレーカーが開くまでに発生する必要のある連続した障害の数を表します。デフォルトは 5 です。halfOpenAfter プロパティは、最後の失敗後、ブレーカーが別のリクエストを試みる前に待機する時間を表します。デフォルトは 1000 ミリ秒です。

次の例では、サーキットブレーカーを構成し、その DEBUG および ERROR 出力を示しています。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

上記の例では、しきい値は 2 に設定され、halfOpenAfter は 12 秒に設定されています。5 秒ごとに新しいリクエストが届きます。最初の 2 回の試行でサービスが呼び出されました。3 番目と 4 番目は、サーキットブレーカーが開いていることを示す例外で失敗しました。5 番目のリクエストは、最後の失敗から 15 秒後にリクエストされたために試行されました。ブレーカーがすぐに開いたため、6 回目の試行はすぐに失敗します。

式評価アドバイス

最終的に提供されるアドバイスクラスは o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice です。このアドバイスは、他の 2 つのアドバイスよりも一般的です。エンドポイントに送信された元の受信メッセージの式を評価するメカニズムを提供します。成功または失敗の後、評価するために個別の式を使用できます。オプションで、入力メッセージとともに評価結果を含むメッセージをメッセージチャネルに送信できます。

このアドバイスの一般的な使用例は、<ftp:outbound-channel-adapter/> を使用することです。おそらく、転送が成功した場合はファイルを 1 つのディレクトリに、失敗した場合は別のディレクトリに移動します。

アドバイスには、成功した場合の式、失敗した場合の式、それぞれに対応するチャネルを設定するプロパティがあります。成功した場合、successChannel に送信されるメッセージは AdviceMessage であり、ペイロードは式の評価の結果です。inputMessage という追加のプロパティには、ハンドラーに送信された元のメッセージが含まれています。failureChannel に送信されるメッセージ(ハンドラーが例外をスローする場合)は、MessageHandlingExpressionEvaluatingAdviceException のペイロードを持つ ErrorMessage です。すべての MessagingException インスタンスと同様に、このペイロードには failedMessage および cause プロパティと、式評価の結果を含む evaluationResult という追加のプロパティがあります。

バージョン 5.1.3 以降、チャネルは構成されているが式が提供されていない場合、メッセージの payload を評価するためにデフォルトの式が使用されます。

アドバイスのスコープ内で例外がスローされると、デフォルトでは、failureExpression が評価された後にその例外が呼び出し元にスローされます。例外のスローを抑制したい場合は、trapException プロパティを true に設定します。次のアドバイスは、Java DSL を使用して advice を構成する方法を示しています。

@SpringBootApplication
public class EerhaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }

    @Bean
    public IntegrationFlow advised() {
        return f -> f.<String>handle((payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

}

レートリミッターのアドバイス

レートリミッターのアドバイス(RateLimiterRequestHandlerAdvice)により、エンドポイントがリクエストでオーバーロードにならないようにすることができます。レート制限に違反すると、リクエストはブロックされた状態になります。

このアドバイスの典型的な使用例は、外部サービスプロバイダーが 1 分あたり n を超える数のリクエストを許可しない場合です。

RateLimiterRequestHandlerAdvice の実装は、Resilience4j [GitHub] (英語) プロジェクトに完全に基づいており、RateLimiter または RateLimiterConfig のいずれかの注入が必要です。デフォルトおよび / またはカスタム名で構成することもできます。

次の例では、1 秒ごとに 1 つのリクエストでレートリミッターのアドバイスを構成しています。

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}

キャッシングのアドバイス

バージョン 5.2 から、CacheRequestHandlerAdvice が導入されました。これは、Spring Framework のキャッシング抽象化に基づいており、@Caching アノテーションファミリーによって提供される概念と機能に沿っています。内部のロジックは CacheAspectSupport 拡張に基づいており、キャッシュ操作のプロキシは、リクエスト Message<?> を引数として AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage メソッドを中心に実行されます。このアドバイスは、キャッシュキーを評価するために SpEL 式または Function を使用して構成できます。リクエスト Message<?> は、SpEL 評価コンテキストのルートオブジェクトとして、または Function 入力引数として使用できます。デフォルトでは、リクエストメッセージの payload がキャッシュキーに使用されます。デフォルトのキャッシュ操作が CacheableOperation の場合、CacheRequestHandlerAdvice は cacheNames で構成するか、任意の CacheOperation のセットで構成する必要があります。すべての CacheOperation は個別に構成することも、CacheManagerCacheResolverCacheErrorHandler などの共有オプションを使用して CacheRequestHandlerAdvice 構成から再利用することもできます。この構成機能は、Spring Framework の @CacheConfig と @Caching アノテーションの組み合わせに似ています。CacheManager が提供されていない場合、単一の Bean がデフォルトで CacheAspectSupport の BeanFactory から解決されます。

次の例では、キャッシュ操作の異なるセットで 2 つのアドバイスを構成します。

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}