クラス DeadLetterPublishingRecoverer

実装されたすべてのインターフェース:
BiConsumerSE<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,ExceptionSE>ConsumerAwareRecordRecovererConsumerRecordRecoverer

public class DeadLetterPublishingRecoverer extends ExceptionClassifier implements ConsumerAwareRecordRecoverer
失敗したレコードをデッドレタートピックに公開する ConsumerRecordRecoverer
導入:
2.2
作成者:
Gary Russell, Tomaz Fernandes
  • フィールドの詳細

  • コンストラクターの詳細

    • DeadLetterPublishingRecoverer

      public DeadLetterPublishingRecoverer(KafkaOperations<? extends ObjectSE,? extends ObjectSE> template)
      提供されたテンプレートと、失敗したレコードの元のトピック( ".DLT" が付加された)に基づいて TopicPartition を返すデフォルトの宛先解決関数、および失敗したレコードと同じパーティションを使用してインスタンスを作成します。デッドレタートピックには、少なくとも元のトピックと同じ数のパーティションが必要です。
      パラメーター:
      template - 公開に使用する KafkaOperations
    • DeadLetterPublishingRecoverer

      public DeadLetterPublishingRecoverer(KafkaOperations<? extends ObjectSE,? extends ObjectSE> template, BiFunctionSE<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,ExceptionSE,org.apache.kafka.common.TopicPartition> destinationResolver)
      提供されたテンプレートと宛先解決関数を使用してインスタンスを作成します。このインスタンスは、失敗したコンシューマーレコードと例外を受け取り、TopicPartition を返します。TopicPartition のパーティションが 0 未満の場合、トピックに公開するときにパーティションは設定されません。
      パラメーター:
      template - 公開に使用する KafkaOperations
      destinationResolver - 解決機能。
    • DeadLetterPublishingRecoverer

      public DeadLetterPublishingRecoverer(MapSE<ClassSE<?>,KafkaOperations<? extends ObjectSE,? extends ObjectSE>> templates)
      提供されたテンプレートと、失敗したレコードの元のトピック ( ".DLT" が追加された) に基づいて TopicPartition を返すデフォルトの宛先解決関数と、失敗したレコードと同じパーティションを使用してインスタンスを作成します。配信不能トピックには、少なくとも元のトピックと同じ数のパーティションが必要です。テンプレートマップキーは、クラスと、対応するテンプレートがその型のオブジェクト (プロデューサーレコード値) に使用する値です。複数のテンプレートがある場合は、マップが順番にトラバースされるように、LinkedHashMapSE をお勧めします。null 値を持つレコードを送信するには、VoidSE クラスをキーとしてテンプレートを追加します。それ以外の場合は、マップ値反復子の最初のテンプレートが使用されます。
      パラメーター:
      templates - 公開に使用する KafkaOperations
    • DeadLetterPublishingRecoverer

      public DeadLetterPublishingRecoverer(MapSE<ClassSE<?>,KafkaOperations<? extends ObjectSE,? extends ObjectSE>> templates, BiFunctionSE<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,ExceptionSE,org.apache.kafka.common.TopicPartition> destinationResolver)
      提供されたテンプレートと宛先解決関数を使用して、失敗したコンシューマーレコードと例外を受け取り、TopicPartition を返すインスタンスを作成します。TopicPartition のパーティションが 0 未満の場合、トピックへのパブリッシュ時にパーティションは設定されません。テンプレートマップキーは、クラスと、対応するテンプレートがその型のオブジェクト (プロデューサーレコード値) に使用する値です。複数のテンプレートがある場合は、マップが順番にトラバースされるように、LinkedHashMapSE をお勧めします。null 値を持つレコードを送信するには、VoidSE クラスをキーとしてテンプレートを追加します。それ以外の場合は、マップ値反復子の最初のテンプレートが使用されます。
      パラメーター:
      templates - 公開に使用する KafkaOperations
      destinationResolver - 解決機能。
    • DeadLetterPublishingRecoverer

      public DeadLetterPublishingRecoverer(FunctionSE<org.apache.kafka.clients.producer.ProducerRecord<?,?>,KafkaOperations<?,?>> templateResolver, BiFunctionSE<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,ExceptionSE,org.apache.kafka.common.TopicPartition> destinationResolver)
      失敗したコンシューマーレコードと例外を受け取り、KafkaOperations と、このインスタンスからの公開がトランザクションであるかどうかに関するフラグを返すテンプレート解決関数を使用してインスタンスを作成します。同様に機能するが、代わりに TopicPartition を返す宛先解決関数も受け取ります。TopicPartition のパーティションが 0 未満の場合、トピックに公開するときにパーティションは設定されません。
      パラメーター:
      templateResolver - 公開に使用する KafkaOperations をリゾルバーする関数。
      destinationResolver - 解決機能。
      導入:
      3.0.9
    • DeadLetterPublishingRecoverer

      public DeadLetterPublishingRecoverer(FunctionSE<org.apache.kafka.clients.producer.ProducerRecord<?,?>,KafkaOperations<?,?>> templateResolver, boolean transactional, BiFunctionSE<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,ExceptionSE,org.apache.kafka.common.TopicPartition> destinationResolver)
      失敗したコンシューマーレコードと例外を受け取り、KafkaOperations と、このインスタンスからの公開がトランザクションであるかどうかに関するフラグを返すテンプレート解決関数を使用してインスタンスを作成します。同様に機能するが、代わりに TopicPartition を返す宛先解決関数も受け取ります。TopicPartition のパーティションが 0 未満の場合、トピックに公開するときにパーティションは設定されません。
      パラメーター:
      templateResolver - 公開に使用する KafkaOperations をリゾルバーする関数。
      transactional - このインスタンスによる公開がトランザクションである必要があるかどうか
      destinationResolver - 解決機能。
      導入:
      2.7
  • メソッドの詳細

    • setRetainExceptionHeader

      public void setRetainExceptionHeader(boolean retainExceptionHeader)
      Java 直列化 DeserializationException ヘッダーを保持するには、true に設定します。デフォルトでは、このようなヘッダーは、キーと値の両方の逆直列化例外が発生しない限り、公開されたレコードから削除されます。発生した場合、DLT_ * ヘッダーは値の例外から作成され、キーの例外ヘッダーが保持されます。
      パラメーター:
      retainExceptionHeader - 保持するために真
      導入:
      2.5
    • setHeadersFunction

      public void setHeadersFunction(BiFunctionSE<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,ExceptionSE,org.apache.kafka.common.header.Headers> headersFunction)
      公開されたレコードに追加する追加ヘッダーを取得するために呼び出される関数を設定します。返された Header が DeadLetterPublishingRecoverer.SingleRecordHeader のインスタンスである場合、そのヘッダーは、新しい値として追加されるのではなく、その名前の既存のヘッダーを置き換えます。
      パラメーター:
      headersFunction - ヘッダー機能。
      導入:
      2.5.4
      関連事項:
    • setVerifyPartition

      public void setVerifyPartition(boolean verifyPartition)
      false に設定すると、パーティションの検証が無効になります。true の場合、リゾルバーによって返されるパーティションが実際に存在することを確認します。そうでない場合は、ProducerRecord.partition() を null に設定して、プロデューサーが宛先パーティションを判別できるようにします。
      パラメーター:
      verifyPartition - 無効にする場合は false。
      導入:
      2.7
      関連事項:
    • setPartitionInfoTimeout

      public void setPartitionInfoTimeout(DurationSE partitionInfoTimeout)
      検証時にパーティション情報を待つ時間。デフォルトは 5 秒です。
      パラメーター:
      partitionInfoTimeout - タイムアウト。
      導入:
      2.7
      関連事項:
    • setAppendOriginalHeaders

      public void setAppendOriginalHeaders(boolean appendOriginalHeaders)
      現在の「元の」ヘッダー(トピック、パーティションなど)がすでに存在する場合、追加したくない場合は、false に設定します。false の場合、最初の「元の」ヘッダーのみが保持されます。
      パラメーター:
      appendOriginalHeaders - 置き換えないように false に設定します。
      導入:
      2.7.9
    • setThrowIfNoDestinationReturned

      public void setThrowIfNoDestinationReturned(boolean throwIfNoDestinationReturned)
      宛先リゾルバー関数が null TopicPartition を返した場合に例外をスローするには、true に設定します。
      パラメーター:
      throwIfNoDestinationReturned - 有効にする場合は true。
      導入:
      2.7
    • setFailIfSendResultIsError

      public void setFailIfSendResultIsError(boolean failIfSendResultIsError)
      true に設定すると、送信結果の待機が有効になり、失敗した場合は例外がスローされます。結果を待つために、waitForSendResultTimeout で指定されたミリ秒を待ちます。
      パラメーター:
      failIfSendResultIsError - 有効にする場合は true。
      導入:
      2.7
      関連事項:
    • isFailIfSendResultIsError

      protected boolean isFailIfSendResultIsError()
      true の場合、送信結果を待ち、失敗した場合は例外をスローします。結果の waitForSendResultTimeout で指定されたミリ秒を待機します。
      戻り値:
      待つのは本当です。
      導入:
      2.7.14
      関連事項:
    • setWaitForSendResultTimeout

      public void setWaitForSendResultTimeout(DurationSE waitForSendResultTimeout)
      メッセージの送信を待つ最小時間を設定します。デフォルトは、プロデューサー構成 delivery.timeout.ms と setTimeoutBuffer(long) です。
      パラメーター:
      waitForSendResultTimeout - タイムアウト。
      導入:
      2.7
      関連事項:
    • setTimeoutBuffer

      public void setTimeoutBuffer(long buffer)
      Kafka プロデューサーの前にタイムアウトにならないように、プロデューサー構成 delivery.timeout.ms プロパティに追加するミリ秒数を設定します。デフォルトは 5000 です。
      パラメーター:
      buffer - バッファ。
      導入:
      2.7
      関連事項:
    • getTimeoutBuffer

      protected long getTimeoutBuffer()
      Kafka プロデューサーの前にタイムアウトにならないようにするために、プロデューサー構成 delivery.timeout.ms プロパティに追加するミリ秒数。
      戻り値:
      バッファ。
      導入:
      2.7.14
    • setStripPreviousExceptionHeaders

      public void setStripPreviousExceptionHeaders(boolean stripPreviousExceptionHeaders)
      以前の例外ヘッダーと現在の例外のヘッダーを保持するには、false に設定します。デフォルトは true です。これは、現在のヘッダーのみが保持されることを意味します。false に設定すると、レコードが何度も再発行されるときにレコードサイズが大きくなる可能性があります。
      パラメーター:
      stripPreviousExceptionHeaders - すべてを保持するには false。
      導入:
      2.7.9
    • setSkipSameTopicFatalExceptions

      public void setSkipSameTopicFatalExceptions(boolean skipSameTopicFatalExceptions)
      このクラスの分類によって例外が致命的である場合でも、レコードを同じトピックに転送する場合は、false に設定します。このシナリオを別のレイヤーで処理します。
      パラメーター:
      skipSameTopicFatalExceptions - このシナリオで転送する場合は false。
    • setLogRecoveryRecord

      public void setLogRecoveryRecord(boolean logRecoveryRecord)
      リカバリレコードと例外をログに記録する場合は、true に設定します。
      パラメーター:
      logRecoveryRecord - ログレコードと例外の場合は true。
      導入:
      3.1
    • setExceptionHeadersCreator

      public void setExceptionHeadersCreator(DeadLetterPublishingRecoverer.ExceptionHeadersCreator headersCreator)
      DeadLetterPublishingRecoverer.ExceptionHeadersCreator 実装を設定して、出力レコードの例外ヘッダーの設定を完全に引き継ぎます。デフォルトで設定されているすべてのヘッダーを無効にします。
      パラメーター:
      headersCreator - クリエイター。
      導入:
      2.8.4
    • isTransactional

      protected boolean isTransactional()
      公開をトランザクションで実行する必要がある場合は True。
      戻り値:
      トランザクションの場合は true。
      導入:
      2.7.14
    • excludeHeader

      public void excludeHeader(DeadLetterPublishingRecoverer.HeaderNames.HeadersToAdd... headers)
      ヘッダー名のヘッダー包含ビットをクリアします。
      パラメーター:
      headers - クリアするヘッダー。
      導入:
      2.8.4
    • includeHeader

      public void includeHeader(DeadLetterPublishingRecoverer.HeaderNames.HeadersToAdd... headers)
      ヘッダー名のヘッダー包含ビットを設定します。
      パラメーター:
      headers - 設定するヘッダー。
      導入:
      2.8.4
    • addHeadersFunction

      public void addHeadersFunction(BiFunctionSE<org.apache.kafka.clients.consumer.ConsumerRecord<?,?>,ExceptionSE,org.apache.kafka.common.header.Headers> headersFunction)
      公開されたレコードに追加する追加のヘッダーを取得するために呼び出される関数を追加します。関数は、追加された順序で、関数が setHeadersFunction(BiFunction) に渡された後に呼び出されます。返された Header が DeadLetterPublishingRecoverer.SingleRecordHeader のインスタンスである場合、そのヘッダーは、新しい値として追加されるのではなく、その名前の既存のヘッダーを置き換えます。
      パラメーター:
      headersFunction - ヘッダー機能。
      導入:
      2.8.4
      関連事項:
    • accept

      public void accept(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, @Nullable org.apache.kafka.clients.consumer.Consumer<?,?> consumer, ExceptionSE exception)
      インターフェースからコピーされた説明: ConsumerAwareRecordRecoverer
      レコードを回復します。
      次で指定:
      インターフェース ConsumerAwareRecordRecovereraccept 
      パラメーター:
      record - レコード。
      consumer - コンシューマー。
      exception - 例外。
    • send

      protected void send(org.apache.kafka.clients.producer.ProducerRecord<ObjectSE,ObjectSE> outRecord, KafkaOperations<ObjectSE,ObjectSE> kafkaTemplate, org.apache.kafka.clients.consumer.ConsumerRecord<?,?> inRecord)
      レコードを送信します。
      パラメーター:
      outRecord - レコード。
      kafkaTemplate - テンプレート。
      inRecord - コンシューマー記録。
      導入:
      2.7
    • createProducerRecord

      protected org.apache.kafka.clients.producer.ProducerRecord<ObjectSE,ObjectSE> createProducerRecord(org.apache.kafka.clients.consumer.ConsumerRecord<?,?> record, org.apache.kafka.common.TopicPartition topicPartition, org.apache.kafka.common.header.Headers headers, @Nullable byte[] key, @Nullable byte[] value)
      サブクラスはこのメソッドをオーバーライドして、DLQ に送信するプロデューサーレコードをカスタマイズできます。デフォルトの実装では、コンシューマーレコードからキーと値をコピーし、ヘッダーを追加するだけです。タイムスタンプが設定されていません(元のタイムスタンプはヘッダーの 1 つにあります)。重要: TopicPartition のパーティションが 0 未満の場合、ProducerRecord では null に設定する必要があります。
      パラメーター:
      record - 失敗した記録
      topicPartition - 宛先リゾルバーによって返される TopicPartition
      headers - ヘッダー - 元のレコードヘッダーと DLT ヘッダー。
      key - コンシューマーレコードキーの代わりに使用するキー。
      value - コンシューマーレコード値の代わりに使用する値。
      戻り値:
      送信するプロデューサーレコード。
      関連事項:
    • publish

      protected void publish(org.apache.kafka.clients.producer.ProducerRecord<ObjectSE,ObjectSE> outRecord, KafkaOperations<ObjectSE,ObjectSE> kafkaTemplate, org.apache.kafka.clients.consumer.ConsumerRecord<?,?> inRecord)
      送信結果のログ記録以上のものが必要な場合は、これをオーバーライドします。
      パラメーター:
      outRecord - 送信するレコード。
      kafkaTemplate - テンプレート。
      inRecord - コンシューマー記録。
      導入:
      2.2.5
    • verifySendResult

      protected void verifySendResult(KafkaOperations<ObjectSE,ObjectSE> kafkaTemplate, org.apache.kafka.clients.producer.ProducerRecord<ObjectSE,ObjectSE> outRecord, @Nullable CompletableFutureSE<SendResult<ObjectSE,ObjectSE>> sendResult, org.apache.kafka.clients.consumer.ConsumerRecord<?,?> inRecord)
      送信先が完了するのを待ちます。
      パラメーター:
      kafkaTemplate - レコードの送信に使用されるテンプレート。
      outRecord - レコード。
      sendResult - 未来。
      inRecord - 元のコンシューマー記録。
    • determineSendTimeout

      protected DurationSE determineSendTimeout(KafkaOperations<?,?> template)
      テンプレートのプロデューサーファクトリと setWaitForSendResultTimeout(Duration) に基づいて送信タイムアウトを決定します。
      パラメーター:
      template - テンプレート。
      戻り値:
      タイムアウト。
      導入:
      2.7.14
    • getHeaderNames

      非推奨、削除予定: この API 要素は、将来のバージョンで削除される可能性があります。
      3.0.9 以降 - 代わりにサプライヤーを提供してください。
      送信されたレコードで異なるヘッダー名を使用する場合は、これをオーバーライドします。
      戻り値:
      ヘッダー名。
      導入:
      2.7
      関連事項:
    • setHeaderNamesSupplier

      public void setHeaderNamesSupplier(SupplierSE<DeadLetterPublishingRecoverer.HeaderNames> supplier)
      パラメーター:
      supplier - サプライヤー。
      導入:
      3.0.9