機能

ほとんどの機能は、@RetryableTopic アノテーションと RetryTopicConfiguration Bean の両方で使用できます。

BackOff の設定

BackOff 構成は、Spring Retry プロジェクトの BackOffPolicy インターフェースに依存しています。

以下が含まれます:

  • 固定バックオフ

  • 指数バックオフ

  • ランダム指数バックオフ

  • 均一ランダムバックオフ

  • ノーバックオフ

  • カスタムバックオフ

@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3_000)
            .maxAttempts(4)
            .create(template);
}

Spring Retry の SleepingBackOffPolicy インターフェースのカスタム実装を提供することもできます。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackOff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}
デフォルトのバックオフポリシーは FixedBackOffPolicy で、試行回数は最大 3 回、間隔は 1000 ミリ秒です。
ExponentialBackOffPolicy のデフォルトの最大遅延は 30 秒です。バックオフポリシーでそれより大きな値の遅延が必要な場合は、それに応じて maxDelay プロパティを調整します。
最初の試行は maxAttempts にカウントされるため、maxAttempts 値を 4 に指定すると、元の試行に 3 回の再試行が加えられます。

グローバルタイムアウト

再試行プロセスのグローバルタイムアウトを設定できます。その時間に達した場合、次にコンシューマーが例外をスローしたときに、メッセージは DLT に直接送信されるか、使用できる DLT がない場合は処理を終了します。

@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}
デフォルトではタイムアウトが設定されていません。これは、タイムアウト値として -1 を指定することでも実現できます。

例外分類子

再試行する例外と再試行しない例外を指定できます。ネストされた例外を検索するために原因をトラバースするように設定することもできます。

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
デフォルトの動作では、すべての例外で再試行され、原因はトラバースされません。

2.8.3 以降、レコードが再試行なしで DLT に送信される原因となる致命的な例外のグローバルリストがあります。致命的な例外のデフォルトのリストについては、DefaultErrorHandler を参照してください。RetryTopicConfigurationSupport を継承する @Configuration クラスで configureNonBlockingRetries メソッドをオーバーライドすることにより、このリストに例外を追加または削除できます。詳細については、グローバル設定と機能の構成を参照してください。

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
致命的な例外の分類を無効にするには、提供されたリストをクリアするだけです。

トピックを含めたり除外したりする

.includeTopic(String topic)、.includeTopics(Collection<String> Topics) .excludeTopic(String topic) および .excludeTopics(Collection<String> topic) メソッドを介して、RetryTopicConfiguration Bean によって処理されるトピックと処理されないトピックを決定できます。.

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
デフォルトの動作では、すべてのトピックが含まれます。

トピック AutoCreation

特に指定がない限り、フレームワークは、KafkaAdmin Bean によって消費される NewTopic Bean を使用して必要なトピックを自動作成します。トピックの作成に使用するパーティションの数とレプリケーション係数を指定でき、この機能をオフにすることもできます。バージョン 3.0 以降、デフォルトのレプリケーション係数は -1 で、ブローカーのデフォルトを使用することを意味します。ブローカーのバージョンが 2.4 より前の場合は、明示的な値を設定する必要があります。

Spring Boot を使用していない場合、この機能を使用するには KafkaAdmin Bean を提供する必要があることに注意してください。
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
デフォルトでは、トピックは 1 つのパーティションと -1 のレプリケーション係数で自動作成されます (ブローカーのデフォルトを使用することを意味します)。ブローカーのバージョンが 2.4 より前の場合は、明示的な値を設定する必要があります。

障害ヘッダー管理

障害ヘッダー(元のヘッダーと例外ヘッダー)の管理方法を検討する場合、フレームワークは DeadLetterPublishingRecover に委譲して、ヘッダーを追加するか置き換えるかを決定します。

デフォルトでは、appendOriginalHeaders を false に明示的に設定し、stripPreviousExceptionHeaders を DeadLetterPublishingRecover で使用されるデフォルトのままにします。

これは、最初の「元の」ヘッダーと最後の例外ヘッダーのみがデフォルト構成で保持されることを意味します。これは、多くの再試行手順が含まれる場合に、(スタックトレースヘッダーなどが原因で)過度に大きなメッセージが作成されないようにするためです。

詳細については、デッドレターレコードヘッダーの管理を参照してください。

これらのプロパティに異なる設定を使用するようにフレームワークを再構成するには、RetryTopicConfigurationSupport を継承する @Configuration クラスで configureCustomizers メソッドをオーバーライドして、DeadLetterPublishingRecoverer カスタマイザーを構成します。詳細については、グローバル設定と機能の構成を参照してください。

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}

バージョン 2.8.4 以降では、カスタムヘッダーを追加したい場合 (ファクトリによって追加された再試行情報ヘッダーに加えて、headersFunction をファクトリに追加できます - factory.setHeadersFunction((rec, ex) -> {... }))。

デフォルトでは、追加されたヘッダーはすべて累積されます。Kafka ヘッダーには複数の値を含めることができます。バージョン 2.9.5 以降、関数によって返される Headers に型 DeadLetterPublishingRecoverer.SingleRecordHeader のヘッダーが含まれている場合、そのヘッダーの既存の値はすべて削除され、新しい単一の値のみが残ります。

カスタム DeadLetterPublishingRecoverer

障害ヘッダー管理でわかるように、フレームワークによって作成されたデフォルトの DeadLetterPublishingRecoverer インスタンスをカスタマイズできます。ただし、ユースケースによっては、DeadLetterPublishingRecoverer をサブクラス化する必要があります。たとえば、createProducerRecord() をオーバーライドして、再試行 (またはデッドレター) トピックに送信される内容を変更します。バージョン 3.0.9 以降では、RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory() メソッドをオーバーライドして DeadLetterPublisherCreator インスタンスを提供できます。例:

@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
        configureDeadLetterPublishingContainerFactory() {

    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}

カスタムインスタンスを構築するときは、提供されているリゾルバーを使用することをお勧めします。