クラス RetryTopicConfigurer
- 実装されているすべてのインターフェース:
Aware,BeanFactoryAware
メインエンドポイントと提供された構成に基づいてメイン、再試行、DLT トピックを構成し、順序の保証を犠牲にしてノンブロッキング方式で分散再試行 /DLT パターンを実現します。
たとえば、"main-topic" トピックがあり、指数関数的なバックオフを 1000ms、倍率を 2、再試行を 3 回行いたい場合、main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000, main-topic-dlt トピックが作成されることになります。この設定は、RetryTopicConfigurationBuilder を使用して 1 つ以上の RetryTopicConfigurer Bean を作成するか、RetryableTopic アノテーションを使用することで実現可能です。使い方の詳細は後述します。
使い方:
メッセージ処理が例外をスローした場合、構成された DefaultErrorHandler および DeadLetterPublishingRecoverer は、DestinationTopicResolver を使用してメッセージを次のトピックに転送し、次のトピックとその遅延を認識します。
転送された各レコードにはバックオフタイムスタンプヘッダーがあり、それより前に KafkaBackoffAwareMessageListenerAdapter によって消費が試行された場合、パーティションの消費は KafkaConsumerBackoffManager によって一時停止され、KafkaBackoffException がスローされます。
パーティションが ContainerProperties の idlePartitionEventInterval プロパティで指定された時間アイドル状態になると、ListenerContainerPartitionIdleEvent が公開され、KafkaConsumerBackoffManager はこれをリッスンして、パーティションの一時停止を解除する必要があるかどうかを確認します。
消費が再開されたときに処理が再び失敗した場合、メッセージは dlt に到達するまで、次のトピックなどに転送されます。
Kafka のパーティション順序の保証と、各トピックの遅延時間が固定されていることを考慮すると、特定の再試行トピックパーティションで消費される最初のメッセージは、そのパーティションのバックオフタイムスタンプが最も早いメッセージであることがわかります。パーティションを一時停止することで、次のことがわかります。他のパーティションでのメッセージ処理を必要以上に遅らせないでください。
使用箇所:
エンドポイントを構成するには、主に 2 つのメソッドがあります。1 つ目は、次のような Configuration アノテーション付きクラスで 1 つ以上の Bean を提供することです。
@Bean
public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
これにより、デフォルト構成を使用して、KafkaListener でアノテーションが付けられたメソッドのすべてのトピックとそのコンシューマーに対して、再試行トピックと dlt トピックが作成されます。メッセージ処理が失敗した場合、DLT トピックに到達するまで、メッセージを次のトピックに転送します。メッセージ転送には KafkaOperations インスタンスが必要です。
各トピックの再試行を処理する方法をよりきめ細かく制御するために、次のように複数の Bean を提供できます。
@Bean
public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.includeTopics(List.of("my-topic", "my-other-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryableTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics(List.of("my-topic", "my-other-topic"))
.retryOn(MyException.class)
.create(template);
}
他のいくつかのオプションが含まれます: トピックの自動作成、バックオフ、retryOn/notRetryOn/RetryTemplate のような横断、単一トピックの固定バックオフ処理、カスタム DLT リスナー Bean、カスタムトピックサフィックス、および特定の listenerContainerFactories の提供。
エンドポイントを設定するもう 1 つの非排他的な方法は、便利な RetryableTopic アノテーションを使用することです。これは、次のような KafkaListener アノテーション付きメソッドに直接配置できます。
@RetryableTopic(attempts = 3,
backOff = @BackOff(delay = 700, maxDelay = 12000, multiplier = 3))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
3.2 以降、RetryableTopic アノテーションは次のような KafkaListener アノテーション付きクラスをサポートします。
@RetryableTopic(attempts = 3,
backOff = @BackOff(delay = 700, maxDelay = 12000, multiplier = 3))
@KafkaListener(topics = "my-annotated-topic")
static class ListenerBean {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
3.2 以降、RetryableTopic アノテーションは次のような KafkaListener アノテーション付きクラスをサポートします。
@RetryableTopic(attempts = 3,
backOff = @BackOff(delay = 700, maxDelay = 12000, multiplier = 3))
@KafkaListener(topics = "my-annotated-topic")
static class ListenerBean {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
または、次のようなメタアノテーション経由:
@RetryableTopic(backOff = @BackOff(delay = 700, maxDelay = 12000, multiplier = 3))
public @interface WithExponentialBackoffRetry {
@AliasFor(attribute = "attempts", annotation = RetryableTopic.class)
String retries();
}
@WithExponentialBackoffRetry(retries = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
アノテーションとビルダーのアプローチでは同じ構成を使用でき、両方を同時に使用できます。同じメソッド / トピックを両方で処理できる場合は、アノテーションが優先されます。
DLT 処理:
DLT ハンドラーメソッドは、RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) メソッドを通じて提供でき、DLT トピックを処理する必要があるクラスとメソッド名を提供します。この型の Bean インスタンスが BeanFactory で見つかった場合、それが使用されているインスタンスです。そうでない場合は、インスタンスが作成されます。このクラスは、依存性注入を通常の Bean として使用できます。
@Bean
public RetryTopicConfiguration otherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
DLT ハンドラーメソッドを提供するもう 1 つの方法は、対応する KafkaListener と同じクラス内で使用する必要がある DltHandler アノテーションを使用することです。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
DLT ハンドラーが提供されていない場合は、デフォルトの RetryTopicConfigurer.LoggingDltListenerHandlerMethod が使用されます。- 導入:
- 2.7
- 作成者:
- Tomaz Fernandes, Fabio da Silva Jr., Gary Russell, Wang Zhiyang, Borahm Lee
- 関連事項:
ネストされたクラスの要約
ネストされたクラスフィールドのサマリー
フィールドコンストラクターの概要
コンストラクターコンストラクター説明RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) 提供されたプロパティでインスタンスを作成します。メソッドのサマリー
修飾子と型メソッド説明protected EndpointCustomizer<MethodKafkaListenerEndpoint<?, ?>> createEndpointCustomizer(EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) static EndpointHandlerMethodcreateHandlerMethodWith(ObjectSE bean, MethodSE method) static EndpointHandlerMethodcreateHandlerMethodWith(ObjectSE beanOrClass, StringSE methodName) protected voidcreateNewTopicBeans(CollectionSE<StringSE> topics, org.springframework.kafka.retrytopic.RetryTopicConfiguration.TopicCreation config) protected EndpointHandlerMethodgetEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, DestinationTopic.Properties props) voidprocessMainAndRetryListeners(RetryTopicConfigurer.EndpointProcessor endpointProcessor, MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, KafkaListenerEndpointRegistrar registrar, @Nullable KafkaListenerContainerFactory<?> factory, StringSE defaultContainerFactoryBeanName) 再試行および dlt エンドポイント、および対応する listenerContainer を作成するコンテナーファクトリを作成および構成するためのエントリポイント。voidsetBeanFactory(BeanFactory beanFactory)
フィールドの詳細
DEFAULT_DLT_HANDLER
DLT でメッセージを処理するためのデフォルトの方法。
コンストラクターの詳細
RetryTopicConfigurer
@Autowired public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) 提供されたプロパティでインスタンスを作成します。- パラメーター:
destinationTopicProcessor- 宛先トピックプロセッサー。containerFactoryResolver- コンテナーファクトリリゾルバー。listenerContainerFactoryConfigurer- コンテナーファクトリコンフィギュレータ。retryTopicNamesProviderFactory- 再試行トピック名はファクトリです。
メソッドの詳細
processMainAndRetryListeners
public void processMainAndRetryListeners(RetryTopicConfigurer.EndpointProcessor endpointProcessor, MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, KafkaListenerEndpointRegistrar registrar, @Nullable KafkaListenerContainerFactory<?> factory, StringSE defaultContainerFactoryBeanName) 再試行および dlt エンドポイント、および対応する listenerContainer を作成するコンテナーファクトリを作成および構成するためのエントリポイント。- パラメーター:
endpointProcessor- エンドポイント processListener メソッドを処理する関数。mainEndpoint- 再試行エンドポイントと dlt エンドポイントも作成および処理されるエンドポイント。configuration- トピックの構成。registrar- エンドポイントを登録するKafkaListenerEndpointRegistrar。factory-KafkaListenerで提供されるファクトリdefaultContainerFactoryBeanName-KafkaListenerのデフォルトのファクトリ Bean 名
getEndpointHandlerMethod
protected EndpointHandlerMethod getEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, DestinationTopic.Properties props) createNewTopicBeans
protected void createNewTopicBeans(CollectionSE<StringSE> topics, org.springframework.kafka.retrytopic.RetryTopicConfiguration.TopicCreation config) createEndpointCustomizer
protected EndpointCustomizer<MethodKafkaListenerEndpoint<?,?>> createEndpointCustomizer(EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) createHandlerMethodWith
public static EndpointHandlerMethod createHandlerMethodWith(ObjectSE beanOrClass, StringSE methodName) createHandlerMethodWith
setBeanFactory
- 次で指定:
- インターフェース
BeanFactoryAwareのsetBeanFactory - 例外:
BeansException