トランザクションバインダー
spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix
を空でない値に設定して、トランザクションを有効にします。tx-
。プロセッサーアプリケーションで使用される場合、コンシューマーはトランザクションを開始します。コンシューマースレッドで送信されたすべてのレコードは、同じトランザクションに参加します。リスナーが正常に終了すると、リスナーコンテナーはオフセットをトランザクションに送信し、コミットします。spring.cloud.stream.kafka.binder.transaction.producer.*
プロパティを使用して構成されたすべてのプロデューサーバーインディングには、共通のプロデューサーファクトリが使用されます。個々のバインディング Kafka プロデューサープロパティは無視されます。
通常のバインダーの再試行(およびデッドレタリング)は、トランザクションではサポートされていません。再試行は元のトランザクションで実行されるため、ロールバックされる可能性があり、公開されたレコードもロールバックされます。再試行が有効になっている場合(共通プロパティ maxAttempts がゼロより大きい場合)、再試行プロパティを使用して、コンテナーレベルで再試行を有効にするように DefaultAfterRollbackProcessor を構成します。同様に、トランザクション内で配信不能レコードを公開する代わりに、この機能は、メイントランザクションがロールバックされた後に実行される DefaultAfterRollbackProcessor を介して、リスナーコンテナーに移動されます。 |
ソースアプリケーションで、またはプロデューサーのみのトランザクション(@Scheduled
メソッドなど)の任意のスレッドからトランザクションを使用する場合は、トランザクションプロデューサーファクトリへの参照を取得し、それを使用して KafkaTransactionManager
Bean を定義する必要があります。
@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
@Value("${unique.tx.id.per.instance}") String txId) {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
tm.setTransactionId(txId)
return tm;
}
BinderFactory
を使用してバインダーへの参照を取得していることに注意してください。バインダーが 1 つしか構成されていない場合は、最初の引数で null
を使用します。複数のバインダーが構成されている場合は、バインダー名を使用して参照を取得します。バインダーへの参照を取得したら、ProducerFactory
への参照を取得して、トランザクションマネージャーを作成できます。
次に、通常の Spring トランザクションサポートを使用します。TransactionTemplate
または @Transactional
、例:
public static class Sender {
@Transactional
public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
}
}
プロデューサーのみのトランザクションを他のトランザクションマネージャーからのトランザクションと同期する場合は、ChainedTransactionManager
を使用します。
アプリケーションの複数のインスタンスをデプロイする場合、各インスタンスには一意の transactionIdPrefix が必要です。 |
Kafka トランザクションにおける例外再試行動作
トランザクションロールバック再試行動作の構成
Kafka トランザクション内でメッセージを処理する場合、defaultRetryable
プロパティと retryableExceptions
マップを使用して、トランザクションのロールバック後に再試行する例外を構成できます。
デフォルトの再試行動作
DefaultAfterRollbackProcessor
は、トランザクションのロールバック後に再試行をトリガーする例外を決定します。デフォルトでは、すべての例外が再試行されますが、この動作を変更できます。
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
defaultRetryable: false # Change default to NOT retry exceptions
defaultRetryable
が false
に設定されている場合、DefaultAfterRollbackProcessor
は defaultFalse(true)
で構成されます。つまり、再試行可能として明示的に構成されない限り、例外は再試行されません。
例外固有の構成
きめ細かい制御を行うには、個々の例外型に対して再試行動作を指定できます。
spring:
cloud:
stream:
kafka:
bindings:
<binding-name>:
consumer:
retryableExceptions:
java.lang.IllegalStateException: true # Always retry this exception
java.lang.IllegalArgumentException: false # Never retry this exception
DefaultAfterRollbackProcessor
は、true
としてマークされた例外には addRetryableExceptions()
を使用し、false
としてマークされた例外には addNotRetryableExceptions()
を使用します。これらの例外固有の構成は、デフォルトの動作よりも優先されます。