このバージョンはまだ開発中であり、まだ安定しているとは見なされていません。最新の安定バージョンについては、Spring Integration 6.5.3 を使用してください! |
提供されたアドバイスクラス
Spring Integration は、AOP アドバイスクラスを適用する一般的なメカニズムを提供することに加えて、これらのすぐに使えるアドバイスの実装を提供します。
RequestHandlerRetryAdvice( 再試行のアドバイスで説明)RequestHandlerCircuitBreakerAdvice( サーキットブレーカーのアドバイスで説明)ExpressionEvaluatingRequestHandlerAdvice( 表現のアドバイスで説明)RateLimiterRequestHandlerAdvice( レートリミッターのアドバイスで説明)CacheRequestHandlerAdvice( キャッシングのアドバイスで説明)ReactiveRequestHandlerAdvice( リアクティブアドバイスで説明)ContextHolderRequestHandlerAdvice( コンテキストホルダーのアドバイスで説明)LockRequestHandlerAdvice( ロックアドバイスで説明)
再試行のアドバイス
The retry advice (o.s.i.handler.advice.RequestHandlerRetryAdvice) leverages the rich retry mechanisms provided by the Retry Support in Spring Framework. The core component of this advice is the RetryTemplate, which allows configuration of sophisticated retry scenarios, including RetryPolicy as well as a RecoveryCallback strategy to determine the action to take when retries are exhausted.
- ステートレス再試行
ステートレス再試行は、再試行アクティビティがアドバイス内で完全に処理される場合です。スレッドは一時停止し(そうするように構成されている場合)、アクションを再試行します。
- ステートフルリトライ
Stateful retry is the case where the retry state is managed within the advice but where an exception is thrown and the caller resubmits the request. An example for stateful retry is when we want the message originator, (for example, JMS) to be responsible for resubmitting, rather than performing it on the current thread. Stateful retry needs some mechanism to detect a retried submission. For this purpose, the
RequestHandlerRetryAdviceexpose thestateKeyFunction、newMessagePredicate、stateCacheSizeproperties. Where later two of them make sense only if the first is provided. Essentially, thestateKeyFunctionis an indicator to switchRequestHandlerRetryAdvicelogic from stateless to stateful. The meaning of thenewMessagePredicateis two refresh an existing retry state for the key based on the message to handle. ThestateCacheSizeis100by default, where elder entries are removed from the cache when more new retry states are coming. Perhaps those old messages are not redelivered anymore from the upstream flow, e.g., the message broker dead-lettered those messages according to its redelivery policy.
| デフォルトのバックオフ動作は、バックオフしないことです。再試行はすぐに試行されます。試行間でスレッドを一時停止させるバックオフポリシーを使用すると、過剰なメモリ使用やスレッド不足などのパフォーマンスの問題が発生する可能性があります。大量の環境では、バックオフポリシーを注意して使用する必要があります。 |
再試行アドバイスの構成
このセクションの例では、常に例外をスローする次の @ServiceActivator を使用しています。
public class FailingService {
@ServiceActivator(inputChannel = "input", adviceChain = "retryAdvice")
public void service(String message) {
throw new RuntimeException("error");
}
}- 単純なステートレス再試行
The default
RetryPolicyis to try three times, plus original call for the targetMessageHandler. There is no backoff by default, so the three attempts are made back-to-back-to-back with no delay between attempts. There is noRecoveryCallback, so the result is to throw the exception to the caller after the final failed retry occurs. In a Spring Integration environment, this final exception might be handled by using anerror-channelon the inbound endpoint. The following example uses default configuration for theRequestHandlerRetryAdvice:@Bean RequestHandlerRetryAdvice retryAdvice() { return new RequestHandlerRetryAdvice(); }- 回復を伴う単純なステートレス再試行
次の例では、前の例に
RecoveryCallbackを追加し、ErrorMessageSendingRecovererを使用してErrorMessageをチャネルに送信します。@Bean RequestHandlerRetryAdvice retryAdvice(MessageChannel recoveryChannel) { RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice(); requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel)); return requestHandlerRetryAdvice; }- カスタマイズされたポリシーを使用したステートレス再試行とリカバリ
For more sophistication, a customized
RetryPolicycan be provided for theRequestHandlerRetryAdvice. This example continues to use the simpleRetryPolicybut increases the attempts to four. It also adds anExponentialBackoffwhere the first retry waits one second, the second waits five seconds, and the third waits 25 (for four attempts in all). The following listing shows the example of such a configuration:@Bean RequestHandlerRetryAdvice retryAdvice() { RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice(); requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel())); RetryPolicy retryPolicy = RetryPolicy.builder() .maxAttempts(4) .delay(Duration.ofSeconds(1)) .multiplier(5.0) .maxDelay(Duration.ofMinutes(1)) .build(); requestHandlerRetryAdvice.setRetryPolicy(retryPolicy); return requestHandlerRetryAdvice; }- ステートレス再試行のネームスペースサポート
The following examples demonstrate how the
RequestHandlerRetryAdvicecan be configured using Spring Integration XML namespace with its custom tags:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>上記の例では、アドバイスはトップレベルの Bean として定義されているため、複数の
request-handler-advice-chainインスタンスで使用できます。次の例に示すように、チェーン内でアドバイスを直接定義することもできます。<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator><handler-retry-advice>は、<fixed-back-off>または<exponential-back-off>の子要素を持つことも、子要素を持たないこともできます。子要素のない<handler-retry-advice>は、バックオフを使用しません。recovery-channelがない場合、再試行が使い果たされると例外がスローされます。名前空間は、ステートレス再試行でのみ使用できます。より複雑な環境 (カスタムポリシーなど) の場合は、通常の
<bean>定義を使用します。- 回復を伴う単純なステートフル再試行
To make retry stateful, a
Function<Message<?>, Object> stateKeyFunctionmust be provided for theRequestHandlerRetryAdviceinstance. This function is used to identify a message as being a resubmission so that theRequestHandlerRetryAdvicecan determine the current state of retry for this message. The idea behind stateful retry is to not block the current thread, but rather cache the retry state for this message and re-throwMessageHandlerfailure back to the caller. Usually this works well with message originators which are able to resubmit (or redeliver) events, for example, message brokers like RabbitMQ withnack, or Apache Kafka with seek functionality; or JMS after rollback on the consumption. If there is no cached state yet (or thePredicate<Message<?>> newMessagePredicatereturnstruefor the message), theMessageHandlercall is treated as the first one, and on its failure an internalRetryStatebased on aBackOffExecution, is cached under the mentioned key. On the next message arrival, the cached state provides a backoff interval forThread.sleep()before an attempt to call theMessageHandler. If this backoff interval is equal toBackOffExecution.STOP(e.g.,maxAttemptshave been reached), that mean no more retries for this message: the whole retry cycle is treated as exhausted, and respectiveRetryExceptionis thrown back to the caller or used forRecoveryCallbackinvocation if provided. In general, the exception handling logic and backoff execution are similar to the stateless behavior, with only difference that thread is not blocked for all themaxAttempts. It is up to the message originator to redeliver a message for the next retry call.
サーキットブレーカーのアドバイス
サーキットブレーカーパターンの一般的な考え方は、サービスが現在利用できない場合、それを使用しようとして時間(およびリソース)を浪費しないことです。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice はこのパターンを実装しています。サーキットブレーカーが閉じた状態の場合、エンドポイントはサービスの呼び出しを試みます。サーキットブレーカーは、一定回数の連続試行が失敗するとオープン状態になります。オープン状態の場合、新しいリクエストは「高速で失敗」し、時間が経過するまでサービスの呼び出しは試行されません。
When that time has expired, the circuit breaker is set to the half-open state. When in this state, if even a single attempt fails, the breaker immediately goes to the open state. If the attempt succeeds, the breaker goes to the closed state, in which case it does not go to the open state again until the configured number of consecutive failures again occurs. Any successful attempt resets the state to zero failures for the purpose of determining when the breaker might go to the open state again.
Typically, this advice might be used for external services, where it might take some time to fail, such as a timeout attempting to make a network connection.
RequestHandlerCircuitBreakerAdvice には、threshold と halfOpenAfter の 2 つのプロパティがあります。threshold プロパティは、ブレーカーが開くまでに発生する必要のある連続した障害の数を表します。デフォルトは 5 です。halfOpenAfter プロパティは、最後の失敗後、ブレーカーが別のリクエストを試みる前に待機する時間を表します。デフォルトは 1000 ミリ秒です。
次の例では、サーキットブレーカーを構成し、その DEBUG および ERROR 出力を示しています。
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator 上記の例では、しきい値は 2 に設定され、halfOpenAfter は 12 秒に設定されています。5 秒ごとに新しいリクエストが届きます。最初の 2 回の試行でサービスが呼び出されました。3 番目と 4 番目は、サーキットブレーカーが開いていることを示す例外で失敗しました。5 番目のリクエストは、最後の失敗から 15 秒後にリクエストされたために試行されました。ブレーカーがすぐに開いたため、6 回目の試行はすぐに失敗します。
式評価アドバイス
最終的に提供されるアドバイスクラスは o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice です。このアドバイスは、他の 2 つのアドバイスよりも一般的です。エンドポイントに送信された元の受信メッセージの式を評価するメカニズムを提供します。成功または失敗の後、評価するために個別の式を使用できます。オプションで、入力メッセージとともに評価結果を含むメッセージをメッセージチャネルに送信できます。
このアドバイスの一般的な使用例は、<ftp:outbound-channel-adapter/> を使用することです。おそらく、転送が成功した場合はファイルを 1 つのディレクトリに、失敗した場合は別のディレクトリに移動します。
アドバイスには、成功した場合の式、失敗した場合の式、それぞれに対応するチャネルを設定するプロパティがあります。成功した場合、successChannel に送信されるメッセージは AdviceMessage であり、ペイロードは式の評価の結果です。inputMessage という追加のプロパティには、ハンドラーに送信された元のメッセージが含まれています。failureChannel に送信されるメッセージ(ハンドラーが例外をスローする場合)は、MessageHandlingExpressionEvaluatingAdviceException のペイロードを持つ ErrorMessage です。すべての MessagingException インスタンスと同様に、このペイロードには failedMessage および cause プロパティと、式評価の結果を含む evaluationResult という追加のプロパティがあります。
バージョン 5.1.3 以降、チャネルは構成されているが式が提供されていない場合、メッセージの payload を評価するためにデフォルトの式が使用されます。 |
アドバイスのスコープ内で例外がスローされると、デフォルトでは、failureExpression が評価された後にその例外が呼び出し元にスローされます。例外のスローを抑制したい場合は、trapException プロパティを true に設定します。次のアドバイスは、Java DSL を使用して advice を構成する方法を示しています。
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.<String>handle((payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}レートリミッターのアドバイス
The Rate Limiter advice (RateLimiterRequestHandlerAdvice) allows to ensure that an endpoint does not get overloaded with requests. When the rate limit is breached, the request will go in a blocked state.
このアドバイスの典型的な使用例は、外部サービスプロバイダーが 1 分あたり n を超える数のリクエストを許可しない場合です。
RateLimiterRequestHandlerAdvice の実装は、Resilience4j [GitHub] (英語) プロジェクトに完全に基づいており、RateLimiter または RateLimiterConfig のいずれかの注入が必要です。デフォルトおよび / またはカスタム名で構成することもできます。
次の例では、1 秒ごとに 1 つのリクエストでレートリミッターのアドバイスを構成しています。
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}キャッシングのアドバイス
バージョン 5.2 から、CacheRequestHandlerAdvice が導入されました。これは、Spring Framework のキャッシング抽象化に基づいており、@Caching アノテーションファミリーによって提供される概念と機能に沿っています。内部のロジックは CacheAspectSupport 拡張に基づいており、キャッシュ操作のプロキシは、リクエスト Message<?> を引数として AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage メソッドを中心に実行されます。このアドバイスは、キャッシュキーを評価するために SpEL 式または Function を使用して構成できます。リクエスト Message<?> は、SpEL 評価コンテキストのルートオブジェクトとして、または Function 入力引数として使用できます。デフォルトでは、リクエストメッセージの payload がキャッシュキーに使用されます。デフォルトのキャッシュ操作が CacheableOperation の場合、CacheRequestHandlerAdvice は cacheNames で構成するか、任意の CacheOperation のセットで構成する必要があります。すべての CacheOperation は個別に構成することも、CacheManager、CacheResolver、CacheErrorHandler などの共有オプションを使用して CacheRequestHandlerAdvice 構成から再利用することもできます。この構成機能は、Spring Framework の @CacheConfig と @Caching アノテーションの組み合わせに似ています。CacheManager が提供されていない場合、単一の Bean がデフォルトで CacheAspectSupport の BeanFactory から解決されます。
The following example configures two advice with different sets of caching operations:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}