回復力: エラーおよびブローカーの障害からのリカバリ
Spring AMQP が提供する主要な (そして最も一般的な) 高レベル機能のいくつかは、プロトコルエラーまたはブローカー障害が発生した場合の回復と自動再接続に関係しています。関連するすべてのコンポーネントについてはこのガイドですでに説明しましたが、ここですべてまとめて、機能と回復シナリオを個別に呼び出すことが役立つはずです。
主要な再接続機能は、CachingConnectionFactory
自体によって有効になります。RabbitAdmin
自動宣言機能を使用することもしばしば有益です。さらに、保証された配信を気にする場合は、おそらく RabbitTemplate
と SimpleMessageListenerContainer
で channelTransacted
フラグを使用し、SimpleMessageListenerContainer
で AcknowledgeMode.AUTO
(または自分で ack を行う場合はマニュアル) も使用する必要があります。
交換、キュー、バインディングの自動宣言
RabbitAdmin
コンポーネントは、起動時に交換、キュー、バインディングを宣言できます。これは、ConnectionListener
を介して怠惰に行われます。起動時にブローカーが存在しなくても問題ありません。Connection
が (メッセージの送信などによって) 初めて使用されると、リスナーが起動し、管理機能が適用されます。リスナーで auto 宣言を行うことのもう 1 つの利点は、接続が何らかの理由 (たとえば、ブローカーの停止、ネットワーク障害など) で切断された場合、接続が再確立されたときに再度適用されることです。
この方法で宣言されたキューには、明示的に宣言されているか、AnonymousQueue インスタンスのフレームワークによって生成された固定名が必要です。匿名キューは、永続的ではなく、排他的で、自動削除されます。 |
自動宣言は、CachingConnectionFactory キャッシュモードが CHANNEL (デフォルト) の場合にのみ実行されます。この制限が存在するのは、排他キューと自動削除キューが接続にバインドされているためです。 |
バージョン 2.2.2 以降、RabbitAdmin
は型 DeclarableCustomizer
の Bean を検出し、宣言を実際に処理する前に関数を適用します。これは、たとえば、フレームワーク内で最初のクラスがサポートされる前に新しい引数 (プロパティ) を設定する場合に便利です。
@Bean
public DeclarableCustomizer customizer() {
return dec -> {
if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
dec.addArgument("some.new.queue.argument", true);
}
return dec;
};
}
また、Declarable
Bean 定義への直接アクセスを提供しないプロジェクトでも役立ちます。
RabbitMQ 自動接続 / トポロジリカバリも参照してください。
同期操作の失敗と再試行のオプション
RabbitTemplate
を使用しているときに (たとえば) 同期シーケンスでブローカーへの接続が失われた場合、Spring AMQP は AmqpException
(通常は、常にではありませんが、AmqpIOException
) をスローします。問題が発生したという事実を隠そうとはしません。そのため、例外をキャッチして対応できる必要があります。接続が失われたと思われる場合 (そして、あなたのせいではありません) に行う最も簡単な方法は、操作を再試行することです。これは手動で行うことも、Spring Retry を使用して (命令的または宣言的に) 再試行を処理することもできます。
Spring Retry は、いくつかの AOP インターセプターと、再試行のパラメーター (試行回数、例外の種類、バックオフアルゴリズムなど) を指定するための非常に高い柔軟性を提供します。Spring AMQP は、カスタムリカバリロジックの実装に使用できる厳密に型指定されたコールバックインターフェースを使用して、AMQP ユースケースに便利な形式で Spring Retry インターセプタを作成するための便利なファクトリ Bean も提供します。詳細については、StatefulRetryOperationsInterceptor
および StatelessRetryOperationsInterceptor
の Javadoc とプロパティを参照してください。ステートレスな再試行は、トランザクションがない場合、または再試行コールバック内でトランザクションが開始された場合に適しています。ステートレスな再試行は、ステートフルな再試行よりも構成と分析が簡単ですが、ロールバックする必要がある進行中のトランザクションがある場合、または確実にロールバックする場合は、通常は適切ではありません。トランザクションの途中で切断された接続は、ロールバックと同じ効果を持つはずです。トランザクションがスタックの上位で開始される再接続では、通常、ステートフルな再試行が最適な選択です。ステートフルな再試行には、メッセージを一意に識別するメカニズムが必要です。最も簡単な方法は、送信者が MessageId
メッセージプロパティに一意の値を設定することです。提供されているメッセージコンバーターには、これを行うためのオプションがあります。createMessageIds
を true
に設定できます。それ以外の場合は、インターセプターに MessageKeyGenerator
実装を挿入できます。キージェネレーターは、メッセージごとに一意のキーを返す必要があります。バージョン 2.0 より前のバージョンでは、MissingMessageIdAdvice
が提供されていました。これにより、messageId
プロパティのないメッセージを 1 回だけ再試行できるようになりました (再試行設定は無視されます)。spring-retry
バージョン 1.2 と共に、その機能がインターセプターおよびメッセージリスナーコンテナーに組み込まれているため、このアドバイスは提供されなくなりました。
下位互換性のために、メッセージ ID が null のメッセージは、デフォルトで (1 回の再試行後) コンシューマーにとって致命的と見なされます (コンシューマーは停止されます)。MissingMessageIdAdvice によって提供される機能をレプリケートするには、リスナーコンテナーで statefulRetryFatalWithNullMessageId プロパティを false に設定します。その設定では、コンシューマーは引き続き実行され、メッセージは (1 回の再試行後に) 拒否されます。これは破棄されるか、配信不能キュー (構成されている場合) にルーティングされます。 |
バージョン 1.3 以降では、Java を使用して (@Configuration
クラスで) これらのインターセプターをアセンブルするのに役立つビルダー API が提供されています。次の例は、その方法を示しています。
@Bean
public StatefulRetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateful()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
.build();
}
この方法で構成できるのは、再試行機能のサブセットのみです。より高度な機能を使用するには、RetryTemplate
を Spring Bean として構成する必要があります。利用可能なポリシーとその構成に関する完全な情報については、Spring Retry Javadoc を参照してください。
バッチリスナーで再試行
バッチがプロデューサーによって単一のレコードで作成された場合を除き、バッチリスナーを使用して再試行を構成することはお勧めしません。コンシューマーおよびプロデューサーが作成したバッチについては、バッチメッセージを参照してください。コンシューマーが作成したバッチでは、フレームワークはバッチ内のどのメッセージが失敗の原因であるかを認識していないため、再試行が使い果たされた後の回復は不可能です。プロデューサーが作成したバッチでは、実際に失敗したメッセージは 1 つだけなので、メッセージ全体を復元できます。アプリケーションは、おそらくスローされた例外のインデックスプロパティを設定することによって、バッチ内のどこでエラーが発生したかをカスタムリカバリに通知する必要がある場合があります。
バッチリスナーのリトライリカバリは MessageBatchRecoverer
を実装する必要があります。
メッセージリスナーと非同期ケース
ビジネス例外が原因で MessageListener
が失敗した場合、その例外はメッセージリスナーコンテナーによって処理され、別のメッセージのリッスンに戻ります。失敗の原因が接続の切断 (ビジネス例外ではない) である場合、リスナーのメッセージを収集しているコンシューマーをキャンセルして再起動する必要があります。SimpleMessageListenerContainer
はこれをシームレスに処理し、リスナーが再起動中であることを示すログを残します。実際、コンシューマーを再起動しようとして無限にループします。コンシューマーの態度が非常に悪い場合にのみ、コンシューマーはあきらめます。副作用の 1 つは、コンテナーの開始時にブローカーがダウンしている場合、接続が確立されるまで試行を続けることです。
プロトコルエラーや切断された接続とは対照的に、ビジネス例外処理は、特にトランザクションまたはコンテナー ack が使用されている場合は、さらに検討し、いくつかのカスタム構成が必要になる場合があります。2.8.x より前は、RabbitMQ には配信不能動作の定義がありませんでした。デフォルトでは、ビジネス例外のために拒否またはロールバックされたメッセージは無限に再配信される可能性があります。クライアントの再配信回数を制限するには、リスナーのアドバイスチェーン で StatefulRetryOperationsInterceptor
を選択するのが 1 つの選択肢です。インターセプターには、特定の環境に適したカスタムの配信不能アクションを実装する回復コールバックを含めることができます。
もう 1 つの方法は、コンテナーの defaultRequeueRejected
プロパティを false
に設定することです。これにより、失敗したすべてのメッセージが破棄されます。RabbitMQ 2.8.x 以上を使用する場合、デッドレター交換へのメッセージの配信も容易になります。
または、AmqpRejectAndDontRequeueException
をスローすることもできます。これにより、defaultRequeueRejected
プロパティの設定に関係なく、メッセージの再キューイングが防止されます。
バージョン 2.1 から、ImmediateRequeueAmqpException
が導入され、まったく逆のロジックが実行されます。defaultRequeueRejected
プロパティの設定に関係なく、メッセージは再度キューに入れられます。
多くの場合、両方の手法を組み合わせて使用します。アドバイスチェーン で StatefulRetryOperationsInterceptor
を使用し、AmqpRejectAndDontRequeueException
をスローする MessageRecoverer
を使用できます。MessageRecover
は、すべての再試行が終了したときに呼び出されます。RejectAndDontRequeueRecoverer
はまさにそれを行います。デフォルトの MessageRecoverer
は、誤ったメッセージを消費し、WARN
メッセージを発行します。
バージョン 1.3 以降、新しい RepublishMessageRecoverer
が提供され、再試行が使い果たされた後に失敗したメッセージを公開できるようになりました。
リカバリが最終例外を消費すると、メッセージは確認応答され、ブローカーによってデッドレター交換に送信されません (構成されている場合)。
コンシューマー側で RepublishMessageRecoverer を使用する場合、受信したメッセージは receivedDeliveryMode メッセージプロパティに deliveryMode を持ちます。この場合、deliveryMode は null です。これは、ブローカーの NON_PERSISTENT 配信モードを意味します。バージョン 2.0 以降では、deliveryMode が null の場合に再公開するようにメッセージに設定するように RepublishMessageRecoverer を構成できます。デフォルトでは、MessageProperties デフォルト値 - MessageDeliveryMode.PERSISTENT を使用します。 |
次の例は、RepublishMessageRecoverer
をリカバリとして設定する方法を示しています。
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
.build();
}
RepublishMessageRecoverer
は、例外メッセージ、スタックトレース、元の交換、ルーティングキーなどの追加情報をメッセージヘッダーに含めてメッセージを発行します。サブクラスを作成して additionalHeaders()
をオーバーライドすることにより、追加のヘッダーを追加できます。次の例に示すように、deliveryMode
(またはその他のプロパティ) も additionalHeaders()
で変更できます。
RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {
protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
message.getMessageProperties()
.setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
return null;
}
};
バージョン 2.0.5 以降では、スタックトレースが大きすぎると切り捨てられる場合があります。これは、すべてのヘッダーが 1 つのフレームに収まる必要があるためです。デフォルトでは、スタックトレースによって他のヘッダーに使用できる 20,000 バイト (「ヘッドルーム」) よりも少ない場合、切り捨てられます。これは、他のヘッダー用に多かれ少なかれスペースが必要な場合は、リカバリの frameMaxHeadroom
プロパティを設定することで調整できます。バージョン 2.1.13、2.2.3 以降では、例外メッセージがこの計算に含まれ、スタックトレースの量は次のアルゴリズムを使用して最大化されます。
スタックトレースだけで制限を超える場合、例外メッセージヘッダーは 97 バイトと
…
に切り捨てられ、スタックトレースも切り捨てられます。スタックトレースが小さい場合、メッセージは利用可能なバイトに収まるように切り捨てられます (プラス
…
) (ただし、スタックトレース内のメッセージ自体は 97 バイトと…
に切り捨てられます)。
あらゆる種類の切り捨てが発生するたびに、元の例外がログに記録され、完全な情報が保持されます。例外型などの情報を式で使用できるように、ヘッダーが拡張された後に評価が実行されます。
バージョン 2.4.8 以降では、Message
を評価のルートオブジェクトとして、エラー交換とルーティングキーを SpEL 式として提供できます。
バージョン 2.3.3 から、新しいサブクラス RepublishMessageRecovererWithConfirms
が提供されています。これは発行者の確認の両方のスタイルをサポートし、戻る前に確認を待機します (または、確認されていない場合やメッセージが返された場合は例外をスローします)。
確認型が CORRELATED
の場合、サブクラスはメッセージが返されたかどうかも検出し、AmqpMessageReturnedException
をスローします。パブリケーションが否定的に承認された場合、AmqpNackReceivedException
がスローされます。
確認型が SIMPLE
の場合、サブクラスはチャネルで waitForConfirmsOrDie
メソッドを呼び出します。
確認と return の詳細については、パブリッシャーの確認と return を参照してください。
バージョン 2.1 から、ImmediateRequeueMessageRecoverer
が追加されて ImmediateRequeueAmqpException
がスローされます。これは、現在の失敗したメッセージを再キューイングするようにリスナーコンテナーに通知します。
Spring Retry の例外分類
Spring Retry には、どの例外が再試行を呼び出すことができるかを決定するための非常に高い柔軟性があります。デフォルトの構成では、すべての例外に対して再試行します。ユーザー例外が ListenerExecutionFailedException
にラップされている場合、分類で例外の原因を調べる必要があります。デフォルトの分類子は、最上位の例外のみを調べます。
Spring Retry 1.0.3 以降、BinaryExceptionClassifier
には traverseCauses
と呼ばれるプロパティがあります (デフォルト: false
)。true
の場合、一致が見つかるまで、原因がなくなるまで、例外の原因をたどります。
この分類子を再試行に使用するには、最大試行回数を取るコンストラクター、Exception
インスタンスの Map
、およびブール値 (traverseCauses
) で作成された SimpleRetryPolicy
を使用し、このポリシーを RetryTemplate
に挿入します。