エンドポイントへの動作の追加

Spring Integration 2.2 以前は、ポーラーの <advice-chain/> 要素に AOP アドバイスを追加することで、統合フロー全体に動作を追加できました。しかし、ダウンストリームエンドポイントではなく、REST Web サービスの呼び出しだけを再試行する場合を考えます。

例: 次のフローを検討します。

inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapter

ポーラーのアドバイスチェーンに一部の再試行ロジックを構成し、ネットワークグリッチのために http-gateway2 への呼び出しが失敗した場合、再試行により http-gateway1 と http-gateway2 の両方が再度呼び出されます。同様に、jdbc-outbound-adapter で一時的な障害が発生すると、両方の HTTP ゲートウェイがもう一度呼び出されてから、再び jdbc-outbound-adapter が呼び出されます。

Spring Integration 2.2 は、個々のエンドポイントに動作を追加する機能を追加します。これは、多くのエンドポイントに <request-handler-advice-chain/> 要素を追加することで実現されます。次の例は、outbound-gateway 内の <request-handler-advice-chain/> 要素の使用方法を示しています。

<int-http:outbound-gateway id="withAdvice"
    url-expression="'http://localhost/test1'"
    request-channel="requests"
    reply-channel="nextChannel">
    <int-http:request-handler-advice-chain>
        <ref bean="myRetryAdvice" />
    </int-http:request-handler-advice-chain>
</int-http:outbound-gateway>

この場合、myRetryAdvice はこのゲートウェイにローカルにのみ適用され、応答が nextChannel に送信された後にダウンストリームで実行されるその他のアクションには適用されません。アドバイスの範囲はエンドポイント自体に限定されます。

現時点では、<chain/> 全体のエンドポイントにアドバイスすることはできません。スキーマでは、チェーン自体の子要素として <request-handler-advice-chain> を許可していません。

ただし、<request-handler-advice-chain> は、<chain> 要素内の個々の応答生成エンドポイントに追加できます。例外は、応答を生成しないチェーンでは、チェーンの最後の要素が outbound-channel-adapter であるため、その最後の要素は通知できないことです。そのような要素をアドバイスする必要がある場合は、チェーンの外側に移動する必要があります(チェーンの output-channel はアダプターの input-channel です)。アダプターは、通常どおりアドバイスできます。応答を生成するチェーンの場合、すべての子要素にアドバイスできます。

提供されたアドバイスクラス

Spring Integration は、AOP アドバイスクラスを適用する一般的なメカニズムを提供することに加えて、これらのすぐに使えるアドバイスの実装を提供します。

再試行のアドバイス

再試行のアドバイス(o.s.i.handler.advice.RequestHandlerRetryAdvice)は、Spring Retry [GitHub] (英語) プロジェクトによって提供される豊富な再試行メカニズムを活用します。spring-retry のコアコンポーネントは RetryTemplate です。これにより、RetryPolicy および BackoffPolicy 戦略(多数の実装を含む)、および再試行が尽きたときに実行するアクションを決定する RecoveryCallback 戦略を含む、洗練された再試行シナリオを構成できます。

ステートレス再試行

ステートレス再試行は、再試行アクティビティがアドバイス内で完全に処理される場合です。スレッドは一時停止し(そうするように構成されている場合)、アクションを再試行します。

ステートフルリトライ

ステートフル再試行は、再試行状態がアドバイス内で管理されているが、例外がスローされ、呼び出し元がリクエストを再送信する場合です。ステートフル再試行の例は、現在のスレッドで実行するのではなく、メッセージ発信者(たとえば、JMS)に再送信を行わせたい場合です。ステートフル再試行には、再試行された送信を検出するための何らかのメカニズムが必要です。

spring-retry の詳細については、プロジェクトの Javadoc と、spring-retry の作成元である Spring Batch のリファレンスドキュメントを参照してください。

デフォルトのバックオフ動作は、バックオフしないことです。再試行はすぐに試行されます。試行間でスレッドを一時停止させるバックオフポリシーを使用すると、過剰なメモリ使用やスレッド不足などのパフォーマンスの問題が発生する可能性があります。大量の環境では、バックオフポリシーを注意して使用する必要があります。
再試行アドバイスの構成

このセクションの例では、常に例外をスローする次の <service-activator> を使用しています。

public class FailingService {

    public void service(String message) {
        throw new RuntimeException("error");
    }
}
単純なステートレス再試行

デフォルトの RetryTemplate には、3 回試行する SimpleRetryPolicy があります。BackOffPolicy はないため、3 回の試行は連続して行われ、試行間の遅延はありません。RecoveryCallback はないため、最終的な失敗した再試行が発生した後、呼び出し元に例外をスローします。Spring Integration 環境では、この最終的な例外は、受信エンドポイントで error-channel を使用して処理される場合があります。次の例では、RetryTemplate を使用し、その DEBUG 出力を示しています。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
回復を伴う単純なステートレス再試行

次の例では、前の例に RecoveryCallback を追加し、ErrorMessageSendingRecoverer を使用して ErrorMessage をチャネルに送信します。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
DEBUG [task-scheduler-2]Retry: count=0
DEBUG [task-scheduler-2]Checking for rethrow: count=1
DEBUG [task-scheduler-2]Retry: count=1
DEBUG [task-scheduler-2]Checking for rethrow: count=2
DEBUG [task-scheduler-2]Retry: count=2
DEBUG [task-scheduler-2]Checking for rethrow: count=3
DEBUG [task-scheduler-2]Retry failed last attempt: count=3
DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
カスタマイズされたポリシーを使用したステートレス再試行とリカバリ

より洗練されたものにするために、カスタマイズされた RetryTemplate でアドバイスを提供できます。この例では、SimpleRetryPolicy を引き続き使用していますが、試行回数を 4 回に増やしています。また、最初の再試行が 1 秒間待機し、2 番目が 5 秒間待機し、3 番目が 25 待機する ExponentialBackoffPolicy を追加します(合計 4 回の試行)。次のリストは、例とその DEBUG 出力を示しています。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
            <property name="retryTemplate" ref="retryTemplate" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="retryPolicy">
        <bean class="org.springframework.retry.policy.SimpleRetryPolicy">
            <property name="maxAttempts" value="4" />
        </bean>
    </property>
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="1000" />
            <property name="multiplier" value="5.0" />
            <property name="maxInterval" value="60000" />
        </bean>
    </property>
</bean>

27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
27.071 DEBUG [task-scheduler-1]Retry: count=0
27.080 DEBUG [task-scheduler-1]Sleeping for 1000
28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1
28.081 DEBUG [task-scheduler-1]Retry: count=1
28.081 DEBUG [task-scheduler-1]Sleeping for 5000
33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2
33.082 DEBUG [task-scheduler-1]Retry: count=2
33.083 DEBUG [task-scheduler-1]Sleeping for 25000
58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3
58.083 DEBUG [task-scheduler-1]Retry: count=3
58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4
58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4
58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
ステートレス再試行のネームスペースサポート

バージョン 4.0 からは、次の例に示すように、再試行のアドバイスに対する名前空間のサポートのおかげで、前述の構成を大幅に簡素化できます。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <ref bean="retrier" />
    </int:request-handler-advice-chain>
</int:service-activator>

<int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
    <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
</int:handler-retry-advice>

上記の例では、アドバイスはトップレベルの Bean として定義されているため、複数の request-handler-advice-chain インスタンスで使用できます。次の例に示すように、チェーン内でアドバイスを直接定義することもできます。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel">
            <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" />
        </int:retry-advice>
    </int:request-handler-advice-chain>
</int:service-activator>

<handler-retry-advice> は、<fixed-back-off> または <exponential-back-off> の子要素を持つことも、子要素を持たないこともできます。子要素のない <handler-retry-advice> は、バックオフを使用しません。recovery-channel がない場合、再試行が使い果たされると例外がスローされます。名前空間は、ステートレス再試行でのみ使用できます。

より複雑な環境(カスタムポリシーなど)では、通常の <bean> 定義を使用します。

回復を伴う単純なステートフル再試行

再試行をステートフルにするには、RetryStateGenerator 実装でアドバイスを提供する必要があります。このクラスは、RetryTemplate がこのメッセージの再試行の現在の状態を判断できるように、メッセージを再送信として識別するために使用されます。フレームワークは、SpelExpressionRetryStateGenerator を提供します。SpelExpressionRetryStateGenerator は、SpEL 式を使用してメッセージ識別子を決定します。この例でもデフォルトのポリシー(バックオフなしの 3 回の試行)を使用しています。ステートレス再試行と同様に、これらのポリシーはカスタマイズできます。次のリストは、例とその DEBUG 出力を示しています。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice">
            <property name="retryStateGenerator">
                <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator">
                    <constructor-arg value="headers['jms_messageId']" />
                </bean>
            </property>
            <property name="recoveryCallback">
                <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer">
                    <constructor-arg ref="myErrorChannel" />
                </bean>
            </property>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
24.368 DEBUG [Container#0-1]Retry: count=0
24.387 DEBUG [Container#0-1]Checking for rethrow: count=1
24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1
24.387 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
25.412 DEBUG [Container#0-1]Retry: count=1
25.413 DEBUG [Container#0-1]Checking for rethrow: count=2
25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2
25.413 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
26.418 DEBUG [Container#0-1]Retry: count=2
26.419 DEBUG [Container#0-1]Checking for rethrow: count=3
26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3
26.419 WARN  [Container#0-1]failure occurred in gateway sendAndReceive
org.springframework.integration.MessagingException: Failed to invoke handler
...
Caused by: java.lang.RuntimeException: foo
...
26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception
...
27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...]
27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3
27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]

前述の例をステートレスの例と比較すると、ステートフルリトライでは、失敗のたびに呼び出し元に例外がスローされることがわかります。

再試行の例外分類

Spring Retry には、どの例外が再試行を呼び出すことができるかを決定するための柔軟性があります。デフォルトの構成ではすべての例外が再試行され、例外分類子は最上位の例外を調べます。たとえば、MyException でのみ再試行するように構成し、アプリケーションが原因が MyException である SomeOtherException をスローする場合、再試行は発生しません。

Spring Retry 1.0.3 以降、BinaryExceptionClassifier には traverseCauses (デフォルトは false)と呼ばれるプロパティがあります。true の場合、一致が見つかるか、トラバースする原因がなくなるまで、例外の原因をトラバースします。

再試行にこの分類子を使用するには、最大試行回数、Exception オブジェクトの MaptraverseCauses ブール値を取るコンストラクターで作成された SimpleRetryPolicy を使用します。その後、このポリシーを RetryTemplate に注入できます。

ユーザー例外は MessagingException にラップされる可能性があるため、この場合は traverseCauses が必要です。
サーキットブレーカーのアドバイス

サーキットブレーカーパターンの一般的な考え方は、サービスが現在利用できない場合、それを使用しようとして時間(およびリソース)を浪費しないことです。o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice はこのパターンを実装しています。サーキットブレーカーが閉じた状態の場合、エンドポイントはサービスの呼び出しを試みます。サーキットブレーカーは、一定回数の連続試行が失敗するとオープン状態になります。オープン状態の場合、新しいリクエストは「高速で失敗」し、時間が経過するまでサービスの呼び出しは試行されません。

その時間が経過すると、サーキットブレーカーは半開状態に設定されます。この状態で、1 回の試行でも失敗すると、ブレーカーはすぐにオープン状態になります。試行が成功すると、ブレーカーは閉じた状態になり、その場合、構成された数の連続した障害が再び発生するまで、再び開いた状態になりません。正常に試行されると、ブレーカーが再びオープン状態になるタイミングを判別するために、状態が障害ゼロにリセットされます。

通常、このアドバイスは外部サービスに使用される可能性があり、失敗するまでに時間がかかる場合があります(ネットワーク接続を試行するタイムアウトなど)。

RequestHandlerCircuitBreakerAdvice には、threshold と halfOpenAfter の 2 つのプロパティがあります。threshold プロパティは、ブレーカーが開くまでに発生する必要のある連続した障害の数を表します。デフォルトは 5 です。halfOpenAfter プロパティは、最後の失敗後、ブレーカーが別のリクエストを試みる前に待機する時間を表します。デフォルトは 1000 ミリ秒です。

次の例では、サーキットブレーカーを構成し、その DEBUG および ERROR 出力を示しています。

<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator

上記の例では、しきい値は 2 に設定され、halfOpenAfter は 12 秒に設定されています。5 秒ごとに新しいリクエストが届きます。最初の 2 回の試行でサービスが呼び出されました。3 番目と 4 番目は、サーキットブレーカーが開いていることを示す例外で失敗しました。5 番目のリクエストは、最後の失敗から 15 秒後にリクエストされたために試行されました。ブレーカーがすぐに開いたため、6 回目の試行はすぐに失敗します。

式評価アドバイス

最終的に提供されるアドバイスクラスは o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice です。このアドバイスは、他の 2 つのアドバイスよりも一般的です。エンドポイントに送信された元の受信メッセージの式を評価するメカニズムを提供します。成功または失敗の後、評価するために個別の式を使用できます。オプションで、入力メッセージとともに評価結果を含むメッセージをメッセージチャネルに送信できます。

このアドバイスの一般的な使用例は、<ftp:outbound-channel-adapter/> を使用することです。おそらく、転送が成功した場合はファイルを 1 つのディレクトリに、失敗した場合は別のディレクトリに移動します。

アドバイスには、成功した場合の式、失敗した場合の式、それぞれに対応するチャネルを設定するプロパティがあります。成功した場合、successChannel に送信されるメッセージは AdviceMessage であり、ペイロードは式の評価の結果です。inputMessage という追加のプロパティには、ハンドラーに送信された元のメッセージが含まれています。failureChannel に送信されるメッセージ(ハンドラーが例外をスローする場合)は、MessageHandlingExpressionEvaluatingAdviceException のペイロードを持つ ErrorMessage です。すべての MessagingException インスタンスと同様に、このペイロードには failedMessage および cause プロパティと、式評価の結果を含む evaluationResult という追加のプロパティがあります。

バージョン 5.1.3 以降、チャネルは構成されているが式が提供されていない場合、メッセージの payload を評価するためにデフォルトの式が使用されます。

アドバイスのスコープ内で例外がスローされると、デフォルトでは、failureExpression が評価された後にその例外が呼び出し元にスローされます。例外のスローを抑制したい場合は、trapException プロパティを true に設定します。次のアドバイスは、Java DSL でアドバイスを設定する方法を示しています。

@SpringBootApplication
public class EerhaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }

    @Bean
    public IntegrationFlow advised() {
        return f -> f.handle((GenericHandler<String>) (payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }

    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }

    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }

    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }

}
レートリミッターのアドバイス

レートリミッターのアドバイス(RateLimiterRequestHandlerAdvice)により、エンドポイントがリクエストでオーバーロードにならないようにすることができます。レート制限に違反すると、リクエストはブロックされた状態になります。

このアドバイスの典型的な使用例は、外部サービスプロバイダーが 1 分あたり n を超える数のリクエストを許可しない場合です。

RateLimiterRequestHandlerAdvice の実装は、Resilience4j [GitHub] (英語) プロジェクトに完全に基づいており、RateLimiter または RateLimiterConfig のいずれかの注入が必要です。デフォルトおよび / またはカスタム名で構成することもできます。

次の例では、1 秒ごとに 1 つのリクエストでレートリミッターのアドバイスを構成しています。

@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}

@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}
キャッシングのアドバイス

バージョン 5.2 から、CacheRequestHandlerAdvice が導入されました。これは、Spring Framework のキャッシング抽象化に基づいており、@Caching アノテーションファミリーによって提供される概念と機能に沿っています。内部のロジックは CacheAspectSupport 拡張に基づいており、キャッシュ操作のプロキシは、リクエスト Message<?> を引数として AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage メソッドを中心に実行されます。このアドバイスは、キャッシュキーを評価するために SpEL 式または Function を使用して構成できます。リクエスト Message<?> は、SpEL 評価コンテキストのルートオブジェクトとして、または Function 入力引数として使用できます。デフォルトでは、リクエストメッセージの payload がキャッシュキーに使用されます。デフォルトのキャッシュ操作が CacheableOperation の場合、CacheRequestHandlerAdvice は cacheNames で構成するか、任意の CacheOperation のセットで構成する必要があります。すべての CacheOperation は個別に構成することも、CacheManagerCacheResolverCacheErrorHandler などの共有オプションを使用して CacheRequestHandlerAdvice 構成から再利用することもできます。この構成機能は、Spring Framework の @CacheConfig と @Caching アノテーションの組み合わせに似ています。CacheManager が提供されていない場合、単一の Bean がデフォルトで CacheAspectSupport の BeanFactory から解決されます。

次の例では、キャッシュ操作の異なるセットで 2 つのアドバイスを構成します。

@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}

@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}

@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}

@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}

リアクティブアドバイス

バージョン 5.3 以降、ReactiveRequestHandlerAdvice は、Mono レスポンスを生成するリクエストメッセージハンドラーに使用できます。このアドバイスには BiFunction<Message<?>, Mono<?>, Publisher<?>> を提供する必要があり、インターセプトされた handleRequestMessage() メソッドの実装によって生成されたレスポンスで Mono.transform() オペレーターから呼び出されます。通常、このような Mono のカスタマイズは、timeout()retry()、同様のサポートオペレーターを介してネットワークの変動を制御する場合に必要です。たとえば、WebFlux クライアントを介して HTTP リクエストを実行できる場合、以下の構成を使用して、5 秒を超えてレスポンスを待たないようにすることができます。

.handle(WebFlux.outboundGateway("https://somehost/"),
                       e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));

message 引数はメッセージハンドラーのリクエストメッセージであり、リクエストスコープの属性を決定するために使用できます。mono 引数は、このメッセージハンドラーの handleRequestMessage() メソッド実装の結果です。この関数からネストされた Mono.transform() を呼び出して、たとえばリアクティブサーキットブレーカーを適用することもできます。

カスタムアドバイスクラス

前述のアドバイスクラスに加えて、独自のアドバイスクラスを実装できます。org.aopalliance.aop.Advice (通常は org.aopalliance.intercept.MethodInterceptor)の実装を提供できますが、一般的に o.s.i.handler.advice.AbstractRequestHandlerAdvice をサブクラス化することをお勧めします。これには、低レベルのアスペクト指向プログラミングコードの記述を避け、この環境での使用に合わせて特別に調整された開始点を提供するという利点があります。

サブクラスは doInvoke() メソッドを実装する必要があり、その定義は次のとおりです。

/**
 * Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
 * invokes the handler method and returns its result, or null).
 * @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
 * @param target The target handler.
 * @param message The message that will be sent to the handler.
 * @return the result after invoking the {@link MessageHandler}.
 * @throws Exception
 */
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;

コールバックパラメーターは、AOP を直接扱うサブクラスを回避するのに便利です。callback.execute() メソッドを呼び出すと、メッセージハンドラーが呼び出されます。

target パラメーターは、特定のハンドラーの状態を維持する必要があるサブクラスに提供されます。おそらく、ターゲットによってキー設定された Map でその状態を維持することによってです。この機能により、同じアドバイスを複数のハンドラーに適用できます。RequestHandlerCircuitBreakerAdvice は、アドバイスを使用して、各ハンドラーのサーキットブレーカーの状態を維持します。

message パラメーターは、ハンドラーに送信されるメッセージです。アドバイスは、ハンドラーを呼び出す前にメッセージを変更できませんが、ペイロードを変更できます(可変プロパティがある場合)。通常、アドバイスでは、メッセージをロギングに使用したり、ハンドラーの呼び出しの前後にメッセージのコピーを送信したりします。

通常、戻り値は callback.execute() によって返される値です。ただし、アドバイスには戻り値を変更する機能があります。AbstractReplyProducingMessageHandler インスタンスのみが値を返すことに注意してください。次の例は、AbstractRequestHandlerAdvice を継承するカスタムアドバイスクラスを示しています。

public class MyAdvice extends AbstractRequestHandlerAdvice {

    @Override
    protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
        // add code before the invocation
        Object result = callback.execute();
        // add code after the invocation
        return result;
    }
}

execute() メソッドに加えて、ExecutionCallback は追加のメソッド cloneAndExecute() を提供します。このメソッドは、RequestHandlerRetryAdvice など、doInvoke() の単一の実行内で呼び出しが複数回呼び出される可能性がある場合に使用する必要があります。Spring AOP org.springframework.aop.framework.ReflectiveMethodInvocation オブジェクトは、チェーンのどのアドバイスが最後に呼び出されたかを追跡することで状態を維持するため、これが必要です。この状態は、呼び出しごとにリセットする必要があります。

詳細については、ReflectiveMethodInvocation (Javadoc) Javadoc を参照してください。

その他のアドバイスチェーン要素

上記の抽象クラスは便利ですが、トランザクションアドバイスを含む任意の Advice をチェーンに追加できます。

メッセージアドバイスの処理

このセクションの導入部分で説明したように、リクエストハンドラーアドバイスチェーン内のアドバイスオブジェクトは、ダウンストリームフロー (存在する場合) ではなく、現在のエンドポイントにのみ適用されます。応答を生成する MessageHandler オブジェクト ( AbstractReplyProducingMessageHandler を継承するオブジェクトなど) の場合、アドバイスは内部メソッド handleRequestMessage() ( MessageHandler.handleMessage() から呼び出される) に適用されます。他のメッセージハンドラーの場合、アドバイスは MessageHandler.handleMessage() に適用されます。

メッセージハンドラーが AbstractReplyProducingMessageHandler であっても、handleMessage メソッドにアドバイスを適用する必要がある状況がいくつかあります。例: べき等レシーバーは null を返す場合があり、ハンドラーの replyRequired プロパティが true に設定されている場合、例外が発生します。別の例は BoundRabbitChannelAdvice です - 厳密なメッセージ順序を参照してください。

バージョン 4.3.1 から、新しい HandleMessageAdvice インターフェースとその基本実装(AbstractHandleMessageAdvice)が導入されました。HandleMessageAdvice を実装する Advice オブジェクトは、ハンドラー型に関係なく、常に handleMessage() メソッドに適用されます。

HandleMessageAdvice 実装(べき等レシーバーなど)は、レスポンスを返すハンドラーに適用されると、adviceChain から分離され、MessageHandler.handleMessage() メソッドに適切に適用されることを理解することが重要です。

この関連付けが解除されているため、アドバイスチェーンの順序は尊重されません。

次の構成を検討してください。

<some-reply-producing-endpoint ... >
    <int:request-handler-advice-chain>
        <tx:advice ... />
        <ref bean="myHandleMessageAdvice" />
    </int:request-handler-advice-chain>
</some-reply-producing-endpoint>

上記の例では、<tx:advice> は AbstractReplyProducingMessageHandler.handleRequestMessage() に適用されます。ただし、myHandleMessageAdvice は MessageHandler.handleMessage() に適用されます。<tx:advice>に呼び出されます。順序を保持するには、標準の Spring AOP 構成アプローチに従って、エンドポイント id を .handler サフィックスとともに使用して、ターゲット MessageHandler Bean を取得する必要があります。その場合、ダウンストリームフロー全体がトランザクションスコープ内にあることに注意してください。

レスポンスを返さない MessageHandler の場合、アドバイスチェーンの順序は保持されます。

バージョン 5.3 以降、HandleMessageAdviceAdapter は、MessageHandler.handleMessage() に既存の MethodInterceptor を適用できるようにするため、サブフロー全体に適用されます。たとえば、RetryOperationsInterceptor は、一部のエンドポイントから開始するサブフロー全体に適用できます。これは、コンシューマーエンドポイントが AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage() に対してのみアドバイスを適用するため、デフォルトでは不可能です。バージョン 5.3 以降、HandleMessageAdviceAdapter は、MessageHandler.handleMessage() メソッドに MethodInterceptor を適用するために提供されているため、サブフロー全体に適用されます。例: RetryOperationsInterceptor は、いくつかのエンドポイントから始まるサブフロー全体に適用できます。コンシューマーエンドポイントは AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage() にのみアドバイスを適用するため、これはデフォルトでは不可能です。

トランザクションサポート

バージョン 5.0 から、HandleMessageAdvice 実装のおかげで、ダウンストリームフロー全体をトランザクション化するために、新しい TransactionHandleMessageAdvice が導入されました。<request-handler-advice-chain> 要素で通常の TransactionInterceptor が使用される場合(たとえば、<tx:advice> の構成を通じて)、開始されたトランザクションは内部 AbstractReplyProducingMessageHandler.handleRequestMessage() にのみ適用され、ダウンストリームフローに伝搬されません。

XML 構成を簡素化するために、<request-handler-advice-chain> とともに、<transactional> 要素がすべての <outbound-gateway> および <service-activator> および関連コンポーネントに追加されました。次の例は、使用中の <transactional> を示しています。

<int-rmi:outbound-gateway remote-channel="foo" host="localhost"
    request-channel="good" reply-channel="reply" port="#{@port}">
        <int-rmi:transactional/>
</int-rmi:outbound-gateway>

<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
    <constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>

JPA 統合コンポーネントに精通している場合、このような構成は新しいものではありませんが、<poller> または JMS などのメッセージ駆動型チャネルアダプターだけでなく、フローの任意のポイントからトランザクションを開始できます。

次の例に示すように、Java 構成は TransactionInterceptorBuilder を使用することで簡素化でき、結果の Bean 名をメッセージングアノテーション  adviceChain 属性で使用できます。

@Bean
public ConcurrentMetadataStore store() {
    return new SimpleMetadataStore(hazelcastInstance()
                       .getMap("idempotentReceiverMetadataStore"));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(
            new MetadataStoreSelector(
                    message -> message.getPayload().toString(),
                    message -> message.getPayload().toString().toUpperCase(), store()));
}

@Bean
public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder(true)
                .transactionManager(this.transactionManager)
                .isolation(Isolation.READ_COMMITTED)
                .propagation(Propagation.REQUIRES_NEW)
                .build();
}

@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
         outputChannel = "output",
         adviceChain = { "idempotentReceiverInterceptor",
                 "transactionInterceptor" })
public Transformer transformer() {
    return message -> message;
}

TransactionInterceptorBuilder コンストラクターの true パラメーターに注意してください。通常の TransactionInterceptor ではなく、TransactionHandleMessageAdvice が作成されます。

Java DSL は、次の例に示すように、エンドポイント構成の .transactional() オプションを介して Advice をサポートします。

@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
        .handle(Jpa.updatingGateway(this.entityManagerFactory),
                e -> e.transactional(true))
        .channel(c -> c.queue("persistResults"));
}

アドバイスフィルター

Filter アドバイスを助言する際には、追加の考慮事項があります。デフォルトでは、破棄アクション(フィルターが false を返す場合)は、アドバイスチェーンのスコープ内で実行されます。これには、廃棄チャネルの下流のすべてのフローが含まれる場合があります。たとえば、破棄チャネルの下流の要素が例外をスローし、再試行のアドバイスがある場合、プロセスは再試行されます。また、throwExceptionOnRejection が true に設定されている場合(アドバイスの範囲内で例外がスローされます)。

discard-within-advice を false に設定すると、この動作が変更され、破棄(または例外)が発生します。after the アドバイスチェーンが呼び出されます。

アノテーションを使用したエンドポイントへのアドバイス

アノテーション(@Filter@ServiceActivator@Splitter@Transformer)を使用して特定のエンドポイントを構成する場合、adviceChain 属性でアドバイスチェーンの Bean 名を指定できます。さらに、@Filter アノテーションには discardWithinAdvice 属性もあります。これは、アドバイスフィルターに従って、破棄動作を構成するために使用できます。次の例では、廃棄が after the アドバイスで実行されます。

@MessageEndpoint
public class MyAdvisedFilter {

    @Filter(inputChannel="input", outputChannel="output",
            adviceChain="adviceChain", discardWithinAdvice="false")
    public boolean filter(String s) {
        return s.contains("good");
    }
}

アドバイスチェーン内のアドバイスのオーダー

アドバイスクラスは「アラウンド」アドバイスであり、ネストされた方法で適用されます。最初のアドバイスが最も外側で、最後のアドバイスが最も内側(つまり、アドバイスされているハンドラーに最も近い)です。アドバイスクラスを正しい順序で配置して、必要な機能を実現することが重要です。

例: 再試行のアドバイスとトランザクションのアドバイスを追加するとします。再試行アドバイスアドバイスを最初に配置し、その後にトランザクションアドバイスを配置することができます。各再試行は新しいトランザクションで実行されます。一方、すべての試行と回復操作(再試行 RecoveryCallback 内)をトランザクション内でスコープする場合、トランザクションアドバイスを最初に置くことができます。

推奨されるハンドラープロパティ

アドバイス内からハンドラープロパティにアクセスすると便利な場合があります。例: ほとんどのハンドラーは NamedComponent を実装して、コンポーネント名にアクセスできるようにします。

ターゲットオブジェクトには、target 引数(AbstractRequestHandlerAdvice をサブクラス化する場合)または invocation.getThis() (org.aopalliance.intercept.MethodInterceptor を実装する場合)を介してアクセスできます。

ハンドラー全体がアドバイスされる場合(ハンドラーが応答を生成しない場合やアドバイスが HandleMessageAdvice を実装する場合など)、次の例に示すように、ターゲットオブジェクトを NamedComponent などのインターフェースにキャストできます。

String componentName = ((NamedComponent) target).getComponentName();

MethodInterceptor を直接実装する場合、次のようにターゲットオブジェクトをキャストできます。

String componentName = ((NamedComponent) invocation.getThis()).getComponentName();

handleRequestMessage() メソッドのみが推奨される場合(応答生成ハンドラーで)、ハンドラー全体(AbstractReplyProducingMessageHandler)にアクセスする必要があります。次の例は、その方法を示しています。

AbstractReplyProducingMessageHandler handler =
    ((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();

String componentName = handler.getComponentName();

べき等レシーバーエンタープライズ統合パターン

バージョン 4.1 から、Spring Integration はべき等レシーバー (英語) エンタープライズ統合パターンの実装を提供します。これは関数パターンであり、べき等性ロジック全体をアプリケーションに実装する必要があります。ただし、意思決定を簡素化するために、IdempotentReceiverInterceptor コンポーネントが提供されています。これは、MessageHandler.handleMessage() メソッドに適用される AOP Advice であり、構成に応じてリクエストメッセージを filter するか、duplicate としてマークすることができます。

以前は、たとえば <filter/> でカスタム MessageSelector を使用してこのパターンを実装できました(フィルターを参照)。ただし、このパターンはエンドポイント自体ではなく、エンドポイントの動作を実際に定義するため、べき等レシーバーの実装はエンドポイントコンポーネントを提供しません。むしろ、アプリケーションで宣言されたエンドポイントに適用されます。

IdempotentReceiverInterceptor のロジックは、提供された MessageSelector に基づいており、メッセージがそのセレクターで受け入れられない場合、true に設定された duplicateMessage ヘッダーで強化されます。ターゲット MessageHandler (またはダウンストリームフロー)は、このヘッダーを参照して正しいべき等性ロジックを実装できます。IdempotentReceiverInterceptor が discardChannel または throwExceptionOnRejection = true で構成されている場合、複製メッセージはターゲット MessageHandler.handleMessage() に送信されません。むしろ、破棄されます。重複したメッセージを破棄する(何もしない)場合は、discardChannel を、デフォルトの nullChannel Bean などの NullChannel で構成する必要があります。

メッセージ間の状態を維持し、べき等性についてメッセージを比較する機能を提供するために、MetadataStoreSelector を提供します。MessageProcessor 実装(Message に基づいてルックアップキーを作成)とオプションの ConcurrentMetadataStore (メタデータストア)を受け入れます。詳細については、MetadataStoreSelector Javadoc を参照してください。追加の MessageProcessor を使用して、ConcurrentMetadataStore 用に value をカスタマイズすることもできます。デフォルトでは、MetadataStoreSelector は timestamp メッセージヘッダーを使用します。

通常、キーに既存の値がない場合、セレクターは受け入れのためにメッセージを選択します。場合によっては、キーの現在の値と新しい値を比較して、メッセージを受け入れる必要があるかどうかを判断すると便利です。バージョン 5.3 以降、BiPredicate<String, String> を参照する compareValues プロパティが提供されています。最初のパラメーターは古い値です。true を返してメッセージを受け入れ、MetadataStore の古い値を新しい値に置き換えます。これは、キーの数を減らすのに役立ちます。たとえば、ファイル内の行を処理する場合、ファイル名をキーに格納し、現在の行番号を値に格納できます。その後、再起動後、すでに処理された行をスキップできます。例については、分割ファイルを処理するべき等べき下流を参照してください。

便宜上、MetadataStoreSelector オプションは <idempotent-receiver> コンポーネントで直接構成可能です。次のリストは、可能なすべての属性を示しています。

<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)
1IdempotentReceiverInterceptor Bean の ID。オプション。
2 このインターセプターが適用されるコンシューマーエンドポイント名またはパターン。endpoint="aaa, bbb*, ccc, *ddd, eee*fff" など、コンマ(,)で名前(パターン)を区切ります。これらのパターンに一致するエンドポイント Bean 名は、ターゲットエンドポイントの MessageHandler Bean を取得するために使用され(.handler サフィックスを使用)、IdempotentReceiverInterceptor がそれらの Bean に適用されます。必須。
3MessageSelector Bean リファレンス。metadata-store および key-strategy (key-expression) と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。
4IdempotentReceiverInterceptor がメッセージを受け入れない場合にメッセージを送信するチャネルを識別します。省略すると、重複したメッセージが duplicateMessage ヘッダーとともにハンドラーに転送されます。オプション。
5ConcurrentMetadataStore リファレンス。基礎となる MetadataStoreSelector によって使用されます。selector と相互に排他的。オプション。デフォルトの MetadataStoreSelector は、アプリケーションの実行中に状態を維持しない内部 SimpleMetadataStore を使用します。
6MessageProcessor リファレンス。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージから idempotentKey を評価します。selector および key-expression と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。
7ExpressionEvaluatingMessageProcessor に入力する SpEL 式。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージを評価コンテキストルートオブジェクトとして使用して、idempotentKey を評価します。selector および key-strategy と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。
8MessageProcessor リファレンス。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージから idempotentKey の value を評価します。selector および value-expression と相互に排他的。デフォルトでは、"MetadataStoreSelector" は "timestamp" メッセージヘッダーをメタデータの「値」として使用します。
9ExpressionEvaluatingMessageProcessor に入力する SpEL 式。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージを評価コンテキストルートオブジェクトとして使用して、idempotentKey の value を評価します。selector および value-strategy と相互に排他的。デフォルトでは、"MetadataStoreSelector" は "timestamp" メッセージヘッダーをメタデータ "value" として使用します。
10 キーの古い値と新しい値を比較することにより、オプションでメッセージを選択できる BiPredicate<String, String> Bean への参照。デフォルトでは null
11IdempotentReceiverInterceptor がメッセージを拒否した場合に例外をスローするかどうか。デフォルトは false です。discard-channel が提供されているかどうかに関係なく適用されます。

Java 構成の場合、Spring Integration はメソッドレベルの @IdempotentReceiver アノテーションを提供します。メッセージングアノテーションを持つ method をマークするために使用されます(@ServiceActivator@Router, and others) to specify which `IdempotentReceiverInterceptor オブジェクトがこのエンドポイントに適用されます。次の例は、@IdempotentReceiver アノテーションの使用方法を示しています。

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

Java DSL を使用する場合、次の例に示すように、インターセプターをエンドポイントのアドバイスチェーンに追加できます。

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
IdempotentReceiverInterceptor は、MessageHandler.handleMessage(Message<?>) メソッド専用に設計されています。バージョン 4.3.1 以降では、AbstractHandleMessageAdvice を基本クラスとして HandleMessageAdvice を実装し、より良い分離を実現しています。詳細については、メッセージアドバイスの処理を参照してください。