トランザクション

このセクションでは、Spring for Apache Kafka がトランザクションをサポートする方法について説明します。

概要

0.11.0.0 クライアントライブラリでは、トランザクションのサポートが追加されました。Spring for Apache Kafka は、次の方法でサポートを追加します。

  • KafkaTransactionManager: 通常の Spring トランザクションサポートと併用 (@TransactionalTransactionTemplate など)

  • トランザクション KafkaMessageListenerContainer

  • KafkaTemplate とのローカルトランザクション

  • 他のトランザクションマネージャーとのトランザクションの同期

トランザクションは、DefaultKafkaProducerFactory に transactionIdPrefix を提供することによって有効になります。その場合、単一の共有 Producer を管理する代わりに、ファクトリはトランザクションプロデューサーのキャッシュを維持します。ユーザーがプロデューサーで close() を呼び出すと、プロデューサーは実際に閉じられるのではなく、再利用のためにキャッシュに返されます。各プロデューサーの transactional.id プロパティは transactionIdPrefix + n です。ここで、n は 0 で始まり、新しいプロデューサーごとに増分されます。以前のバージョンの Spring for Apache Kafka では、3.0 で始まる唯一のオプションである EOSMode.V2 を使用して、フェンシングゾンビをサポートするために、レコードベースのリスナーを持つリスナーコンテナーによって開始されたトランザクションに対して transactional.id が異なる方法で生成されました。複数のインスタンスで実行されているアプリケーションの場合、transactionIdPrefix はインスタンスごとに一意である必要があります。

正確に一度セマンティクスも参照してください。

transactionIdPrefix も参照してください。

Spring Boot では、spring.kafka.producer.transaction-id-prefix プロパティを設定するだけで済みます。Spring Boot は自動的に KafkaTransactionManager Bean を構成し、それをリスナーコンテナーに接続します。

バージョン 2.5.8 以降では、プロデューサーファクトリで maxAge プロパティを構成できるようになりました。これは、ブローカーの transactional.id.expiration.ms に対してアイドル状態になる可能性のあるトランザクションプロデューサーを使用する場合に便利です。現在の kafka-clients では、リバランスを行わないと ProducerFencedException が発生する可能性があります。maxAge を transactional.id.expiration.ms 未満に設定すると、最大有効期間を過ぎた場合、ファクトリはプロデューサーをリフレッシュします。

KafkaTransactionManager を使用する

KafkaTransactionManager は、Spring Framework の PlatformTransactionManager の実装です。コンストラクターでプロデューサーファクトリへの参照が提供されます。カスタムプロデューサーファクトリを提供する場合は、トランザクションをサポートする必要があります。ProducerFactory.transactionCapable() を参照してください。

KafkaTransactionManager は、通常の Spring トランザクションサポート (@TransactionalTransactionTemplate など) とともに使用できます。トランザクションがアクティブな場合、トランザクションのスコープ内で実行されるすべての KafkaTemplate 操作は、トランザクションの Producer を使用します。マネージャーは、成功または失敗に応じて、トランザクションをコミットまたはロールバックします。トランザクションマネージャーと同じ ProducerFactory を使用するように KafkaTemplate を構成する必要があります。

トランザクションの同期

このセクションでは、プロデューサーのみのトランザクション (リスナーコンテナーによって開始されないトランザクション) を参照します。コンテナーがトランザクションを開始するときのトランザクションのチェーンについては、コンシューマー主導のトランザクションの使用を参照してください。

レコードを kafka に送信してデータベースの更新を実行したい場合は、通常の Spring トランザクション管理を、たとえば DataSourceTransactionManager で使用できます。

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

@Transactional アノテーションのインターセプターはトランザクションを開始し、KafkaTemplate はトランザクションをそのトランザクションマネージャーと同期します。各送信はそのトランザクションに参加します。メソッドが終了すると、データベーストランザクションがコミットされ、続いて Kafka トランザクションがコミットされます。コミットを逆の順序 (Kafka が最初) で実行する場合は、ネストされた @Transactional メソッドを使用します。外側のメソッドは DataSourceTransactionManager を使用するように構成され、内側のメソッドは KafkaTransactionManager を使用するように構成されます。

Kafka ファーストまたは DB ファースト構成で JDBC および Kafka トランザクションを同期するアプリケーションの例については、他のトランザクションマネージャーとの Kafka トランザクションの例を参照してください。

バージョン 2.5.17, 2.6.12, 2.7.9, 2.8.0 以降では、(プライマリトランザクションがコミットされた後) 同期トランザクションでコミットが失敗すると、呼び出し元に例外がスローされます。以前は、これは確認なしで無視されていました (デバッグレベルでログに記録されました)。アプリケーションは、コミットされたプライマリトランザクションを補うために、必要に応じて修復措置を講じる必要があります。

コンシューマー主導のトランザクションの使用

ChainedKafkaTransactionManager はバージョン 2.7 以降非推奨になりました。詳細については、スーパークラス ChainedTransactionManager の JavaDocs を参照してください。代わりに、コンテナー内で KafkaTransactionManager を使用して Kafka トランザクションを開始し、リスナーメソッドに @Transactional アノテーションを付けて他のトランザクションを開始します。

チェーン JDBC および Kafka トランザクションのサンプルアプリケーションについては、他のトランザクションマネージャーとの Kafka トランザクションの例を参照してください。

ノンブロッキング再試行コンテナートランザクションと組み合わせることはできません。リスナーコードが例外をスローすると、コンテナートランザクションのコミットが成功し、レコードが再試行可能なトピックに送信されます。

KafkaTemplate ローカルトランザクション

KafkaTemplate を使用して、ローカルトランザクション内で一連の操作を実行できます。次の例は、その方法を示しています。

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

コールバックの引数はテンプレート自体 (this) です。コールバックが正常に終了すると、トランザクションはコミットされます。例外がスローされた場合、トランザクションはロールバックされます。

処理中の KafkaTransactionManager (または同期)トランザクションがある場合、使用されません。代わりに、新しい「ネストされた」トランザクションが使用されます。

TransactionIdPrefix

唯一サポートされているモードである EOSMode.V2 (別名 BETA) では、コンシューマーが開始したトランザクションであっても、同じ transactional.id を使用する必要がなくなりました。実際、プロデューサーが開始したトランザクションと同じように、インスタンスごとに一意である必要があります。このプロパティは、アプリケーションインスタンスごとに異なる値を持つ必要があります。

TransactionIdSuffix Fixed

3.2 以降、transactional.id サフィックスを管理するために新しい TransactionIdSuffixStrategy インターフェースが導入されました。maxCache をゼロより大きく設定すると、特定の範囲内で transactional.id を再利用できる場合、デフォルトの実装は DefaultTransactionIdSuffixStrategy です。それ以外の場合は、カウンターをインクリメントすることによってサフィックスがオンザフライで生成されます。トランザクションプロデューサーがリクエストされ、transactional.id がすべて使用されている場合は、NoProducerAvailableException をスローします。その後、ユーザーは、適切に構成されたバックオフを使用して、その例外を再試行するように構成された RetryTemplate を使用できます。

public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}

maxCache を 5 に設定すると、transactional.id は my.txid.+ `{0-4}` になります。

KafkaTransactionManager を ConcurrentMessageListenerContainer とともに使用し、maxCache を有効にする場合、maxCache を concurrency 以上の値に設定する必要があります。MessageListenerContainer が transactional.id サフィックスを取得できない場合は、NoProducerAvailableException がスローされます。ConcurrentMessageListenerContainer でネストされたトランザクションを使用する場合は、ネストされたトランザクションの数の増加に対応するために maxCache 設定を調整する必要があります。

KafkaTemplate トランザクションおよび非トランザクションパブリッシング

通常、KafkaTemplate がトランザクション (トランザクション対応のプロデューサーファクトリで構成されている) である場合、トランザクションが必要です。トランザクションは、TransactionTemplate@Transactional メソッド、executeInTransaction の呼び出し、または KafkaTransactionManager で構成されている場合はリスナーコンテナーによって開始できます。トランザクションの範囲外でテンプレートを使用しようとすると、テンプレートは IllegalStateException をスローします。バージョン 2.4.3 以降、テンプレートの allowNonTransactional プロパティを true に設定できます。その場合、テンプレートは ProducerFactory の createNonTransactionalProducer() メソッドを呼び出すことにより、トランザクションなしで操作を実行できるようにします。プロデューサーは、再利用のために通常どおりキャッシュされるか、スレッドにバインドされます。DefaultKafkaProducerFactory の使用を参照してください。

バッチリスナーとのトランザクション

トランザクションの使用中にリスナーが失敗すると、AfterRollbackProcessor が呼び出され、ロールバックの発生後に何らかのアクションが実行されます。レコードリスナーでデフォルトの AfterRollbackProcessor を使用する場合、失敗したレコードが再配信されるようにシークが実行されます。ただし、バッチリスナーを使用すると、フレームワークはバッチ内のどのレコードが失敗したかを認識できないため、バッチ全体が再配信されます。詳細については、ロールバック後のプロセッサーを参照してください。

バッチリスナーを使用する場合、バージョン 2.4.2 では、バッチ処理中の障害に対処するための代替メカニズム BatchToRecordAdapter が導入されました。batchListener が true に設定されているコンテナーファクトリが BatchToRecordAdapter で構成されている場合、リスナーは一度に 1 つのレコードで呼び出されます。これにより、例外の種類に応じてバッチ全体の処理を停止しながら、バッチ内のエラー処理が可能になります。デフォルトの BatchToRecordAdapter が提供されており、DeadLetterPublishingRecoverer などの標準 ConsumerRecordRecoverer で構成できます。次のテストケース構成スニペットは、この機能の使用方法を示しています。

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}