べき等レシーバーエンタープライズ統合パターン
バージョン 4.1 から、Spring Integration はべき等レシーバー (英語) エンタープライズ統合パターンの実装を提供します。これは関数パターンであり、べき等性ロジック全体をアプリケーションに実装する必要があります。ただし、意思決定を簡素化するために、IdempotentReceiverInterceptor
コンポーネントが提供されています。これは、MessageHandler.handleMessage()
メソッドに適用される AOP Advice
であり、構成に応じてリクエストメッセージを filter
するか、duplicate
としてマークすることができます。
以前は、たとえば <filter/>
でカスタム MessageSelector
を使用してこのパターンを実装できました(フィルターを参照)。ただし、このパターンはエンドポイント自体ではなく、エンドポイントの動作を実際に定義するため、べき等レシーバーの実装はエンドポイントコンポーネントを提供しません。むしろ、アプリケーションで宣言されたエンドポイントに適用されます。
IdempotentReceiverInterceptor
のロジックは、提供された MessageSelector
に基づいており、メッセージがそのセレクターで受け入れられない場合、true
に設定された duplicateMessage
ヘッダーで強化されます。ターゲット MessageHandler
(またはダウンストリームフロー)は、このヘッダーを参照して正しいべき等性ロジックを実装できます。IdempotentReceiverInterceptor
が discardChannel
または throwExceptionOnRejection = true
で構成されている場合、複製メッセージはターゲット MessageHandler.handleMessage()
に送信されません。むしろ、破棄されます。重複したメッセージを破棄する(何もしない)場合は、discardChannel
を、デフォルトの nullChannel
Bean などの NullChannel
で構成する必要があります。
メッセージ間の状態を維持し、べき等性についてメッセージを比較する機能を提供するために、MetadataStoreSelector
を提供します。MessageProcessor
実装(Message
に基づいてルックアップキーを作成)とオプションの ConcurrentMetadataStore
(メタデータストア)を受け入れます。詳細については、MetadataStoreSelector
Javadoc を参照してください。追加の MessageProcessor
を使用して、ConcurrentMetadataStore
用に value
をカスタマイズすることもできます。デフォルトでは、MetadataStoreSelector
は timestamp
メッセージヘッダーを使用します。
通常、キーに既存の値がない場合、セレクターは受け入れのためにメッセージを選択します。場合によっては、キーの現在の値と新しい値を比較して、メッセージを受け入れる必要があるかどうかを判断すると便利です。バージョン 5.3 以降、BiPredicate<String, String>
を参照する compareValues
プロパティが提供されています。最初のパラメーターは古い値です。true
を返してメッセージを受け入れ、MetadataStore
の古い値を新しい値に置き換えます。これは、キーの数を減らすのに役立ちます。たとえば、ファイル内の行を処理する場合、ファイル名をキーに格納し、現在の行番号を値に格納できます。その後、再起動後、すでに処理された行をスキップできます。例については、分割ファイルを処理するべき等べき下流を参照してください。
便宜上、MetadataStoreSelector
オプションは <idempotent-receiver>
コンポーネントで直接構成可能です。次のリストは、可能なすべての属性を示しています。
<idempotent-receiver
id="" (1)
endpoint="" (2)
selector="" (3)
discard-channel="" (4)
metadata-store="" (5)
key-strategy="" (6)
key-expression="" (7)
value-strategy="" (8)
value-expression="" (9)
compare-values="" (10)
throw-exception-on-rejection="" /> (11)
1 | IdempotentReceiverInterceptor Bean の ID。オプション。 |
2 | このインターセプターが適用されるコンシューマーエンドポイント名またはパターン。endpoint="aaa, bbb*, ccc, *ddd, eee*fff" など、コンマ(, )で名前(パターン)を区切ります。これらのパターンに一致するエンドポイント Bean 名は、ターゲットエンドポイントの MessageHandler Bean を取得するために使用され(.handler サフィックスを使用)、IdempotentReceiverInterceptor がそれらの Bean に適用されます。必須。 |
3 | MessageSelector Bean リファレンス。metadata-store および key-strategy (key-expression) と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。 |
4 | IdempotentReceiverInterceptor がメッセージを受け入れない場合にメッセージを送信するチャネルを識別します。省略すると、重複したメッセージが duplicateMessage ヘッダーとともにハンドラーに転送されます。オプション。 |
5 | ConcurrentMetadataStore リファレンス。基礎となる MetadataStoreSelector によって使用されます。selector と相互に排他的。オプション。デフォルトの MetadataStoreSelector は、アプリケーションの実行中に状態を維持しない内部 SimpleMetadataStore を使用します。 |
6 | MessageProcessor リファレンス。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージから idempotentKey を評価します。selector および key-expression と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。 |
7 | ExpressionEvaluatingMessageProcessor に入力する SpEL 式。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージを評価コンテキストルートオブジェクトとして使用して、idempotentKey を評価します。selector および key-strategy と相互に排他的。selector が提供されない場合、key-strategy または key-strategy-expression のいずれかが必要です。 |
8 | MessageProcessor リファレンス。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージから idempotentKey の value を評価します。selector および value-expression と相互に排他的。デフォルトでは、"MetadataStoreSelector" は "timestamp" メッセージヘッダーをメタデータの「値」として使用します。 |
9 | ExpressionEvaluatingMessageProcessor に入力する SpEL 式。基礎となる MetadataStoreSelector によって使用されます。リクエストメッセージを評価コンテキストルートオブジェクトとして使用して、idempotentKey の value を評価します。selector および value-strategy と相互に排他的。デフォルトでは、"MetadataStoreSelector" は "timestamp" メッセージヘッダーをメタデータ "value" として使用します。 |
10 | キーの古い値と新しい値を比較することにより、オプションでメッセージを選択できる BiPredicate<String, String> Bean への参照。デフォルトでは null 。 |
11 | IdempotentReceiverInterceptor がメッセージを拒否した場合に例外をスローするかどうか。デフォルトは false です。discard-channel が提供されているかどうかに関係なく適用されます。 |
Java 構成の場合、Spring Integration はメソッドレベルの @IdempotentReceiver
アノテーションを提供します。メッセージングアノテーションを持つ method
をマークするために使用されます(@ServiceActivator
、@Router, and others) to specify which `IdempotentReceiverInterceptor
オブジェクトがこのエンドポイントに適用されます。次の例は、@IdempotentReceiver
アノテーションの使用方法を示しています。
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
....
}
Java DSL を使用する場合、次の例に示すように、インターセプターをエンドポイントのアドバイスチェーンに追加できます。
@Bean
public IntegrationFlow flow() {
...
.handle("someBean", "someMethod",
e -> e.advice(idempotentReceiverInterceptor()))
...
}
IdempotentReceiverInterceptor は、MessageHandler.handleMessage(Message<?>) メソッド専用に設計されています。バージョン 4.3.1 以降では、AbstractHandleMessageAdvice を基本クラスとして HandleMessageAdvice を実装し、より良い分離を実現しています。詳細については、メッセージアドバイスの処理を参照してください。 |