DLT 戦略

このフレームワークは、DLT を操作するためのいくつかの戦略を提供します。DLT 処理の方法を提供するか、デフォルトのログ記録方法を使用するか、DLT をまったく使用しないことができます。また、DLT 処理が失敗した場合の動作を選択することもできます。

DLT 処理方式

トピックの DLT を処理するために使用する方法と、その処理が失敗した場合の動作を指定できます。

これを行うには、@RetryableTopic アノテーションを持つクラスのメソッドで @DltHandler アノテーションを使用できます。そのクラス内のすべての @RetryableTopic アノテーション付きメソッドで同じメソッドが使用されることに注意してください。

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@DltHandler
public void processDltMessage(MyPojo message) {
    // ... message processing, persistence, etc
}

DLT ハンドラーメソッドは、RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) メソッドを通じて提供することもでき、引数として Bean 名と DLT のメッセージを処理するメソッド名を渡します。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
        // ... message processing, persistence, etc
    }
}
DLT ハンドラーが提供されていない場合は、デフォルトの RetryTopicConfigurer.LoggingDltListenerHandlerMethod が使用されます。

バージョン 2.8 以降、デフォルトのハンドラーを含め、このアプリケーションで DLT から消費したくない場合(または消費を延期したい場合)、DLT コンテナーを開始するかどうかを制御できます。コンテナーファクトリの autoStartup プロパティ。

@RetryableTopic アノテーションを使用する場合は、autoStartDltHandler プロパティを false に設定します。構成ビルダーを使用する場合は、autoStartDltHandler(false) を使用してください。

後で KafkaListenerEndpointRegistry を介して DLT ハンドラーを開始できます。

DLT 障害動作

DLT 処理が失敗した場合、ALWAYS_RETRY_ON_ERROR と FAIL_ON_ERROR の 2 つの可能な動作が利用可能です。

前者では、レコードは DLT トピックに転送されるため、他の DLT レコードの処理がブロックされることはありません。後者の場合、コンシューマーはメッセージを転送せずに実行を終了します。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}
デフォルトの動作は ALWAYS_RETRY_ON_ERROR です。
バージョン 2.8.3 以降では、レコードによって DeserializationException などの致命的な例外がスローされる場合、ALWAYS_RETRY_ON_ERROR はレコードを DLT にルーティングしません。一般に、そのような例外は常にスローされるためです。

致命的と見なされる例外は次のとおりです。

  • DeserializationException

  • MessageConversionException

  • ConversionException

  • MethodArgumentResolutionException

  • NoSuchMethodException

  • ClassCastException

DestinationTopicResolver Bean のメソッドを使用して、このリストに例外を追加したり、このリストから例外を削除したりできます。

詳細については、例外分類子を参照してください。

DLT なしの構成

フレームワークは、トピックの DLT を構成しない可能性も提供します。この場合、再試行回数が尽きた後、処理は単純に終了します。

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}