例外の処理

このセクションでは、Spring for Apache Kafka の使用時に発生する可能性のあるさまざまな例外の処理方法について説明します。

リスナーエラーハンドラー

バージョン 2.0 以降、@KafkaListener アノテーションには新しい属性 errorHandler があります。

errorHandler を使用して、KafkaListenerErrorHandler 実装の Bean 名を提供できます。次のように、この関数インターフェースには 1 つのメソッドがあります。

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

メッセージコンバーターによって生成された spring-messaging Message<?> オブジェクトと、ListenerExecutionFailedException にラップされたリスナーによってスローされた例外にアクセスできます。エラーハンドラーは、コンテナーにスローされる元の例外または新しい例外をスローできます。エラーハンドラーによって返されるものはすべて無視されます。

バージョン 2.7 以降、MessagingMessageConverter および BatchMessagingMessageConverter で rawRecordHeader プロパティを設定できます。これにより、生の ConsumerRecord が KafkaHeaders.RAW_DATA ヘッダーの変換された Message<?> に追加されます。これは、たとえば、リスナーエラーハンドラーで DeadLetterPublishingRecoverer を使用する場合に便利です。これは、デッドレタートピックで失敗したレコードをキャプチャーした後、何度か再試行した後、失敗結果を送信者に送信するリクエスト / 応答シナリオで使用される場合があります。

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

次のメソッドを介して、コンシューマーオブジェクトにアクセスできるサブインターフェース (ConsumerAwareListenerErrorHandler) があります。

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

別のサブインターフェース (ManualAckListenerErrorHandler) は、手動 AckMode を使用するときに Acknowledgment オブジェクトへのアクセスを提供します。

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

どちらの場合でも、コンテナーはシークを認識しないため、コンシューマーに対してシークを実行しないでください。

コンテナーエラーハンドラー

バージョン 2.8 以降、従来の ErrorHandler および BatchErrorHandler インターフェースは、新しい CommonErrorHandler に置き換えられました。これらのエラーハンドラーは、レコードリスナーとバッチリスナーの両方のエラーを処理できるため、単一のリスナーコンテナーファクトリで両方の型のリスナーのコンテナーを作成できます。ほとんどの従来のフレームワークエラーハンドラー実装を置き換える CommonErrorHandler 実装が提供されています。

カスタムエラーハンドラーを CommonErrorHandler に移行する方法については、カスタムレガシーエラーハンドラーの実装を CommonErrorHandler に移行するを参照してください。

トランザクションが使用されている場合、デフォルトではエラーハンドラーは構成されていないため、例外によってトランザクションがロールバックされます。トランザクションコンテナーのエラー処理は AfterRollbackProcessor によって処理されます。トランザクションの使用時にカスタムエラーハンドラーを提供する場合、トランザクションをロールバックするには、例外をスローする必要があります。

このインターフェースには、コンテナーによって呼び出されるデフォルトのメソッド isAckAfterHandle() があり、エラーハンドラーが例外をスローせずに戻った場合に、オフセットをコミットする必要があるかどうかを判断します。デフォルトでは true を返します。

通常、フレームワークによって提供されるエラーハンドラーは、エラーが「処理」されていない場合(たとえば、シーク操作の実行後)に例外をスローします。デフォルトでは、このような例外は ERROR レベルでコンテナーによってログに記録されます。すべてのフレームワークエラーハンドラーは KafkaExceptionLogLevelAware を継承し、これらの例外がログに記録されるレベルを制御できるようにします。

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

コンテナーファクトリのすべてのリスナーに使用されるグローバルエラーハンドラーを指定できます。次の例は、その方法を示しています。

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

デフォルトでは、アノテーション付きのリスナーメソッドが例外をスローすると、コンテナーにスローされ、メッセージはコンテナーの設定に従って処理されます。

コンテナーは、エラーハンドラーを呼び出す前に、保留中のオフセットコミットをすべてコミットします。

Spring Boot を使用している場合は、エラーハンドラーを @Bean として追加するだけで、Boot が自動構成されたファクトリに追加します。

バックオフハンドラー

DefaultErrorHandler などのエラーハンドラーは、BackOff を使用して、配信を再試行するまでの待機時間を決定します。バージョン 2.9 以降、カスタム BackOffHandler を構成できます。デフォルトのハンドラーは、バックオフ時間が経過するまで (またはコンテナーが停止するまで) スレッドを中断するだけです。フレームワークは、バックオフ時間が経過するまでリスナーコンテナーを一時停止し、その後コンテナーを再開する ContainerPausingBackOffHandler も提供します。これは、遅延が max.poll.interval.ms コンシューマープロパティよりも長い場合に役立ちます。実際のバックオフ時間の解決は、pollTimeout コンテナープロパティの影響を受けることに注意してください。

DefaultErrorHandler

この新しいエラーハンドラーは、現在いくつかのリリースでデフォルトのエラーハンドラーとなっている SeekToCurrentErrorHandler および RecoveringBatchErrorHandler に代わるものです。1 つの違いは、バッチリスナーのフォールバック動作(BatchListenerFailedException 以外の例外がスローされた場合)が完全なバッチの再試行と同等であることです。

バージョン 2.9 以降、DefaultErrorHandler は、以下で説明する未処理のレコードオフセットをシークするのと同じセマンティクスを提供するように構成できますが、実際にはシークしません。代わりに、レコードはリスナーコンテナーによって保持され、エラーハンドラーが終了した後 (および一時停止された poll() を 1 回実行した後、コンシューマーを存続させるためにリスナーに再送信されます。ノンブロッキング再試行または ContainerPausingBackOffHandler が使用されている場合、一時停止は複数の世論調査)。エラーハンドラーは、現在失敗しているレコードを再送信できるかどうか、回復された後はリスナーに再度送信されないかどうかを示す結果をコンテナーに返します。このモードを有効にするには、プロパティ seekAfterError を false に設定します。

エラーハンドラーは、失敗し続けるレコードを回復(スキップ)できます。デフォルトでは、10 回の失敗の後、失敗したレコードがログに記録されます(ERROR レベルで)。カスタムリカバリ(BiConsumer)と、配信の試行とそれぞれの間の遅延を制御する BackOff を使用してハンドラーを構成できます。FixedBackOff.UNLIMITED_ATTEMPTS とともに FixedBackOff を使用すると、(事実上)無限の再試行が発生します。次の例では、3 回試行した後のリカバリを構成します。

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

このハンドラーのカスタマイズされたインスタンスを使用してリスナーコンテナーを構成するには、コンテナーファクトリに追加します。

例: @KafkaListener コンテナーファクトリでは、次のように DefaultErrorHandler を追加できます。

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

レコードリスナーの場合、これにより、デフォルト構成(FixedBackOff(0L, 9))の代わりに、1 秒のバックオフで最大 2 回(3 回の配信試行)の配信が再試行されます。再試行が終了した後、失敗は単にログに記録されます。

たとえば、poll が 6 つのレコード (各パーティション 0, 1, 2 から 2 つ) を返し、リスナーが 4 番目のレコードで例外をスローした場合、コンテナーはオフセットをコミットすることで最初の 3 つのメッセージを確認します。DefaultErrorHandler は、パーティション 1 に対してオフセット 1 を、パーティション 2 に対してオフセット 0 を求めます。次の poll() は、3 つの未処理レコードを返します。

AckMode が BATCH の場合、コンテナーはエラーハンドラーを呼び出す前に、最初の 2 つのパーティションのオフセットをコミットします。

バッチリスナーの場合、リスナーは、バッチ内のどのレコードが失敗したかを示す BatchListenerFailedException をスローする必要があります。

イベントのシーケンスは次のとおりです。

  • インデックスの前にレコードのオフセットをコミットします。

  • 再試行を繰り返しても実行されない場合は、シークを実行して、残りのすべてのレコード (失敗したレコードを含む) が再配信されるようにします。

  • 再試行が終了した場合は、失敗したレコード (デフォルトログのみ) の回復を試み、残りのレコード (失敗したレコードを除く) が再配信されるようにシークを実行します。回復されたレコードのオフセットがコミットされます。

  • 再試行回数を超えて回復に失敗した場合、再試行回数に達していないかのようにシークが実行されます。

バージョン 2.9 から、DefaultErrorHandler は、上記で説明した未処理のレコードオフセットをシークするのと同じセマンティクスを提供するように構成できますが、実際にはシークしません。代わりに、エラーハンドラーは、未処理のレコードのみを含む新しい ConsumerRecords<?, ?> を作成します。これは、リスナーに送信されます (一時停止された poll() を 1 回実行した後、コンシューマーを存続させます)。このモードを有効にするには、プロパティ seekAfterError を false に設定します。

デフォルトの回復者は、再試行回数が尽きた後、失敗したレコードをログに記録します。カスタムリカバリツール、または DeadLetterPublishingRecoverer などのフレームワークによって提供されるものを使用できます。

POJO バッチリスナー (例: List<Thing>) を使用していて、例外に追加する完全なコンシューマーレコードがない場合は、失敗したレコードのインデックスを追加するだけで済みます。

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

コンテナーが AckMode.MANUAL_IMMEDIATE で構成されている場合、リカバリされたレコードのオフセットをコミットするようにエラーハンドラーを構成できます。commitRecovered プロパティを true に設定します。

デッドレターレコードの公開も参照してください。

トランザクションを使用する場合、同様の機能が DefaultAfterRollbackProcessor によって提供されます。ロールバック後のプロセッサーを参照してください。

DefaultErrorHandler は特定の例外を致命的と見なし、そのような例外の再試行はスキップされます。リカバリ装置は、最初の障害時に呼び出されます。デフォルトで致命的と見なされる例外は次のとおりです。

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

これらの例外は、再試行された配信では解決されそうにないためです。

再試行不可能なカテゴリに例外型を追加するか、分類された例外のマップを完全に置き換えることができます。詳細については、DefaultErrorHandler.addNotRetryableException() および DefaultErrorHandler.setClassifications() の Javadoc、および spring-retry BinaryExceptionClassifier の Javadoc を参照してください。

再試行できない例外に IllegalArgumentException を追加する例を次に示します。

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}

エラーハンドラーは、1 つ以上の RetryListener で構成でき、再試行と回復の進行状況の通知を受け取ります。バージョン 2.8.10 以降、バッチリスナーのメソッドが追加されました。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

詳細については、JavaDocs を参照してください。

リカバリが失敗した場合(例外をスローした場合)、失敗したレコードはシークに含まれます。リカバリが失敗した場合、BackOff はデフォルトでリセットされ、リカバリが再試行される前に再配信が再度バックオフされます。リカバリが失敗した後の再試行をスキップするには、エラーハンドラーの resetStateOnRecoveryFailure を false に設定します。

エラーハンドラーに BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> を提供して、失敗したレコードや例外に基づいて、使用する BackOff を決定できます。

handler.setBackOffFunction((record, ex) -> { ... });

関数が null を返す場合、ハンドラーのデフォルトの BackOff が使用されます。

resetStateOnExceptionChange を true に設定すると、再試行シーケンスが再開されます (そのように構成されている場合は、新しい BackOff の選択を含む) 失敗の間で例外型が変更された場合。false (バージョン 2.9 より前のデフォルト) の場合、例外型は考慮されません。

バージョン 2.9 から、これはデフォルトで true になりました。

配信試行ヘッダーも参照してください。

バッチエラーハンドラーによる変換エラー

バージョン 2.8 以降、バッチリスナーは ByteArrayDeserializerBytesDeserializerStringDeserializerDefaultErrorHandler とともに MessageConverter を使用する場合に、変換エラーを適切に処理できるようになりました。変換エラーが発生すると、ペイロードは null に設定され、ErrorHandlingDeserializer と同様に、逆直列化例外がレコードヘッダーに追加されます。ConversionException のリストはリスナーで利用できるため、リスナーは、変換例外が発生した最初のインデックスを示す BatchListenerFailedException をスローできます。

例:

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

完全なバッチの再試行

これは、リスナーが BatchListenerFailedException 以外の例外をスローするバッチリスナーの DefaultErrorHandler のフォールバック動作になりました。

バッチが再配信される場合、バッチに同じ数のレコードがあること、および / または再配信されたレコードが同じ順序であるという保証はありません。バッチのリトライ状態を簡単に維持することはできません。FallbackBatchErrorHandler は次のアプローチを採用しています。バッチリスナーが BatchListenerFailedException ではない例外をスローした場合、メモリ内のレコードのバッチから再試行が実行されます。延長された再試行シーケンス中の再バランスを回避するために、エラーハンドラーはコンシューマーを一時停止し、再試行ごとにバックオフのためにスリープする前にコンシューマーをポーリングし、リスナーを再度呼び出します。再試行が限界に達した場合、バッチ内のレコードごとに ConsumerRecordRecoverer が呼び出されます。リカバリが例外をスローした場合、またはスレッドがスリープ中に中断された場合、レコードのバッチは次のポーリングで再配信されます。終了する前に、結果に関係なく、コンシューマーは再開されます。

このメカニズムはトランザクションでは使用できません。

BackOff 間隔を待機している間、エラーハンドラーは、コンテナーが停止しているかどうかを確認しながら、目的の遅延に達するまで短いスリープでループし、遅延を発生させるのではなく、stop() の直後にスリープを終了できるようにします。

コンテナー停止エラーハンドラー

CommonContainerStoppingErrorHandler は、リスナーが例外をスローした場合にコンテナーを停止します。レコードリスナーの場合、AckMode が RECORD の場合、すでに処理されたレコードのオフセットがコミットされます。レコードリスナーの場合、AckMode が手動値の場合、すでに確認応答されたレコードのオフセットがコミットされます。レコードリスナーの場合 ( AckMode が BATCH の場合)、またはバッチリスナーの場合は、コンテナーの再起動時にバッチ全体が再生されます。

コンテナーが停止した後、ListenerExecutionFailedException をラップする例外がスローされます。これにより、トランザクションがロールバックされます (トランザクションが有効になっている場合)。

エラーハンドラーの委譲

CommonDelegatingErrorHandler は、例外型に応じて、さまざまなエラーハンドラーに委譲できます。例: ほとんどの例外に対して DefaultErrorHandler を呼び出したり、他の例外に対して CommonContainerStoppingErrorHandler を呼び出したりすることができます。

すべてのデリゲートは同じ互換性のあるプロパティ (ackAfterHandleseekAfterError …) を共有する必要があります。

ロギングエラーハンドラー

CommonLoggingErrorHandler は単に例外をログに記録します。レコードリスナーを使用すると、前のポーリングの残りのレコードがリスナーに渡されます。バッチリスナーの場合、バッチ内のすべてのレコードがログに記録されます。

レコードリスナーとバッチリスナーに異なる一般的なエラーハンドラーを使用する

レコードリスナーとバッチリスナーに異なるエラー処理戦略を使用する場合は、CommonMixedErrorHandler が提供されており、リスナー型ごとに特定のエラーハンドラーを構成できます。

一般的なエラーハンドラーの概要

  • DefaultErrorHandler

  • CommonContainerStoppingErrorHandler

  • CommonDelegatingErrorHandler

  • CommonLoggingErrorHandler

  • CommonMixedErrorHandler

レガシーエラーハンドラーとその代替

レガシーエラーハンドラー 置換文字列

LoggingErrorHandler

CommonLoggingErrorHandler

BatchLoggingErrorHandler

CommonLoggingErrorHandler

ConditionalDelegatingErrorHandler

DelegatingErrorHandler

ConditionalDelegatingBatchErrorHandler

DelegatingErrorHandler

ContainerStoppingErrorHandler

CommonContainerStoppingErrorHandler

ContainerStoppingBatchErrorHandler

CommonContainerStoppingErrorHandler

SeekToCurrentErrorHandler

DefaultErrorHandler

SeekToCurrentBatchErrorHandler

代替はありません。DefaultErrorHandler と無限の BackOff を使用してください。

RecoveringBatchErrorHandler

DefaultErrorHandler

RetryingBatchErrorHandler

置換はありません。DefaultErrorHandler を使用し、BatchListenerFailedException 以外の例外をスローします。

カスタムレガシーエラーハンドラーの実装を CommonErrorHandler に移行する

CommonErrorHandler の JavaDocs を参照してください。

ErrorHandler または ConsumerAwareErrorHandler 実装を置き換えるには、handleOne() を実装し、seeksAfterHandle() が false (デフォルト) を返すようにする必要があります。また、レコード処理の範囲外で発生する例外 (コンシューマーエラーなど) を処理するには、handleOtherException() を実装する必要があります。

RemainingRecordsErrorHandler 実装を置き換えるには、handleRemaining() を実装し、seeksAfterHandle() をオーバーライドして true を返す必要があります (エラーハンドラーは必要なシークを実行する必要があります)。また、handleOtherException() を実装して、レコード処理の範囲外で発生する例外 (コンシューマーエラーなど) を処理する必要があります。

BatchErrorHandler 実装を置き換えるには、handleBatch() も実装する必要があります。handleOtherException() も実装する必要があります。これにより、レコード処理の範囲外で発生する例外(コンシューマーエラーなど)を処理できます。

ロールバック後プロセッサー

トランザクションを使用しているときに、リスナーが例外をスローした場合 (およびエラーハンドラーが存在する場合は例外をスローした場合)、トランザクションはロールバックされます。デフォルトでは、未処理のレコード (失敗したレコードを含む) は次のポーリングで再フェッチされます。これは、DefaultAfterRollbackProcessor で seek 操作を実行することで実現されます。バッチリスナーを使用すると、レコードのバッチ全体が再処理されます (コンテナーは、バッチ内のどのレコードが失敗したかを認識しません)。この動作を変更するには、カスタム AfterRollbackProcessor を使用してリスナーコンテナーを構成します。例: レコードベースのリスナーを使用している場合、失敗したレコードを追跡し、何回か試行した後、おそらくそれをデッドレタートピックに公開することであきらめたいと思うかもしれません。

バージョン 2.2 以降、DefaultAfterRollbackProcessor は失敗し続けるレコードを回復 (スキップ) できるようになりました。デフォルトでは、10 回失敗すると、失敗したレコードがログに記録されます (ERROR レベルで)。カスタムリカバリ (BiConsumer) と最大の障害を使用してプロセッサーを構成できます。maxFailures プロパティを負の数に設定すると、無限の再試行が発生します。次の例では、3 回試行した後の回復を構成します。

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

トランザクションを使用しない場合は、DefaultErrorHandler を構成することで同様の機能を実現できます。コンテナーエラーハンドラーを参照してください。

バージョン 3.2 以降、リカバリでは失敗し続けるレコードのバッチ全体をリカバリ (スキップ) できるようになりました。この機能を有効にするには、ContainerProperties.setBatchRecoverAfterRollback(true) を設定します。

デフォルトの動作では、バッチ内のどのレコードが失敗し続けるかフレームワークが認識していないため、バッチリスナーでは回復できません。このような場合、アプリケーションリスナーは失敗し続けるレコードを処理する必要があります。

デッドレターレコードの公開も参照してください。

バージョン 2.2.5 以降、新しいトランザクションで DefaultAfterRollbackProcessor を呼び出すことができます (失敗したトランザクションがロールバックした後に開始されます)。次に、DeadLetterPublishingRecoverer を使用して失敗したレコードを公開している場合、プロセッサーは元のトピック / パーティション内の回復されたレコードのオフセットをトランザクションに送信します。この機能を有効にするには、DefaultAfterRollbackProcessor で commitRecovered および kafkaTemplate プロパティを設定します。

リカバリが失敗した場合 (例外をスローした場合)、失敗したレコードがシークに含まれます。バージョン 2.5.5 以降、回復者が失敗した場合、BackOff はデフォルトでリセットされ、回復が再試行される前に再配信が再びバックオフを通過します。以前のバージョンでは、BackOff はリセットされず、次の障害で回復が再試行されました。以前の動作に戻すには、プロセッサーの resetStateOnRecoveryFailure プロパティを false に設定します。

バージョン 2.6 以降、失敗したレコードや例外に基づいて、使用する BackOff を決定する BiFunction<ConsumerRecord<?, ?>, Exception, BackOff> をプロセッサーに提供できるようになりました。

handler.setBackOffFunction((record, ex) -> { ... });

関数が null を返す場合、プロセッサーのデフォルトの BackOff が使用されます。

バージョン 2.6.3 以降、resetStateOnExceptionChange を true に設定すると、失敗の間に例外型が変更された場合に、再試行シーケンスが再開されます(構成されている場合は、新しい BackOff の選択を含む)。デフォルトでは、例外型は考慮されません。

バージョン 2.3.1 以降、DefaultErrorHandler と同様に、DefaultAfterRollbackProcessor は特定の例外を致命的とみなし、そのような例外の再試行はスキップされます。リカバリは最初の失敗時に呼び出されます。デフォルトで致命的と見なされる例外は次のとおりです。

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

これらの例外は、再試行された配信では解決されそうにないためです。

再試行不可能なカテゴリにさらに例外の種類を追加するか、分類された例外のマップを完全に置き換えることができます。詳細については、DefaultAfterRollbackProcessor.setClassifications() の Javadoc および spring-retry BinaryExceptionClassifier の Javadoc を参照してください。

再試行できない例外に IllegalArgumentException を追加する例を次に示します。

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}

配信試行ヘッダーも参照してください。

現在の kafka-clients では、コンテナーは ProducerFencedException がリバランスによって引き起こされたのか、プロデューサーの transactional.id がタイムアウトまたは期限切れのために取り消されたのかを検出できません。ほとんどの場合、これはリバランスによって引き起こされるため、コンテナーは AfterRollbackProcessor を呼び出しません (パーティションが割り当てられなくなったため、パーティションをシークするのは適切ではないためです)。タイムアウトが各トランザクションを処理し、定期的に (たとえば ListenerContainerIdleEvent を介して) 「空の」トランザクションを実行するのに十分な大きさであることを確認すると、タイムアウトと期限切れによるフェンシングを回避できます。または、stopContainerWhenFenced コンテナープロパティを true に設定すると、コンテナーが停止し、レコードの損失を回避できます。ConsumerStoppedEvent を使用し、FENCED の Reason プロパティをチェックして、この状態を検出できます。イベントにはコンテナーへの参照も含まれているため、このイベントを使用してコンテナーを再起動できます。

バージョン 2.7 以降、BackOff 間隔を待機している間、エラーハンドラーは、コンテナーが停止したかどうかを確認しながら、必要な遅延に達するまで短いスリープでループし、stop() ではなく、stop() の直後にスリープを終了できるようにします。遅延を引き起こしています。

バージョン 2.7 以降、プロセッサーは 1 つ以上の RetryListener で構成でき、再試行および回復の進行状況の通知を受け取ります。

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

詳細については、JavaDocs を参照してください。

配信試行ヘッダー

以下は、バッチリスナーではなく、レコードリスナーにのみ適用されます。

バージョン 2.5 以降、DeliveryAttemptAware を実装する ErrorHandler または AfterRollbackProcessor を使用する場合、レコードへの KafkaHeaders.DELIVERY_ATTEMPT ヘッダー (kafka_deliveryAttempt) の追加を有効にすることができます。このヘッダーの値は、1 から始まる増分整数です。生の ConsumerRecord<?, ?> を受信する場合、整数は byte[4] にあります。

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

@KafkaListener を DefaultKafkaHeaderMapper または SimpleKafkaHeaderMapper とともに使用する場合、リスナーメソッドに @Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery をパラメーターとして追加することで取得できます。

このヘッダーの設定を有効にするには、コンテナープロパティ deliveryAttemptHeader を true に設定します。各レコードの状態を調べてヘッダーを追加する (小さな) オーバーヘッドを回避するために、デフォルトでは無効になっています。

DefaultErrorHandler および DefaultAfterRollbackProcessor は、この機能をサポートしています。

バッチリスナーの配信試行ヘッダー

ConsumerRecord を BatchListener で処理する場合、KafkaHeaders.DELIVERY_ATTEMPT ヘッダーは SingleRecordListener とは異なる方法で存在することができます。

バージョン 3.3 以降、BatchListener を使用するときに KafkaHeaders.DELIVERY_ATTEMPT ヘッダーを ConsumerRecord に挿入する場合は、ErrorHandler で DeliveryAttemptAwareRetryListener を RetryListener として設定します。

以下のコードを参照してください。

final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);

その後、バッチが完了に失敗するたびに、DeliveryAttemptAwareRetryListener は KafkaHeaders.DELIVERY_ATTMPT ヘッダーを ConsumerRecord に挿入します。

リスナー情報ヘッダー

場合によっては、リスナーが実行されているコンテナーを知ることができると便利です。

バージョン 2.8.4 以降、リスナーコンテナーで listenerInfo プロパティを設定したり、@KafkaListener アノテーションで info 属性を設定したりできるようになりました。次に、コンテナーはこれを KafkaListener.LISTENER_INFO ヘッダーですべての受信メッセージに追加します。その後、レコードインターセプター、フィルターなど、リスナー自体で使用できます。

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

RecordInterceptor または RecordFilterStrategy 実装で使用される場合、ヘッダーは、KafkaListenerAnnotationBeanPostProcessor の charSet プロパティを使用して変換されたバイト配列としてコンシューマーレコードにあります。

ヘッダーマッパーは、コンシューマーレコードから MessageHeaders を作成するときにも String に変換し、このヘッダーを送信レコードにマップすることはありません。

バージョン 2.8.6 以降の POJO バッチリスナーの場合、ヘッダーはバッチの各メンバーにコピーされ、変換後に単一の String パラメーターとしても使用できます。

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
バッチリスナーにフィルターがあり、フィルターの結果が空のバッチになる場合、空のバッチの情報は利用できないため、required = false を @Header パラメーターに追加する必要があります。

List<Message<Thing>> を受け取った場合、情報は各 Message<?> の KafkaHeaders.LISTENER_INFO ヘッダーにあります。

バッチの消費の詳細については、バッチリスナーを参照してください。

デッドレターレコードの公開

レコードの障害の最大数に達したときに、レコードリカバリ装置を使用して DefaultErrorHandler および DefaultAfterRollbackProcessor を構成できます。フレームワークは、失敗したメッセージを別のトピックに公開する DeadLetterPublishingRecoverer を提供します。リカバリには、レコードの送信に使用される KafkaTemplate<Object, Object> が必要です。オプションで、宛先トピックとパーティションを解決するために呼び出される BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> を使用して構成することもできます。

デフォルトでは、配信不能レコードは <originalTopic>-dlt という名前のトピック ( -dlt のサフィックスが付いた元のトピック名) と、元のレコードと同じパーティションに送信されます。デフォルトのリゾルバーを使用する場合、配信不能トピックには少なくとも元のトピックと同じ数のパーティションが必要です。

返された TopicPartition に負のパーティションがある場合、そのパーティションは ProducerRecord に設定されていないため、パーティションは Kafka によって選択されます。バージョン 2.2.4 以降、すべての ListenerExecutionFailedException (たとえば、@KafkaListener メソッドで例外が検出された場合にスローされる) は、groupId プロパティで拡張されています。これにより、宛先リゾルバーは、デッドレタートピックを選択するための ConsumerRecord 内の情報に加えて、これを使用できます。

次の例は、カスタム宛先リゾルバーを接続する方法を示しています。

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

配信不能トピックに送信されるレコードは、次のヘッダーで拡張されます。

  • KafkaHeaders.DLT_EXCEPTION_FQCN: 例外クラス名(通常は ListenerExecutionFailedException ですが、他の名前にすることもできます)。

  • KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: 例外の原因となるクラス名(存在する場合)(バージョン 2.8 以降)。

  • KafkaHeaders.DLT_EXCEPTION_STACKTRACE: 例外スタックトレース。

  • KafkaHeaders.DLT_EXCEPTION_MESSAGE: 例外メッセージ。

  • KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: 例外クラス名 (キーの逆直列化エラーのみ)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 例外スタックトレース (キーの逆直列化エラーのみ)。

  • KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 例外メッセージ (キーの逆直列化エラーのみ)。

  • KafkaHeaders.DLT_ORIGINAL_TOPIC: 元ネタ。

  • KafkaHeaders.DLT_ORIGINAL_PARTITION: 元のパーティション。

  • KafkaHeaders.DLT_ORIGINAL_OFFSET: 元のオフセット。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: 元のタイムスタンプ。

  • KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: 元のタイムスタンプ型。

  • KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: レコードの処理に失敗した元のコンシューマーグループ(バージョン 2.8 以降)。

重要な例外は DeserializationException によってのみ発生するため、DLT_KEY_EXCEPTION_CAUSE_FQCN はありません。

さらにヘッダーを追加するには、2 つのメカニズムがあります。

  1. 回復者をサブクラス化して createProducerRecord() をオーバーライドします - super.createProducerRecord() を呼び出して、さらにヘッダーを追加します。

  2. BiFunction を提供して、コンシューマーレコードと例外を受け取り、Headers オブジェクトを返します。そこからのヘッダーは、最終的なプロデューサーレコードにコピーされます。デッドレターレコードヘッダーの管理も参照してください。setHeadersFunction() を使用して BiFunction を設定します。

2 番目の方法は実装が簡単ですが、最初の方法には、すでに組み立てられている標準ヘッダーなど、より多くの情報が含まれています。

バージョン 2.3 以降、ErrorHandlingDeserializer と組み合わせて使用すると、発行者は、デッドレタープロデューサーレコード内のレコード value() を、逆直列化に失敗した元の値に復元します。以前は、value() は null であり、ユーザーコードはメッセージヘッダーから DeserializationException をデコードする必要がありました。さらに、パブリッシャーに複数の KafkaTemplate を提供できます。これは、たとえば、DeserializationException から byte[] を公開する場合や、正常に逆直列化されたレコードから別のシリアライザーを使用する値を公開する場合に必要になる場合があります。String および byte[] シリアライザーを使用する KafkaTemplate でパブリッシャーを構成する例を次に示します。

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

公開者は、マップキーを使用して、公開予定の value() に適したテンプレートを見つけます。キーが順番に検査されるように、LinkedHashMap が推奨されます。

null 値を公開するときに複数のテンプレートがある場合、リカバリは Void クラスのテンプレートを検索します。存在しない場合は、values().iterator() の最初のテンプレートが使用されます。

2.7 以降は、setFailIfSendResultIsError メソッドを使用して、メッセージの公開が失敗したときに例外がスローされるようにすることができます。setWaitForSendResultTimeout を使用した送信者の成功の検証にタイムアウトを設定することもできます。

リカバリが失敗した場合 (例外をスローした場合)、失敗したレコードがシークに含まれます。バージョン 2.5.5 以降、回復者が失敗した場合、BackOff はデフォルトでリセットされ、回復が再試行される前に再配信が再びバックオフを通過します。以前のバージョンでは、BackOff はリセットされず、次の障害で回復が再試行されました。以前の動作に戻すには、エラーハンドラーの resetStateOnRecoveryFailure プロパティを false に設定します。

バージョン 2.6.3 以降、resetStateOnExceptionChange を true に設定すると、失敗の間に例外型が変更された場合に、再試行シーケンスが再開されます(構成されている場合は、新しい BackOff の選択を含む)。デフォルトでは、例外型は考慮されません。

バージョン 2.3 以降、recoverer は Kafka ストリームでも使用できます - 詳細については、デシリアライズ例外からの回復を参照してください。

ErrorHandlingDeserializer は、ヘッダー ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER および ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER に逆直列化例外を追加します (Java 直列化を使用)。デフォルトでは、これらのヘッダーはデッドレタートピックにパブリッシュされたメッセージには保持されません。バージョン 2.7 以降では、キーと値の両方が逆直列化に失敗した場合、両方の元の値が DLT に送信されるレコードに設定されます。

受信レコードが互いに依存しているが、順不同で到着する可能性がある場合は、失敗したレコードをデッドレタートピックに直接送信するのではなく、元のトピックの末尾に (数回) 再発行すると便利な場合があります。例については、このスタックオーバーフローの質問 (英語) を参照してください。

次のエラーハンドラー構成は、まさにそれを行います。

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic.DLT", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

バージョン 2.7 以降、回復者は、宛先リゾルバーによって選択されたパーティションが実際に存在することを確認します。パーティションが存在しない場合、ProducerRecord 内のパーティションは null に設定され、KafkaProducer がパーティションを選択できるようになります。このチェックを無効にするには、verifyPartition プロパティを false に設定します。

バージョン 3.1 以降では、logRecoveryRecord プロパティを true に設定すると、リカバリレコードと例外がログに記録されます。

デッドレターレコードヘッダーの管理

上記のデッドレターレコードの公開を参照すると、DeadLetterPublishingRecoverer には、ヘッダーがすでに存在する場合(ノンブロッキング再試行の使用を含む、失敗したデッドレターレコードの再処理時など)にヘッダーを管理するために使用される 2 つのプロパティがあります。

  • appendOriginalHeaders (デフォルト true)

  • stripPreviousExceptionHeaders (バージョン 2.8 以降のデフォルト true )

Apache Kafka は同じ名前を持つ複数のヘッダーをサポートします。「最新」の値を取得するには、headers.lastHeader(headerName) を使用できます。複数のヘッダーの反復子を取得するには、headers.headers(headerName).iterator() を使用します。

失敗したレコードを繰り返し再発行すると、これらのヘッダーが大きくなる可能性があります(最終的には、RecordTooLargeException が原因で発行が失敗します)。これは、例外ヘッダー、特にスタックトレースヘッダーに特に当てはまります。

2 つのプロパティの理由は、最後の例外情報のみを保持したい場合でも、失敗ごとにレコードが通過したトピックの履歴を保持したい場合があるためです。

appendOriginalHeaders は ORIGINAL という名前のすべてのヘッダーに適用され、stripPreviousExceptionHeaders は EXCEPTION という名前のすべてのヘッダーに適用されます。

バージョン 2.8.4 以降、出力レコードに追加する標準ヘッダーを制御できるようになりました。デフォルトで追加される(現在)10 個の標準ヘッダーの総称名については enum HeadersToAdd を参照してください(これらは実際のヘッダー名ではなく、単なる抽象化です。実際のヘッダー名は、サブクラスがオーバーライドできる getHeaderNames() メソッドによって設定されます。

ヘッダーを除外するには、excludeHeaders() メソッドを使用します。たとえば、ヘッダーへの例外スタックトレースの追加を抑制するには、次を使用します。

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

さらに、ExceptionHeadersCreator を追加することで、例外ヘッダーの追加を完全にカスタマイズできます。これにより、すべての標準例外ヘッダーも無効になります。

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

また、バージョン 2.8.4 以降、addHeadersFunction メソッドを介して複数のヘッダー機能を提供できるようになりました。これにより、たとえばノンブロッキング再試行を使用している場合など、別の機能がすでに登録されている場合でも、追加の機能を適用できます。

ExponentialBackOffWithMaxRetries の実装

Spring Framework は、多数の BackOff 実装を提供します。デフォルトでは、ExponentialBackOff は無期限に再試行します。何度か再試行した後にあきらめるには、maxElapsedTime を計算する必要があります。バージョン 2.7.3 以降、Spring for Apache Kafka は、maxRetries プロパティを受け取り、maxElapsedTime を自動的に計算するサブクラスである ExponentialBackOffWithMaxRetries を提供します。これは、もう少し便利です。

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

これは、1, 2, 4, 8, 10, 10 秒後、リカバリを呼び出す前に再試行します。