アノテーションインターフェース EnableKafkaRetryTopic
@RetentionSE(RUNTIMESE)
@TargetSE(TYPESE)
@DocumentedSE
@Import(RetryTopicConfigurationSupport.class)
@EnableKafka
public @interface EnableKafkaRetryTopic
ノンブロッキングトピックベースの遅延再試行機能を有効にします。次のように
Configuration クラスで使用します。
@EnableKafkaRetryTopic
@Configuration
public class AppConfig {
}
@Component
public class MyListener {
@RetryableTopic(sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC, backOff = @BackOff(4000))
@KafkaListener(topics = "myTopic")
public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
logger.info("Message {} received in topic {} ", message, receivedTopic);
}
@DltHandler
public void dltHandler(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
logger.info("Message {} received in dlt handler at topic {} ", message, receivedTopic);
}
このアノテーションを使用すると、デフォルトの RetryTopicConfigurationSupport Bean が構成されます。このアノテーションには @EnableKafka のメタアノテーションが付けられているため、両方を指定する必要はありません。 機能のコンポーネントを構成するには、RetryTopicConfigurationSupport クラスを継承し、@Configuration クラスの適切なメソッドを次のようにオーバーライドします。
@Configuration
@EnableKafka
public class AppConfig extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
.backOff(new FixedBackOff(50, 3));
}
@Override
protected void configureNonBlockingRetries(NonBlockingRetriesConfigurer nonBlockingRetries) {
nonBlockingRetries
.addToFatalExceptions(ShouldSkipBothRetriesException.class);
}
この場合、このアノテーションを使用しないでください。代わりに @EnableKafka を使用してください。- 導入:
- 2.9
- 作成者:
- Tomaz Fernandes