クラス RetryTopicConfigurer

java.lang.ObjectSE
org.springframework.kafka.retrytopic.RetryTopicConfigurer
実装されたすべてのインターフェース:
AwareBeanFactoryAware

public class RetryTopicConfigurer extends ObjectSE implements BeanFactoryAware

メインエンドポイントと提供された構成に基づいてメイン、再試行、DLT トピックを構成し、順序の保証を犠牲にしてノンブロッキング方式で分散再試行 /DLT パターンを実現します。

たとえば、"main-topic" トピックがあり、指数関数的なバックオフを 1000ms、倍率を 2、再試行を 3 回行いたい場合、main-topic-retry-1000, main-topic-retry-2000, main-topic-retry-4000, main-topic-dlt トピックが作成されることになります。この設定は、RetryTopicConfigurationBuilder を使用して 1 つ以上の RetryTopicConfigurer Bean を作成するか、RetryableTopic アノテーションを使用することで実現可能です。使い方の詳細は後述します。

使い方:

メッセージ処理が例外をスローした場合、構成された DefaultErrorHandler および DeadLetterPublishingRecoverer は、DestinationTopicResolver を使用してメッセージを次のトピックに転送し、次のトピックとその遅延を認識します。

転送された各レコードにはバックオフタイムスタンプヘッダーがあり、それより前に KafkaBackoffAwareMessageListenerAdapter によって消費が試行された場合、パーティションの消費は KafkaConsumerBackoffManager によって一時停止され、KafkaBackoffException がスローされます。

パーティションが ContainerProperties の idlePartitionEventInterval プロパティで指定された時間アイドル状態になると、ListenerContainerPartitionIdleEvent が公開され、KafkaConsumerBackoffManager はこれをリッスンして、パーティションの一時停止を解除する必要があるかどうかを確認します。

消費が再開されたときに処理が再び失敗した場合、メッセージは dlt に到達するまで、次のトピックなどに転送されます。

Kafka のパーティション順序の保証と、各トピックの遅延時間が固定されていることを考慮すると、特定の再試行トピックパーティションで消費される最初のメッセージは、そのパーティションのバックオフタイムスタンプが最も早いメッセージであることがわかります。パーティションを一時停止することで、次のことがわかります。他のパーティションでのメッセージ処理を必要以上に遅らせないでください。

使用箇所:

エンドポイントを構成するには、主に 2 つのメソッドがあります。1 つ目は、次のような Configuration アノテーション付きクラスで 1 つ以上の Bean を提供することです。

     @Bean
     public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, Object> template) {
         return RetryTopicConfigurationBuilder
                 .newInstance()
                 .create(template);
      }
 

これにより、デフォルト構成を使用して、KafkaListener でアノテーションが付けられたメソッドのすべてのトピックとそのコンシューマーに対して、再試行トピックと dlt トピックが作成されます。メッセージ処理が失敗した場合、DLT トピックに到達するまで、メッセージを次のトピックに転送します。メッセージ転送には KafkaOperations インスタンスが必要です。

各トピックの再試行を処理する方法をよりきめ細かく制御するために、次のように複数の Bean を提供できます。

     @Bean
     public RetryTopicConfiguration myRetryableTopic(KafkaTemplate<String, MyPojo> template) {
         return RetryTopicConfigurationBuilder
                 .newInstance()
                 .fixedBackOff(3000)
                 .maxAttempts(5)
                 .includeTopics("my-topic", "my-other-topic")
                 .create(template);
         }
 
           @Bean
     public RetryTopicConfiguration myOtherRetryableTopic(KafkaTemplate<String, MyPojo> template) {
         return RetryTopicConfigurationBuilder
                 .newInstance()
                 .exponentialBackoff(1000, 2, 5000)
                 .maxAttempts(4)
                 .excludeTopics("my-topic", "my-other-topic")
                 .retryOn(MyException.class)
                 .create(template);
         }
 

他のいくつかのオプションが含まれます: トピックの自動作成、バックオフ、retryOn/notRetryOn/ RetryTemplate のような横断、単一トピックの固定バックオフ処理、カスタム DLT リスナー Bean、カスタムトピックサフィックス、および特定の listenerContainerFactories の提供。

エンドポイントを設定するもう 1 つの非排他的な方法は、便利な RetryableTopic アノテーションを使用することです。これは、次のような KafkaListener アノテーション付きメソッドに直接配置できます。

     @RetryableTopic(attempts = 3,
                backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
     @KafkaListener(topics = "my-annotated-topic")
     public void processMessage(MyPojo message) {
                        // ... message processing
     }

3.2 以降、RetryableTopic アノテーションは次のような KafkaListener アノテーション付きクラスをサポートします。

     @RetryableTopic(attempts = 3,
                backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
     @KafkaListener(topics = "my-annotated-topic")
     static class ListenerBean {
          @KafkaHandler
         public void processMessage(MyPojo message) {
                        // ... message processing
         }
     }

3.2 以降、RetryableTopic アノテーションは次のような KafkaListener アノテーション付きクラスをサポートします。

     @RetryableTopic(attempts = 3,
                backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
     @KafkaListener(topics = "my-annotated-topic")
     static class ListenerBean {
          @KafkaHandler
         public void processMessage(MyPojo message) {
                        // ... message processing
         }
     }

または、次のようなメタアノテーションを介して:

     @RetryableTopic(backoff = @Backoff(delay = 700, maxDelay = 12000, multiplier = 3))
     public @interface WithExponentialBackoffRetry {
             @AliasFor(attribute = "attempts", annotation = RetryableTopic.class)
                String retries();
     }

     @WithExponentialBackoffRetry(retries = "3")
     @KafkaListener(topics = "my-annotated-topic")
     public void processMessage(MyPojo message) {
                        // ... message processing
     }

アノテーションとビルダーのアプローチでは同じ構成を使用でき、両方を同時に使用できます。同じメソッド / トピックを両方で処理できる場合は、アノテーションが優先されます。

DLT 処理:

DLT ハンドラーメソッドは、RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) メソッドを通じて提供でき、DLT トピックを処理する必要があるクラスとメソッド名を提供します。この型の Bean インスタンスが BeanFactory で見つかった場合、それが使用されているインスタンスです。そうでない場合は、インスタンスが作成されます。このクラスは、依存性注入を通常の Bean として使用できます。

     @Bean
     public RetryTopicConfiguration otherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
         return RetryTopicConfigurationBuilder
                 .newInstance()
                 .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
                 .create(template);
     }

     @Component
     public class MyCustomDltProcessor {

                public void processDltMessage(MyPojo message) {
               // ... message processing, persistence, etc
                }
     }
 
DLT ハンドラーメソッドを提供するもう 1 つの方法は、対応する KafkaListener と同じクラス内で使用する必要がある DltHandler アノテーションを使用することです。
            @DltHandler
       public void processMessage(MyPojo message) {
                        // ... message processing, persistence, etc
       }
DLT ハンドラーが提供されていない場合は、デフォルトの RetryTopicConfigurer.LoggingDltListenerHandlerMethod が使用されます。
導入:
2.7
作成者:
Tomaz Fernandes, Fabio da Silva Jr., Gary Russell, Wang Zhiyang, Borahm Lee
関連事項: