DLT 戦略
このフレームワークは、DLT を操作するためのいくつかの戦略を提供します。DLT 処理の方法を提供するか、デフォルトのログ記録方法を使用するか、DLT をまったく使用しないことができます。また、DLT 処理が失敗した場合の動作を選択することもできます。
DLT 処理方式
トピックの DLT を処理するために使用する方法と、その処理が失敗した場合の動作を指定できます。
これを行うには、@RetryableTopic
アノテーションを持つクラスのメソッドで @DltHandler
アノテーションを使用できます。そのクラス内のすべての @RetryableTopic
アノテーション付きメソッドで同じメソッドが使用されることに注意してください。
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
DLT ハンドラーメソッドは、RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)
メソッドを通じて提供することもでき、引数として Bean 名と DLT のメッセージを処理するメソッド名を渡します。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
DLT ハンドラーが提供されていない場合は、デフォルトの RetryTopicConfigurer.LoggingDltListenerHandlerMethod が使用されます。 |
バージョン 2.8 以降、デフォルトのハンドラーを含め、このアプリケーションで DLT から消費したくない場合(または消費を延期したい場合)、DLT コンテナーを開始するかどうかを制御できます。コンテナーファクトリの autoStartup
プロパティ。
@RetryableTopic
アノテーションを使用する場合は、autoStartDltHandler
プロパティを false
に設定します。構成ビルダーを使用する場合は、autoStartDltHandler(false)
を使用してください。
後で KafkaListenerEndpointRegistry
を介して DLT ハンドラーを開始できます。
DLT 障害動作
DLT 処理が失敗した場合、ALWAYS_RETRY_ON_ERROR
と FAIL_ON_ERROR
の 2 つの可能な動作が利用可能です。
前者では、レコードは DLT トピックに転送されるため、他の DLT レコードの処理がブロックされることはありません。後者の場合、コンシューマーはメッセージを転送せずに実行を終了します。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
デフォルトの動作は ALWAYS_RETRY_ON_ERROR です。 |
バージョン 2.8.3 以降では、レコードによって DeserializationException などの致命的な例外がスローされる場合、ALWAYS_RETRY_ON_ERROR はレコードを DLT にルーティングしません。一般に、そのような例外は常にスローされるためです。 |
致命的と見なされる例外は次のとおりです。
DeserializationException
MessageConversionException
ConversionException
MethodArgumentResolutionException
NoSuchMethodException
ClassCastException
DestinationTopicResolver
Bean のメソッドを使用して、このリストに例外を追加したり、このリストから例外を削除したりできます。
詳細については、例外分類子を参照してください。
DLT なしの構成
フレームワークは、トピックの DLT を構成しない可能性も提供します。この場合、再試行回数が尽きた後、処理は単純に終了します。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotConfigureDlt()
.create(template);
}