デッドレタートピック処理

DLQ を有効にする

DLQ を有効にするには、Kafka バインダーベースのアプリケーションがプロパティ spring.cloud.stream.bindings.<binding-name>.group を介してコンシューマーグループを提供する必要があります。匿名のコンシューマーグループ (つまり、アプリケーションがグループを明示的に提供しないグループ) は、DLQ 機能を有効にできません。

アプリケーションがエラーのあるレコードを DLQ トピックに送信したい場合、デフォルトでは DLQ 機能が有効になっていないため、そのアプリケーションは DLQ 機能を有効にする必要があります。DLQ を有効にするには、spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq プロパティを true に設定する必要があります。

DLQ が有効になっている場合、処理中にエラーが発生し、spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts プロパティに基づいてすべての再試行が完了すると、そのレコードは DLQ トピックに送信されます。

デフォルトでは、max-attempts プロパティは 3 に設定されています。max-attempts プロパティが 1 より大きく、dlq が有効な場合、再試行では max-attempts プロパティが尊重されることがわかります。dlq が有効になっていない場合 (デフォルト)、max-attempts プロパティは再試行の処理方法に影響しません。その場合、再試行は Spring for Apache Kafka のコンテナーのデフォルト (10 再試行) に戻ります。DLQ が無効になっているときにアプリケーションが再試行を完全に無効にしたい場合、max-attempts プロパティを 1 に設定しても機能しません。この場合に再試行を完全に無効にするには、ListenerContainerCustomizer を指定してから、適切な Backoff 設定を使用する必要があります。ここに一例を示します。

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
	return (container, destinationName, group) -> {
		var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
		container.setCommonErrorHandler(commonErrorHandler);
	};
}

これにより、デフォルトのコンテナーの動作が無効になり、再試行は行われなくなります。上で記述されていたように、DLQ を有効にすると、バインダー設定が優先されます。

デッドレタートピックでのレコードの処理

フレームワークは、ユーザーがデッドレターメッセージをどのように処理するかを予測できないため、メッセージを処理するための標準的なメカニズムを提供していません。デッドレタリングの理由が一時的なものである場合は、メッセージを元のトピックにルーティングして戻すことをお勧めします。ただし、課題が永続的な課題である場合は、無限ループが発生する可能性があります。このトピック内のサンプル Spring Boot アプリケーションは、これらのメッセージを元のトピックにルーティングする方法の例ですが、3 回試行すると、メッセージは「駐車場」トピックに移動します。このアプリケーションは、デッドレターのトピックから読み取る別の spring-cloud-stream アプリケーションです。5 秒間メッセージが受信されないと終了します。

例では、元の宛先が so8400out であり、コンシューマーグループが so8400 であると想定しています。

考慮すべき戦略がいくつかあります。

  • メインアプリケーションが実行されていないときにのみ再ルーティングを実行することを検討してください。そうしないと、一時的なエラーの再試行がすぐに使い果たされてしまいます。

  • または、2 段階のアプローチを使用します。このアプリケーションを使用して 3 番目のトピックにルーティングし、別のアプリケーションを使用してそこからメイントピックにルーティングします。

次のコードリストは、サンプルアプリケーションを示しています。

spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries
アプリケーション
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private StreamBridge streamBridge;

    @Bean
    public Function<Message<?>, Message<?>> reRoute() {
        return failed -> {
            processed.incrementAndGet();
            Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
            if (retries == null) {
                System.out.println("First retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else if (retries < 3) {
                System.out.println("Another retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, retries + 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else {
                System.out.println("Retries exhausted for " + failed);
                streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
            }
            return null;
        };
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, exiting");
                return;
            }
        }
    }
}