トランザクション
このセクションでは、Spring for Apache Kafka がトランザクションをサポートする方法について説明します。
概要
0.11.0.0 クライアントライブラリでは、トランザクションのサポートが追加されました。Spring for Apache Kafka は、次の方法でサポートを追加します。
KafkaTransactionManager
: 通常の Spring トランザクションサポートと併用 (@Transactional
、TransactionTemplate
など)トランザクション
KafkaMessageListenerContainer
KafkaTemplate
とのローカルトランザクション他のトランザクションマネージャーとのトランザクションの同期
トランザクションは、DefaultKafkaProducerFactory
に transactionIdPrefix
を提供することによって有効になります。その場合、単一の共有 Producer
を管理する代わりに、ファクトリはトランザクションプロデューサーのキャッシュを維持します。ユーザーがプロデューサーで close()
を呼び出すと、プロデューサーは実際に閉じられるのではなく、再利用のためにキャッシュに返されます。各プロデューサーの transactional.id
プロパティは transactionIdPrefix
+ n
です。ここで、n
は 0
で始まり、新しいプロデューサーごとに増分されます。以前のバージョンの Spring for Apache Kafka では、3.0 で始まる唯一のオプションである EOSMode.V2
を使用して、フェンシングゾンビをサポートするために、レコードベースのリスナーを持つリスナーコンテナーによって開始されたトランザクションに対して transactional.id
が異なる方法で生成されました。複数のインスタンスで実行されているアプリケーションの場合、transactionIdPrefix
はインスタンスごとに一意である必要があります。
正確に一度セマンティクスも参照してください。
transactionIdPrefix
も参照してください。
Spring Boot では、spring.kafka.producer.transaction-id-prefix
プロパティを設定するだけで済みます。Spring Boot は自動的に KafkaTransactionManager
Bean を構成し、それをリスナーコンテナーに接続します。
バージョン 2.5.8 以降では、プロデューサーファクトリで maxAge プロパティを構成できるようになりました。これは、ブローカーの transactional.id.expiration.ms に対してアイドル状態になる可能性のあるトランザクションプロデューサーを使用する場合に便利です。現在の kafka-clients では、リバランスを行わないと ProducerFencedException が発生する可能性があります。maxAge を transactional.id.expiration.ms 未満に設定すると、最大有効期間を過ぎた場合、ファクトリはプロデューサーをリフレッシュします。 |
KafkaTransactionManager
を使用する
KafkaTransactionManager
は、Spring Framework の PlatformTransactionManager
の実装です。コンストラクターでプロデューサーファクトリへの参照が提供されます。カスタムプロデューサーファクトリを提供する場合は、トランザクションをサポートする必要があります。ProducerFactory.transactionCapable()
を参照してください。
KafkaTransactionManager
は、通常の Spring トランザクションサポート (@Transactional
、TransactionTemplate
など) とともに使用できます。トランザクションがアクティブな場合、トランザクションのスコープ内で実行されるすべての KafkaTemplate
操作は、トランザクションの Producer
を使用します。マネージャーは、成功または失敗に応じて、トランザクションをコミットまたはロールバックします。トランザクションマネージャーと同じ ProducerFactory
を使用するように KafkaTemplate
を構成する必要があります。
トランザクションの同期
このセクションでは、プロデューサーのみのトランザクション (リスナーコンテナーによって開始されないトランザクション) を参照します。コンテナーがトランザクションを開始するときのトランザクションのチェーンについては、コンシューマー主導のトランザクションの使用を参照してください。
レコードを kafka に送信してデータベースの更新を実行したい場合は、通常の Spring トランザクション管理を、たとえば DataSourceTransactionManager
で使用できます。
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
@Transactional
アノテーションのインターセプターはトランザクションを開始し、KafkaTemplate
はトランザクションをそのトランザクションマネージャーと同期します。各送信はそのトランザクションに参加します。メソッドが終了すると、データベーストランザクションがコミットされ、続いて Kafka トランザクションがコミットされます。コミットを逆の順序 (Kafka が最初) で実行する場合は、ネストされた @Transactional
メソッドを使用します。外側のメソッドは DataSourceTransactionManager
を使用するように構成され、内側のメソッドは KafkaTransactionManager
を使用するように構成されます。
Kafka ファーストまたは DB ファースト構成で JDBC および Kafka トランザクションを同期するアプリケーションの例については、他のトランザクションマネージャーとの Kafka トランザクションの例を参照してください。
バージョン 2.5.17, 2.6.12, 2.7.9, 2.8.0 以降では、(プライマリトランザクションがコミットされた後) 同期トランザクションでコミットが失敗すると、呼び出し元に例外がスローされます。以前は、これは確認なしで無視されていました (デバッグレベルでログに記録されました)。アプリケーションは、コミットされたプライマリトランザクションを補うために、必要に応じて修復措置を講じる必要があります。 |
コンシューマー主導のトランザクションの使用
ChainedKafkaTransactionManager
はバージョン 2.7 以降非推奨になりました。詳細については、スーパークラス ChainedTransactionManager
の JavaDocs を参照してください。代わりに、コンテナー内で KafkaTransactionManager
を使用して Kafka トランザクションを開始し、リスナーメソッドに @Transactional
アノテーションを付けて他のトランザクションを開始します。
チェーン JDBC および Kafka トランザクションのサンプルアプリケーションについては、他のトランザクションマネージャーとの Kafka トランザクションの例を参照してください。
ノンブロッキング再試行はコンテナートランザクションと組み合わせることはできません。リスナーコードが例外をスローすると、コンテナートランザクションのコミットが成功し、レコードが再試行可能なトピックに送信されます。 |
KafkaTemplate
ローカルトランザクション
KafkaTemplate
を使用して、ローカルトランザクション内で一連の操作を実行できます。次の例は、その方法を示しています。
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
コールバックの引数はテンプレート自体 (this
) です。コールバックが正常に終了すると、トランザクションはコミットされます。例外がスローされた場合、トランザクションはロールバックされます。
処理中の KafkaTransactionManager (または同期)トランザクションがある場合、使用されません。代わりに、新しい「ネストされた」トランザクションが使用されます。 |
TransactionIdPrefix
唯一サポートされているモードである EOSMode.V2
(別名 BETA
) では、コンシューマーが開始したトランザクションであっても、同じ transactional.id
を使用する必要がなくなりました。実際、プロデューサーが開始したトランザクションと同じように、インスタンスごとに一意である必要があります。このプロパティは、アプリケーションインスタンスごとに異なる値を持つ必要があります。
TransactionIdSuffix Fixed
3.2 以降、transactional.id
サフィックスを管理するために新しい TransactionIdSuffixStrategy
インターフェースが導入されました。maxCache
をゼロより大きく設定すると、特定の範囲内で transactional.id
を再利用できる場合、デフォルトの実装は DefaultTransactionIdSuffixStrategy
です。それ以外の場合は、カウンターをインクリメントすることによってサフィックスがオンザフライで生成されます。トランザクションプロデューサーがリクエストされ、transactional.id
がすべて使用されている場合は、NoProducerAvailableException
をスローします。その後、ユーザーは、適切に構成されたバックオフを使用して、その例外を再試行するように構成された RetryTemplate
を使用できます。
public static class Config {
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
pf.setTransactionIdSuffixStrategy(ss);
return pf;
}
}
maxCache
を 5 に設定すると、transactional.id
は my.txid.
+ `{0-4}` になります。
KafkaTransactionManager を ConcurrentMessageListenerContainer とともに使用し、maxCache を有効にする場合、maxCache を concurrency 以上の値に設定する必要があります。MessageListenerContainer が transactional.id サフィックスを取得できない場合は、NoProducerAvailableException がスローされます。ConcurrentMessageListenerContainer でネストされたトランザクションを使用する場合は、ネストされたトランザクションの数の増加に対応するために maxCache 設定を調整する必要があります。 |
KafkaTemplate
トランザクションおよび非トランザクションパブリッシング
通常、KafkaTemplate
がトランザクション (トランザクション対応のプロデューサーファクトリで構成されている) である場合、トランザクションが必要です。トランザクションは、TransactionTemplate
、@Transactional
メソッド、executeInTransaction
の呼び出し、または KafkaTransactionManager
で構成されている場合はリスナーコンテナーによって開始できます。トランザクションの範囲外でテンプレートを使用しようとすると、テンプレートは IllegalStateException
をスローします。バージョン 2.4.3 以降、テンプレートの allowNonTransactional
プロパティを true
に設定できます。その場合、テンプレートは ProducerFactory
の createNonTransactionalProducer()
メソッドを呼び出すことにより、トランザクションなしで操作を実行できるようにします。プロデューサーは、再利用のために通常どおりキャッシュされるか、スレッドにバインドされます。DefaultKafkaProducerFactory
の使用を参照してください。
バッチリスナーとのトランザクション
トランザクションの使用中にリスナーが失敗すると、AfterRollbackProcessor
が呼び出され、ロールバックの発生後に何らかのアクションが実行されます。レコードリスナーでデフォルトの AfterRollbackProcessor
を使用する場合、失敗したレコードが再配信されるようにシークが実行されます。ただし、バッチリスナーを使用すると、フレームワークはバッチ内のどのレコードが失敗したかを認識できないため、バッチ全体が再配信されます。詳細については、ロールバック後のプロセッサーを参照してください。
バッチリスナーを使用する場合、バージョン 2.4.2 では、バッチ処理中の障害に対処するための代替メカニズム BatchToRecordAdapter
が導入されました。batchListener
が true に設定されているコンテナーファクトリが BatchToRecordAdapter
で構成されている場合、リスナーは一度に 1 つのレコードで呼び出されます。これにより、例外の種類に応じてバッチ全体の処理を停止しながら、バッチ内のエラー処理が可能になります。デフォルトの BatchToRecordAdapter
が提供されており、DeadLetterPublishingRecoverer
などの標準 ConsumerRecordRecoverer
で構成できます。次のテストケース構成スニペットは、この機能の使用方法を示しています。
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}