トピックの命名
トピックの再試行と DLT は、メイントピックに提供された値またはデフォルト値をサフィックスとして付け、そのトピックの遅延またはインデックスを追加することで名前が付けられます。
例:
"my-topic" → "my-topic-retry-0"、"my-topic-retry-1"、…、"my-topic-dlt"
"my-other-topic" → "my-topic-myRetrySuffix-1000","my-topic-myRetrySuffix-2000"、…、"my-topic-myDltSuffix"
デフォルトの動作では、試行ごとに個別の再試行トピックが作成され、インデックス値が追加されます: retry-0、retry-1、…、retry-n。デフォルトでは、再試行トピックの数は、構成された maxAttempts から 1 を引いた数になります。 |
サフィックスを構成し、試行インデックスまたは遅延を追加するかどうかを選択し、固定バックオフを使用する場合は単一の再試行トピックを使用し、指数バックオフを使用する場合は maxInterval による試行に単一の再試行トピックを使用できます。
再試行トピックと DLT サフィックス
再試行および DLT トピックで使用されるサフィックスを指定できます。
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
デフォルトのサフィックスは、"-retry" および "-dlt" で、それぞれ再試行トピックと dlt です。 |
トピックのインデックスまたは遅延の追加
サフィックスの後にトピックのインデックスまたは遅延値を追加できます。
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
デフォルトの動作では、複数のトピックでの固定遅延構成を除き、遅延値を末尾に付けます。この場合、トピックの末尾にはトピックのインデックスが付きます。 |
固定遅延再試行の単一トピック
FixedBackOffPolicy
や NoBackOffPolicy
などの固定遅延ポリシーを使用している場合は、単一のトピックを使用してノンブロッキング再試行を実行できます。このトピックには、提供されたサフィックスまたはデフォルトのサフィックスがサフィックスとして付けられ、インデックスまたは遅延値はアペンドされません。
以前の FixedDelayStrategy は廃止され、SameIntervalTopicReuseStrategy に置き換えることができます。 |
@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.create(template);
}
デフォルトの動作では、試行ごとに個別の再試行トピックが作成され、そのインデックス値が追加されます: retry-0、retry-1、… |
maxInterval 指数遅延の単一トピック
指数バックオフポリシー (ExponentialBackOffPolicy
) を使用している場合は、単一の再試行トピックを使用して、遅延が構成された maxInterval
である試行の非ブロック再試行を実行できます。
この「最終」再試行トピックには、指定されたサフィックスまたはデフォルトのサフィックスが付加され、インデックスまたは maxInterval
値が付加されます。
maxInterval 遅延を使用した再試行に単一のトピックを使用することを選択することで、長時間再試行を続ける指数再試行ポリシーを構成することがより実行可能になる可能性があります。このアプローチでは大量のトピックが必要ないためです。 |
3.2 以降、デフォルトの動作では再試行トピックを同じ間隔で再利用します。指数バックオフを使用する場合、再試行トピックには遅延値が接尾辞として付加され、最後の再試行トピックは同じ間隔で再利用されます (maxInterval
遅延に対応)。
たとえば、initialInterval=1_000
、multiplier=2
、maxInterval=16_000
で指数バックオフを構成する場合、1 時間試行し続けるには、maxAttempts
を 229 として構成する必要があり、デフォルトで必要な再試行トピックは次のようになります。
- 再試行 -1000
- 再試行 -2000
- 再試行 -4000
- 再試行 -8000
- 再試行 -16000
設定された maxAttempts
から 1 を引いた数の再試行トピックで機能する戦略を使用する場合、追加のインデックスが末尾に付加される最後の再試行トピック (maxInterval
遅延に対応) は次のようになります。
- 再試行 -1000
- 再試行 -2000
- 再試行 -4000
- 再試行 -8000
- 再試行 -16000-0
- 再試行 -16000-1
- 再試行 -16000-2
…
- 再試行 -16000-224
複数のトピックが必要な場合は、次の構成を使用して実行できます。
@RetryableTopic(attempts = 230,
backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1_000, 2, 16_000)
.maxAttempts(230)
.useSingleTopicForSameIntervals()
.create(template);
}
カスタム命名戦略
RetryTopicNamesProviderFactory
を実装する Bean を登録することで、より複雑な命名戦略を実現できます。デフォルトの実装は SuffixingRetryTopicNamesProviderFactory
であり、次の方法で別の実装を登録できます。
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
例として、次の実装では、標準のサフィックスに加えて、retry/dlt トピック名にプレフィックスを追加します。
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if (properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}