エラー処理

Apache Kafka Streams は、デシリアライゼーションエラーからの例外をネイティブに処理する機能を提供します。このサポートの詳細については、こちら [Apache] (英語) を参照してください。Apache Kafka Streams は、デフォルトで、LogAndContinueExceptionHandler と LogAndFailExceptionHandler の 2 種類のデシリアライゼーション例外ハンドラーを提供します。名前が示すように、前者はエラーをログに記録して次のレコードの処理を続行し、後者はエラーをログに記録して失敗します。LogAndFailExceptionHandler は、デフォルトのデシリアライゼーション例外ハンドラーです。

バインダーでの逆直列化例外の処理

Kafka Streams バインダーでは、次のプロパティを使用して、上記の逆直列化例外ハンドラーを指定できます。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue

または

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail

上記の 2 つの逆直列化例外ハンドラーに加えて、バインダーは、誤ったレコード(ポイズンピル)を DLQ(デッドレターキュー)トピックに送信するための 3 番目のハンドラーも提供します。この DLQ 例外ハンドラーを有効にする方法は次のとおりです。

spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq

上記のプロパティを設定すると、逆直列化エラーのすべてのレコードが自動的に DLQ トピックに送信されます。

DLQ メッセージを公開するトピック名は以下のように設定できます。

関数インターフェースである DlqDestinationResolver の実装を提供できます。DlqDestinationResolver は、ConsumerRecord と例外を入力として受け取り、トピック名を出力として指定できるようにします。Kafka ConsumerRecord にアクセスすることにより、BiFunction の実装でヘッダーレコードをイントロスペクトできます。

これは、DlqDestinationResolver の実装を提供する例です。

@Bean
public DlqDestinationResolver dlqDestinationResolver() {
    return (rec, ex) -> {
        if (rec.topic().equals("word1")) {
            return "topic1-dlq";
        }
        else {
            return "topic2-dlq";
        }
    };
}

DlqDestinationResolver の実装を提供する際に留意すべき重要なことの 1 つは、バインダーのプロビジョナーがアプリケーションのトピックを自動作成しないことです。これは、実装が送信する可能性のあるすべての DLQ トピックの名前をバインダーが推測する方法がないためです。この戦略を使用して DLQ 名を提供する場合、それらのトピックが事前に作成されていることを確認するのはアプリケーションの責任です。

DlqDestinationResolver がアプリケーションに Bean として存在する場合、それが優先されます。このアプローチに従わず、構成を使用して静的 DLQ 名を指定する場合は、次のプロパティを設定できます。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)

これが設定されている場合、エラーレコードはトピック custom-dlq に送信されます。アプリケーションが上記の戦略のいずれも使用していない場合、error.<input-topic-name>.<application-id> という名前の DLQ トピックが作成されます。たとえば、バインディングの宛先トピックが inputTopic で、アプリケーション ID が process-applicationId の場合、デフォルトの DLQ トピックは error.inputTopic.process-applicationId です。DLQ を有効にする場合は、入力バインディングごとに DLQ トピックを明示的に作成することを常にお勧めします。

入力コンシューマーバインディングごとの DLQ

プロパティ spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler は、アプリケーション全体に適用されます。これは、同じアプリケーションに複数の関数がある場合、このプロパティがそれらすべてに適用されることを意味します。ただし、単一のプロセッサー内に複数のプロセッサーまたは複数の入力バインディングがある場合は、入力コンシューマーバインディングごとにバインダーが提供する、よりきめ細かい DLQ コントロールを使用できます。

次のプロセッサーを使用している場合

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

最初の入力バインディングでのみ DLQ を有効にし、2 番目のバインディングで skipAndContinue を有効にしたい場合は、以下のようにコンシューマーで実行できます。

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue

この方法で逆直列化例外ハンドラーを設定すると、バインダーレベルで設定するよりも優先されます。

DLQ パーティショニング

デフォルトでは、レコードは元のレコードと同じパーティションを使用して Dead-Letter トピックに公開されます。つまり、Dead-Letter トピックには、少なくとも元のレコードと同じ数のパーティションが必要です。

この動作を変更するには、DlqPartitionFunction 実装を @Bean としてアプリケーションコンテキストに追加します。そのような Bean は 1 つだけ存在できます。この機能は、コンシューマーグループ(ほとんどの状況でアプリケーション ID と同じ)、失敗した ConsumerRecord、例外とともに提供されます。例: 常にパーティション 0 にルーティングする場合は、次を使用できます。

@Bean
public DlqPartitionFunction partitionFunction() {
    return (group, record, ex) -> 0;
}
コンシューマーバインディングの dlqPartitions プロパティを 1 に設定する(そして、バインダーの minPartitionCount が 1 に等しい)場合、DlqPartitionFunction を供給する必要はありません。フレームワークは常にパーティション 0 を使用します。コンシューマーバインディングの dlqPartitions プロパティを 1 よりも大きい値に設定した場合(または、バインダーの minPartitionCount が 1 よりも大きい場合)、パーティション数が元のトピックのものと同じであっても、DlqPartitionFunction Bean を必ず提供しなければなりません。

Kafka Streams バインダーの例外処理機能を使用する際に留意すべき点がいくつかあります。

  • プロパティ spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler は、アプリケーション全体に適用されます。これは、同じアプリケーションに複数の関数がある場合、このプロパティがそれらすべてに適用されることを意味します。

  • デシリアライズの例外処理は、ネイティブのデシリアライズおよびフレームワークが提供するメッセージ変換と一貫して機能します。

バインダーでの本番例外の処理

上記の逆直列化例外ハンドラーのサポートとは異なり、バインダーは、本番例外を処理するためのそのようなファーストクラスのメカニズムを提供しません。ただし、StreamsBuilderFactoryBean カスタマイザを使用して本番例外ハンドラーを構成することはできます。詳細については、以下の次のセクションを参照してください。

実行時エラーの処理

アプリケーションコード、つまりビジネスロジックの実行からのエラーの処理に関しては、通常、それを処理するのはアプリケーション次第です。なぜなら、Kafka Streams バインダーにはアプリケーションコードに干渉する方法がないからです。ただし、アプリケーションの処理を少し簡単にするために、バインダーには便利な RecordRecoverableProcessor が用意されており、これを使用すると、アプリケーションレベルのエラーの処理方法を指定できます。

次のコードを考えてみましょう。

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .map(...);
}

上記の map 呼び出し内のビジネスコードが例外をスローした場合、そのエラーを処理するのはユーザーの責任です。ここで RecordRecoverableProcessor が役に立ちます。デフォルトでは、RecordRecoverableProcessor は単にエラーをログに記録し、アプリケーションを続行させます。失敗したレコードをアプリケーション内で処理するのではなく、DLT に公開したいとします。その場合、DltAwareProcessor と呼ばれる RecordRecoverableProcessor のカスタム実装を使用する必要があります。その方法は次のとおりです。

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
    return input -> input
        .process(() -> new DltAwareProcessor<>(record -> {
					throw new RuntimeException("error");
				}, "hello-dlt-1", dltPublishingContext));
}

元の map 呼び出しのビジネスロジックコードは、ProcessorSupplier を受け取る KStream#process メソッド呼び出しの一部として移動されました。次に、DLT に発行できるカスタム DltAwareProcessor, を渡します。上記の DltAwareProcessor のコンストラクターは 3 つのパラメーターを受け取ります。Function は入力レコードを受け取り、その後 Function 本体の一部としてビジネスロジック操作を受け取り、DLT トピック、最後に DltPublishingContext を受け取ります。Function’s lambda expression throws an exception, the `DltAwareProcessor が入力レコードを DLT に送信するとき。DltPublishingContext は、DltAwareProcessor に必要な公開インフラストラクチャ Bean を提供します。DltPublishingContext はバインダーによって自動構成されるため、これをアプリケーションに直接挿入できます。

バインダーが失敗したレコードを DLT に公開しないようにするには、DltAwareProcessor の代わりに RecordRecoverableProcessor を直接使用する必要があります。入力 Record と例外を引数として受け取る独自のリカバリを BiConsumer として提供できます。レコードを DLT に送信せず、単にメッセージをログに記録して次に進むシナリオを想定します。以下にそれを実現する方法の例を示します。

@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
    return input -> input
        .process(() -> new RecordRecoverableProcessor<>(record -> {
					throw new RuntimeException("error");
				},
                (record, exception) -> {
                  // Handle the record
                }));
}

この場合、レコードが失敗すると、RecordRecoverableProcessor はユーザーが提供したリカバリを使用します。これは、失敗したレコードとスローされた例外を引数として受け取る BiConsumer です。

DltAwareProcessor でのレコードキーの処理

DltAwareProcessor を使用して失敗したレコードを DLT に送信するときに、レコードキーを DLT トピックに送信する場合は、DLT バインディングに適切なシリアライザーを設定する必要があります。これは、DltAwareProcessor が、デフォルトでキーに ByteArraySerializer を使用する通常の Kafka バインダー (メッセージチャネルベース) を使用する StreamBridge を使用するためです。レコード値の場合、Spring Cloud Stream はペイロードを適切な byte[] に変換します。ただし、キーの場合はそうではなく、ヘッダーで受け取ったものをキーとして渡すだけです。非バイト配列キーを指定している場合は、クラスキャスト例外が発生する可能性があるため、それを回避するには、以下のように DLT バインディングにシリアライザーを設定する必要があります。

DLT 宛先が hello-dlt-1 で、レコードキーが String データ型であると仮定します。

spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer