NULL ペイロードと "Tombstone" レコードのログ圧縮

ログ圧縮を使用すると、null ペイロードを含むメッセージを送受信して、キーの削除を識別できます。また、値をデシリアライズできないときに null を返すデシリアライザーなど、他の理由で null 値を受け取ることもあります。

Null ペイロードの生成

null メッセージパラメーター値を send メソッドの 1 つに渡すことで、ReactivePulsarTemplate で null 値を送信できます。例:

reactiveTemplate
        .send(null, Schema.STRING)
        .subscribe();
NULL 値を送信する場合、システムは null ペイロードからメッセージの型を判断できないため、スキーマ型を指定する必要があります。

Null ペイロードの消費

@ReactivePularListener の場合、null ペイロードは、次のようにメッセージパラメーターの型に基づいてリスナーメソッドに渡されます。

パラメーター型 渡された値

primitive

null

user-defined

null

org.apache.pulsar.client.api.Message<T>

getValue() が null を返す、null 以外の Pulsar メッセージ

org.springframework.messaging.Message<T>

getPayload() が PulsarNull を返す、null 以外の Spring メッセージ

Flux<org.apache.pulsar.client.api.Message<T>>

エントリが非 null Pulsar メッセージである非 null flux で、getValue() が null を返す

Flux<org.springframework.messaging.Message<T>>

エントリが非 null Spring メッセージである非 null flux で、getPayload() が PulsarNull を返す

渡された値が null (つまり、プリミティブ型またはユーザー定義型の単一レコードリスナー) の場合は、required = false とともに @Payload パラメーターアノテーションを使用する必要があります。
リスナーペイロード型に Spring org.springframework.messaging.Message を使用する場合、そのジェネリクス型情報は Message<PulsarNull> (例: MessageMessage<?>、または Message<Object>) を受け入れるのに十分な幅を持つ必要があります。これは、Spring メッセージがペイロードに null 値を許可せず、代わりに PulsarNull プレースホルダーを使用するという事実によるものです。

圧縮されたログの tombstone メッセージの場合、アプリケーションがどのキーが "deleted" であったかを判断できるように、通常はキーも必要です。次の例は、そのような構成を示しています。

@ReactivePulsarListener(
        topics = "my-topic",
        subscriptionName = "my-topic-sub",
        schemaType = SchemaType.STRING)
Mono<Void> myListener(
        @Payload(required = false) String msg,
        @Header(PulsarHeaders.KEY) String key) {
    ...
}
ストリーミングメッセージリスナー (Flux) を使用する場合、ヘッダーのサポートが制限されるため、ログ圧縮シナリオではあまり役に立ちません。