メッセージ制作

1. Pulsar テンプレート

Pulsar プロデューサー側では、Spring Boot 自動構成によりレコードを公開するための PulsarTemplate が提供されます。このテンプレートは、PulsarOperations と呼ばれるインターフェースを実装し、その契約を通じてレコードを公開するメソッドを提供します。

これらの送信 API メソッドには send と sendAsync という 2 つのカテゴリがあります。send メソッドは、Pulsar プロデューサーの同期送信機能を使用して呼び出しをブロックします。メッセージがブローカー上で永続化されると、発行されたメッセージの MessageId を返します。sendAsync メソッド呼び出しは、ノンブロッキングの非同期呼び出しです。これらは CompletableFuture を返します。これを使用して、メッセージがパブリッシュされた後にメッセージ ID を非同期的に受信できます。

トピックパラメーターを含まない API バリアントの場合、トピック解決プロセスを使用して宛先トピックが決定されます。

1.1. シンプルな API

このテンプレートには、単純な送信リクエスト用のいくつかのメソッド ( 「送信」という接頭辞が付く (Javadoc) ) が用意されています。より複雑な送信リクエストの場合は、流れるような API を使用して、より多くのオプションを構成できます。

1.2. Fluent API

このテンプレートは、より複雑な送信リクエストを処理するための流れるようなビルダー (Javadoc) を提供します。

1.3. メッセージのカスタマイズ

TypedMessageBuilderCustomizer を指定して、送信メッセージを構成できます。例: 次のコードは、キー付きメッセージを送信する方法を示しています。

template.newMessage(msg)
    .withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
    .send();

1.4. プロデューサーのカスタマイズ

ProducerBuilderCustomizer を指定して、発信メッセージの送信に使用されるプロデューサーを最終的に構築する基礎となる Pulsar プロデューサービルダーを構成できます。

これにより、プロデューサービルダーへの完全なアクセスが許可され、そのメソッドの一部 ( create など) を呼び出すと、意図しない副作用が発生する可能性があるため、使用には注意してください。

例: 次のコードは、バッチ処理を無効にしてチャンク処理を有効にする方法を示しています。

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
    .send();

この別の例では、パーティション化されたトピックにレコードをパブリッシュするときにカスタムルーティングを使用する方法を示します。Producer ビルダーでカスタム MessageRouter 実装を次のように指定します。

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
    .send();
MessageRouter を使用する場合、spring.pulsar.producer.message-routing-mode の有効な設定は custom のみであることに注意してください。

この他の例は、プロデューサーが受信したメッセージをブローカーにパブリッシュする前にインターセプトして変更する ProducerInterceptor を追加する方法を示しています。

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.intercept(interceptor))
    .send();

カスタマイザは、送信操作に使用されるプロデューサーにのみ適用されます。すべてのプロデューサーにカスタマイザーを適用する場合は、グローバルプロデューサーのカスタマイズに従って、カスタマイザーをプロデューサーファクトリに提供する必要があります。

Lambda カスタマイザーを使用する場合は、"Lambda カスタマイザーに関する注意" で説明されているルールに従う必要があります。

2. スキーマ情報の指定

Java プリミティブ型を使用する場合、フレームワークによってスキーマが自動検出されるため、データを公開するためにスキーマ型を指定する必要はありません。非プリミティブ型の場合、PulsarTemplate で送信操作を呼び出すときにスキーマが明示的に指定されていない場合、Spring for Apache Pulsar フレームワークはその型から Schema.JSON を構築しようとします。

現在サポートされている複雑なスキーマ型は、INLINE エンコードを使用した JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES、KEY_VALUE です。

2.1. カスタムスキーママッピング

PulsarTemplate で複合型の送信操作を呼び出すときにスキーマを指定する代わりに、型のマッピングを使用してスキーマリゾルバーを構成できます。これにより、フレームワークが送信メッセージ型を使用してリゾルバーに問い合わせるため、スキーマを指定する必要がなくなります。

2.1.1. 構成プロパティ

スキーママッピングは、spring.pulsar.defaults.type-mappings プロパティを使用して構成できます。次の例では、application.yml を使用して、それぞれ AVRO スキーマと JSON スキーマを使用する User および Address 複合オブジェクトのマッピングを追加します。

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
message-type は、メッセージクラスの完全修飾名です。

2.1.2. スキーマリゾルバーカスタマイザー

マッピングを追加する推奨される方法は、上記のプロパティを使用することです。ただし、より詳細な制御が必要な場合は、スキーマリゾルバーカスタマイザーを提供してマッピングを追加できます。

次の例では、スキーマリゾルバーカスタマイザーを使用して、AVRO スキーマと JSON スキーマをそれぞれ使用する User および Address 複合オブジェクトのマッピングを追加します。

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 型マッピングのアノテーション

特定のメッセージ型に使用するデフォルトのスキーマ情報を指定するためのもう 1 つのオプションは、メッセージクラスに @PulsarMessage アノテーションをマークすることです。スキーマ情報は、アノテーションの schemaType 属性を介して指定できます。

次の例では、Foo 型のメッセージを生成または消費するときに、デフォルトのスキーマとして JSON を使用するようにシステムを構成します。

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

この構成を使用すると、送信操作時にスキーマを設定指定する必要はありません。

2.2. AUTO_SCHEMA を使用した制作

Pulsar トピックのスキーマの種類を事前に知る機会がない場合は、AUTO_PRODUCE [Apache] (英語) スキーマを使用して、生の JSON または Avro ペイロードを byte[] として安全に公開できます。

この場合、プロデューサーは、送信バイトが宛先トピックのスキーマと互換性があるかどうかを検証します。

以下の例に示すように、テンプレートの送信操作で Schema.AUTO_PRODUCE_BYTES() のスキーマを指定するだけです。

void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
	template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
これは、Avro および JSON スキーマ型でのみサポートされます。

3. Pulsar プロデューサーファクトリ

PulsarTemplate は PulsarProducerFactory に依存して、基礎となるプロデューサーを実際に作成します。Spring Boot 自動構成では、spring.pulsar.producer.* アプリケーションプロパティのいずれかを指定することでさらに構成できるこのプロデューサーファクトリも提供されます。

プロデューサーファクトリ API を直接使用するときにトピック情報が指定されていない場合は、PulsarTemplate で使用されるのと同じトピック解決プロセスが使用されますが、「メッセージ型のデフォルト」ステップが省略されるという 1 つの例外があります。

3.1. グローバルプロデューサーのカスタマイズ

このフレームワークは、各プロデューサーの構築に使用される基礎となるビルダーを構成できる ProducerBuilderCustomizer 契約を提供します。すべてのプロデューサーをカスタマイズするには、カスタマイザーのリストを PulsarProducerFactory コンストラクターに渡すことができます。複数のカスタマイザーを使用する場合、リストに表示されている順序で適用されます。

Spring Boot 自動構成を使用する場合、カスタマイザーを Bean として指定すると、それらは @Order アノテーションに従って順序付けされて、自動的に PulsarProducerFactory に渡されます。

カスタマイザーを 1 つのプロデューサーにのみ適用する場合は、Fluent API を使用して、送信時にカスタマイザーを指定できます。

4. Pulsar プロデューサーのキャッシュ

基礎となる各 Pulsar プロデューサーはリソースを消費します。パフォーマンスを向上させ、プロデューサーの継続的な作成を回避するために、プロデューサーファクトリは、作成したプロデューサーをキャッシュします。これらは LRU 方式でキャッシュされ、構成された期間内に使用されなかった場合は削除されます。キャッシュキー [GitHub] (英語) は、後続の作成リクエストで呼び出し元に同じプロデューサーが返されることを保証するのに十分な情報で構成されます。

さらに、spring.pulsar.producer.cache.* アプリケーションプロパティのいずれかを指定してキャッシュ設定を構成できます。

4.1. Lambda カスタマイザーに関する注意

ユーザーが提供するプロデューサーカスタマイザーもキャッシュキーに含まれます。キャッシュキーは equals/hashCode の有効な実装に依存しているため、Lambda カスタマイザーを使用する場合は注意が必要です。

RULE : Lambda として実装された 2 つのカスタマイザーは、同じ Lambda インスタンスを使用し、そのクロージャーの外側で定義された変数を必要としない場合に限り、 equals/hashCode で一致します。

上記のルールを明確にするために、いくつかの例を見てみましょう。次の例では、カスタマイザーはインライン Lambda として定義されています。これは、sendUser への各呼び出しが同じ Lambda インスタンスを使用することを意味します。さらに、クロージャの外側に変数は必要ありません。キャッシュキーとして一致します。

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName("user"))
        .send();
}

次のケースでは、カスタマイザーはインライン Lambda として定義されます。これは、sendUser への各呼び出しが同じ Lambda インスタンスを使用することを意味します。ただし、クロージャの外側に変数が必要です。キャッシュキーとしては一致しません。

void sendUser() {
    var user = randomUser();
    var name = randomName();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName(name))
        .send();
}

この最後の例では、カスタマイザーはインライン Lambda として定義されています。これは、sendUser への各呼び出しが同じ Lambda インスタンスを使用することを意味します。変数名は使用されますが、クロージャの外側で発生したものではないため、キャッシュキーとして一致します。これは、変数を Lambda クロージャで使用でき、静的メソッドを呼び出すこともできることを示しています。

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> {
           var name = SomeHelper.someStaticMethod();
           b.producerName(name);
        })
        .send();
}
RULE : Lambda カスタマイザが 1 回だけ定義されていない場合 (同じインスタンスが後続の呼び出しで使用される) OR では、そのクロージャの外側で変数が定義されている必要があるため、カスタマイザ実装に有効な equals/hashCode 実装を提供する必要があります。
これらのルールに従わない場合、プロデューサーキャッシュは常にミスし、アプリケーションのパフォーマンスに悪影響が生じます。

5. プロデューサー上のメッセージをインターセプトする

ProducerInterceptor を追加すると、プロデューサーが受信したメッセージをブローカーにパブリッシュする前にインターセプトして変更できるようになります。これを行うには、インターセプターのリストを PulsarTemplate コンストラクターに渡すことができます。複数のインターセプターを使用する場合、それらが適用される順序は、リストに表示される順序になります。

Spring Boot 自動構成を使用する場合は、インターセプターを Bean として指定できます。これらは自動的に PulsarTemplate に渡されます。インターセプターの順序付けは、次のように @Order アノテーションを使用して実現されます。

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
  ...
}

@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
  ...
}
スターターを使用しない場合は、前述のコンポーネントを自分で設定して登録する必要があります。