NULL ペイロードと "Tombstone" レコードのログ圧縮

ログ圧縮 [Apache] (英語) を使用すると、キーの削除を識別するために null ペイロードを含むメッセージを送受信できます。

値をデシリアライズできない場合に null を返す可能性のある Deserializer など、他の理由で null 値を受け取ることもできます。

KafkaTemplate を使用して null ペイロードを送信するには、send() メソッドの value 引数に null を渡すことができます。これに対する 1 つの例外は、send(Message<?> message) バリアントです。spring-messagingMessage<?> は null ペイロードを持つことができないため、KafkaNull と呼ばれる特別なペイロード型を使用でき、フレームワークは null を送信します。便宜上、静的 KafkaNull.INSTANCE が提供されます。

メッセージリスナーコンテナーを使用する場合、受信した ConsumerRecord には nullvalue() があります。

null ペイロードを処理するように @KafkaListener を構成するには、required = false とともに @Payload アノテーションを使用する必要があります。圧縮されたログの tombstone メッセージの場合、アプリケーションがどのキーが "deleted" であったかを判断できるように、通常はキーも必要です。次の例は、そのような構成を示しています。

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
    // value == null represents key deletion
}

複数の @KafkaHandler メソッドでクラスレベルの @KafkaListener を使用する場合は、追加の構成が必要です。具体的には、KafkaNull ペイロードを持つ @KafkaHandler メソッドが必要です。次の例は、1 つの設定方法を示しています。

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

    @KafkaHandler
    public void listen(String cat) {
        ...
    }

    @KafkaHandler
    public void listen(Integer hat) {
        ...
    }

    @KafkaHandler
    public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
        ...
    }

}

引数は KafkaNull ではなく null であることに注意してください。

この機能では、デフォルトの MessageHandlerMethodFactory を使用するときにフレームワークが構成する KafkaNullAwarePayloadArgumentResolver を使用する必要があります。カスタム MessageHandlerMethodFactory を使用する場合は、カスタム HandlerMethodArgumentResolver を @KafkaListener に追加するを参照してください。