ヒント、コツ、レシピ

Kafka を使用したシンプルな DLQ

問題文

開発者として、Kafka トピックのレコードを処理するコンシューマーアプリケーションを作成したいと思います。ただし、処理中にエラーが発生した場合は、アプリケーションを完全に停止させたくありません。代わりに、エラーのあるレコードを DLT(Dead-Letter-Topic)に送信してから、新しいレコードの処理を続行したいと思います。

ソリューション

この問題の解決策は、Spring Cloud Stream の DLQ 機能を使用することです。この議論の目的のために、以下が私たちのプロセッサー関数であると仮定しましょう。

@Bean
public Consumer<byte[]> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

これは、処理するすべてのレコードに対して例外をスローする非常に簡単な関数ですが、この関数を使用して、他の同様の状況に拡張することができます。

エラーのあるレコードを DLT に送信するには、次の構成を提供する必要があります。

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq

DLQ をアクティブ化するには、アプリケーションでグループ名を指定する必要があります。匿名のコンシューマーは DLQ 機能を使用できません。また、Kafka コンシューマーバインディングの enableDLQ プロパティを true に設定して、DLQ を有効にする必要があります。最後に、Kafka コンシューマーバインディングで dlqName を提供することにより、オプションで DLT 名を提供できます。それ以外の場合は、この場合はデフォルトで error.input-topic.my-group になります。

上記のコンシューマーの例では、ペイロードの型は byte[] であることに注意してください。デフォルトでは、Kafka バインダーの DLQ プロデューサーは型 byte[] のペイロードを想定しています。そうでない場合は、適切なシリアライザーの構成を提供する必要があります。例: コンシューマー関数を次のように書き直してみましょう。

@Bean
public Consumer<String> processData() {
  return s -> {
     throw new RuntimeException();
  };
}

次に、Spring Cloud Stream に、DLT に書き込むときにデータをどのように直列化するかを指示する必要があります。このシナリオの変更された構成は次のとおりです。

spring.cloud.stream:
  bindings:
   processData-in-0:
     group: my-group
     destination: input-topic
 kafka:
   bindings:
     processData-in-0:
       consumer:
         enableDlq: true
         dlqName: input-topic-dlq
         dlqProducerProperties:
           configuration:
             value.serializer: org.apache.kafka.common.serialization.StringSerializer

高度な再試行オプションを備えた DLQ

問題文

これは上記のレシピに似ていますが、開発者として、再試行の処理方法を構成したいと思います。

ソリューション

上記のレシピに従った場合、処理でエラーが発生したときに、Kafka バインダーに組み込まれているデフォルトの再試行オプションを取得します。

デフォルトでは、バインダーは 1 秒の初期遅延で最大 3 回試行され、2.0 乗数はそれぞれ最大遅延 10 秒でバックオフします。これらの構成はすべて、次のように変更できます。

spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval

必要に応じて、ブール値のマップを提供することにより、再試行可能な例外のリストを提供することもできます。元:

spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false

デフォルトでは、上記のマップにリストされていない例外は再試行されます。それが望ましくない場合は、次のように指定して無効にすることができます。

spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false

独自の RetryTemplate を提供し、バインダーによってスキャンされて使用される @StreamRetryTemplate としてマークすることもできます。これは、より高度な再試行戦略とポリシーが必要な場合に役立ちます。

複数の @StreamRetryTemplate Bean がある場合は、プロパティを使用して、バインディングで必要な Bean を指定できます。

spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>

DLQ を使用した逆直列化エラーの処理

問題文

Kafka コンシューマーでデシリアライゼーション例外が発生するプロセッサーがあります。Spring Cloud Stream DLQ メカニズムがそのシナリオを捉えることを期待していますが、そうではありません。どうすればこれを処理できますか ?

ソリューション

Spring Cloud Stream によって提供される通常の DLQ メカニズムは、Kafka コンシューマーが回復不能な逆直列化例外をスローした場合には役に立ちません。これは、コンシューマーの poll() メソッドが戻る前であっても、この例外が発生するためです。Spring for Apache Kafka プロジェクトは、この状況でバインダーを支援するいくつかの優れた方法を提供します。調べてみましょう。

これが私たちの機能であると仮定します:

@Bean
public Consumer<String> functionName() {
    return s -> {
        System.out.println(s);
    };
}

これは、String パラメーターを受け取る簡単な関数です。

Spring Cloud Stream が提供するメッセージコンバーターをバイパスし、代わりにネイティブデシリアライザーを使用したいと考えています。String 型の場合はあまり意味がありませんが、AVRO などのより複雑な型の場合は、外部デシリアライザーに依存する必要があるため、変換を Kafka に委譲する必要があります。

コンシューマーがデータを受信したときに、デシリアライゼーションエラーを引き起こす不良レコードがあると仮定しましょう。たとえば、誰かが String ではなく Integer を渡した可能性があります。その場合、アプリケーションで何もしないと、チェーン を介して例外が伝播され、アプリケーションは最終的に終了します。

これを処理するために、DefaultErrorHandler を構成する ListenerContainerCustomizer@Bean を追加できます。この DefaultErrorHandler は DeadLetterPublishingRecoverer で構成されています。また、コンシューマー用に ErrorHandlingDeserializer を構成する必要があります。それは多くの複雑なことのように聞こえますが、実際には、この場合、これらの 3 つの Bean に要約されます。

	@Bean
	public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
		return (container, dest, group) -> {
			container.setCommonErrorHandler(errorHandler);
		};
	}
	@Bean
	public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
		return new DefaultErrorHandler(deadLetterPublishingRecoverer);
	}
	@Bean
	public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
		return new DeadLetterPublishingRecoverer(bytesTemplate);
	}

それぞれを分析してみましょう。最初のものは、DefaultErrorHandler を取る ListenerContainerCustomizer Bean です。これで、コンテナーはその特定のエラーハンドラーでカスタマイズされます。コンテナーのカスタマイズについて詳しくは、こちらを参照してください

2 番目の Bean は、DLT への公開で構成された DefaultErrorHandler です。DefaultErrorHandler の詳細については、こちらを参照してください。

3 番目の Bean は、DLT への送信を最終的に担当する DeadLetterPublishingRecoverer です。デフォルトでは、DLT トピックの名前は ORIGINAL_TOPIC_NAME.DLT です。ただし、変更できます。詳細については、ドキュメントを参照してください。

また、アプリケーション構成を介して ErrorHandlingDeserializer を構成する必要があります。

ErrorHandlingDeserializer は実際のデシリアライザーに委譲します。エラーが発生した場合は、レコードのキー / 値を null に設定し、メッセージの生のバイトを含めます。次に、ヘッダーに例外を設定し、このレコードをリスナーに渡します。リスナーは、登録されたエラーハンドラーを呼び出します。

必要な構成は次のとおりです。

spring.cloud.stream:
  function:
    definition: functionName
  bindings:
    functionName-in-0:
      group: group-name
      destination: input-topic
      consumer:
       use-native-decoding: true
  kafka:
    bindings:
      functionName-in-0:
        consumer:
          enableDlq: true
          dlqName: dlq-topic
          dlqProducerProperties:
            configuration:
              value.serializer: org.apache.kafka.common.serialization.StringSerializer
          configuration:
            value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
            spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

バインディングの configuration プロパティを介して ErrorHandlingDeserializer を提供しています。また、委譲する実際のデシリアライザーが StringDeserializer であることも示しています。

上記の dlq プロパティは、このレシピの説明には関係がないことに注意してください。これらは、純粋にアプリケーションレベルのエラーに対処することのみを目的としています。

Kafka バインダーの基本的なオフセット管理

問題文

Spring Cloud Stream Kafka コンシューマーアプリケーションを作成したいのですが、Kafka コンシューマーオフセットをどのように管理するかがわかりません。説明できる?

ソリューション

これに関するドキュメントセクションを読んで、完全に理解することをお勧めします。

これが gist です。

Kafka は、デフォルトで 2 種類のオフセット(earliest と latest)をサポートしています。それらのセマンティクスは、それらの名前から自明です。

初めてコンシューマーを実行していると仮定します。Spring Cloud Stream アプリケーションで group.id を見逃すと、匿名のコンシューマーになります。匿名のコンシューマーがある場合は常に、Spring Cloud Stream アプリケーションはデフォルトで、トピックパーティション内の latest の使用可能なオフセットから開始されます。一方、group.id を明示的に指定すると、デフォルトで、Spring Cloud Stream アプリケーションは、トピックパーティションで使用可能な earliest オフセットから開始されます。

上記の両方の場合(明示的なグループと匿名グループを持つコンシューマー)、開始オフセットは、プロパティ spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset を使用し、earliest または latest のいずれかに設定することで切り替えることができます。

ここで、以前にコンシューマーを実行し、再度開始したと仮定します。この場合、コンシューマーがコンシューマーグループに対してすでにコミットされたオフセットを見つけるため、上記の場合の開始オフセットセマンティクスは適用されません(匿名コンシューマーの場合、アプリケーションは group.id を提供しませんが、バインダーは自動生成)。最後にコミットされたオフセット以降をピックアップするだけです。これは、startOffset 値が提供されている場合でも当てはまります。

ただし、resetOffsets プロパティを使用して、コンシューマーが最後にコミットされたオフセットから開始するデフォルトの動作をオーバーライドできます。これを行うには、プロパティ spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets を true (デフォルトでは false)に設定します。次に、startOffset 値(earliest または latest のいずれか)を指定していることを確認してください。これを実行してからコンシューマーアプリケーションを起動すると、起動するたびに、これが初めて起動するかのように起動し、パーティションのコミットされたオフセットを無視します。

Kafka で任意のオフセットを求めています

問題文

Kafka バインダーを使用すると、オフセットを earliest または latest のいずれかに設定できることがわかりますが、オフセットを途中の任意のオフセットに求める必要があります。Spring Cloud Stream Kafka バインダーを使用してこれを達成する方法はありますか ?

ソリューション

以前、Kafka バインダーを使用して基本的なオフセット管理に取り組む方法を見てきました。デフォルトでは、バインダーは、少なくともそのレシピで見たメカニズムを通じて、任意のオフセットに巻き戻すことを許可しません。ただし、このユースケースを実現するためにバインダーが提供する低レベルの戦略がいくつかあります。調べてみましょう。

まず、earliest または latest 以外の任意のオフセットにリセットする場合は、resetOffsets 構成をデフォルトの false のままにしてください。次に、型 KafkaBindingRebalanceListener のカスタム Bean を提供する必要があります。これは、すべてのコンシューマーバインディングに注入されます。これはいくつかのデフォルトのメソッドが付属しているインターフェースですが、ここに興味を持っているメソッドがあります:

/**
	 * Invoked when partitions are initially assigned or after a rebalance. Applications
	 * might only want to perform seek operations on an initial assignment. While the
	 * 'initial' argument is true for each thread (when concurrency is greater than 1),
	 * implementations should keep track of exactly which partitions have been sought.
	 * There is a race in that a rebalance could occur during startup and so a topic/
	 * partition that has been sought on one thread may be re-assigned to another
	 * thread and you may not wish to re-seek it at that time.
	 * @param bindingName the name of the binding.
	 * @param consumer the consumer.
	 * @param partitions the partitions.
	 * @param initial true if this is the initial assignment on the current thread.
	 */
	default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
			Collection<TopicPartition> partitions, boolean initial) {
		// do nothing
	}

詳細を見てみましょう。

基本的に、このメソッドは、トピックパーティションの最初の割り当て中、またはリバランス後に毎回呼び出されます。説明をわかりやすくするために、トピックが foo であり、4 つのパーティションがあると仮定します。最初は、グループ内の 1 つのコンシューマーのみを開始し、このコンシューマーはすべてのパーティションから消費します。コンシューマーが初めて起動すると、4 つのパーティションすべてが最初に割り当てられます。ただし、デフォルトで消費するパーティションを開始するのではなく(グループを定義しているため、earliest)、パーティションごとに、任意のオフセットをシークした後に消費するようにします。以下のように、特定のオフセットから消費するビジネスケースがあると想像してください。

Partition   start offset

0           1000
1           2000
2           2000
3           1000

これは、上記の方法を以下のように実装することで実現できます。

@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {

    Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
    topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
    topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
    topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);

    if (initial) {
        partitions.forEach(tp -> {
            if (topicPartitionOffset.containsKey(tp)) {
                final Long offset = topicPartitionOffset.get(tp);
                try {
                    consumer.seek(tp, offset);
                }
                catch (Exception e) {
                    // Handle exceptions carefully.
                }
            }
        });
    }
}

これは単なる基本的な実装です。実際のユースケースはこれよりもはるかに複雑であり、それに応じて調整する必要がありますが、これは確かに基本的なスケッチを提供します。コンシューマー seek が失敗すると、いくつかのランタイム例外がスローされる可能性があり、その場合の対処方法を決定する必要があります。

[[ 同じグループ ID で 2 番目のコンシューマーを開始する場合はどうすればよいですか ? ]] === 同じグループ ID で 2 番目のコンシューマーを開始したらどうなるでしょうか ?

2 番目のコンシューマーを追加すると、リバランスが発生し、一部のパーティションが移動します。新しいコンシューマーがパーティション 2 および 3 を取得するとします。この新しい Spring Cloud Stream コンシューマーがこの onPartitionsAssigned メソッドを呼び出すと、これがこのコンシューマーのパーティション 2 および 3 の初期割り当てであることがわかります。initial 引数の条件付きチェックにより、シーク操作が実行されます。最初のコンシューマーの場合、パーティション 0 と 1 しかありません。ただし、このコンシューマーの場合、これは単なるリバランスイベントであり、最初の割り当てとは見なされませんでした。initial 引数の条件付きチェックのため、指定されたオフセットに再シークすることはありません。

[[how-do-i-manually-acknowledge-using-kafka-binder?]] == Kafka バインダーの使用を手動で確認するにはどうすればよいですか ?

問題文

Kafka バインダーを使用して、コンシューマーのメッセージを手動で確認したい。それ、どうやったら出来るの?

ソリューション

デフォルトでは、Kafka バインダーは Spring for Apache Kafka プロジェクトのデフォルトのコミット設定に委譲します。Spring Kafka のデフォルトの ackMode は batch です。詳細については、こちらを参照してください。

このデフォルトのコミット動作を無効にし、手動コミットに依存したい場合があります。次の手順でそれを行うことができます。

プロパティ spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode を MANUAL または MANUAL_IMMEDIATE のいずれかに設定します。このように設定すると、コンシューマーメソッドが受信するメッセージに kafka_acknowledgment (KafkaHeaders.ACKNOWLEDGMENT から)というヘッダーが存在します。

例: これをコンシューマー向けの方法として想像してください。

@Bean
public Consumer<Message<String>> myConsumer() {
    return msg -> {
        Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
         System.out.println("Acknowledgment provided");
         acknowledgment.acknowledge();
        }
    };
}

次に、プロパティ spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode を MANUAL または MANUAL_IMMEDIATE に設定します。

[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == Spring Cloud Stream のデフォルトのバインディング名をオーバーライドするにはどうすればよいですか ?

問題文

Spring Cloud Stream は、関数の定義と署名に基づいてデフォルトのバインディングを作成しますが、これらをよりドメインに適した名前にオーバーライドするにはどうすればよいですか?

ソリューション

以下が関数のシグネチャーであると想定します。

@Bean
public Function<String, String> uppercase(){
...
}

デフォルトでは、Spring Cloud Stream は以下のようにバインディングを作成します。

  1. 大文字の 0

  2. 大文字 -out-0

次のプロパティを使用して、これらのバインディングを何かにオーバーライドできます。

spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out

この後、すべてのバインディングプロパティは、新しい名前 my-transformer-in および my-transformer-out で作成する必要があります。

これは、Kafka ストリームと複数の入力を使用した別の例です。

@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}

デフォルトでは、Spring Cloud Stream はこの関数に対して 3 つの異なるバインディング名を作成します。

  1. processOrder-in-0

  2. processOrder-in-1

  3. processOrder-out-0

これらのバインディングにいくつかの構成を設定するたびに、これらのバインディング名を使用する必要があります。それが好きではなく、たとえば、のような、よりドメインフレンドリーで読みやすいバインディング名を使用したいと考えています。

  1. オーダー

  2. アカウント

  3. enrichedOrders

これらの 3 つのプロパティを設定するだけで、簡単にそれを行うことができます

  1. spring.cloud.stream.function.bindings.processOrder-in-0= オーダー

  2. spring.cloud.stream.function.bindings.processOrder-in-1= アカウント

  3. spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders

これを行うと、デフォルトのバインディング名が上書きされ、それらに設定するプロパティは、これらの新しいバインディング名にある必要があります。

[[how-do-i-send-a-message-key-as-part-of-my-record?]] == メッセージキーをレコードの一部として送信するにはどうすればよいですか ?

問題文

レコードのペイロードと一緒にキーを送信する必要がありますが、Spring Cloud Stream でそれを行う方法はありますか?

ソリューション

多くの場合、キーと値を含むレコードとして、マップのような連想データ構造を送信する必要があります。Spring Cloud Stream を使用すると、簡単な方法でそれを行うことができます。以下は、これを行うための基本的な青写真ですが、特定のユースケースに適合させることをお勧めします。

これがサンプルプロデューサーメソッド(別名 Supplier)です。

@Bean
public Supplier<Message<String>> supplier() {
    return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}

これは、String ペイロードだけでなく、キーも使用してメッセージを送信する簡単な関数です。KafkaHeaders.MESSAGE_KEY を使用してキーをメッセージヘッダーとして設定していることに注意してください。

キーをデフォルトの kafka_messageKey から変更する場合は、構成で次のプロパティを指定する必要があります。

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']

バインディング名 supplier-out-0 を使用していることに注意してください。これは関数名であるため、それに応じて更新してください。

次に、メッセージを生成するときにこの新しいキーを使用します。

[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == の代わりにネイティブシリアライザーとデシリアライザーを使用するにはどうすればよいですか ? メッセージ変換は Spring Cloud Stream によって行われましたか ?

問題文

Spring Cloud Stream でメッセージコンバーターを使用する代わりに、Kafka でネイティブシリアライザーとデシリアライザーを使用したいと思います。デフォルトでは、Spring Cloud Stream は、内部の組み込みメッセージコンバーターを使用してこの変換を処理します。これをバイパスして、Kafka に責任を委譲するにはどうすればよいですか?

ソリューション

これは本当に簡単です。

ネイティブ直列化を有効にするには、次のプロパティを指定するだけです。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true

次に、シリアライザーも設定する必要があります。これを行うにはいくつかの方法があります。

spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

またはバインダー構成を使用します。

spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer

バインダー方式を使用する場合、すべてのバインディングに対して適用されますが、バインディングでの設定はバインディングごとに行われます。

デシリアライズ側では、デシリアライザーを構成として提供する必要があります。

以下に例を示します。

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer

バインダーレベルで設定することもできます。

ネイティブデコードを強制するために設定できるオプションのプロパティがあります。

spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true

ただし、Kafka バインダーの場合、バインダーに到達するまでに、Kafka は構成済みのデシリアライザーを使用してすでにデシリアライズしているため、これは不要です。

Kafka Streams バインダーでオフセットリセットがどのように機能するかを説明します

問題文

デフォルトでは、Kafka Streams バインダーは、常に新しいコンシューマーの最も早いオフセットから開始します。場合によっては、最新のオフセットから開始することが有益であるか、アプリケーションによって要求されます。Kafka Streams バインダーを使用するとそれが可能になります。

ソリューション

ソリューションを検討する前に、次のシナリオを見てみましょう。

@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
    (s, t) -> s.join(t, ...)
    ...
}

2 つの入力バインディングを必要とする BiConsumer Bean があります。この場合、最初のバインディングは KStream 用で、2 番目のバインディングは KTable 用です。このアプリケーションを初めて実行するとき、デフォルトでは、両方のバインディングは earliest オフセットから始まります。いくつかの要件のために latest オフセットから始めたいのはどうですか? これを行うには、次のプロパティを有効にします。

spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest

1 つのバインディングのみを latest オフセットから開始し、もう 1 つをデフォルトの earliest からコンシューマーに開始する場合は、後者のバインディングを構成から除外します。

コミットオフセットが存在したら、これらの設定は光栄とコミットオフセットが優先されていない、ということを覚えておいてください。

Kafka でのレコードの送信(生成)の成功を追跡する

問題文

Kafka プロデューサーアプリケーションがあり、成功したすべての送信を追跡したいと考えています。

ソリューション

アプリケーションに次のサプライヤーがあると仮定します。

@Bean
	public Supplier<Message<String>> supplier() {
		return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
	}

次に、成功したすべての送信情報をキャプチャーするために、新しい MessageChannel Bean を定義する必要があります。

@Bean
	public MessageChannel fooRecordChannel() {
		return new DirectChannel();
	}

次に、アプリケーション構成でこのプロパティを定義して、recordMetadataChannel の Bean 名を指定します。

spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel

この時点で、正常に送信された情報が fooRecordChannel に送信されます。

以下のように IntegrationFlow を記述して、情報を確認できます。

@Bean
public IntegrationFlow integrationFlow() {
    return f -> f.channel("fooRecordChannel")
                 .handle((payload, messageHeaders) -> payload);
}

handle メソッドでは、ペイロードは Kafka に送信されたものであり、メッセージヘッダーには kafka_recordMetadata と呼ばれる特別なキーが含まれています。その値は、トピックパーティション、現在のオフセットなどに関する情報を含む RecordMetadata です。

Kafka にカスタムヘッダーマッパーを追加する

問題文

いくつかのヘッダーを設定する Kafka プロデューサーアプリケーションがありますが、コンシューマーアプリケーションにありません。何故ですか?

ソリューション

通常の状況では、これで問題ありません。

次のプロデューサーがいると想像してみてください。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}

コンシューマー側では、ヘッダー "foo" が表示されるはずですが、以下では課題は発生しません。

@Bean
public Consumer<Message<String>> consume() {
    return s -> {
        final String foo = (String)s.getHeaders().get("foo");
        System.out.println(foo);
    };
}

アプリケーションでカスタムヘッダーマッパーを提供する場合、これは機能しません。アプリケーションに空の KafkaHeaderMapper があるとしましょう。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {

        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {

        }
    };
}

それが実装である場合、コンシューマーの foo ヘッダーを見逃すことになります。おそらく、これらの KafkaHeaderMapper メソッド内にいくつかのロジックがある可能性があります。foo ヘッダーにデータを入力するには、次のものが必要です。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String foo = (String) headers.get("foo");
            target.add("foo", foo.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header foo = source.lastHeader("foo");
			target.put("foo", new String(foo.value()));
        }
    }

これにより、プロデューサーからコンシューマーに foo ヘッダーが適切に入力されます。

id ヘッダーに関する特記事項

Spring Cloud Stream では、id ヘッダーは特別なヘッダーですが、一部のアプリケーションでは、custom-idIDId などの特別なカスタム ID ヘッダーが必要になる場合があります。最初のもの(custom-id)は、カスタムヘッダーマッパーなしでプロデューサーからコンシューマーに伝播します。ただし、フレームワークで予約されている id ヘッダーのバリアント(IDIdiD など)を使用して作成すると、フレームワークの内部で問題が発生します。このユースケースの詳細については、この StackOverflow スレッド (英語) を参照してください。その場合、大文字と小文字を区別する ID ヘッダーをマップするためにカスタム KafkaHeaderMapper を使用する必要があります。例: 次のプロデューサーがいるとします。

@Bean
public Supplier<Message<String>> supply() {
    return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}

上記のヘッダー Id は、フレームワーク id ヘッダーと衝突するため、消費側から削除されます。この課題を解決するために、カスタム KafkaHeaderMapper を提供できます。

@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
    return new KafkaHeaderMapper() {
        @Override
        public void fromHeaders(MessageHeaders headers, Headers target) {
            final String myId = (String) headers.get("Id");
			target.add("Id", myId.getBytes());
        }

        @Override
        public void toHeaders(Headers source, Map<String, Object> target) {
            final Header Id = source.lastHeader("Id");
			target.put("Id", new String(Id.value()));
        }
    };
}

これにより、id ヘッダーと Id ヘッダーの両方がプロデューサー側からコンシューマー側で使用できるようになります。

トランザクションで複数のトピックを作成する

問題文

複数の Kafka トピックへのトランザクションメッセージを生成するにはどうすればよいですか?

詳細については、この StackOverflow の質問 (英語) を参照してください。

ソリューション

トランザクションに Kafka バインダーのトランザクションサポートを使用してから、AfterRollbackProcessor を提供します。複数のトピックを作成するには、StreamBridge API を使用します。

以下は、このためのコードスニペットです。

@Autowired
StreamBridge bridge;

@Bean
Consumer<String> input() {
    return str -> {
        System.out.println(str);
        this.bridge.send("left", str.toUpperCase());
        this.bridge.send("right", str.toLowerCase());
        if (str.equals("Fail")) {
            throw new RuntimeException("test");
        }
    };
}

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
    return (container, dest, group) -> {
        ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
                MessageChannel.class)).getTransactionalProducerFactory();
        KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
        DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
        container.setAfterRollbackProcessor(rollbackProcessor);
    };
}

DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
    return new DefaultAfterRollbackProcessor<>(
            new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}

必要な構成

spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right

spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1

テストするには、次を使用できます。

@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
    return args -> {
        System.in.read();
        template.send("input", "Fail".getBytes());
        template.send("input", "Good".getBytes());
    };
}

いくつかの重要な注意事項:

DLT を手動で構成するため、アプリケーション構成に DLQ 設定がないことを確認してください(デフォルトでは、初期のコンシューマー機能に基づいて input.DLT という名前のトピックに公開されます)。また、バインダーによる再試行を回避するために、1 へのコンシューマーバインディングで maxAttempts をリセットします。上記の例では、最大で合計 3 回試行されます(最初の試行 + FixedBackoff での 2 回の試行)。

このコードをテストする方法の詳細については、StackOverflow スレッド (英語) を参照してください。Spring Cloud Stream を使用して、コンシューマー関数を追加してテストする場合は、コンシューマーバインディングの isolation-level を read-committed に設定してください。

この StackOverflow スレッド (英語) もこの議論に関連しています。

複数のポーリング可能なコンシューマーを実行するときに避けるべき落とし穴

問題文

ポーリング可能なコンシューマーの複数のインスタンスを実行し、インスタンスごとに一意の client.id を生成するにはどうすればよいですか?

ソリューション

次の定義を持っていると仮定します:

spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group

アプリケーションを実行すると、Kafka コンシューマーは client.id(consumer-my-group-1 など)を生成します。実行中のアプリケーションのインスタンスごとに、この client.id は同じになり、予期しない問題が発生します。

これを修正するために、アプリケーションの各インスタンスに次のプロパティを追加できます。

spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}

詳細については、この GitHub の課題 (英語) を参照してください。