クラス RetryTopicConfigurer
- 実装されたすべてのインターフェース:
Aware
,BeanFactoryAware
メインエンドポイントと提供された構成に基づいてメイン、再試行、DLT トピックを構成し、順序の保証を犠牲にしてノンブロッキング方式で分散再試行 /DLT パターンを実現します。
たとえば、"main-topic" トピックがあり、指数関数的なバックオフを 1000ms、倍率を 2、再試行を 3 回行いたい場合、main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000, main-topic-dlt トピックが作成されることになります。この設定は、RetryTopicConfigurationBuilder
を使用して 1 つ以上の RetryTopicConfigurer
Bean を作成するか、RetryableTopic
アノテーションを使用することで実現可能です。使い方の詳細は後述します。
使い方:
メッセージ処理が例外をスローした場合、構成された DefaultErrorHandler
および DeadLetterPublishingRecoverer
は、DestinationTopicResolver
を使用してメッセージを次のトピックに転送し、次のトピックとその遅延を認識します。
転送された各レコードにはバックオフタイムスタンプヘッダーがあり、それより前に KafkaBackoffAwareMessageListenerAdapter
によって消費が試行された場合、パーティションの消費は KafkaConsumerBackoffManager
によって一時停止され、KafkaBackoffException
がスローされます。
パーティションが ContainerProperties の idlePartitionEventInterval プロパティで指定された時間アイドル状態になると、ListenerContainerPartitionIdleEvent
が公開され、KafkaConsumerBackoffManager
はこれをリッスンして、パーティションの一時停止を解除する必要があるかどうかを確認します。
消費が再開されたときに処理が再び失敗した場合、メッセージは dlt に到達するまで、次のトピックなどに転送されます。
Kafka のパーティション順序の保証と、各トピックの遅延時間が固定されていることを考慮すると、特定の再試行トピックパーティションで消費される最初のメッセージは、そのパーティションのバックオフタイムスタンプが最も早いメッセージであることがわかります。パーティションを一時停止することで、次のことがわかります。他のパーティションでのメッセージ処理を必要以上に遅らせないでください。
使用箇所:
エンドポイントを構成するには、主に 2 つのメソッドがあります。1 つ目は、次のような Configuration
アノテーション付きクラスで 1 つ以上の Bean
を提供することです。
@Bean
public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, Object> template) { return RetryTopicConfigurationBuilder .newInstance() .create(template); }
これにより、デフォルト構成を使用して、KafkaListener
でアノテーションが付けられたメソッドのすべてのトピックとそのコンシューマーに対して、再試行トピックと dlt トピックが作成されます。メッセージ処理が失敗した場合、DLT トピックに到達するまで、メッセージを次のトピックに転送します。メッセージ転送には KafkaOperations
インスタンスが必要です。
各トピックの再試行を処理する方法をよりきめ細かく制御するために、次のように複数の Bean を提供できます。
@Bean
public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.includeTopics(List.of("my-topic", "my-other-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryableTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics(List.of("my-topic", "my-other-topic"))
.retryOn(MyException.class)
.create(template);
}
他のいくつかのオプションが含まれます: トピックの自動作成、バックオフ、retryOn/notRetryOn/ RetryTemplate
のような横断、単一トピックの固定バックオフ処理、カスタム DLT リスナー Bean、カスタムトピックサフィックス、および特定の listenerContainerFactories の提供。
エンドポイントを設定するもう 1 つの非排他的な方法は、便利な RetryableTopic
アノテーションを使用することです。これは、次のような KafkaListener
アノテーション付きメソッドに直接配置できます。
@RetryableTopic(attempts = 3, backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
@KafkaListener(topics = "my-annotated-topic") public void processMessage(MyPojo message) { // ... message processing }
3.2 以降、RetryableTopic
アノテーションは次のような KafkaListener
アノテーション付きクラスをサポートします。
@RetryableTopic(attempts = 3, backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
@KafkaListener(topics = "my-annotated-topic") static class ListenerBean {
@KafkaHandler public void processMessage(MyPojo message) { // ... message processing }
}
3.2 以降、RetryableTopic
アノテーションは次のような KafkaListener
アノテーション付きクラスをサポートします。
@RetryableTopic(attempts = 3, backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
@KafkaListener(topics = "my-annotated-topic") static class ListenerBean {
@KafkaHandler public void processMessage(MyPojo message) { // ... message processing }
}
または、次のようなメタアノテーション経由:
@RetryableTopic(backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
public @interface WithExponentialBackoffRetry {
@AliasFor(attribute = "attempts", annotation = RetryableTopic.class) String retries(); }
@WithExponentialBackoffRetry(retries = "3")
@KafkaListener(topics = "my-annotated-topic") public void processMessage(MyPojo message) { // ... message processing }
アノテーションとビルダーのアプローチでは同じ構成を使用でき、両方を同時に使用できます。同じメソッド / トピックを両方で処理できる場合は、アノテーションが優先されます。
DLT 処理:
DLT ハンドラーメソッドは、RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)
メソッドを通じて提供でき、DLT トピックを処理する必要があるクラスとメソッド名を提供します。この型の Bean インスタンスが BeanFactory
で見つかった場合、それが使用されているインスタンスです。そうでない場合は、インスタンスが作成されます。このクラスは、依存性注入を通常の Bean として使用できます。
DLT ハンドラーメソッドを提供するもう 1 つの方法は、対応する@Bean public RetryTopicConfiguration otherRetryTopic(KafkaTemplate<Integer, MyPojo> template) { return RetryTopicConfigurationBuilder .newInstance() .dltHandlerMethod("myCustomDltProcessor", "processDltMessage") .create(template); }
@Component public class MyCustomDltProcessor { public void processDltMessage(MyPojo message) { // ... message processing, persistence, etc } }
KafkaListener
と同じクラス内で使用する必要がある DltHandler
アノテーションを使用することです。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
DLT ハンドラーが提供されていない場合は、デフォルトの RetryTopicConfigurer.LoggingDltListenerHandlerMethod
が使用されます。- 導入:
- 2.7
- 作成者:
- Tomaz Fernandes, Fabio da Silva Jr., Gary Russell, Wang Zhiyang, Borahm Lee
- 関連事項:
ネストされたクラスのサマリー
ネストされたクラスフィールドのサマリー
フィールドコンストラクターの概要
コンストラクターコンストラクター説明RetryTopicConfigurer
(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) 提供されたプロパティでインスタンスを作成します。メソッドのサマリー
修飾子と型メソッド説明protected EndpointCustomizer<MethodKafkaListenerEndpoint<?,
?>> createEndpointCustomizer
(EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) static EndpointHandlerMethod
createHandlerMethodWith
(ObjectSE bean, MethodSE method) static EndpointHandlerMethod
createHandlerMethodWith
(ObjectSE beanOrClass, StringSE methodName) protected void
createNewTopicBeans
(CollectionSE<StringSE> topics, org.springframework.kafka.retrytopic.RetryTopicConfiguration.TopicCreation config) protected EndpointHandlerMethod
getEndpointHandlerMethod
(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, DestinationTopic.Properties props) void
processMainAndRetryListeners
(RetryTopicConfigurer.EndpointProcessor endpointProcessor, MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, KafkaListenerEndpointRegistrar registrar, KafkaListenerContainerFactory<?> factory, StringSE defaultContainerFactoryBeanName) 再試行および dlt エンドポイント、および対応する listenerContainer を作成するコンテナーファクトリを作成および構成するためのエントリポイント。void
setBeanFactory
(BeanFactory beanFactory)
フィールドの詳細
DEFAULT_DLT_HANDLER
DLT でメッセージを処理するためのデフォルトの方法。
コンストラクターの詳細
RetryTopicConfigurer
@Autowired public RetryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor, ListenerContainerFactoryResolver containerFactoryResolver, ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) 提供されたプロパティでインスタンスを作成します。- パラメーター:
destinationTopicProcessor
- 宛先トピックプロセッサー。containerFactoryResolver
- コンテナーファクトリリゾルバー。listenerContainerFactoryConfigurer
- コンテナーファクトリコンフィギュレータ。retryTopicNamesProviderFactory
- 再試行トピック名はファクトリです。
メソッドの詳細
processMainAndRetryListeners
public void processMainAndRetryListeners(RetryTopicConfigurer.EndpointProcessor endpointProcessor, MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, KafkaListenerEndpointRegistrar registrar, @Nullable KafkaListenerContainerFactory<?> factory, StringSE defaultContainerFactoryBeanName) 再試行および dlt エンドポイント、および対応する listenerContainer を作成するコンテナーファクトリを作成および構成するためのエントリポイント。- パラメーター:
endpointProcessor
- エンドポイント processListener メソッドを処理する関数。mainEndpoint
- 再試行エンドポイントと dlt エンドポイントも作成および処理されるエンドポイント。configuration
- トピックの構成。registrar
- エンドポイントを登録するKafkaListenerEndpointRegistrar
。factory
-KafkaListener
で提供されるファクトリdefaultContainerFactoryBeanName
-KafkaListener
のデフォルトのファクトリ Bean 名
getEndpointHandlerMethod
protected EndpointHandlerMethod getEndpointHandlerMethod(MethodKafkaListenerEndpoint<?, ?> mainEndpoint, RetryTopicConfiguration configuration, DestinationTopic.Properties props) createNewTopicBeans
protected void createNewTopicBeans(CollectionSE<StringSE> topics, org.springframework.kafka.retrytopic.RetryTopicConfiguration.TopicCreation config) createEndpointCustomizer
protected EndpointCustomizer<MethodKafkaListenerEndpoint<?,?>> createEndpointCustomizer(EndpointHandlerMethod endpointBeanMethod, DestinationTopic.Properties destinationTopicProperties) createHandlerMethodWith
public static EndpointHandlerMethod createHandlerMethodWith(ObjectSE beanOrClass, StringSE methodName) createHandlerMethodWith
setBeanFactory
- 次で指定:
- インターフェース
BeanFactoryAware
のsetBeanFactory
- 例外:
BeansException