べき等レシーバーエンタープライズ統合パターン

バージョン 4.1 から、Spring Integration はべき等レシーバー (英語) エンタープライズ統合パターンの実装を提供します。これは関数パターンであり、べき等性ロジック全体をアプリケーションに実装する必要があります。ただし、意思決定を簡素化するために、IdempotentReceiverInterceptor コンポーネントが提供されています。これは、MessageHandler.handleMessage() メソッドに適用される AOP Advice であり、構成に応じてリクエストメッセージを filter するか、duplicate としてマークすることができます。

以前は、たとえば <filter/> でカスタム MessageSelector を使用してこのパターンを実装できました(フィルターを参照)。ただし、このパターンはエンドポイント自体ではなく、エンドポイントの動作を実際に定義するため、べき等レシーバーの実装はエンドポイントコンポーネントを提供しません。むしろ、アプリケーションで宣言されたエンドポイントに適用されます。

IdempotentReceiverInterceptor のロジックは、提供された MessageSelector に基づいており、メッセージがそのセレクターで受け入れられない場合、true に設定された duplicateMessage ヘッダーで強化されます。ターゲット MessageHandler (またはダウンストリームフロー)は、このヘッダーを参照して正しいべき等性ロジックを実装できます。IdempotentReceiverInterceptor が discardChannel または throwExceptionOnRejection = true で構成されている場合、複製メッセージはターゲット MessageHandler.handleMessage() に送信されません。むしろ、破棄されます。重複したメッセージを破棄する(何もしない)場合は、discardChannel を、デフォルトの nullChannel Bean などの NullChannel で構成する必要があります。

メッセージ間の状態を維持し、べき等性についてメッセージを比較する機能を提供するために、MetadataStoreSelector を提供します。MessageProcessor 実装(Message に基づいてルックアップキーを作成)とオプションの ConcurrentMetadataStore (メタデータストア)を受け入れます。詳細については、MetadataStoreSelector Javadoc を参照してください。追加の MessageProcessor を使用して、ConcurrentMetadataStore 用に value をカスタマイズすることもできます。デフォルトでは、MetadataStoreSelector は timestamp メッセージヘッダーを使用します。

通常、キーに既存の値がない場合、セレクターは受け入れのためにメッセージを選択します。場合によっては、キーの現在の値と新しい値を比較して、メッセージを受け入れる必要があるかどうかを判断すると便利です。バージョン 5.3 以降、BiPredicate<String, String> を参照する compareValues プロパティが提供されています。最初のパラメーターは古い値です。true を返してメッセージを受け入れ、MetadataStore の古い値を新しい値に置き換えます。これは、キーの数を減らすのに役立ちます。たとえば、ファイル内の行を処理する場合、ファイル名をキーに格納し、現在の行番号を値に格納できます。その後、再起動後、すでに処理された行をスキップできます。例については、分割ファイルを処理するべき等べき下流を参照してください。

便宜上、MetadataStoreSelector オプションは <idempotent-receiver> コンポーネントで直接構成可能です。次のリストは、可能なすべての属性を示しています。

<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)
1IdempotentReceiverInterceptor Bean の ID。オプション。
2 このインターセプターが適用されるコンシューマーエンドポイント名またはパターン。endpoint="aaa, bbb*, ccc, *ddd, eee*fff" など、コンマ(,)で名前(パターン)を区切ります。これらのパターンに一致するエンドポイント Bean 名は、ターゲットエンドポイントの MessageHandler Bean を取得するために使用され(.handler サフィックスを使用)、IdempotentReceiverInterceptor がそれらの Bean に適用されます。必須。
3MessageSelector Bean リファレンス。metadata-store および key-strategy (key-expression) と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。
4IdempotentReceiverInterceptor がメッセージを受け入れない場合にメッセージを送信するチャネルを識別します。省略すると、重複したメッセージが duplicateMessage ヘッダーとともにハンドラーに転送されます。オプション。
5ConcurrentMetadataStore リファレンス。基礎となる MetadataStoreSelector によって使用されます。selector と相互に排他的。オプション。デフォルトの MetadataStoreSelector は、アプリケーションの実行中に状態を維持しない内部 SimpleMetadataStore を使用します。
6MessageProcessor リファレンス。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージから idempotentKey を評価します。selector および key-expression と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。
7ExpressionEvaluatingMessageProcessor に入力する SpEL 式。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージを評価コンテキストルートオブジェクトとして使用して、idempotentKey を評価します。selector および key-strategy と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。
8MessageProcessor リファレンス。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージから idempotentKey の value を評価します。selector および value-expression と相互に排他的。デフォルトでは、"MetadataStoreSelector" は "timestamp" メッセージヘッダーをメタデータの「値」として使用します。
9ExpressionEvaluatingMessageProcessor に入力する SpEL 式。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージを評価コンテキストルートオブジェクトとして使用して、idempotentKey の value を評価します。selector および value-strategy と相互に排他的。デフォルトでは、"MetadataStoreSelector" は "timestamp" メッセージヘッダーをメタデータ "value" として使用します。
10 キーの古い値と新しい値を比較することにより、オプションでメッセージを選択できる BiPredicate<String, String> Bean への参照。デフォルトでは null
11IdempotentReceiverInterceptor がメッセージを拒否した場合に例外をスローするかどうか。デフォルトは false です。discard-channel が提供されているかどうかに関係なく適用されます。

Java 構成の場合、Spring Integration はメソッドレベルの @IdempotentReceiver アノテーションを提供します。メッセージングアノテーションを持つ method をマークするために使用されます(@ServiceActivator@Router, and others) to specify which `IdempotentReceiverInterceptor オブジェクトがこのエンドポイントに適用されます。次の例は、@IdempotentReceiver アノテーションの使用方法を示しています。

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}

@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}

Java DSL を使用する場合、次の例に示すように、インターセプターをエンドポイントのアドバイスチェーンに追加できます。

@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
IdempotentReceiverInterceptor は、MessageHandler.handleMessage(Message<?>) メソッド専用に設計されています。バージョン 4.3.1 以降では、AbstractHandleMessageAdvice を基本クラスとして HandleMessageAdvice を実装し、より良い分離を実現しています。詳細については、メッセージアドバイスの処理を参照してください。