ヒント、コツ、レシピ
Kafka を使用したシンプルな DLQ
問題文
開発者として、Kafka トピックのレコードを処理するコンシューマーアプリケーションを作成したいと思います。ただし、処理中にエラーが発生した場合は、アプリケーションを完全に停止させたくありません。代わりに、エラーのあるレコードを DLT(Dead-Letter-Topic)に送信してから、新しいレコードの処理を続行したいと思います。
ソリューション
この問題の解決策は、Spring Cloud Stream の DLQ 機能を使用することです。この議論の目的のために、以下が私たちのプロセッサー関数であると仮定しましょう。
@Bean
public Consumer<byte[]> processData() {
return s -> {
throw new RuntimeException();
};
}
これは、処理するすべてのレコードに対して例外をスローする非常に簡単な関数ですが、この関数を使用して、他の同様の状況に拡張することができます。
エラーのあるレコードを DLT に送信するには、次の構成を提供する必要があります。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
DLQ をアクティブ化するには、アプリケーションでグループ名を指定する必要があります。匿名のコンシューマーは DLQ 機能を使用できません。また、Kafka コンシューマーバインディングの enableDLQ
プロパティを true
に設定して、DLQ を有効にする必要があります。最後に、Kafka コンシューマーバインディングで dlqName
を提供することにより、オプションで DLT 名を提供できます。それ以外の場合は、この場合はデフォルトで error.input-topic.my-group
になります。
上記のコンシューマーの例では、ペイロードの型は byte[]
であることに注意してください。デフォルトでは、Kafka バインダーの DLQ プロデューサーは型 byte[]
のペイロードを想定しています。そうでない場合は、適切なシリアライザーの構成を提供する必要があります。例: コンシューマー関数を次のように書き直してみましょう。
@Bean
public Consumer<String> processData() {
return s -> {
throw new RuntimeException();
};
}
次に、Spring Cloud Stream に、DLT に書き込むときにデータをどのように直列化するかを指示する必要があります。このシナリオの変更された構成は次のとおりです。
spring.cloud.stream:
bindings:
processData-in-0:
group: my-group
destination: input-topic
kafka:
bindings:
processData-in-0:
consumer:
enableDlq: true
dlqName: input-topic-dlq
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
高度な再試行オプションを備えた DLQ
ソリューション
上記のレシピに従った場合、処理でエラーが発生したときに、Kafka バインダーに組み込まれているデフォルトの再試行オプションを取得します。
デフォルトでは、バインダーは 1 秒の初期遅延で最大 3 回試行され、2.0 乗数はそれぞれ最大遅延 10 秒でバックオフします。これらの構成はすべて、次のように変更できます。
spring.cloud.stream.bindings.processData-in-0.consumer.maxAtttempts
spring.cloud.stream.bindings.processData-in-0.consumer.backOffInitialInterval
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMultipler
spring.cloud.stream.bindings.processData-in-0.consumer.backOffMaxInterval
必要に応じて、ブール値のマップを提供することにより、再試行可能な例外のリストを提供することもできます。元:
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalStateException=true
spring.cloud.stream.bindings.processData-in-0.consumer.retryableExceptions.java.lang.IllegalArgumentException=false
デフォルトでは、上記のマップにリストされていない例外は再試行されます。それが望ましくない場合は、次のように指定して無効にすることができます。
spring.cloud.stream.bindings.processData-in-0.consumer.defaultRetryable=false
独自の RetryTemplate
を提供し、バインダーによってスキャンされて使用される @StreamRetryTemplate
としてマークすることもできます。これは、より高度な再試行戦略とポリシーが必要な場合に役立ちます。
複数の @StreamRetryTemplate
Bean がある場合は、プロパティを使用して、バインディングで必要な Bean を指定できます。
spring.cloud.stream.bindings.processData-in-0.consumer.retry-template-name=<your-retry-template-bean-name>
DLQ を使用した逆直列化エラーの処理
問題文
Kafka コンシューマーでデシリアライゼーション例外が発生するプロセッサーがあります。Spring Cloud Stream DLQ メカニズムがそのシナリオを捉えることを期待していますが、そうではありません。どうすればこれを処理できますか ?
ソリューション
Spring Cloud Stream によって提供される通常の DLQ メカニズムは、Kafka コンシューマーが回復不能な逆直列化例外をスローした場合には役に立ちません。これは、コンシューマーの poll()
メソッドが戻る前であっても、この例外が発生するためです。Spring for Apache Kafka プロジェクトは、この状況でバインダーを支援するいくつかの優れた方法を提供します。調べてみましょう。
これが私たちの機能であると仮定します:
@Bean
public Consumer<String> functionName() {
return s -> {
System.out.println(s);
};
}
これは、String
パラメーターを受け取る簡単な関数です。
Spring Cloud Stream が提供するメッセージコンバーターをバイパスし、代わりにネイティブデシリアライザーを使用したいと考えています。String
型の場合はあまり意味がありませんが、AVRO などのより複雑な型の場合は、外部デシリアライザーに依存する必要があるため、変換を Kafka に委譲する必要があります。
コンシューマーがデータを受信したときに、デシリアライゼーションエラーを引き起こす不良レコードがあると仮定しましょう。たとえば、誰かが String
ではなく Integer
を渡した可能性があります。その場合、アプリケーションで何もしないと、チェーン を介して例外が伝播され、アプリケーションは最終的に終了します。
これを処理するために、DefaultErrorHandler
を構成する ListenerContainerCustomizer
@Bean
を追加できます。この DefaultErrorHandler
は DeadLetterPublishingRecoverer
で構成されています。また、コンシューマー用に ErrorHandlingDeserializer
を構成する必要があります。それは多くの複雑なことのように聞こえますが、実際には、この場合、これらの 3 つの Bean に要約されます。
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> {
container.setCommonErrorHandler(errorHandler);
};
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer);
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
return new DeadLetterPublishingRecoverer(bytesTemplate);
}
それぞれを分析してみましょう。最初のものは、DefaultErrorHandler
を取る ListenerContainerCustomizer
Bean です。これで、コンテナーはその特定のエラーハンドラーでカスタマイズされます。コンテナーのカスタマイズについて詳しくは、こちらを参照してください。
2 番目の Bean は、DLT
への公開で構成された DefaultErrorHandler
です。DefaultErrorHandler
の詳細については、こちらを参照してください。
3 番目の Bean は、DLT
への送信を最終的に担当する DeadLetterPublishingRecoverer
です。デフォルトでは、DLT
トピックの名前は ORIGINAL_TOPIC_NAME.DLT です。ただし、変更できます。詳細については、ドキュメントを参照してください。
また、アプリケーション構成を介して ErrorHandlingDeserializer を構成する必要があります。
ErrorHandlingDeserializer
は実際のデシリアライザーに委譲します。エラーが発生した場合は、レコードのキー / 値を null に設定し、メッセージの生のバイトを含めます。次に、ヘッダーに例外を設定し、このレコードをリスナーに渡します。リスナーは、登録されたエラーハンドラーを呼び出します。
必要な構成は次のとおりです。
spring.cloud.stream:
function:
definition: functionName
bindings:
functionName-in-0:
group: group-name
destination: input-topic
consumer:
use-native-decoding: true
kafka:
bindings:
functionName-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
バインディングの configuration
プロパティを介して ErrorHandlingDeserializer
を提供しています。また、委譲する実際のデシリアライザーが StringDeserializer
であることも示しています。
上記の dlq プロパティは、このレシピの説明には関係がないことに注意してください。これらは、純粋にアプリケーションレベルのエラーに対処することのみを目的としています。
Kafka バインダーの基本的なオフセット管理
ソリューション
これに関するドキュメントセクションを読んで、完全に理解することをお勧めします。
これが gist です。
Kafka は、デフォルトで 2 種類のオフセット(earliest
と latest
)をサポートしています。それらのセマンティクスは、それらの名前から自明です。
初めてコンシューマーを実行していると仮定します。Spring Cloud Stream アプリケーションで group.id を見逃すと、匿名のコンシューマーになります。匿名のコンシューマーがある場合は常に、Spring Cloud Stream アプリケーションはデフォルトで、トピックパーティション内の latest
の使用可能なオフセットから開始されます。一方、group.id を明示的に指定すると、デフォルトで、Spring Cloud Stream アプリケーションは、トピックパーティションで使用可能な earliest
オフセットから開始されます。
上記の両方の場合(明示的なグループと匿名グループを持つコンシューマー)、開始オフセットは、プロパティ spring.cloud.stream.kafka.bindings.<binding-name>.consumer.startOffset
を使用し、earliest
または latest
のいずれかに設定することで切り替えることができます。
ここで、以前にコンシューマーを実行し、再度開始したと仮定します。この場合、コンシューマーがコンシューマーグループに対してすでにコミットされたオフセットを見つけるため、上記の場合の開始オフセットセマンティクスは適用されません(匿名コンシューマーの場合、アプリケーションは group.id を提供しませんが、バインダーは自動生成)。最後にコミットされたオフセット以降をピックアップするだけです。これは、startOffset
値が提供されている場合でも当てはまります。
ただし、resetOffsets
プロパティを使用して、コンシューマーが最後にコミットされたオフセットから開始するデフォルトの動作をオーバーライドできます。これを行うには、プロパティ spring.cloud.stream.kafka.bindings.<binding-name>.consumer.resetOffsets
を true
(デフォルトでは false
)に設定します。次に、startOffset
値(earliest
または latest
のいずれか)を指定していることを確認してください。これを実行してからコンシューマーアプリケーションを起動すると、起動するたびに、これが初めて起動するかのように起動し、パーティションのコミットされたオフセットを無視します。
Kafka で任意のオフセットを求めています
問題文
Kafka バインダーを使用すると、オフセットを earliest
または latest
のいずれかに設定できることがわかりますが、オフセットを途中の任意のオフセットに求める必要があります。Spring Cloud Stream Kafka バインダーを使用してこれを達成する方法はありますか ?
ソリューション
以前、Kafka バインダーを使用して基本的なオフセット管理に取り組む方法を見てきました。デフォルトでは、バインダーは、少なくともそのレシピで見たメカニズムを通じて、任意のオフセットに巻き戻すことを許可しません。ただし、このユースケースを実現するためにバインダーが提供する低レベルの戦略がいくつかあります。調べてみましょう。
まず、earliest
または latest
以外の任意のオフセットにリセットする場合は、resetOffsets
構成をデフォルトの false
のままにしてください。次に、型 KafkaBindingRebalanceListener
のカスタム Bean を提供する必要があります。これは、すべてのコンシューマーバインディングに注入されます。これはいくつかのデフォルトのメソッドが付属しているインターフェースですが、ここに興味を持っているメソッドがあります:
/**
* Invoked when partitions are initially assigned or after a rebalance. Applications
* might only want to perform seek operations on an initial assignment. While the
* 'initial' argument is true for each thread (when concurrency is greater than 1),
* implementations should keep track of exactly which partitions have been sought.
* There is a race in that a rebalance could occur during startup and so a topic/
* partition that has been sought on one thread may be re-assigned to another
* thread and you may not wish to re-seek it at that time.
* @param bindingName the name of the binding.
* @param consumer the consumer.
* @param partitions the partitions.
* @param initial true if this is the initial assignment on the current thread.
*/
default void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer,
Collection<TopicPartition> partitions, boolean initial) {
// do nothing
}
詳細を見てみましょう。
基本的に、このメソッドは、トピックパーティションの最初の割り当て中、またはリバランス後に毎回呼び出されます。説明をわかりやすくするために、トピックが foo
であり、4 つのパーティションがあると仮定します。最初は、グループ内の 1 つのコンシューマーのみを開始し、このコンシューマーはすべてのパーティションから消費します。コンシューマーが初めて起動すると、4 つのパーティションすべてが最初に割り当てられます。ただし、デフォルトで消費するパーティションを開始するのではなく(グループを定義しているため、earliest
)、パーティションごとに、任意のオフセットをシークした後に消費するようにします。以下のように、特定のオフセットから消費するビジネスケースがあると想像してください。
Partition start offset
0 1000
1 2000
2 2000
3 1000
これは、上記の方法を以下のように実装することで実現できます。
@Override
public void onPartitionsAssigned(String bindingName, Consumer<?, ?> consumer, Collection<TopicPartition> partitions, boolean initial) {
Map<TopicPartition, Long> topicPartitionOffset = new HashMap<>();
topicPartitionOffset.put(new TopicPartition("foo", 0), 1000L);
topicPartitionOffset.put(new TopicPartition("foo", 1), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 2), 2000L);
topicPartitionOffset.put(new TopicPartition("foo", 3), 1000L);
if (initial) {
partitions.forEach(tp -> {
if (topicPartitionOffset.containsKey(tp)) {
final Long offset = topicPartitionOffset.get(tp);
try {
consumer.seek(tp, offset);
}
catch (Exception e) {
// Handle exceptions carefully.
}
}
});
}
}
これは単なる基本的な実装です。実際のユースケースはこれよりもはるかに複雑であり、それに応じて調整する必要がありますが、これは確かに基本的なスケッチを提供します。コンシューマー seek
が失敗すると、いくつかのランタイム例外がスローされる可能性があり、その場合の対処方法を決定する必要があります。
[[ 同じグループ ID で 2 番目のコンシューマーを開始する場合はどうすればよいですか ? ]] === 同じグループ ID で 2 番目のコンシューマーを開始したらどうなるでしょうか ?
2 番目のコンシューマーを追加すると、リバランスが発生し、一部のパーティションが移動します。新しいコンシューマーがパーティション 2
および 3
を取得するとします。この新しい Spring Cloud Stream コンシューマーがこの onPartitionsAssigned
メソッドを呼び出すと、これがこのコンシューマーのパーティション 2
および 3
の初期割り当てであることがわかります。initial
引数の条件付きチェックにより、シーク操作が実行されます。最初のコンシューマーの場合、パーティション 0
と 1
しかありません。ただし、このコンシューマーの場合、これは単なるリバランスイベントであり、最初の割り当てとは見なされませんでした。initial
引数の条件付きチェックのため、指定されたオフセットに再シークすることはありません。
[[how-do-i-manually-acknowledge-using-kafka-binder?]] == Kafka バインダーの使用を手動で確認するにはどうすればよいですか ?
ソリューション
デフォルトでは、Kafka バインダーは Spring for Apache Kafka プロジェクトのデフォルトのコミット設定に委譲します。Spring Kafka のデフォルトの ackMode
は batch
です。詳細については、こちらを参照してください。
このデフォルトのコミット動作を無効にし、手動コミットに依存したい場合があります。次の手順でそれを行うことができます。
プロパティ spring.cloud.stream.kafka.bindings.<binding-name>.consumer.ackMode
を MANUAL
または MANUAL_IMMEDIATE
のいずれかに設定します。このように設定すると、コンシューマーメソッドが受信するメッセージに kafka_acknowledgment
(KafkaHeaders.ACKNOWLEDGMENT
から)というヘッダーが存在します。
例: これをコンシューマー向けの方法として想像してください。
@Bean
public Consumer<Message<String>> myConsumer() {
return msg -> {
Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
if (acknowledgment != null) {
System.out.println("Acknowledgment provided");
acknowledgment.acknowledge();
}
};
}
次に、プロパティ spring.cloud.stream.kafka.bindings.myConsumer-in-0.consumer.ackMode
を MANUAL
または MANUAL_IMMEDIATE
に設定します。
[[how-do-i-override-the-default-binding-names-in-spring-cloud-stream?]] == Spring Cloud Stream のデフォルトのバインディング名をオーバーライドするにはどうすればよいですか ?
ソリューション
以下が関数のシグネチャーであると想定します。
@Bean
public Function<String, String> uppercase(){
...
}
デフォルトでは、Spring Cloud Stream は以下のようにバインディングを作成します。
大文字の 0
大文字 -out-0
次のプロパティを使用して、これらのバインディングを何かにオーバーライドできます。
spring.cloud.stream.function.bindings.uppercase-in-0=my-transformer-in
spring.cloud.stream.function.bindings.uppercase-out-0=my-transformer-out
この後、すべてのバインディングプロパティは、新しい名前 my-transformer-in
および my-transformer-out
で作成する必要があります。
これは、Kafka ストリームと複数の入力を使用した別の例です。
@Bean
public BiFunction<KStream<String, Order>, KTable<String, Account>, KStream<String, EnrichedOrder>> processOrder() {
...
}
デフォルトでは、Spring Cloud Stream はこの関数に対して 3 つの異なるバインディング名を作成します。
processOrder-in-0
processOrder-in-1
processOrder-out-0
これらのバインディングにいくつかの構成を設定するたびに、これらのバインディング名を使用する必要があります。それが好きではなく、たとえば、のような、よりドメインフレンドリーで読みやすいバインディング名を使用したいと考えています。
オーダー
アカウント
enrichedOrders
これらの 3 つのプロパティを設定するだけで、簡単にそれを行うことができます
spring.cloud.stream.function.bindings.processOrder-in-0= オーダー
spring.cloud.stream.function.bindings.processOrder-in-1= アカウント
spring.cloud.stream.function.bindings.processOrder-out-0=enrichedOrders
これを行うと、デフォルトのバインディング名が上書きされ、それらに設定するプロパティは、これらの新しいバインディング名にある必要があります。
[[how-do-i-send-a-message-key-as-part-of-my-record?]] == メッセージキーをレコードの一部として送信するにはどうすればよいですか ?
ソリューション
多くの場合、キーと値を含むレコードとして、マップのような連想データ構造を送信する必要があります。Spring Cloud Stream を使用すると、簡単な方法でそれを行うことができます。以下は、これを行うための基本的な青写真ですが、特定のユースケースに適合させることをお勧めします。
これがサンプルプロデューサーメソッド(別名 Supplier
)です。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
これは、String
ペイロードだけでなく、キーも使用してメッセージを送信する簡単な関数です。KafkaHeaders.MESSAGE_KEY
を使用してキーをメッセージヘッダーとして設定していることに注意してください。
キーをデフォルトの kafka_messageKey
から変更する場合は、構成で次のプロパティを指定する必要があります。
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.messageKeyExpression=headers['my-special-key']
バインディング名 supplier-out-0
を使用していることに注意してください。これは関数名であるため、それに応じて更新してください。
次に、メッセージを生成するときにこの新しいキーを使用します。
[[how-do-i-use-native-serializer-and-deserializer-instead-of-message-conversion-done-by-spring-cloud-stream?]] == の代わりにネイティブシリアライザーとデシリアライザーを使用するにはどうすればよいですか ? メッセージ変換は Spring Cloud Stream によって行われましたか ?
問題文
Spring Cloud Stream でメッセージコンバーターを使用する代わりに、Kafka でネイティブシリアライザーとデシリアライザーを使用したいと思います。デフォルトでは、Spring Cloud Stream は、内部の組み込みメッセージコンバーターを使用してこの変換を処理します。これをバイパスして、Kafka に責任を委譲するにはどうすればよいですか?
ソリューション
これは本当に簡単です。
ネイティブ直列化を有効にするには、次のプロパティを指定するだけです。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.useNativeEncoding: true
次に、シリアライザーも設定する必要があります。これを行うにはいくつかの方法があります。
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
またはバインダー構成を使用します。
spring.cloud.stream.kafka.binder.configuration.key.serializer: org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.binder.configuration.value.serializer: org.apache.kafka.common.serialization.StringSerializer
バインダー方式を使用する場合、すべてのバインディングに対して適用されますが、バインディングでの設定はバインディングごとに行われます。
デシリアライズ側では、デシリアライザーを構成として提供する必要があります。
以下に例を示します。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.configuration.key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.cloud.stream.kafka.bindings.<binding-name>.producer.configuration.value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
バインダーレベルで設定することもできます。
ネイティブデコードを強制するために設定できるオプションのプロパティがあります。
spring.cloud.stream.kafka.bindings.<binding-name>.consumer.useNativeDecoding: true
ただし、Kafka バインダーの場合、バインダーに到達するまでに、Kafka は構成済みのデシリアライザーを使用してすでにデシリアライズしているため、これは不要です。
Kafka Streams バインダーでオフセットリセットがどのように機能するかを説明します
問題文
デフォルトでは、Kafka Streams バインダーは、常に新しいコンシューマーの最も早いオフセットから開始します。場合によっては、最新のオフセットから開始することが有益であるか、アプリケーションによって要求されます。Kafka Streams バインダーを使用するとそれが可能になります。
ソリューション
ソリューションを検討する前に、次のシナリオを見てみましょう。
@Bean
public BiConsumer<KStream<Object, Object>, KTable<Object, Object>> myBiConsumer{
(s, t) -> s.join(t, ...)
...
}
2 つの入力バインディングを必要とする BiConsumer
Bean があります。この場合、最初のバインディングは KStream
用で、2 番目のバインディングは KTable
用です。このアプリケーションを初めて実行するとき、デフォルトでは、両方のバインディングは earliest
オフセットから始まります。いくつかの要件のために latest
オフセットから始めたいのはどうですか? これを行うには、次のプロパティを有効にします。
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-0.consumer.startOffset: latest
spring.cloud.stream.kafka.streams.bindings.myBiConsumer-in-1.consumer.startOffset: latest
1 つのバインディングのみを latest
オフセットから開始し、もう 1 つをデフォルトの earliest
からコンシューマーに開始する場合は、後者のバインディングを構成から除外します。
コミットオフセットが存在したら、これらの設定は光栄とコミットオフセットが優先されていない、ということを覚えておいてください。
Kafka でのレコードの送信(生成)の成功を追跡する
ソリューション
アプリケーションに次のサプライヤーがあると仮定します。
@Bean
public Supplier<Message<String>> supplier() {
return () -> MessageBuilder.withPayload("foo").setHeader(KafkaHeaders.MESSAGE_KEY, "my-foo").build();
}
次に、成功したすべての送信情報をキャプチャーするために、新しい MessageChannel
Bean を定義する必要があります。
@Bean
public MessageChannel fooRecordChannel() {
return new DirectChannel();
}
次に、アプリケーション構成でこのプロパティを定義して、recordMetadataChannel
の Bean 名を指定します。
spring.cloud.stream.kafka.bindings.supplier-out-0.producer.recordMetadataChannel: fooRecordChannel
この時点で、正常に送信された情報が fooRecordChannel
に送信されます。
以下のように IntegrationFlow
を記述して、情報を確認できます。
@Bean
public IntegrationFlow integrationFlow() {
return f -> f.channel("fooRecordChannel")
.handle((payload, messageHeaders) -> payload);
}
handle
メソッドでは、ペイロードは Kafka に送信されたものであり、メッセージヘッダーには kafka_recordMetadata
と呼ばれる特別なキーが含まれています。その値は、トピックパーティション、現在のオフセットなどに関する情報を含む RecordMetadata
です。
Kafka にカスタムヘッダーマッパーを追加する
ソリューション
通常の状況では、これで問題ありません。
次のプロデューサーがいると想像してみてください。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("foo", "bar").build();
}
コンシューマー側では、ヘッダー "foo" が表示されるはずですが、以下では課題は発生しません。
@Bean
public Consumer<Message<String>> consume() {
return s -> {
final String foo = (String)s.getHeaders().get("foo");
System.out.println(foo);
};
}
アプリケーションでカスタムヘッダーマッパーを提供する場合、これは機能しません。アプリケーションに空の KafkaHeaderMapper
があるとしましょう。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
}
};
}
それが実装である場合、コンシューマーの foo
ヘッダーを見逃すことになります。おそらく、これらの KafkaHeaderMapper
メソッド内にいくつかのロジックがある可能性があります。foo
ヘッダーにデータを入力するには、次のものが必要です。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String foo = (String) headers.get("foo");
target.add("foo", foo.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header foo = source.lastHeader("foo");
target.put("foo", new String(foo.value()));
}
}
これにより、プロデューサーからコンシューマーに foo
ヘッダーが適切に入力されます。
id ヘッダーに関する特記事項
Spring Cloud Stream では、id
ヘッダーは特別なヘッダーですが、一部のアプリケーションでは、custom-id
、ID
、Id
などの特別なカスタム ID ヘッダーが必要になる場合があります。最初のもの(custom-id
)は、カスタムヘッダーマッパーなしでプロデューサーからコンシューマーに伝播します。ただし、フレームワークで予約されている id
ヘッダーのバリアント(ID
、Id
、iD
など)を使用して作成すると、フレームワークの内部で問題が発生します。このユースケースの詳細については、この StackOverflow スレッド (英語) を参照してください。その場合、大文字と小文字を区別する ID ヘッダーをマップするためにカスタム KafkaHeaderMapper
を使用する必要があります。例: 次のプロデューサーがいるとします。
@Bean
public Supplier<Message<String>> supply() {
return () -> MessageBuilder.withPayload("foo").setHeader("Id", "my-id").build();
}
上記のヘッダー Id
は、フレームワーク id
ヘッダーと衝突するため、消費側から削除されます。この課題を解決するために、カスタム KafkaHeaderMapper
を提供できます。
@Bean
public KafkaHeaderMapper kafkaBinderHeaderMapper1() {
return new KafkaHeaderMapper() {
@Override
public void fromHeaders(MessageHeaders headers, Headers target) {
final String myId = (String) headers.get("Id");
target.add("Id", myId.getBytes());
}
@Override
public void toHeaders(Headers source, Map<String, Object> target) {
final Header Id = source.lastHeader("Id");
target.put("Id", new String(Id.value()));
}
};
}
これにより、id
ヘッダーと Id
ヘッダーの両方がプロデューサー側からコンシューマー側で使用できるようになります。
トランザクションで複数のトピックを作成する
ソリューション
トランザクションに Kafka バインダーのトランザクションサポートを使用してから、AfterRollbackProcessor
を提供します。複数のトピックを作成するには、StreamBridge
API を使用します。
以下は、このためのコードスニペットです。
@Autowired
StreamBridge bridge;
@Bean
Consumer<String> input() {
return str -> {
System.out.println(str);
this.bridge.send("left", str.toUpperCase());
this.bridge.send("right", str.toLowerCase());
if (str.equals("Fail")) {
throw new RuntimeException("test");
}
};
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(BinderFactory binders) {
return (container, dest, group) -> {
ProducerFactory<byte[], byte[]> pf = ((KafkaMessageChannelBinder) binders.getBinder(null,
MessageChannel.class)).getTransactionalProducerFactory();
KafkaTemplate<byte[], byte[]> template = new KafkaTemplate<>(pf);
DefaultAfterRollbackProcessor rollbackProcessor = rollbackProcessor(template);
container.setAfterRollbackProcessor(rollbackProcessor);
};
}
DefaultAfterRollbackProcessor rollbackProcessor(KafkaTemplate<byte[], byte[]> template) {
return new DefaultAfterRollbackProcessor<>(
new DeadLetterPublishingRecoverer(template), new FixedBackOff(2000L, 2L), template, true);
}
必要な構成
spring.cloud.stream.kafka.binder.transaction.transaction-id-prefix: tx-
spring.cloud.stream.kafka.binder.required-acks=all
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.destination=input
spring.cloud.stream.bindings.left.destination=left
spring.cloud.stream.bindings.right.destination=right
spring.cloud.stream.kafka.bindings.input-in-0.consumer.maxAttempts=1
テストするには、次を使用できます。
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
System.in.read();
template.send("input", "Fail".getBytes());
template.send("input", "Good".getBytes());
};
}
いくつかの重要な注意事項:
DLT を手動で構成するため、アプリケーション構成に DLQ 設定がないことを確認してください(デフォルトでは、初期のコンシューマー機能に基づいて input.DLT
という名前のトピックに公開されます)。また、バインダーによる再試行を回避するために、1
へのコンシューマーバインディングで maxAttempts
をリセットします。上記の例では、最大で合計 3 回試行されます(最初の試行 + FixedBackoff
での 2 回の試行)。
このコードをテストする方法の詳細については、StackOverflow スレッド (英語) を参照してください。Spring Cloud Stream を使用して、コンシューマー関数を追加してテストする場合は、コンシューマーバインディングの isolation-level
を read-committed
に設定してください。
この StackOverflow スレッド (英語) もこの議論に関連しています。
複数のポーリング可能なコンシューマーを実行するときに避けるべき落とし穴
ソリューション
次の定義を持っていると仮定します:
spring.cloud.stream.pollable-source: foo
spring.cloud.stream.bindings.foo-in-0.group: my-group
アプリケーションを実行すると、Kafka コンシューマーは client.id(consumer-my-group-1
など)を生成します。実行中のアプリケーションのインスタンスごとに、この client.id
は同じになり、予期しない問題が発生します。
これを修正するために、アプリケーションの各インスタンスに次のプロパティを追加できます。
spring.cloud.stream.kafka.bindings.foo-in-0.consumer.configuration.client.id=${client.id}
詳細については、この GitHub の課題 (英語) を参照してください。