構成

バージョン 2.9 以降、デフォルト設定では、@EnableKafkaRetryTopic アノテーションを @Configuration アノテーション付きクラスで使用する必要があります。これにより、機能が適切にブートストラップできるようになり、機能のコンポーネントの一部を注入して実行時に検索できるようになります。

@EnableKafkaRetryTopic は @EnableKafka でメタアノテーションが付けられているため、このアノテーションを追加する場合、@EnableKafka も追加する必要はありません。

また、そのバージョン以降、機能のコンポーネントとグローバル機能のより高度な構成のために、RetryTopicConfigurationSupport クラスを @Configuration クラスで拡張し、適切なメソッドをオーバーライドする必要があります。詳細については、グローバル設定と機能の構成を参照してください。

既定では、再試行トピックのコンテナーは、メインコンテナーと同じ同時実行性を持ちます。バージョン 3.0 以降では、再試行コンテナーに別の concurrency を設定できます (アノテーションまたは RetryConfigurationBuilder で)。

上記の手法の 1 つだけを使用でき、RetryTopicConfigurationSupport を継承できる @Configuration クラスは 1 つだけです。

@RetryableTopic アノテーションを使用する

@KafkaListener アノテーション付きメソッドの再試行トピックと dlt を設定するには、@RetryableTopic アノテーションを追加するだけで、Spring for Apache Kafka は必要なすべてのトピックとコンシューマーをデフォルト設定でブートストラップします。

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
    // ... message processing
}

3.2 なので、クラス上の @KafkaListener に対する @RetryableTopic のサポートは次のようになります。

@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {

    @KafkaHandler
    public void processMessage(MyPojo message) {
        // ... message processing
    }

}

同じクラス内のメソッドを指定して、@DltHandler アノテーションでアノテーションを付けることにより、dlt メッセージを処理できます。DltHandler メソッドが指定されていない場合、消費のみを記録するデフォルトのコンシューマーが作成されます。

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}
kafkaTemplate 名を指定しない場合、defaultRetryTopicKafkaTemplate という名前の Bean が検索されます。Bean が見つからない場合は、例外がスローされます。

バージョン 3.0 以降では、@RetryableTopic アノテーションをカスタムアノテーションのメタアノテーションとして使用できます。例:

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {

    @AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
    String parallelism() default "3";

}

RetryTopicConfiguration Bean の使用

@Configuration アノテーション付きクラスで RetryTopicConfiguration Bean を作成することにより、ノンブロッキング再試行サポートを構成することもできます。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

これにより、デフォルト構成を使用して、@KafkaListener アノテーションが付けられたメソッド内のすべてのトピックに対して、再試行トピックと dlt、および対応するコンシューマーが作成されます。KafkaTemplate インスタンスはメッセージ転送に必要です。

各トピックのノンブロッキング再試行の処理方法をより細かく制御するために、複数の RetryTopicConfiguration Bean を提供できます。

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(5)
            .concurrency(1)
            .includeTopics("my-topic", "my-other-topic")
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics("my-topic", "my-other-topic")
            .retryOn(MyException.class)
            .create(template);
}
再試行トピックと dlt のコンシューマーは、@KafkaListener アノテーションの groupId パラメーターで指定したグループ ID とトピックのサフィックスを組み合わせたグループ ID を持つコンシューマーグループに割り当てられます。何も指定しない場合、すべて同じグループに属し、再試行トピックでリバランスすると、メイントピックで不必要なリバランスが発生します。
コンシューマーが ErrorHandlingDeserializer で構成されている場合、逆直列化例外を処理するには、通常のオブジェクトと逆直列化例外から生じる生の byte[] 値を処理できるシリアライザーを使用して KafkaTemplate とそのプロデューサーを構成することが重要です。テンプレートの汎用値型は Object である必要があります。1 つの手法は、DelegatingByTypeSerializer を使用することです。以下に例を示します。
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
               MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
複数の @KafkaListener アノテーションは、手動のパーティション割り当ての有無にかかわらず、ノンブロッキング再試行とともに同じトピックに使用できますが、特定のトピックに使用される構成は 1 つだけです。このようなトピックの構成には、単一の RetryTopicConfiguration Bean を使用するのが最適です。同じトピックに複数の @RetryableTopic アノテーションが使用されている場合、すべて同じ値である必要があります。そうでない場合、それらの 1 つがそのトピックのすべてのリスナーに適用され、他のアノテーションの値は無視されます。

グローバル設定と機能の構成

2.9 以降、コンポーネントを構成するための以前の Bean オーバーライドアプローチは削除されました (前述の API の実験的な性質のため、非推奨ではありません)。これは、RetryTopicConfiguration Bean のアプローチを変更するものではなく、インフラストラクチャコンポーネントの構成のみを変更します。ここで、RetryTopicConfigurationSupport クラスを (単一の) @Configuration クラスに拡張し、適切なメソッドをオーバーライドする必要があります。以下に例を示します。

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}
この構成アプローチを使用する場合、Bean の重複によるコンテキストの開始の失敗を防ぐために @EnableKafkaRetryTopic アノテーションを使用しないでください。代わりに単純な @EnableKafka アノテーションを使用してください。

autoCreateTopics が true の場合、メイントピックと再試行トピックは、指定されたパーティション数とレプリケーション係数で作成されます。バージョン 3.0 以降、デフォルトのレプリケーション係数は -1 で、ブローカーのデフォルトを使用することを意味します。ブローカーのバージョンが 2.4 より前の場合は、明示的な値を設定する必要があります。特定のトピック (メイントピックや DLT など) のこれらの値をオーバーライドするには、必要なプロパティを備えた NewTopic@Bean を追加するだけです。これにより、自動作成プロパティがオーバーライドされます。

デフォルトでは、受信したレコードの元のパーティションを使用して、レコードが再試行トピックに発行されます。再試行トピックのパーティションがメイントピックよりも少ない場合は、フレームワークを適切に構成する必要があります。例を次に示します。
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {

    @Override
    protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
        return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
    }

    ...

}

関数のパラメーターは、コンシューマーレコードと次のトピックの名前です。特定のパーティション番号、または null を返して、KafkaProducer がパーティションを決定する必要があることを示すことができます。

デフォルトでは、レコードが再試行トピックを通過するときに、再試行ヘッダーのすべての値 (試行回数、タイムスタンプ) が保持されます。バージョン 2.9.6 以降、これらのヘッダーの最後の値のみを保持する場合は、上記の configureDeadLetterPublishingContainerFactory() メソッドを使用して、ファクトリの retainAllRetryHeaderValues プロパティを false に設定します。

RetryTopicConfiguration を探す

@RetryableTopic アノテーションからインスタンスを作成するか、アノテーションが利用できない場合は Bean コンテナーからインスタンスを作成することにより、RetryTopicConfiguration のインスタンスの提供を試みます。

コンテナー内に Bean が見つかった場合、提供されたトピックをそのようなインスタンスのいずれかで処理する必要があるかどうかを判断するためのチェックが行われます。

@RetryableTopic アノテーションが提供されている場合は、DltHandler アノテーションが付けられたメソッドが検索されます。

3.2 以降、クラスに @RetryableTopic アノテーションが付けられたときに RetryTopicConfiguration を作成するための新しい API を提供します。

@Bean
public RetryTopicConfiguration myRetryTopic() {
    RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
    return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}

@RetryableTopic
public static class AnnotatedClass {
    // NoOps
}