NULL ペイロードと "Tombstone" レコードのログ圧縮

ログ圧縮を使用すると、null ペイロードを含むメッセージを送受信して、キーの削除を識別できます。また、値をデシリアライズできないときに null を返すデシリアライザーなど、他の理由で null 値を受け取ることもあります。

Null ペイロードの生成

PulsarTemplate を使用して null ペイロードを送信するには、fluent API を使用し、newMessage() メソッドの value 引数に null を渡すことができます。例:

pulsarTemplate
        .newMessage(null)
        .withTopic("my-topic")
        .withSchema(Schema.STRING)
        .withMessageCustomizer((mb) -> mb.key("key:1234"))
        .send();
NULL 値を送信する場合、システムは null ペイロードからメッセージの型を判断できないため、スキーマ型を指定する必要があります。

Null ペイロードの消費

@PulsarListener および @PulsarReader の場合、null ペイロードは、次のようにメッセージパラメーターの型に基づいてリスナーメソッドに渡されます。

パラメーター型 渡された値

primitive

null

user-defined

null

org.apache.pulsar.client.api.Message<T>

getValue() が null を返す、null 以外の Pulsar メッセージ

org.springframework.messaging.Message<T>

getPayload() が PulsarNull を返す、null 以外の Spring メッセージ

List<X>

エントリ (X) が上記の型のいずれかであり、それに応じて動作する非 null リスト (つまり。プリミティブエントリは null などです。)

org.apache.pulsar.client.api.Messages<T>

getValue() が null を返す非 null Pulsar メッセージの null 以外のコンテナー

渡された値が null (つまり、プリミティブ型またはユーザー定義型の単一レコードリスナー) の場合は、required = false とともに @Payload パラメーターアノテーションを使用する必要があります。
リスナーペイロード型に Spring org.springframework.messaging.Message を使用する場合、そのジェネリクス型情報は Message<PulsarNull> (例: MessageMessage<?>、または Message<Object>) を受け入れるのに十分な幅を持つ必要があります。これは、Spring メッセージがペイロードに null 値を許可せず、代わりに PulsarNull プレースホルダーを使用するという事実によるものです。

圧縮されたログの tombstone メッセージの場合、アプリケーションがどのキーが "deleted" であったかを判断できるように、通常はキーも必要です。次の例は、そのような構成を示しています。

@PulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
void myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}
@PulsarReader はまだ @Header 引数をサポートしていないため、ログ圧縮シナリオではあまり役に立ちません。