エンドポイントへの動作の追加
Spring Integration 2.2 以前は、ポーラーの <advice-chain/>
要素に AOP アドバイスを追加することで、統合フロー全体に動作を追加できました。しかし、ダウンストリームエンドポイントではなく、REST Web サービスの呼び出しだけを再試行する場合を考えます。
例: 次のフローを検討します。
inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter
ポーラーのアドバイスチェーンに一部の再試行ロジックを構成し、ネットワークグリッチのために http-gateway2
への呼び出しが失敗した場合、再試行により http-gateway1
と http-gateway2
の両方が再度呼び出されます。同様に、jdbc-outbound-adapter で一時的な障害が発生すると、両方の HTTP ゲートウェイがもう一度呼び出されてから、再び jdbc-outbound-adapter
が呼び出されます。
Spring Integration 2.2 は、個々のエンドポイントに動作を追加する機能を追加します。これは、多くのエンドポイントに <request-handler-advice-chain/>
要素を追加することで実現されます。次の例は、outbound-gateway
内の <request-handler-advice-chain/>
要素の使用方法を示しています。
<int-http:outbound-gateway id="withAdvice"
url-expression="'http://localhost/test1'"
request-channel="requests"
reply-channel="nextChannel">
<int-http:request-handler-advice-chain>
<ref bean="myRetryAdvice" />
</int-http:request-handler-advice-chain>
</int-http:outbound-gateway>
この場合、myRetryAdvice
はこのゲートウェイにローカルにのみ適用され、応答が nextChannel
に送信された後にダウンストリームで実行されるその他のアクションには適用されません。アドバイスの範囲はエンドポイント自体に限定されます。
現時点では、 ただし、 |
提供されたアドバイスクラス
Spring Integration は、AOP アドバイスクラスを適用する一般的なメカニズムを提供することに加えて、これらのすぐに使えるアドバイスの実装を提供します。
RequestHandlerRetryAdvice
( 再試行のアドバイスで説明)RequestHandlerCircuitBreakerAdvice
( サーキットブレーカーのアドバイスで説明)ExpressionEvaluatingRequestHandlerAdvice
( 式評価アドバイスで説明)RateLimiterRequestHandlerAdvice
( レートリミッターのアドバイスで説明)CacheRequestHandlerAdvice
( キャッシングのアドバイスで説明)ReactiveRequestHandlerAdvice
( リアクティブアドバイスで説明)
再試行のアドバイス
再試行のアドバイス(o.s.i.handler.advice.RequestHandlerRetryAdvice
)は、Spring Retry [GitHub] (英語) プロジェクトによって提供される豊富な再試行メカニズムを活用します。spring-retry
のコアコンポーネントは RetryTemplate
です。これにより、RetryPolicy
および BackoffPolicy
戦略(多数の実装を含む)、および再試行が尽きたときに実行するアクションを決定する RecoveryCallback
戦略を含む、洗練された再試行シナリオを構成できます。
- ステートレス再試行
ステートレス再試行は、再試行アクティビティがアドバイス内で完全に処理される場合です。スレッドは一時停止し(そうするように構成されている場合)、アクションを再試行します。
- ステートフルリトライ
ステートフル再試行は、再試行状態がアドバイス内で管理されているが、例外がスローされ、呼び出し元がリクエストを再送信する場合です。ステートフル再試行の例は、現在のスレッドで実行するのではなく、メッセージ発信者(たとえば、JMS)に再送信を行わせたい場合です。ステートフル再試行には、再試行された送信を検出するための何らかのメカニズムが必要です。
spring-retry
の詳細については、プロジェクトの Javadoc と、spring-retry
の作成元である Spring Batch のリファレンスドキュメントを参照してください。
デフォルトのバックオフ動作は、バックオフしないことです。再試行はすぐに試行されます。試行間でスレッドを一時停止させるバックオフポリシーを使用すると、過剰なメモリ使用やスレッド不足などのパフォーマンスの問題が発生する可能性があります。大量の環境では、バックオフポリシーを注意して使用する必要があります。 |
再試行アドバイスの構成
このセクションの例では、常に例外をスローする次の <service-activator>
を使用しています。
public class FailingService {
public void service(String message) {
throw new RuntimeException("error");
}
}
- 単純なステートレス再試行
デフォルトの
RetryTemplate
には、3 回試行するSimpleRetryPolicy
があります。BackOffPolicy
はないため、3 回の試行は連続して行われ、試行間の遅延はありません。RecoveryCallback
はないため、最終的な失敗した再試行が発生した後、呼び出し元に例外をスローします。Spring Integration 環境では、この最終的な例外は、受信エンドポイントでerror-channel
を使用して処理される場合があります。次の例では、RetryTemplate
を使用し、そのDEBUG
出力を示しています。<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- 回復を伴う単純なステートレス再試行
次の例では、前の例に
RecoveryCallback
を追加し、ErrorMessageSendingRecoverer
を使用してErrorMessage
をチャネルに送信します。<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- カスタマイズされたポリシーを使用したステートレス再試行とリカバリ
より洗練されたものにするために、カスタマイズされた
RetryTemplate
でアドバイスを提供できます。この例では、SimpleRetryPolicy
を引き続き使用していますが、試行回数を 4 回に増やしています。また、最初の再試行が 1 秒間待機し、2 番目が 5 秒間待機し、3 番目が 25 待機するExponentialBackoffPolicy
を追加します(合計 4 回の試行)。次のリストは、例とそのDEBUG
出力を示しています。<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- ステートレス再試行のネームスペースサポート
バージョン 4.0 からは、次の例に示すように、再試行のアドバイスに対する名前空間のサポートのおかげで、前述の構成を大幅に簡素化できます。
<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>
上記の例では、アドバイスはトップレベルの Bean として定義されているため、複数の
request-handler-advice-chain
インスタンスで使用できます。次の例に示すように、チェーン内でアドバイスを直接定義することもできます。<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>
<handler-retry-advice>
は、<fixed-back-off>
または<exponential-back-off>
の子要素を持つことも、子要素を持たないこともできます。子要素のない<handler-retry-advice>
は、バックオフを使用しません。recovery-channel
がない場合、再試行が使い果たされると例外がスローされます。名前空間は、ステートレス再試行でのみ使用できます。より複雑な環境(カスタムポリシーなど)では、通常の
<bean>
定義を使用します。- 回復を伴う単純なステートフル再試行
再試行をステートフルにするには、
RetryStateGenerator
実装でアドバイスを提供する必要があります。このクラスは、RetryTemplate
がこのメッセージの再試行の現在の状態を判断できるように、メッセージを再送信として識別するために使用されます。フレームワークは、SpelExpressionRetryStateGenerator
を提供します。SpelExpressionRetryStateGenerator
は、SpEL 式を使用してメッセージ識別子を決定します。この例でもデフォルトのポリシー(バックオフなしの 3 回の試行)を使用しています。ステートレス再試行と同様に、これらのポリシーはカスタマイズできます。次のリストは、例とそのDEBUG
出力を示しています。<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]
前述の例をステートレスの例と比較すると、ステートフルリトライでは、失敗のたびに呼び出し元に例外がスローされることがわかります。
- 再試行の例外分類
Spring Retry には、どの例外が再試行を呼び出すことができるかを決定するための柔軟性があります。デフォルトの構成ではすべての例外が再試行され、例外分類子は最上位の例外を調べます。たとえば、
MyException
でのみ再試行するように構成し、アプリケーションが原因がMyException
であるSomeOtherException
をスローする場合、再試行は発生しません。Spring Retry 1.0.3 以降、
BinaryExceptionClassifier
にはtraverseCauses
(デフォルトはfalse
)と呼ばれるプロパティがあります。true
の場合、一致が見つかるか、トラバースする原因がなくなるまで、例外の原因をトラバースします。再試行にこの分類子を使用するには、最大試行回数、
Exception
オブジェクトのMap
、traverseCauses
ブール値を取るコンストラクターで作成されたSimpleRetryPolicy
を使用します。その後、このポリシーをRetryTemplate
に注入できます。
ユーザー例外は MessagingException にラップされる可能性があるため、この場合は traverseCauses が必要です。 |
サーキットブレーカーのアドバイス
サーキットブレーカーパターンの一般的な考え方は、サービスが現在利用できない場合、それを使用しようとして時間(およびリソース)を浪費しないことです。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice
はこのパターンを実装しています。サーキットブレーカーが閉じた状態の場合、エンドポイントはサービスの呼び出しを試みます。サーキットブレーカーは、一定回数の連続試行が失敗するとオープン状態になります。オープン状態の場合、新しいリクエストは「高速で失敗」し、時間が経過するまでサービスの呼び出しは試行されません。
その時間が経過すると、サーキットブレーカーは半開状態に設定されます。この状態で、1 回の試行でも失敗すると、ブレーカーはすぐにオープン状態になります。試行が成功すると、ブレーカーは閉じた状態になり、その場合、構成された数の連続した障害が再び発生するまで、再び開いた状態になりません。正常に試行されると、ブレーカーが再びオープン状態になるタイミングを判別するために、状態が障害ゼロにリセットされます。
通常、このアドバイスは外部サービスに使用される可能性があり、失敗するまでに時間がかかる場合があります(ネットワーク接続を試行するタイムアウトなど)。
RequestHandlerCircuitBreakerAdvice
には、threshold
と halfOpenAfter
の 2 つのプロパティがあります。threshold
プロパティは、ブレーカーが開くまでに発生する必要のある連続した障害の数を表します。デフォルトは 5
です。halfOpenAfter
プロパティは、最後の失敗後、ブレーカーが別のリクエストを試みる前に待機する時間を表します。デフォルトは 1000 ミリ秒です。
次の例では、サーキットブレーカーを構成し、その DEBUG
および ERROR
出力を示しています。
<int:service-activator input-channel="input" ref="failer" method="service">
<int:request-handler-advice-chain>
<bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
</int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
上記の例では、しきい値は 2
に設定され、halfOpenAfter
は 12
秒に設定されています。5 秒ごとに新しいリクエストが届きます。最初の 2 回の試行でサービスが呼び出されました。3 番目と 4 番目は、サーキットブレーカーが開いていることを示す例外で失敗しました。5 番目のリクエストは、最後の失敗から 15 秒後にリクエストされたために試行されました。ブレーカーがすぐに開いたため、6 回目の試行はすぐに失敗します。
式評価アドバイス
最終的に提供されるアドバイスクラスは o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice
です。このアドバイスは、他の 2 つのアドバイスよりも一般的です。エンドポイントに送信された元の受信メッセージの式を評価するメカニズムを提供します。成功または失敗の後、評価するために個別の式を使用できます。オプションで、入力メッセージとともに評価結果を含むメッセージをメッセージチャネルに送信できます。
このアドバイスの一般的な使用例は、<ftp:outbound-channel-adapter/>
を使用することです。おそらく、転送が成功した場合はファイルを 1 つのディレクトリに、失敗した場合は別のディレクトリに移動します。
アドバイスには、成功した場合の式、失敗した場合の式、それぞれに対応するチャネルを設定するプロパティがあります。成功した場合、successChannel
に送信されるメッセージは AdviceMessage
であり、ペイロードは式の評価の結果です。inputMessage
という追加のプロパティには、ハンドラーに送信された元のメッセージが含まれています。failureChannel
に送信されるメッセージ(ハンドラーが例外をスローする場合)は、MessageHandlingExpressionEvaluatingAdviceException
のペイロードを持つ ErrorMessage
です。すべての MessagingException
インスタンスと同様に、このペイロードには failedMessage
および cause
プロパティと、式評価の結果を含む evaluationResult
という追加のプロパティがあります。
バージョン 5.1.3 以降、チャネルは構成されているが式が提供されていない場合、メッセージの payload を評価するためにデフォルトの式が使用されます。 |
アドバイスのスコープ内で例外がスローされると、デフォルトでは、failureExpression
が評価された後にその例外が呼び出し元にスローされます。例外のスローを抑制したい場合は、trapException
プロパティを true
に設定します。次のアドバイスは、Java DSL でアドバイスを設定する方法を示しています。
@SpringBootApplication
public class EerhaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
MessageChannel in = context.getBean("advised.input", MessageChannel.class);
in.send(new GenericMessage<>("good"));
in.send(new GenericMessage<>("bad"));
context.close();
}
@Bean
public IntegrationFlow advised() {
return f -> f.handle((GenericHandler<String>) (payload, headers) -> {
if (payload.equals("good")) {
return null;
}
else {
throw new RuntimeException("some failure");
}
}, c -> c.advice(expressionAdvice()));
}
@Bean
public Advice expressionAdvice() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setSuccessChannelName("success.input");
advice.setOnSuccessExpressionString("payload + ' was successful'");
advice.setFailureChannelName("failure.input");
advice.setOnFailureExpressionString(
"payload + ' was bad, with reason: ' + #exception.cause.message");
advice.setTrapException(true);
return advice;
}
@Bean
public IntegrationFlow success() {
return f -> f.handle(System.out::println);
}
@Bean
public IntegrationFlow failure() {
return f -> f.handle(System.out::println);
}
}
レートリミッターのアドバイス
レートリミッターのアドバイス(RateLimiterRequestHandlerAdvice
)により、エンドポイントがリクエストでオーバーロードにならないようにすることができます。レート制限に違反すると、リクエストはブロックされた状態になります。
このアドバイスの典型的な使用例は、外部サービスプロバイダーが 1 分あたり n
を超える数のリクエストを許可しない場合です。
RateLimiterRequestHandlerAdvice
の実装は、Resilience4j [GitHub] (英語) プロジェクトに完全に基づいており、RateLimiter
または RateLimiterConfig
のいずれかの注入が必要です。デフォルトおよび / またはカスタム名で構成することもできます。
次の例では、1 秒ごとに 1 つのリクエストでレートリミッターのアドバイスを構成しています。
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
...
}
キャッシングのアドバイス
バージョン 5.2 から、CacheRequestHandlerAdvice
が導入されました。これは、Spring Framework のキャッシング抽象化に基づいており、@Caching
アノテーションファミリーによって提供される概念と機能に沿っています。内部のロジックは CacheAspectSupport
拡張に基づいており、キャッシュ操作のプロキシは、リクエスト Message<?>
を引数として AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage
メソッドを中心に実行されます。このアドバイスは、キャッシュキーを評価するために SpEL 式または Function
を使用して構成できます。リクエスト Message<?>
は、SpEL 評価コンテキストのルートオブジェクトとして、または Function
入力引数として使用できます。デフォルトでは、リクエストメッセージの payload
がキャッシュキーに使用されます。デフォルトのキャッシュ操作が CacheableOperation
の場合、CacheRequestHandlerAdvice
は cacheNames
で構成するか、任意の CacheOperation
のセットで構成する必要があります。すべての CacheOperation
は個別に構成することも、CacheManager
、CacheResolver
、CacheErrorHandler
などの共有オプションを使用して CacheRequestHandlerAdvice
構成から再利用することもできます。この構成機能は、Spring Framework の @CacheConfig
と @Caching
アノテーションの組み合わせに似ています。CacheManager
が提供されていない場合、単一の Bean がデフォルトで CacheAspectSupport
の BeanFactory
から解決されます。
次の例では、キャッシュ操作の異なるセットで 2 つのアドバイスを構成します。
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
cacheRequestHandlerAdvice.setKeyExpressionString("payload");
CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
cachePutBuilder.setCacheName(TEST_PUT_CACHE);
CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
cacheEvictBuilder.setCacheName(TEST_CACHE);
cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
...
}
リアクティブアドバイス
バージョン 5.3 以降、ReactiveRequestHandlerAdvice
は、Mono
レスポンスを生成するリクエストメッセージハンドラーに使用できます。このアドバイスには BiFunction<Message<?>, Mono<?>, Publisher<?>>
を提供する必要があり、インターセプトされた handleRequestMessage()
メソッドの実装によって生成されたレスポンスで Mono.transform()
オペレーターから呼び出されます。通常、このような Mono
のカスタマイズは、timeout()
、retry()
、同様のサポートオペレーターを介してネットワークの変動を制御する場合に必要です。たとえば、WebFlux クライアントを介して HTTP リクエストを実行できる場合、以下の構成を使用して、5 秒を超えてレスポンスを待たないようにすることができます。
.handle(WebFlux.outboundGateway("https://somehost/"),
e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));
message
引数はメッセージハンドラーのリクエストメッセージであり、リクエストスコープの属性を決定するために使用できます。mono
引数は、このメッセージハンドラーの handleRequestMessage()
メソッド実装の結果です。この関数からネストされた Mono.transform()
を呼び出して、たとえばリアクティブサーキットブレーカーを適用することもできます。
カスタムアドバイスクラス
前述のアドバイスクラスに加えて、独自のアドバイスクラスを実装できます。org.aopalliance.aop.Advice
(通常は org.aopalliance.intercept.MethodInterceptor
)の実装を提供できますが、一般的に o.s.i.handler.advice.AbstractRequestHandlerAdvice
をサブクラス化することをお勧めします。これには、低レベルのアスペクト指向プログラミングコードの記述を避け、この環境での使用に合わせて特別に調整された開始点を提供するという利点があります。
サブクラスは doInvoke()
メソッドを実装する必要があり、その定義は次のとおりです。
/**
* Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
* invokes the handler method and returns its result, or null).
* @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
* @param target The target handler.
* @param message The message that will be sent to the handler.
* @return the result after invoking the {@link MessageHandler}.
* @throws Exception
*/
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;
コールバックパラメーターは、AOP を直接扱うサブクラスを回避するのに便利です。callback.execute()
メソッドを呼び出すと、メッセージハンドラーが呼び出されます。
target
パラメーターは、特定のハンドラーの状態を維持する必要があるサブクラスに提供されます。おそらく、ターゲットによってキー設定された Map
でその状態を維持することによってです。この機能により、同じアドバイスを複数のハンドラーに適用できます。RequestHandlerCircuitBreakerAdvice
は、アドバイスを使用して、各ハンドラーのサーキットブレーカーの状態を維持します。
message
パラメーターは、ハンドラーに送信されるメッセージです。アドバイスは、ハンドラーを呼び出す前にメッセージを変更できませんが、ペイロードを変更できます(可変プロパティがある場合)。通常、アドバイスでは、メッセージをロギングに使用したり、ハンドラーの呼び出しの前後にメッセージのコピーを送信したりします。
通常、戻り値は callback.execute()
によって返される値です。ただし、アドバイスには戻り値を変更する機能があります。AbstractReplyProducingMessageHandler
インスタンスのみが値を返すことに注意してください。次の例は、AbstractRequestHandlerAdvice
を継承するカスタムアドバイスクラスを示しています。
public class MyAdvice extends AbstractRequestHandlerAdvice {
@Override
protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
// add code before the invocation
Object result = callback.execute();
// add code after the invocation
return result;
}
}
詳細については、ReflectiveMethodInvocation (Javadoc) Javadoc を参照してください。 |
メッセージアドバイスの処理
このセクションの導入部分で説明したように、リクエストハンドラーアドバイスチェーン内のアドバイスオブジェクトは、ダウンストリームフロー (存在する場合) ではなく、現在のエンドポイントにのみ適用されます。応答を生成する MessageHandler
オブジェクト ( AbstractReplyProducingMessageHandler
を継承するオブジェクトなど) の場合、アドバイスは内部メソッド handleRequestMessage()
( MessageHandler.handleMessage()
から呼び出される) に適用されます。他のメッセージハンドラーの場合、アドバイスは MessageHandler.handleMessage()
に適用されます。
メッセージハンドラーが AbstractReplyProducingMessageHandler
であっても、handleMessage
メソッドにアドバイスを適用する必要がある状況がいくつかあります。例: べき等レシーバーは null
を返す場合があり、ハンドラーの replyRequired
プロパティが true
に設定されている場合、例外が発生します。別の例は BoundRabbitChannelAdvice
です - 厳密なメッセージ順序を参照してください。
バージョン 4.3.1 から、新しい HandleMessageAdvice
インターフェースとその基本実装(AbstractHandleMessageAdvice
)が導入されました。HandleMessageAdvice
を実装する Advice
オブジェクトは、ハンドラー型に関係なく、常に handleMessage()
メソッドに適用されます。
HandleMessageAdvice
実装(べき等レシーバーなど)は、レスポンスを返すハンドラーに適用されると、adviceChain
から分離され、MessageHandler.handleMessage()
メソッドに適切に適用されることを理解することが重要です。
この関連付けが解除されているため、アドバイスチェーンの順序は尊重されません。 |
次の構成を検討してください。
<some-reply-producing-endpoint ... >
<int:request-handler-advice-chain>
<tx:advice ... />
<ref bean="myHandleMessageAdvice" />
</int:request-handler-advice-chain>
</some-reply-producing-endpoint>
上記の例では、<tx:advice>
は AbstractReplyProducingMessageHandler.handleRequestMessage()
に適用されます。ただし、myHandleMessageAdvice
は MessageHandler.handleMessage()
に適用されます。<tx:advice>
の前に呼び出されます。順序を保持するには、標準の Spring AOP 構成アプローチに従って、エンドポイント id
を .handler
サフィックスとともに使用して、ターゲット MessageHandler
Bean を取得する必要があります。その場合、ダウンストリームフロー全体がトランザクションスコープ内にあることに注意してください。
レスポンスを返さない MessageHandler
の場合、アドバイスチェーンの順序は保持されます。
バージョン 5.3 以降、HandleMessageAdviceAdapter
は、MessageHandler.handleMessage()
に既存の MethodInterceptor
を適用できるようにするため、サブフロー全体に適用されます。たとえば、RetryOperationsInterceptor
は、一部のエンドポイントから開始するサブフロー全体に適用できます。これは、コンシューマーエンドポイントが AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()
に対してのみアドバイスを適用するため、デフォルトでは不可能です。バージョン 5.3 以降、HandleMessageAdviceAdapter
は、MessageHandler.handleMessage()
メソッドに MethodInterceptor
を適用するために提供されているため、サブフロー全体に適用されます。例: RetryOperationsInterceptor
は、いくつかのエンドポイントから始まるサブフロー全体に適用できます。コンシューマーエンドポイントは AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage()
にのみアドバイスを適用するため、これはデフォルトでは不可能です。
トランザクションサポート
バージョン 5.0 から、HandleMessageAdvice
実装のおかげで、ダウンストリームフロー全体をトランザクション化するために、新しい TransactionHandleMessageAdvice
が導入されました。<request-handler-advice-chain>
要素で通常の TransactionInterceptor
が使用される場合(たとえば、<tx:advice>
の構成を通じて)、開始されたトランザクションは内部 AbstractReplyProducingMessageHandler.handleRequestMessage()
にのみ適用され、ダウンストリームフローに伝搬されません。
XML 構成を簡素化するために、<request-handler-advice-chain>
とともに、<transactional>
要素がすべての <outbound-gateway>
および <service-activator>
および関連コンポーネントに追加されました。次の例は、使用中の <transactional>
を示しています。
<int-rmi:outbound-gateway remote-channel="foo" host="localhost"
request-channel="good" reply-channel="reply" port="#{@port}">
<int-rmi:transactional/>
</int-rmi:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>
JPA 統合コンポーネントに精通している場合、このような構成は新しいものではありませんが、<poller>
または JMS などのメッセージ駆動型チャネルアダプターだけでなく、フローの任意のポイントからトランザクションを開始できます。
次の例に示すように、Java 構成は TransactionInterceptorBuilder
を使用することで簡素化でき、結果の Bean 名をメッセージングアノテーション adviceChain
属性で使用できます。
@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}
TransactionInterceptorBuilder
コンストラクターの true
パラメーターに注意してください。通常の TransactionInterceptor
ではなく、TransactionHandleMessageAdvice
が作成されます。
Java DSL は、次の例に示すように、エンドポイント構成の .transactional()
オプションを介して Advice
をサポートします。
@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}
アドバイスフィルター
Filter
アドバイスを助言する際には、追加の考慮事項があります。デフォルトでは、破棄アクション(フィルターが false
を返す場合)は、アドバイスチェーンのスコープ内で実行されます。これには、廃棄チャネルの下流のすべてのフローが含まれる場合があります。たとえば、破棄チャネルの下流の要素が例外をスローし、再試行のアドバイスがある場合、プロセスは再試行されます。また、throwExceptionOnRejection
が true
に設定されている場合(アドバイスの範囲内で例外がスローされます)。
discard-within-advice
を false
に設定すると、この動作が変更され、破棄(または例外)が発生します。after the アドバイスチェーンが呼び出されます。
アノテーションを使用したエンドポイントへのアドバイス
アノテーション(@Filter
、@ServiceActivator
、@Splitter
、@Transformer
)を使用して特定のエンドポイントを構成する場合、adviceChain
属性でアドバイスチェーンの Bean 名を指定できます。さらに、@Filter
アノテーションには discardWithinAdvice
属性もあります。これは、アドバイスフィルターに従って、破棄動作を構成するために使用できます。次の例では、廃棄が after the アドバイスで実行されます。
@MessageEndpoint
public class MyAdvisedFilter {
@Filter(inputChannel="input", outputChannel="output",
adviceChain="adviceChain", discardWithinAdvice="false")
public boolean filter(String s) {
return s.contains("good");
}
}
アドバイスチェーン内のアドバイスのオーダー
アドバイスクラスは「アラウンド」アドバイスであり、ネストされた方法で適用されます。最初のアドバイスが最も外側で、最後のアドバイスが最も内側(つまり、アドバイスされているハンドラーに最も近い)です。アドバイスクラスを正しい順序で配置して、必要な機能を実現することが重要です。
例: 再試行のアドバイスとトランザクションのアドバイスを追加するとします。再試行アドバイスアドバイスを最初に配置し、その後にトランザクションアドバイスを配置することができます。各再試行は新しいトランザクションで実行されます。一方、すべての試行と回復操作(再試行 RecoveryCallback
内)をトランザクション内でスコープする場合、トランザクションアドバイスを最初に置くことができます。
推奨されるハンドラープロパティ
アドバイス内からハンドラープロパティにアクセスすると便利な場合があります。例: ほとんどのハンドラーは NamedComponent
を実装して、コンポーネント名にアクセスできるようにします。
ターゲットオブジェクトには、target
引数(AbstractRequestHandlerAdvice
をサブクラス化する場合)または invocation.getThis()
(org.aopalliance.intercept.MethodInterceptor
を実装する場合)を介してアクセスできます。
ハンドラー全体がアドバイスされる場合(ハンドラーが応答を生成しない場合やアドバイスが HandleMessageAdvice
を実装する場合など)、次の例に示すように、ターゲットオブジェクトを NamedComponent
などのインターフェースにキャストできます。
String componentName = ((NamedComponent) target).getComponentName();
MethodInterceptor
を直接実装する場合、次のようにターゲットオブジェクトをキャストできます。
String componentName = ((NamedComponent) invocation.getThis()).getComponentName();
handleRequestMessage()
メソッドのみが推奨される場合(応答生成ハンドラーで)、ハンドラー全体(AbstractReplyProducingMessageHandler
)にアクセスする必要があります。次の例は、その方法を示しています。
AbstractReplyProducingMessageHandler handler =
((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();
String componentName = handler.getComponentName();
べき等レシーバーエンタープライズ統合パターン
バージョン 4.1 から、Spring Integration はべき等レシーバー (英語) エンタープライズ統合パターンの実装を提供します。これは関数パターンであり、べき等性ロジック全体をアプリケーションに実装する必要があります。ただし、意思決定を簡素化するために、IdempotentReceiverInterceptor
コンポーネントが提供されています。これは、MessageHandler.handleMessage()
メソッドに適用される AOP Advice
であり、構成に応じてリクエストメッセージを filter
するか、duplicate
としてマークすることができます。
以前は、たとえば <filter/>
でカスタム MessageSelector
を使用してこのパターンを実装できました(フィルターを参照)。ただし、このパターンはエンドポイント自体ではなく、エンドポイントの動作を実際に定義するため、べき等レシーバーの実装はエンドポイントコンポーネントを提供しません。むしろ、アプリケーションで宣言されたエンドポイントに適用されます。
IdempotentReceiverInterceptor
のロジックは、提供された MessageSelector
に基づいており、メッセージがそのセレクターで受け入れられない場合、true
に設定された duplicateMessage
ヘッダーで強化されます。ターゲット MessageHandler
(またはダウンストリームフロー)は、このヘッダーを参照して正しいべき等性ロジックを実装できます。IdempotentReceiverInterceptor
が discardChannel
または throwExceptionOnRejection = true
で構成されている場合、複製メッセージはターゲット MessageHandler.handleMessage()
に送信されません。むしろ、破棄されます。重複したメッセージを破棄する(何もしない)場合は、discardChannel
を、デフォルトの nullChannel
Bean などの NullChannel
で構成する必要があります。
メッセージ間の状態を維持し、べき等性についてメッセージを比較する機能を提供するために、MetadataStoreSelector
を提供します。MessageProcessor
実装(Message
に基づいてルックアップキーを作成)とオプションの ConcurrentMetadataStore
(メタデータストア)を受け入れます。詳細については、MetadataStoreSelector
Javadoc を参照してください。追加の MessageProcessor
を使用して、ConcurrentMetadataStore
用に value
をカスタマイズすることもできます。デフォルトでは、MetadataStoreSelector
は timestamp
メッセージヘッダーを使用します。
通常、キーに既存の値がない場合、セレクターは受け入れのためにメッセージを選択します。場合によっては、キーの現在の値と新しい値を比較して、メッセージを受け入れる必要があるかどうかを判断すると便利です。バージョン 5.3 以降、BiPredicate<String, String>
を参照する compareValues
プロパティが提供されています。最初のパラメーターは古い値です。true
を返してメッセージを受け入れ、MetadataStore
の古い値を新しい値に置き換えます。これは、キーの数を減らすのに役立ちます。たとえば、ファイル内の行を処理する場合、ファイル名をキーに格納し、現在の行番号を値に格納できます。その後、再起動後、すでに処理された行をスキップできます。例については、分割ファイルを処理するべき等べき下流を参照してください。
便宜上、MetadataStoreSelector
オプションは <idempotent-receiver>
コンポーネントで直接構成可能です。次のリストは、可能なすべての属性を示しています。
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | IdempotentReceiverInterceptor Bean の ID。オプション。 |
2 | このインターセプターが適用されるコンシューマーエンドポイント名またはパターン。endpoint="aaa, bbb*, ccc, *ddd, eee*fff" など、コンマ(, )で名前(パターン)を区切ります。これらのパターンに一致するエンドポイント Bean 名は、ターゲットエンドポイントの MessageHandler Bean を取得するために使用され(.handler サフィックスを使用)、IdempotentReceiverInterceptor がそれらの Bean に適用されます。必須。 |
3 | MessageSelector Bean リファレンス。metadata-store および key-strategy (key-expression) と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。 |
4 | IdempotentReceiverInterceptor がメッセージを受け入れない場合にメッセージを送信するチャネルを識別します。省略すると、重複したメッセージが duplicateMessage ヘッダーとともにハンドラーに転送されます。オプション。 |
5 | ConcurrentMetadataStore リファレンス。基礎となる MetadataStoreSelector によって使用されます。selector と相互に排他的。オプション。デフォルトの MetadataStoreSelector は、アプリケーションの実行中に状態を維持しない内部 SimpleMetadataStore を使用します。 |
6 | MessageProcessor リファレンス。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージから idempotentKey を評価します。selector および key-expression と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。 |
7 | ExpressionEvaluatingMessageProcessor に入力する SpEL 式。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージを評価コンテキストルートオブジェクトとして使用して、idempotentKey を評価します。selector および key-strategy と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。 |
8 | MessageProcessor リファレンス。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージから idempotentKey の value を評価します。selector および value-expression と相互に排他的。デフォルトでは、"MetadataStoreSelector" は "timestamp" メッセージヘッダーをメタデータの「値」として使用します。 |
9 | ExpressionEvaluatingMessageProcessor に入力する SpEL 式。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージを評価コンテキストルートオブジェクトとして使用して、idempotentKey の value を評価します。selector および value-strategy と相互に排他的。デフォルトでは、"MetadataStoreSelector" は "timestamp" メッセージヘッダーをメタデータ "value" として使用します。 |
10 | キーの古い値と新しい値を比較することにより、オプションでメッセージを選択できる BiPredicate<String, String> Bean への参照。デフォルトでは null 。 |
11 | IdempotentReceiverInterceptor がメッセージを拒否した場合に例外をスローするかどうか。デフォルトは false です。discard-channel が提供されているかどうかに関係なく適用されます。 |
Java 構成の場合、Spring Integration はメソッドレベルの @IdempotentReceiver
アノテーションを提供します。メッセージングアノテーションを持つ method
をマークするために使用されます(@ServiceActivator
、@Router, and others) to specify which `IdempotentReceiverInterceptor
オブジェクトがこのエンドポイントに適用されます。次の例は、@IdempotentReceiver
アノテーションの使用方法を示しています。
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
Java DSL を使用する場合、次の例に示すように、インターセプターをエンドポイントのアドバイスチェーンに追加できます。
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
IdempotentReceiverInterceptor は、MessageHandler.handleMessage(Message<?>) メソッド専用に設計されています。バージョン 4.3.1 以降では、AbstractHandleMessageAdvice を基本クラスとして HandleMessageAdvice を実装し、より良い分離を実現しています。詳細については、メッセージアドバイスの処理を参照してください。 |