トピックの命名

トピックの再試行と DLT は、メイントピックに提供された値またはデフォルト値をサフィックスとして付け、そのトピックの遅延またはインデックスを追加することで名前が付けられます。

例:

"my-topic" → "my-topic-retry-0"、"my-topic-retry-1"、…、"my-topic-dlt"

"my-other-topic" → "my-topic-myRetrySuffix-1000","my-topic-myRetrySuffix-2000"、…、"my-topic-myDltSuffix"

デフォルトの動作では、試行ごとに個別の再試行トピックが作成され、インデックス値が追加されます: retry-0、retry-1、…、retry-n。デフォルトでは、再試行トピックの数は、構成された maxAttempts から 1 を引いた数になります。

再試行トピックと DLT サフィックス

再試行および DLT トピックで使用されるサフィックスを指定できます。

@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .retryTopicSuffix("-my-retry-suffix")
            .dltTopicSuffix("-my-dlt-suffix")
            .create(template);
}
デフォルトのサフィックスは、"-retry" および "-dlt" で、それぞれ再試行トピックと dlt です。

トピックのインデックスまたは遅延の追加

サフィックスの後にトピックのインデックスまたは遅延値を追加できます。

@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .suffixTopicsWithIndexValues()
            .create(template);
    }
デフォルトの動作では、複数のトピックでの固定遅延構成を除き、遅延値を末尾に付けます。この場合、トピックの末尾にはトピックのインデックスが付きます。

固定遅延再試行の単一トピック

FixedBackOffPolicy や NoBackOffPolicy などの固定遅延ポリシーを使用している場合は、単一のトピックを使用してノンブロッキング再試行を実行できます。このトピックには、提供されたサフィックスまたはデフォルトのサフィックスがサフィックスとして付けられ、インデックスまたは遅延値はアペンドされません。

以前の FixedDelayStrategy は廃止され、SameIntervalTopicReuseStrategy に置き換えることができます。
@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3_000)
            .maxAttempts(5)
            .useSingleTopicForFixedDelays()
            .create(template);
}
デフォルトの動作では、試行ごとに個別の再試行トピックが作成され、そのインデックス値が追加されます: retry-0、retry-1、…

maxInterval Exponential Delay の単一トピック

指数バックオフポリシー (ExponentialBackOffPolicy) を使用している場合は、単一の再試行トピックを使用して、遅延が構成された maxInterval である試行の非ブロック再試行を実行できます。

この「最終」再試行トピックには、指定されたサフィックスまたはデフォルトのサフィックスが付加され、インデックスまたは maxInterval 値が付加されます。

maxInterval 遅延を使用した再試行に単一のトピックを使用することを選択することで、長時間再試行を続ける指数再試行ポリシーを構成することがより実行可能になる可能性があります。このアプローチでは大量のトピックが必要ないためです。

デフォルトの動作は、構成された maxAttempts から 1 を引いた数の再試行トピックで動作することです。指数バックオフを使用する場合、再試行トピックには遅延値が末尾に付けられ、最後の再試行トピック (maxInterval 遅延に対応) が末尾に付けられます。追加インデックス付き。

たとえば、initialInterval=1_000multiplier=2maxInterval=16_000 で指数バックオフを構成する場合、1 時間試行し続けるには、maxAttempts を 229 として構成する必要があり、デフォルトで必要な再試行トピックは次のようになります。

  • - 再試行 -1000

  • - 再試行 -2000

  • - 再試行 -4000

  • - 再試行 -8000

  • - 再試行 -16000-0

  • - 再試行 -16000-1

  • - 再試行 -16000-2

  • …​

  • - 再試行 -16000-224

同じ間隔で再試行トピックを再利用する戦略を使用する場合、上記の同じ構成で、必要な再試行トピックは次のようになります。

  • - 再試行 -1000

  • - 再試行 -2000

  • - 再試行 -4000

  • - 再試行 -8000

  • - 再試行 -16000

これは、将来のリリースではデフォルトになります。

@RetryableTopic(attempts = 230,
    backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
    sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1_000, 2, 16_000)
            .maxAttempts(230)
            .useSingleTopicForSameIntervals()
            .create(template);
}

カスタム命名戦略

RetryTopicNamesProviderFactory を実装する Bean を登録することで、より複雑な命名戦略を実現できます。デフォルトの実装は SuffixingRetryTopicNamesProviderFactory であり、次の方法で別の実装を登録できます。

@Override
protected RetryTopicComponentFactory createComponentFactory() {
    return new RetryTopicComponentFactory() {
        @Override
        public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
            return new CustomRetryTopicNamesProviderFactory();
        }
    };
}

例として、次の実装では、標準のサフィックスに加えて、retry/dlt トピック名にプレフィックスを追加します。

public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {

    @Override
    public RetryTopicNamesProvider createRetryTopicNamesProvider(
                DestinationTopic.Properties properties) {

        if (properties.isMainEndpoint()) {
            return new SuffixingRetryTopicNamesProvider(properties);
        }
        else {
            return new SuffixingRetryTopicNamesProvider(properties) {

                @Override
                public String getTopicName(String topic) {
                    return "my-prefix-" + super.getTopicName(topic);
                }

            };
        }
    }

}