再試行とデッドレター処理

デフォルトでは、コンシューマーバインディングで再試行(maxAttemts など)と enableDlq を構成すると、これらの機能はバインダー内で実行され、リスナーコンテナーや Kafka コンシューマーは関与しません。

次のように、この機能をリスナーコンテナーに移動することが望ましい場合があります。

  • 再試行と遅延の合計は、コンシューマーの max.poll.interval.ms プロパティを超え、パーティションのリバランスを引き起こす可能性があります。

  • デッドレターを別の Kafka クラスターに公開したいとします。

  • エラーハンドラーに再試行リスナーを追加したいとします。

  • …​

この機能をバインダーからコンテナーに移動するように構成するには、型 ListenerContainerWithDlqAndRetryCustomizer の @Bean を定義します。このインターフェースには次のメソッドがあります。

/**
 * Configure the container.
 * @param container the container.
 * @param destinationName the destination name.
 * @param group the group.
 * @param dlqDestinationResolver a destination resolver for the dead letter topic (if
 * enableDlq).
 * @param backOff the backOff using retry properties (if configured).
 * @see #retryAndDlqInBinding(String, String)
 */
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
        @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
        @Nullable BackOff backOff);

/**
 * Return false to move retries and DLQ from the binding to a customized error handler
 * using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
 * configured via
 * {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
 * @param destinationName the destination name.
 * @param group the group.
 * @return false to disable retries and DLQ in the binding
 */
default boolean retryAndDlqInBinding(String destinationName, String group) {
    return true;
}

宛先リゾルバーと BackOff は、バインディングプロパティから作成されます (構成されている場合)。KafkaTemplate は、spring.kafka…​. プロパティからの構成を使用します。これらを使用して、カスタムエラーハンドラーとデッドレターパブリッシャーを作成できます。例:

@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
    return new ListenerContainerWithDlqAndRetryCustomizer() {

        @Override
        public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
                String group,
                @Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
                @Nullable BackOff backOff) {

            if (destinationName.equals("topicWithLongTotalRetryConfig")) {
                ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
                        dlqDestinationResolver);
                container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
            }
        }

        @Override
        public boolean retryAndDlqInBinding(String destinationName, String group) {
            return !destinationName.contains("topicWithLongTotalRetryConfig");
        }

    };
}

これで、1 回の再試行遅延のみがコンシューマーの max.poll.interval.ms プロパティより大きくなる必要があります。

複数のバインダーを使用する場合、"ListenerContainerWithDlqAndRetryCustomizer" Bean は "DefaultBinderFactory" によってオーバーライドされます。Bean を適用するには、'BinderCustomizer' を使用してコンテナーカスタマイザーを設定する必要があります ( 【バインダーカスタマイザー】を参照)。

@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
    return (binder, binderName) -> {
        if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
            kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
        }
        else if (binder instanceof KStreamBinder) {
            ...
        }
        else if (binder instanceof RabbitMessageChannelBinder) {
            ...
        }
    };
}