デッドレタートピック処理
DLQ を有効にする
DLQ を有効にするには、Kafka バインダーベースのアプリケーションがプロパティ spring.cloud.stream.bindings.<binding-name>.group
を介してコンシューマーグループを提供する必要があります。匿名のコンシューマーグループ (つまり、アプリケーションがグループを明示的に提供しないグループ) は、DLQ 機能を有効にできません。
アプリケーションがエラーのあるレコードを DLQ トピックに送信したい場合、デフォルトでは DLQ 機能が有効になっていないため、そのアプリケーションは DLQ 機能を有効にする必要があります。DLQ を有効にするには、spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq
プロパティを true に設定する必要があります。
DLQ が有効になっている場合、処理中にエラーが発生し、spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts
プロパティに基づいてすべての再試行が完了すると、そのレコードは DLQ トピックに送信されます。
デフォルトでは、max-attempts
プロパティは 3 に設定されています。max-attempts
プロパティが 1
より大きく、dlq が有効な場合、再試行では max-attempts
プロパティが尊重されることがわかります。dlq が有効になっていない場合 (デフォルト)、max-attempts
プロパティは再試行の処理方法に影響しません。その場合、再試行は Spring for Apache Kafka のコンテナーのデフォルト (10
再試行) に戻ります。DLQ が無効になっているときにアプリケーションが再試行を完全に無効にしたい場合、max-attempts
プロパティを 1
に設定しても機能しません。この場合に再試行を完全に無効にするには、ListenerContainerCustomizer
を指定してから、適切な Backoff
設定を使用する必要があります。ここに一例を示します。
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, destinationName, group) -> {
var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
container.setCommonErrorHandler(commonErrorHandler);
};
}
これにより、デフォルトのコンテナーの動作が無効になり、再試行は行われなくなります。上で記述されていたように、DLQ を有効にすると、バインダー設定が優先されます。
デッドレタートピックでのレコードの処理
フレームワークは、ユーザーがデッドレターメッセージをどのように処理するかを予測できないため、メッセージを処理するための標準的なメカニズムを提供していません。デッドレタリングの理由が一時的なものである場合は、メッセージを元のトピックにルーティングして戻すことをお勧めします。ただし、課題が永続的な課題である場合は、無限ループが発生する可能性があります。このトピック内のサンプル Spring Boot アプリケーションは、これらのメッセージを元のトピックにルーティングする方法の例ですが、3 回試行すると、メッセージは「駐車場」トピックに移動します。このアプリケーションは、デッドレターのトピックから読み取る別の spring-cloud-stream アプリケーションです。5 秒間メッセージが受信されないと終了します。
例では、元の宛先が so8400out
であり、コンシューマーグループが so8400
であると想定しています。
考慮すべき戦略がいくつかあります。
メインアプリケーションが実行されていないときにのみ再ルーティングを実行することを検討してください。そうしないと、一時的なエラーの再試行がすぐに使い果たされてしまいます。
または、2 段階のアプローチを使用します。このアプリケーションを使用して 3 番目のトピックにルーティングし、別のアプリケーションを使用してそこからメイントピックにルーティングします。
次のコードリストは、サンプルアプリケーションを示しています。
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private StreamBridge streamBridge;
@Bean
public Function<Message<?>, Message<?>> reRoute() {
return failed -> {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, retries + 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
};
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
}