トランザクションバインダー

spring.cloud.stream.kafka.binder.transaction.transactionIdPrefix を空でない値に設定して、トランザクションを有効にします。tx-。プロセッサーアプリケーションで使用される場合、コンシューマーはトランザクションを開始します。コンシューマースレッドで送信されたすべてのレコードは、同じトランザクションに参加します。リスナーが正常に終了すると、リスナーコンテナーはオフセットをトランザクションに送信し、コミットします。spring.cloud.stream.kafka.binder.transaction.producer.* プロパティを使用して構成されたすべてのプロデューサーバーインディングには、共通のプロデューサーファクトリが使用されます。個々のバインディング Kafka プロデューサープロパティは無視されます。

通常のバインダーの再試行(およびデッドレタリング)は、トランザクションではサポートされていません。再試行は元のトランザクションで実行されるため、ロールバックされる可能性があり、公開されたレコードもロールバックされます。再試行が有効になっている場合(共通プロパティ maxAttempts がゼロより大きい場合)、再試行プロパティを使用して、コンテナーレベルで再試行を有効にするように DefaultAfterRollbackProcessor を構成します。同様に、トランザクション内で配信不能レコードを公開する代わりに、この機能は、メイントランザクションがロールバックされた後に実行される DefaultAfterRollbackProcessor を介して、リスナーコンテナーに移動されます。

ソースアプリケーションで、またはプロデューサーのみのトランザクション(@Scheduled メソッドなど)の任意のスレッドからトランザクションを使用する場合は、トランザクションプロデューサーファクトリへの参照を取得し、それを使用して KafkaTransactionManager Bean を定義する必要があります。

@Bean
public PlatformTransactionManager transactionManager(BinderFactory binders,
        @Value("${unique.tx.id.per.instance}") String txId) {

    ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
            MessageChannel.class)).getTransactionalProducerFactory();
    KafkaTransactionManager tm = new KafkaTransactionManager<>(pf);
    tm.setTransactionId(txId)
    return tm;
}

BinderFactory を使用してバインダーへの参照を取得していることに注意してください。バインダーが 1 つしか構成されていない場合は、最初の引数で null を使用します。複数のバインダーが構成されている場合は、バインダー名を使用して参照を取得します。バインダーへの参照を取得したら、ProducerFactory への参照を取得して、トランザクションマネージャーを作成できます。

次に、通常の Spring トランザクションサポートを使用します。TransactionTemplate または @Transactional、例:

public static class Sender {

    @Transactional
    public void doInTransaction(MessageChannel output, List<String> stuffToSend) {
        stuffToSend.forEach(stuff -> output.send(new GenericMessage<>(stuff)));
    }

}

プロデューサーのみのトランザクションを他のトランザクションマネージャーからのトランザクションと同期する場合は、ChainedTransactionManager を使用します。

アプリケーションの複数のインスタンスをデプロイする場合、各インスタンスには一意の transactionIdPrefix が必要です。

Kafka トランザクションにおける例外再試行動作

トランザクションロールバック再試行動作の構成

Kafka トランザクション内でメッセージを処理する場合、defaultRetryable プロパティと retryableExceptions マップを使用して、トランザクションのロールバック後に再試行する例外を構成できます。

デフォルトの再試行動作

DefaultAfterRollbackProcessor は、トランザクションのロールバック後に再試行をトリガーする例外を決定します。デフォルトでは、すべての例外が再試行されますが、この動作を変更できます。

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             defaultRetryable: false  # Change default to NOT retry exceptions

defaultRetryable が false に設定されている場合、DefaultAfterRollbackProcessor は defaultFalse(true) で構成されます。つまり、再試行可能として明示的に構成されない限り、例外は再試行されません。

例外固有の構成

きめ細かい制御を行うには、個々の例外型に対して再試行動作を指定できます。

spring:
 cloud:
   stream:
     kafka:
       bindings:
         <binding-name>:
           consumer:
             retryableExceptions:
               java.lang.IllegalStateException: true    # Always retry this exception
               java.lang.IllegalArgumentException: false  # Never retry this exception

DefaultAfterRollbackProcessor は、true としてマークされた例外には addRetryableExceptions() を使用し、false としてマークされた例外には addNotRetryableExceptions() を使用します。これらの例外固有の構成は、デフォルトの動作よりも優先されます。

実装の詳細

  • トランザクションを使用する場合、retryableExceptions では例外型(Exception のサブクラス)のみを設定できます。

  • 例外以外の型が指定された場合は IllegalArgumentException がスローされます

  • DefaultAfterRollbackProcessor は、トランザクションが有効でバッチモードが無効になっている場合にのみ構成されます。

  • この構成により、トランザクション再試行の動作が非トランザクション再試行処理と一致することが保証されます。