最新の安定バージョンについては、spring-cloud-stream 4.3.0 を使用してください。

重要なビジネスロジックの再試行

アプリケーションにとって重要なビジネスロジックの一部を再試行したいシナリオがあります。リレーショナルデータベースへの外部呼び出し、または Kafka Streams プロセッサーからの REST エンドポイントの呼び出しがある可能性があります。これらの呼び出しは、ネットワークの課題やリモートサービスが利用できないなどのさまざまな理由で失敗する可能性があります。多くの場合、これらの障害は、再試行できれば自己解決する可能性があります。デフォルトでは、Kafka Streams バインダーはすべての入力バインディングに対して RetryTemplate Bean を作成します。

関数に次のシグネチャーがある場合

@Bean
public java.util.function.Consumer<KStream<Object, String>> process()

デフォルトのバインディング名では、RetryTemplate は process-in-0-RetryTemplate として登録されます。これは、バインディング名(process-in-0)の後にリテラル -RetryTemplate が続くという規則に従います。複数の入力バインディングの場合、バインディングごとに個別の RetryTemplate Bean を使用できます。アプリケーションで使用可能で、spring.cloud.stream.bindings.<binding-name>.consumer.retryTemplateName を介して提供されるカスタム RetryTemplate Bean がある場合、入力バインディングレベルの再試行テンプレート構成プロパティよりも優先されます。

バインディングからの RetryTemplate がアプリケーションに挿入されると、アプリケーションのクリティカルセクションを再試行するために使用できます。次に例を示します。

@Bean
public java.util.function.Consumer<KStream<Object, String>> process(@Lazy @Qualifier("process-in-0-RetryTemplate") RetryTemplate retryTemplate) {

    return input -> input
            .process(() -> new Processor<Object, String>() {
                @Override
                public void init(ProcessorContext processorContext) {
                }

                @Override
                public void process(Object o, String s) {
                    retryTemplate.execute(context -> {
                       //Critical business logic goes here.
                    });
                }

                @Override
                public void close() {
                }
            });
}

または、以下のようにカスタム RetryTemplate を使用できます。

@EnableAutoConfiguration
public static class CustomRetryTemplateApp {

    @Bean
    @StreamRetryTemplate
    RetryTemplate fooRetryTemplate() {
        RetryTemplate retryTemplate = new RetryTemplate();

        RetryPolicy retryPolicy = new SimpleRetryPolicy(4);
        FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
        backOffPolicy.setBackOffPeriod(1);

        retryTemplate.setBackOffPolicy(backOffPolicy);
        retryTemplate.setRetryPolicy(retryPolicy);

        return retryTemplate;
    }

    @Bean
    public java.util.function.Consumer<KStream<Object, String>> process() {

        return input -> input
                .process(() -> new Processor<Object, String>() {
                    @Override
                    public void init(ProcessorContext processorContext) {
                    }

                    @Override
                    public void process(Object o, String s) {
                        fooRetryTemplate().execute(context -> {
                           //Critical business logic goes here.
                        });

                    }

                    @Override
                    public void close() {
                    }
                });
    }
}

再試行が完了すると、デフォルトでは最後の例外がスローされ、プロセッサーが終了することに注意してください。例外を処理して処理を続行する場合は、RecoveryCallback を execute メソッドに追加できます。例を次に示します。

retryTemplate.execute(context -> {
    //Critical business logic goes here.
    }, context -> {
       //Recovery logic goes here.
       return null;
    ));

RetryTemplate、再試行ポリシー、バックオフポリシーなどの詳細については、Spring Retry [GitHub] (英語) プロジェクトを参照してください。