プログラム構築

この機能は @KafkaListener で使用するように設計されています。ただし、何人かのユーザーは、ノンブロッキングの再試行をプログラムで構成する方法についての情報をリクエストしています。次の Spring Boot アプリケーションは、その方法の例を示しています。

@SpringBootApplication
public class Application extends RetryTopicConfigurationSupport {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    RetryTopicConfiguration retryConfig(KafkaTemplate<String, String> template) {
        return RetryTopicConfigurationBuilder.newInstance()
                .maxAttempts(4)
                .autoCreateTopicsWith(2, (short) 1)
                .create(template);
    }

    @Bean
    TaskScheduler scheduler() {
        return new ThreadPoolTaskScheduler();
    }

    @Bean
    @Order(0)
    SmartInitializingSingleton dynamicRetry(RetryTopicConfigurer configurer, RetryTopicConfiguration config,
            KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp, KafkaListenerContainerFactory<?> factory,
            Listener listener, KafkaListenerEndpointRegistry registry) {

        return () -> {
            KafkaListenerEndpointRegistrar registrar = bpp.getEndpointRegistrar();
            MethodKafkaListenerEndpoint<String, String> mainEndpoint = new MethodKafkaListenerEndpoint<>();
            EndpointProcessor endpointProcessor = endpoint -> {
                // customize as needed (e.g. apply attributes to retry endpoints).
                if (!endpoint.equals(mainEndpoint)) {
                    endpoint.setConcurrency(1);
                }
                // these are required
                endpoint.setMessageHandlerMethodFactory(bpp.getMessageHandlerMethodFactory());
                endpoint.setTopics("topic");
                endpoint.setId("id");
                endpoint.setGroupId("group");
            };
            mainEndpoint.setBean(listener);
            try {
                mainEndpoint.setMethod(Listener.class.getDeclaredMethod("onMessage", ConsumerRecord.class));
            }
            catch (NoSuchMethodException | SecurityException ex) {
                throw new IllegalStateException(ex);
            }
            mainEndpoint.setConcurrency(2);
            mainEndpoint.setTopics("topic");
            mainEndpoint.setId("id");
            mainEndpoint.setGroupId("group");
            configurer.processMainAndRetryListeners(endpointProcessor, mainEndpoint, config, registrar, factory,
                    "kafkaListenerContainerFactory");
        };
    }


    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic", "test");
        };
    }

}

@Component
class Listener implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println(KafkaUtils.format(record));
        throw new RuntimeException("test");
    }

}
トピックの自動作成は、上記の例のように、アプリケーションコンテキストがリフレッシュされる前に構成が処理される場合にのみ発生します。実行時にコンテナーを構成するには、他の手法を使用してトピックを作成する必要があります。