回復力: エラーおよびブローカーの障害からのリカバリ

Spring AMQP が提供する主要な (そして最も一般的な) 高レベル機能のいくつかは、プロトコルエラーまたはブローカー障害が発生した場合の回復と自動再接続に関係しています。関連するすべてのコンポーネントについてはこのガイドですでに説明しましたが、ここですべてまとめて、機能と回復シナリオを個別に呼び出すことが役立つはずです。

主要な再接続機能は、CachingConnectionFactory 自体によって有効になります。RabbitAdmin 自動宣言機能を使用することもしばしば有益です。さらに、保証された配信を気にする場合は、おそらく RabbitTemplate と SimpleMessageListenerContainer で channelTransacted フラグを使用し、SimpleMessageListenerContainer で AcknowledgeMode.AUTO (または自分で ack を行う場合はマニュアル) も使用する必要があります。

交換、キュー、バインディングの自動宣言

RabbitAdmin コンポーネントは、起動時に交換、キュー、バインディングを宣言できます。これは、ConnectionListener を介して怠惰に行われます。起動時にブローカーが存在しなくても問題ありません。Connection が (メッセージの送信などによって) 初めて使用されると、リスナーが起動し、管理機能が適用されます。リスナーで auto 宣言を行うことのもう 1 つの利点は、接続が何らかの理由 (たとえば、ブローカーの停止、ネットワーク障害など) で切断された場合、接続が再確立されたときに再度適用されることです。

この方法で宣言されたキューには、明示的に宣言されているか、AnonymousQueue インスタンスのフレームワークによって生成された固定名が必要です。匿名キューは、永続的ではなく、排他的で、自動削除されます。
自動宣言は、CachingConnectionFactory キャッシュモードが CHANNEL (デフォルト) の場合にのみ実行されます。この制限が存在するのは、排他キューと自動削除キューが接続にバインドされているためです。

バージョン 2.2.2 以降、RabbitAdmin は型 DeclarableCustomizer の Bean を検出し、宣言を実際に処理する前に関数を適用します。これは、たとえば、フレームワーク内で最初のクラスがサポートされる前に新しい引数 (プロパティ) を設定する場合に便利です。

@Bean
public DeclarableCustomizer customizer() {
    return dec -> {
        if (dec instanceof Queue && ((Queue) dec).getName().equals("my.queue")) {
            dec.addArgument("some.new.queue.argument", true);
        }
        return dec;
    };
}

また、Declarable Bean 定義への直接アクセスを提供しないプロジェクトでも役立ちます。

同期操作の失敗と再試行のオプション

RabbitTemplate を使用しているときに (たとえば) 同期シーケンスでブローカーへの接続が失われた場合、Spring AMQP は AmqpException (通常は、常にではありませんが、AmqpIOException) をスローします。問題が発生したという事実を隠そうとはしません。そのため、例外をキャッチして対応できる必要があります。接続が失われたと思われる場合 (そして、あなたのせいではありません) に行う最も簡単な方法は、操作を再試行することです。これは手動で行うことも、Spring Retry を使用して (命令的または宣言的に) 再試行を処理することもできます。

Spring Retry は、いくつかの AOP インターセプターと、再試行のパラメーター (試行回数、例外の種類、バックオフアルゴリズムなど) を指定するための非常に高い柔軟性を提供します。Spring AMQP は、カスタムリカバリロジックの実装に使用できる厳密に型指定されたコールバックインターフェースを使用して、AMQP ユースケースに便利な形式で Spring Retry インターセプタを作成するための便利なファクトリ Bean も提供します。詳細については、StatefulRetryOperationsInterceptor および StatelessRetryOperationsInterceptor の Javadoc とプロパティを参照してください。ステートレスな再試行は、トランザクションがない場合、または再試行コールバック内でトランザクションが開始された場合に適しています。ステートレスな再試行は、ステートフルな再試行よりも構成と分析が簡単ですが、ロールバックする必要がある進行中のトランザクションがある場合、または確実にロールバックする場合は、通常は適切ではありません。トランザクションの途中で切断された接続は、ロールバックと同じ効果を持つはずです。トランザクションがスタックの上位で開始される再接続では、通常、ステートフルな再試行が最適な選択です。ステートフルな再試行には、メッセージを一意に識別するメカニズムが必要です。最も簡単な方法は、送信者が MessageId メッセージプロパティに一意の値を設定することです。提供されているメッセージコンバーターには、これを行うためのオプションがあります。createMessageIds を true に設定できます。それ以外の場合は、インターセプターに MessageKeyGenerator 実装を挿入できます。キージェネレーターは、メッセージごとに一意のキーを返す必要があります。バージョン 2.0 より前のバージョンでは、MissingMessageIdAdvice が提供されていました。これにより、messageId プロパティのないメッセージを 1 回だけ再試行できるようになりました (再試行設定は無視されます)。spring-retry バージョン 1.2 と共に、その機能がインターセプターおよびメッセージリスナーコンテナーに組み込まれているため、このアドバイスは提供されなくなりました。

下位互換性のために、メッセージ ID が null のメッセージは、デフォルトで (1 回の再試行後) コンシューマーにとって致命的と見なされます (コンシューマーは停止されます)。MissingMessageIdAdvice によって提供される機能をレプリケートするには、リスナーコンテナーで statefulRetryFatalWithNullMessageId プロパティを false に設定します。その設定では、コンシューマーは引き続き実行され、メッセージは (1 回の再試行後に) 拒否されます。これは破棄されるか、配信不能キュー (構成されている場合) にルーティングされます。

バージョン 1.3 以降では、Java を使用して (@Configuration クラスで) これらのインターセプターをアセンブルするのに役立つビルダー API が提供されています。次の例は、その方法を示しています。

@Bean
public StatefulRetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateful()
            .maxAttempts(5)
            .backOffOptions(1000, 2.0, 10000) // initialInterval, multiplier, maxInterval
            .build();
}

この方法で構成できるのは、再試行機能のサブセットのみです。より高度な機能を使用するには、RetryTemplate を Spring Bean として構成する必要があります。利用可能なポリシーとその構成に関する完全な情報については、Spring Retry Javadoc を参照してください。

バッチリスナーで再試行

バッチがプロデューサーによって単一のレコードで作成された場合を除き、バッチリスナーを使用して再試行を構成することはお勧めしません。コンシューマーおよびプロデューサーが作成したバッチについては、バッチメッセージを参照してください。コンシューマーが作成したバッチでは、フレームワークはバッチ内のどのメッセージが失敗の原因であるかを認識していないため、再試行が使い果たされた後の回復は不可能です。プロデューサーが作成したバッチでは、実際に失敗したメッセージは 1 つだけなので、メッセージ全体を復元できます。アプリケーションは、おそらくスローされた例外のインデックスプロパティを設定することによって、バッチ内のどこでエラーが発生したかをカスタムリカバリに通知する必要がある場合があります。

バッチリスナーのリトライリカバリは MessageBatchRecoverer を実装する必要があります。

メッセージリスナーと非同期ケース

ビジネス例外が原因で MessageListener が失敗した場合、その例外はメッセージリスナーコンテナーによって処理され、別のメッセージのリッスンに戻ります。失敗の原因が接続の切断 (ビジネス例外ではない) である場合、リスナーのメッセージを収集しているコンシューマーをキャンセルして再起動する必要があります。SimpleMessageListenerContainer はこれをシームレスに処理し、リスナーが再起動中であることを示すログを残します。実際、コンシューマーを再起動しようとして無限にループします。コンシューマーの態度が非常に悪い場合にのみ、コンシューマーはあきらめます。副作用の 1 つは、コンテナーの開始時にブローカーがダウンしている場合、接続が確立されるまで試行を続けることです。

プロトコルエラーや切断された接続とは対照的に、ビジネス例外処理は、特にトランザクションまたはコンテナー ack が使用されている場合は、さらに検討し、いくつかのカスタム構成が必要になる場合があります。2.8.x より前は、RabbitMQ には配信不能動作の定義がありませんでした。デフォルトでは、ビジネス例外のために拒否またはロールバックされたメッセージは無限に再配信される可能性があります。クライアントの再配信回数を制限するには、リスナーのアドバイスチェーン で StatefulRetryOperationsInterceptor を選択するのが 1 つの選択肢です。インターセプターには、特定の環境に適したカスタムの配信不能アクションを実装する回復コールバックを含めることができます。

もう 1 つの方法は、コンテナーの defaultRequeueRejected プロパティを false に設定することです。これにより、失敗したすべてのメッセージが破棄されます。RabbitMQ 2.8.x 以上を使用する場合、デッドレター交換へのメッセージの配信も容易になります。

または、AmqpRejectAndDontRequeueException をスローすることもできます。これにより、defaultRequeueRejected プロパティの設定に関係なく、メッセージの再キューイングが防止されます。

バージョン 2.1 から、ImmediateRequeueAmqpException が導入され、まったく逆のロジックが実行されます。defaultRequeueRejected プロパティの設定に関係なく、メッセージは再度キューに入れられます。

多くの場合、両方の手法を組み合わせて使用します。アドバイスチェーン で StatefulRetryOperationsInterceptor を使用し、AmqpRejectAndDontRequeueException をスローする MessageRecoverer を使用できます。MessageRecover は、すべての再試行が終了したときに呼び出されます。RejectAndDontRequeueRecoverer はまさにそれを行います。デフォルトの MessageRecoverer は、誤ったメッセージを消費し、WARN メッセージを発行します。

バージョン 1.3 以降、新しい RepublishMessageRecoverer が提供され、再試行が使い果たされた後に失敗したメッセージを公開できるようになりました。

リカバリが最終例外を消費すると、メッセージは確認応答され、ブローカーによってデッドレター交換に送信されません (構成されている場合)。

コンシューマー側で RepublishMessageRecoverer を使用する場合、受信したメッセージは receivedDeliveryMode メッセージプロパティに deliveryMode を持ちます。この場合、deliveryMode は null です。これは、ブローカーの NON_PERSISTENT 配信モードを意味します。バージョン 2.0 以降では、deliveryMode が null の場合に再公開するようにメッセージに設定するように RepublishMessageRecoverer を構成できます。デフォルトでは、MessageProperties デフォルト値 - MessageDeliveryMode.PERSISTENT を使用します。

次の例は、RepublishMessageRecoverer をリカバリとして設定する方法を示しています。

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(5)
            .recoverer(new RepublishMessageRecoverer(amqpTemplate(), "something", "somethingelse"))
            .build();
}

RepublishMessageRecoverer は、例外メッセージ、スタックトレース、元の交換、ルーティングキーなどの追加情報をメッセージヘッダーに含めてメッセージを発行します。サブクラスを作成して additionalHeaders() をオーバーライドすることにより、追加のヘッダーを追加できます。次の例に示すように、deliveryMode (またはその他のプロパティ) も additionalHeaders() で変更できます。

RepublishMessageRecoverer recoverer = new RepublishMessageRecoverer(amqpTemplate, "error") {

    protected Map<? extends String, ? extends Object> additionalHeaders(Message message, Throwable cause) {
        message.getMessageProperties()
            .setDeliveryMode(message.getMessageProperties().getReceivedDeliveryMode());
        return null;
    }

};

バージョン 2.0.5 以降では、スタックトレースが大きすぎると切り捨てられる場合があります。これは、すべてのヘッダーが 1 つのフレームに収まる必要があるためです。デフォルトでは、スタックトレースによって他のヘッダーに使用できる 20,000 バイト (「ヘッドルーム」) よりも少ない場合、切り捨てられます。これは、他のヘッダー用に多かれ少なかれスペースが必要な場合は、リカバリの frameMaxHeadroom プロパティを設定することで調整できます。バージョン 2.1.13、2.2.3 以降では、例外メッセージがこの計算に含まれ、スタックトレースの量は次のアルゴリズムを使用して最大化されます。

  • スタックトレースだけで制限を超える場合、例外メッセージヘッダーは 97 バイトと …​ に切り捨てられ、スタックトレースも切り捨てられます。

  • スタックトレースが小さい場合、メッセージは利用可能なバイトに収まるように切り捨てられます (プラス …​) (ただし、スタックトレース内のメッセージ自体は 97 バイトと …​ に切り捨てられます)。

あらゆる種類の切り捨てが発生するたびに、元の例外がログに記録され、完全な情報が保持されます。例外型などの情報を式で使用できるように、ヘッダーが拡張された後に評価が実行されます。

バージョン 2.4.8 以降では、Message を評価のルートオブジェクトとして、エラー交換とルーティングキーを SpEL 式として提供できます。

バージョン 2.3.3 から、新しいサブクラス RepublishMessageRecovererWithConfirms が提供されています。これは発行者の確認の両方のスタイルをサポートし、戻る前に確認を待機します (または、確認されていない場合やメッセージが返された場合は例外をスローします)。

確認型が CORRELATED の場合、サブクラスはメッセージが返されたかどうかも検出し、AmqpMessageReturnedException をスローします。パブリケーションが否定的に承認された場合、AmqpNackReceivedException がスローされます。

確認型が SIMPLE の場合、サブクラスはチャネルで waitForConfirmsOrDie メソッドを呼び出します。

確認と return の詳細については、パブリッシャーの確認と return を参照してください。

バージョン 2.1 から、ImmediateRequeueMessageRecoverer が追加されて ImmediateRequeueAmqpException がスローされます。これは、現在の失敗したメッセージを再キューイングするようにリスナーコンテナーに通知します。

Spring Retry の例外分類

Spring Retry には、どの例外が再試行を呼び出すことができるかを決定するための非常に高い柔軟性があります。デフォルトの構成では、すべての例外に対して再試行します。ユーザー例外が ListenerExecutionFailedException にラップされている場合、分類で例外の原因を調べる必要があります。デフォルトの分類子は、最上位の例外のみを調べます。

Spring Retry 1.0.3 以降、BinaryExceptionClassifier には traverseCauses と呼ばれるプロパティがあります (デフォルト: false)。true の場合、一致が見つかるまで、原因がなくなるまで、例外の原因をたどります。

この分類子を再試行に使用するには、最大試行回数を取るコンストラクター、Exception インスタンスの Map、およびブール値 (traverseCauses) で作成された SimpleRetryPolicy を使用し、このポリシーを RetryTemplate に挿入します。