NULL ペイロードと "Tombstone" レコードのログ圧縮
ログ圧縮 [Apache] (英語) を使用すると、キーの削除を識別するために null
ペイロードを含むメッセージを送受信できます。
値をデシリアライズできない場合に null
を返す可能性のある Deserializer
など、他の理由で null
値を受け取ることもできます。
KafkaTemplate
を使用して null
ペイロードを送信するには、send()
メソッドの value 引数に null を渡すことができます。これに対する 1 つの例外は、send(Message<?> message)
バリアントです。spring-messaging
Message<?>
は null
ペイロードを持つことができないため、KafkaNull
と呼ばれる特別なペイロード型を使用でき、フレームワークは null
を送信します。便宜上、静的 KafkaNull.INSTANCE
が提供されます。
メッセージリスナーコンテナーを使用する場合、受信した ConsumerRecord
には null
value()
があります。
null
ペイロードを処理するように @KafkaListener
を構成するには、required = false
とともに @Payload
アノテーションを使用する必要があります。圧縮されたログの tombstone メッセージの場合、アプリケーションがどのキーが "deleted
" であったかを判断できるように、通常はキーも必要です。次の例は、そのような構成を示しています。
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
複数の @KafkaHandler
メソッドでクラスレベルの @KafkaListener
を使用する場合は、追加の構成が必要です。具体的には、KafkaNull
ペイロードを持つ @KafkaHandler
メソッドが必要です。次の例は、1 つの設定方法を示しています。
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
引数は KafkaNull
ではなく null
であることに注意してください。
すべてのパーティションを手動で割り当てるを参照してください。 |
この機能では、デフォルトの MessageHandlerMethodFactory を使用するときにフレームワークが構成する KafkaNullAwarePayloadArgumentResolver を使用する必要があります。カスタム MessageHandlerMethodFactory を使用する場合は、カスタム HandlerMethodArgumentResolver を @KafkaListener に追加するを参照してください。 |