機能
ほとんどの機能は、@RetryableTopic
アノテーションと RetryTopicConfiguration
Bean の両方で使用できます。
BackOff の設定
BackOff 構成は、Spring Retry
プロジェクトの BackOffPolicy
インターフェースに依存しています。
以下が含まれます:
固定バックオフ
指数バックオフ
ランダム指数バックオフ
均一ランダムバックオフ
ノーバックオフ
カスタムバックオフ
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(4)
.create(template);
}
Spring Retry の SleepingBackOffPolicy
インターフェースのカスタム実装を提供することもできます。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackoff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
デフォルトのバックオフポリシーは FixedBackOffPolicy で、試行回数は最大 3 回、間隔は 1000 ミリ秒です。 |
ExponentialBackOffPolicy のデフォルトの最大遅延は 30 秒です。バックオフポリシーでそれより大きな値の遅延が必要な場合は、それに応じて maxDelay プロパティを調整します。 |
最初の試行は maxAttempts にカウントされるため、maxAttempts 値を 4 に指定すると、元の試行に 3 回の再試行が加えられます。 |
グローバルタイムアウト
再試行プロセスのグローバルタイムアウトを設定できます。その時間に達した場合、次にコンシューマーが例外をスローしたときに、メッセージは DLT に直接送信されるか、使用できる DLT がない場合は処理を終了します。
@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(2_000)
.timeoutAfter(5_000)
.create(template);
}
デフォルトではタイムアウトが設定されていません。これは、タイムアウト値として -1 を指定することでも実現できます。 |
例外分類子
再試行する例外と再試行しない例外を指定できます。ネストされた例外を検索するために原因をトラバースするように設定することもできます。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
デフォルトの動作では、すべての例外で再試行され、原因はトラバースされません。 |
2.8.3 以降、レコードが再試行なしで DLT に送信される原因となる致命的な例外のグローバルリストがあります。致命的な例外のデフォルトのリストについては、DefaultErrorHandler を参照してください。RetryTopicConfigurationSupport
を継承する @Configuration
クラスで configureNonBlockingRetries
メソッドをオーバーライドすることにより、このリストに例外を追加または削除できます。詳細については、グローバル設定と機能の構成を参照してください。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
致命的な例外の分類を無効にするには、提供されたリストをクリアするだけです。 |
トピックを含めたり除外したりする
.includeTopic(String topic)、.includeTopics(Collection<String> Topics) .excludeTopic(String topic) および .excludeTopics(Collection<String> topic) メソッドを介して、RetryTopicConfiguration
Bean によって処理されるトピックと処理されないトピックを決定できます。.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
デフォルトの動作では、すべてのトピックが含まれます。 |
トピック AutoCreation
特に指定がない限り、フレームワークは、KafkaAdmin
Bean によって消費される NewTopic
Bean を使用して必要なトピックを自動作成します。トピックの作成に使用するパーティションの数とレプリケーション係数を指定でき、この機能をオフにすることもできます。バージョン 3.0 以降、デフォルトのレプリケーション係数は -1
で、ブローカーのデフォルトを使用することを意味します。ブローカーのバージョンが 2.4 より前の場合は、明示的な値を設定する必要があります。
Spring Boot を使用していない場合、この機能を使用するには KafkaAdmin Bean を提供する必要があることに注意してください。 |
@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
デフォルトでは、トピックは 1 つのパーティションと -1 のレプリケーション係数で自動作成されます (ブローカーのデフォルトを使用することを意味します)。ブローカーのバージョンが 2.4 より前の場合は、明示的な値を設定する必要があります。 |
障害ヘッダー管理
障害ヘッダー(元のヘッダーと例外ヘッダー)の管理方法を検討する場合、フレームワークは DeadLetterPublishingRecoverer
に委譲して、ヘッダーを追加するか置き換えるかを決定します。
デフォルトでは、appendOriginalHeaders
を false
に明示的に設定し、stripPreviousExceptionHeaders
を DeadLetterPublishingRecover
で使用されるデフォルトのままにします。
これは、最初の「元の」ヘッダーと最後の例外ヘッダーのみがデフォルト構成で保持されることを意味します。これは、多くの再試行手順が含まれる場合に、(スタックトレースヘッダーなどが原因で)過度に大きなメッセージが作成されないようにするためです。
詳細については、デッドレターレコードヘッダーの管理を参照してください。
これらのプロパティに異なる設定を使用するようにフレームワークを再構成するには、RetryTopicConfigurationSupport
を継承する @Configuration
クラスで configureCustomizers
メソッドをオーバーライドして、DeadLetterPublishingRecoverer
カスタマイザーを構成します。詳細については、グローバル設定と機能の構成を参照してください。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
バージョン 2.8.4 以降では、カスタムヘッダーを追加したい場合 (ファクトリによって追加された再試行情報ヘッダーに加えて、headersFunction
をファクトリに追加できます - factory.setHeadersFunction((rec, ex) -> {... })
)。
デフォルトでは、追加されたヘッダーはすべて累積されます。Kafka ヘッダーには複数の値を含めることができます。バージョン 2.9.5 以降、関数によって返される Headers
に型 DeadLetterPublishingRecoverer.SingleRecordHeader
のヘッダーが含まれている場合、そのヘッダーの既存の値はすべて削除され、新しい単一の値のみが残ります。
カスタム DeadLetterPublishingRecoverer
障害ヘッダー管理でわかるように、フレームワークによって作成されたデフォルトの DeadLetterPublishingRecoverer
インスタンスをカスタマイズできます。ただし、ユースケースによっては、DeadLetterPublishingRecoverer
をサブクラス化する必要があります。たとえば、createProducerRecord()
をオーバーライドして、再試行 (またはデッドレター) トピックに送信される内容を変更します。バージョン 3.0.9 以降では、RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory()
メソッドをオーバーライドして DeadLetterPublisherCreator
インスタンスを提供できます。例:
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
カスタムインスタンスを構築するときは、提供されているリゾルバーを使用することをお勧めします。
スローされた例外に基づいたカスタム DLT へのメッセージのルーティング
バージョン 3.2.0 以降では、処理中にスローされた例外の種類に基づいて、メッセージをカスタム DLT にルーティングできるようになりました。そのためには、ルーティングを指定する必要があります。ルーティングのカスタマイズは、追加の宛先の指定で構成されます。宛先は suffix
と exceptions
の 2 つの設定で構成されます。exceptions
で指定された例外型がスローされた場合、汎用 DLT が考慮される前に、suffix
を含む DLT がメッセージのターゲットトピックとして考慮されます。アノテーションまたは RetryTopicConfiguration
Bean を使用した構成の例:
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(template);
}
suffix
は、カスタム DLT 名の一般的な dltTopicSuffix
の前に発生します。提示された例を考慮すると、DeserializationException
の原因となったメッセージは my-annotated-topic-dlt
ではなく my-annotated-topic-deserialization-dlt
にルーティングされます。カスタム DLT は、トピック AutoCreation に記載されているのと同じルールに従って作成されます。