メッセージの生産

1. ReactivePulsarTemplate

Pulsar プロデューサー側では、Spring Boot 自動構成によりレコードを公開するための ReactivePulsarTemplate が提供されます。このテンプレートは、ReactivePulsarOperations と呼ばれるインターフェースを実装し、その契約を通じてレコードを公開するメソッドを提供します。

このテンプレートは、単一のメッセージを受け入れて Mono<MessageId> を返す送信メソッドを提供します。また、複数のメッセージ (ReactiveStreams Publisher 型の形式) を受け入れ、Flux<MessageId> を返す送信メソッドも提供します。

トピックパラメーターを含まない API バリアントの場合、トピック解決プロセスを使用して宛先トピックが決定されます。

1.1. Fluent API

このテンプレートは、より複雑な送信リクエストを処理するための流れるようなビルダー (Javadoc) を提供します。

1.2. メッセージのカスタマイズ

MessageSpecBuilderCustomizer を指定して、送信メッセージを構成できます。例: 次のコードは、キー付きメッセージを送信する方法を示しています。

template.newMessage(msg)
    .withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
    .send();

1.3. 送信者のカスタマイズ

ReactiveMessageSenderBuilderCustomizer を指定して、発信メッセージの送信に使用される送信者を最終的に構築する基礎となる Pulsar 送信者ビルダーを構成できます。

これにより、送信側ビルダーへの完全なアクセスが許可され、そのメソッドの一部 ( create など) を呼び出すと、意図しない副作用が発生する可能性があるため、使用には注意してください。

例: 次のコードは、バッチ処理を無効にしてチャンク処理を有効にする方法を示しています。

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
    .send();

この別の例では、パーティション化されたトピックにレコードをパブリッシュするときにカスタムルーティングを使用する方法を示します。送信側ビルダーでカスタム MessageRouter 実装を次のように指定します。

template.newMessage(msg)
    .withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
    .send();
MessageRouter を使用する場合、spring.pulsar.producer.message-routing-mode の有効な設定は custom のみであることに注意してください。

2. スキーマ情報の指定

Java プリミティブ型を使用する場合、フレームワークによってスキーマが自動検出されるため、データを公開するためにスキーマ型を指定する必要はありません。非プリミティブ型の場合、ReactivePulsarTemplate で送信操作を呼び出すときにスキーマが明示的に指定されていない場合、Spring for Apache Pulsar フレームワークはその型から Schema.JSON を構築しようとします。

現在サポートされている複雑なスキーマ型は、INLINE エンコードを使用した JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES、KEY_VALUE です。

2.1. カスタムスキーママッピング

ReactivePulsarTemplate で複合型の送信操作を呼び出すときにスキーマを指定する代わりに、型のマッピングを使用してスキーマリゾルバーを構成できます。これにより、フレームワークが送信メッセージ型を使用してリゾルバーに問い合わせるため、スキーマを指定する必要がなくなります。

2.1.1. 構成プロパティ

スキーママッピングは、spring.pulsar.defaults.type-mappings プロパティを使用して構成できます。次の例では、application.yml を使用して、それぞれ AVRO スキーマと JSON スキーマを使用する User および Address 複合オブジェクトのマッピングを追加します。

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
message-type は、メッセージクラスの完全修飾名です。

2.1.2. スキーマリゾルバーカスタマイザー

マッピングを追加する推奨される方法は、上記のプロパティを使用することです。ただし、より詳細な制御が必要な場合は、スキーマリゾルバーカスタマイザーを提供してマッピングを追加できます。

次の例では、スキーマリゾルバーカスタマイザーを使用して、AVRO スキーマと JSON スキーマをそれぞれ使用する User および Address 複合オブジェクトのマッピングを追加します。

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 型マッピングのアノテーション

特定のメッセージ型に使用するデフォルトのスキーマ情報を指定するためのもう 1 つのオプションは、メッセージクラスに @PulsarMessage アノテーションをマークすることです。スキーマ情報は、アノテーションの schemaType 属性を介して指定できます。

次の例では、Foo 型のメッセージを生成または消費するときに、デフォルトのスキーマとして JSON を使用するようにシステムを構成します。

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

この構成を使用すると、送信操作時にスキーマを設定指定する必要はありません。

2.2. AUTO_SCHEMA を使用した生産

Pulsar トピックのスキーマの種類を事前に知る機会がない場合は、AUTO_PRODUCE [Apache] (英語) スキーマを使用して、生の JSON または Avro ペイロードを byte[] として安全に公開できます。

この場合、プロデューサーは、送信バイトが宛先トピックのスキーマと互換性があるかどうかを検証します。

以下の例に示すように、テンプレートの送信操作で Schema.AUTO_PRODUCE_BYTES() のスキーマを指定するだけです。

void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
	template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
これは、Avro および JSON スキーマ型でのみサポートされます。

3. ReactivePulsarSenderFactory

ReactivePulsarTemplate は ReactivePulsarSenderFactory に依存して、基礎となる送信者を実際に作成します。

Spring Boot は、spring.pulsar.producer.* (英語) アプリケーションプロパティのいずれかを使用して構成できるこの送信者ファクトリを提供します。

センダーファクトリ API を直接使用するときにトピック情報が指定されていない場合は、「メッセージ型のデフォルト」ステップが省略されることを除いて、ReactivePulsarTemplate で使用されるのと同じトピック解決プロセスが使用されます。

3.1. プロデューサーのキャッシュ

基礎となる各 Pulsar プロデューサーはリソースを消費します。パフォーマンスを向上させ、プロデューサーの継続的な作成を回避するために、基礎となる Apache Pulsar Reactive クライアントの ReactiveMessageSenderCache は、作成したプロデューサーをキャッシュします。これらは LRU 方式でキャッシュされ、構成された期間内に使用されなかった場合は削除されます。

spring.pulsar.producer.cache.* (英語) アプリケーションプロパティのいずれかを指定して、キャッシュ設定を構成できます。