Spring Cloud Stream Apache Pulsar 用バインダー

Spring for Apache Pulsar は、パブリッシュ / サブスクライブパラダイムを使用してイベント駆動型のマイクロサービスを構築するために使用できる Spring Cloud Stream 用のバインダーを提供します。このセクションでは、このバインダーの基本的な詳細について説明します。

使用方法

Spring Cloud Stream に Apache Pulsar バインダーを使用するには、アプリケーションに次の依存関係を含める必要があります。

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}

概要

Apache Pulsar 用の Spring Cloud Stream バインダーを使用すると、アプリケーションは Pulsar の管理と保守に関する下位レベルの詳細を処理するのではなく、ビジネスロジックに集中できるようになります。バインダーは、アプリケーション開発者に代わってこれらすべての詳細を処理します。Spring Cloud Stream は、Spring Cloud Function に基づく強力なプログラミングモデルをもたらし、アプリ開発者が関数型スタイルを使用して複雑なイベント駆動型アプリケーションを作成できるようにします。アプリケーションはミドルウェアに中立な方法から開始し、Pulsar トピックを Spring Cloud Stream の宛先として Spring Boot 構成プロパティにマップできます。Spring Cloud Stream は Spring Boot 上に構築されており、Spring Cloud Stream を使用してイベント駆動型のマイクロサービスを作成するときは、基本的に Boot アプリケーションを作成することになります。これは簡単な Spring Cloud Stream アプリケーションです。

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
	}

	@Bean
	public Supplier<Time> timeSupplier() {
		return () -> new Time(String.valueOf(System.currentTimeMillis()));
	}

	@Bean
	public Function<Time, EnhancedTime> timeProcessor() {
		return (time) -> {
			EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
			this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
			return enhancedTime;
		};
	}

	@Bean
	public Consumer<EnhancedTime> timeLogger() {
		return (time) -> this.logger.info("SINK:      {}", time);
	}

	record Time(String time) {
	}

	record EnhancedTime(Time time, String extra) {
	}

}

上記のサンプルアプリケーションは本格的な Spring Boot アプリケーションであり、いくつかの説明が必要です。ただし、最初のパスでは、これが単なる Java といくつかの Spring および Spring Boot アノテーションであることがわかります。ここには 3 つの Bean メソッド ( java.util.function.Supplierjava.util.function.Function、最後に java.util.function.Consumer) があります。サプライヤーは現在時刻をミリ秒単位で生成し、関数はこの時間を取得し、ランダムデータを追加してそれを拡張し、コンシューマーは拡張された時間をログに記録します。

簡潔にするためにすべてのインポートを省略しましたが、アプリケーション全体で Spring Cloud Stream 固有のものは何もありません。どのようにして Apache Pulsar と対話する Spring Cloud Stream アプリケーションになるのでしょうか ? アプリケーション内のバインダーに対する上記の依存関係を含める必要があります。その依存関係を追加したら、次の構成プロパティを指定する必要があります。

spring:
  cloud:
    function:
      definition: timeSupplier;timeProcessor;timeLogger;
    stream:
      bindings:
        timeProcessor-in-0:
          destination: timeSupplier-out-0
        timeProcessor-out-0:
          destination: timeProcessor-out-0
        timeLogger-in-0:
          destination: timeProcessor-out-0

これにより、上記の Spring Boot アプリケーションは、Spring Cloud Stream に基づいたエンドツーエンドのイベント駆動型アプリケーションになりました。クラスパス上に Pulsar バインダーがあるため、アプリケーションは Apache Pulsar と対話します。アプリケーションに関数が 1 つだけある場合は、Spring Cloud Stream がデフォルトでその関数をアクティブ化して実行するように指示する必要はありません。この例のように、アプリケーションにそのような関数が複数ある場合は、どの関数をアクティブにするかを Spring Cloud Stream に指示する必要があります。この場合、すべてアクティブにする必要があり、spring.cloud.function.definition プロパティを通じてそれを行います。Bean 名は、デフォルトで Spring Cloud Stream バインディング名の一部になります。バインディングは、Spring Cloud Stream における基本的に抽象的な概念であり、これを使用してフレームワークはミドルウェア宛先と通信します。Spring Cloud Stream の動作のほとんどすべては、コンクリートバインディング上で行われます。サプライヤーには出力バインディングのみがあります。関数には入力バインディングと出力バインディングがあり、コンシューマーには入力バインディングのみがあります。例として、サプライヤー Bean - timeSupplier. を考えてみましょう。このサプライヤーのデフォルトのバインディング名は timeSupplier-out-0 になります。同様に、timeProcessor 関数のデフォルトのバインディング名は、受信では timeProcessor-in-0、送信では timeProcessor-out-0 になります。デフォルトのバインディング名を変更する方法の詳細については、Spring Cloud Stream リファレンスドキュメントを参照してください。ほとんどの状況では、デフォルトのバインディング名を使用するだけで十分です。上に示すように、バインディング名に宛先を設定します。宛先が指定されていない場合は、timeSupplier-out-0 の場合と同様に、バインディング名が宛先の値になります。

上記のアプリを実行すると、サプライヤーが毎秒実行され、その後関数によって消費され、ロガーコンシューマーによって消費される時間が増加することがわかります。

バインダーベースのアプリケーションでのメッセージ変換

上記のサンプルアプリケーションでは、メッセージ変換のためのスキーマ情報を提供しませんでした。これは、デフォルトで、Spring Cloud Stream は、Spring メッセージングプロジェクトを通じて Spring Framework で確立されたメッセージングサポートを使用するメッセージ変換メカニズムを使用するためです。指定しない限り、Spring Cloud Stream は受信バインディングと送信バインディングの両方でメッセージ変換に application/json を content-type として使用します。送信では、データは byte[], として直列化され、Pulsar バインダーは Schema.BYTES を使用してネットワーク経由で Pulsar トピックに送信します。同様に、受信では、データは Pulsar トピックから byte[] として消費され、その後、適切なメッセージコンバーターを使用してターゲット型に変換されます。

Pulsar スキーマを使用した Pulsar でのネイティブ変換の使用

デフォルトではフレームワークが提供するメッセージ変換を使用しますが、Spring Cloud Stream では各バインダーがメッセージの変換方法を決定できます。アプリケーションがこのルートを選択したとします。その場合、Spring Cloud Stream は Spring が提供するメッセージ変換機能の使用を回避し、受信または生成するデータを渡します。Spring Cloud Stream のこの機能は、プロデューサー側ではネイティブエンコーディング、コンシューマー側ではネイティブデコーディングとして知られています。これは、エンコードとデコードがターゲットミドルウェア (この場合は Apache Pulsar) 上でネイティブに行われることを意味します。上記のアプリケーションの場合、次の構成を使用してフレームワーク変換をバイパスし、ネイティブエンコーディングとデコーディングを使用できます。

spring:
  cloud:
    stream:
      bindings:
        timeSupplier-out-0:
          producer:
            use-native-encoding: true
        timeProcessor-in-0:
          destination: timeSupplier-out-0
          consumer:
            use-native-decoding: true
        timeProcessor-out-0:
          destination: timeProcessor-out-0
          producer:
            use-native-encoding: true
        timeLogger-in-0:
          destination: timeProcessor-out-0
          consumer:
            use-native-decoding: true
      pulsar:
        bindings:
          timeSupplier-out-0:
            producer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-in-0:
            consumer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-out-0:
            producer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
          timeLogger-in-0:
            consumer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime

プロデューサー側でネイティブエンコーディングを有効にするプロパティは、コア Spring Cloud Stream のバインディングレベルプロパティです。プロデューサーバーインディングに spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding を設定し、これを true. に設定します。同様に、コンシューマーバインディングに spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding を使用し、true. に設定します。Pulsar の場合、ネイティブエンコーディングとデコーディングを使用する場合は、対応するスキーマを設定する必要があります。および基礎となるメッセージ型情報。この情報は、拡張バインディングプロパティとして提供されます。上記の構成でわかるように、プロパティはスキーマ情報の spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type と実際のターゲット型の spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type です。メッセージにキーと値の両方がある場合は、message-key-type と message-value-type を使用してターゲット型を指定できます。

schema-type プロパティが省略されると、設定されたカスタムスキーママッピングが参照されます。

メッセージヘッダーの変換

通常、各メッセージにはヘッダー情報があり、メッセージが Spring Cloud Stream 入力バインディングと出力バインディングを介して Pulsar メッセージングと Spring メッセージング間を通過する際に、これらの情報を運ぶ必要があります。この走査をサポートするために、フレームワークは必要なメッセージヘッダーの変換を処理します。

カスタムヘッダーマッパー

Pulsar バインダーは、独自の PulsarHeaderMapper Bean を提供することでオーバーライドできるデフォルトのヘッダーマッパーを使用して構成されます。

次の例では、JSON ヘッダーマッパーが次のように構成されています。

  • すべての受信ヘッダーをマップします ( "top" または "secret" キーを持つものを除く)

  • 送信ヘッダーをマップします (キーが "id"、"timestamp"、"userId" のものを除く)

  • 送信逆直列化については、"com.acme" パッケージ内のオブジェクトのみを信頼します

  • 単純な toString() エンコーディングを使用して、"com.acme.Money" ヘッダー値をデシリアライズ / 直列化します。

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
    return JsonPulsarHeaderMapper.builder()
            .inboundPatterns("!top", "!secret", "*")
            .outboundPatterns("!id", "!timestamp", "!userId", "*")
            .trustedPackages("com.acme")
            .toStringClasses("com.acme.Money")
            .build();
}

バインダーでの Pulsar プロパティの使用

バインダーは、Spring for Apache Pulsar フレームワークの基本コンポーネントを使用して、プロデューサーバーインディングとコンシューマーバインディングを構築します。バインダーベースのアプリケーションは Spring Boot アプリケーションであるため、バインダーはデフォルトで Spring for Apache Pulsar 用の Spring Boot 自動構成を使用します。コアフレームワークレベルで利用できるすべての Pulsar Spring Boot プロパティは、バインダーを通じても利用できます。例: 接頭辞 spring.pulsar.producer…​spring.pulsar.consumer…​ などのプロパティを使用できます。さらに、これらの Pulsar プロパティをバインダーレベルで設定することもできます。たとえば、spring.cloud.stream.pulsar.binder.producer…​ または spring.cloud.stream.pulsar.binder.consumer…​ も機能します。

上記のアプローチはどちらでも問題ありませんが、このようなプロパティを使用すると、アプリケーション全体に適用されます。アプリケーションに複数の関数がある場合、すべて同じプロパティを取得します。これらの Pulsar プロパティを継承バインディングプロパティレベルで設定して、これに対処することもできます。拡張バインディングプロパティはバインディング自体に適用されます。たとえば、入力バインディングと出力バインディングがあり、両方に個別の Pulsar プロパティセットが必要な場合は、拡張バインディングで設定する必要があります。プロデューサーバーインディングのパターンは spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer…​ です。同様に、コンシューマーバインディングの場合、パターンは spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer…​ です。このようにして、同じアプリケーション内の異なるバインディングに個別の Pulsar プロパティのセットを適用できます。

最も高い優先順位は拡張バインディングプロパティです。バインダー内のプロパティを適用する優先順位は extended binding properties → binder properties → Spring Boot properties. (最高から最低の順) です。

以下は、Pulsar バインダーを通じて利用可能なプロパティの詳細を見つけるために利用できるいくつかのリソースです。

Pulsar プロデューサーバーインディング構成 [GitHub] (英語) 。これらのプロパティには spring.cloud.stream.bindings.<binding-name>.producer プレフィックスが必要です。Spring Boot が提供するすべての Pulsar プロデューサーのプロパティも、この構成クラスを通じて利用できます。

Pulsar コンシューマーバインディング構成 [GitHub] (英語) 。これらのプロパティには spring.cloud.stream.bindings.<binding-name>.consumer プレフィックスが必要です。Spring Boot が提供するすべての Pulsar コンシューマー向けプロパティも、この構成クラスを通じて利用できます。

一般的な Pulsar バインダー固有の構成プロパティについては、これを参照してください。これらのプロパティには、接頭辞 spring.cloud.stream.pulsar.binder が必要です。上記で指定したプロデューサーおよびコンシューマーのプロパティ (Spring Boot プロパティを含む) は、spring.cloud.stream.pulsar.binder.producer または spring.cloud.stream.pulsar.binder.consumer プレフィックスを使用してバインダーで使用できます。

Pulsar トピックプロビジョナー

Apache Pulsar 用の Spring Cloud Stream バインダーには、Pulsar トピック用のすぐに使えるプロビジョナーが付属しています。アプリケーションの実行時に、必要なトピックが存在しない場合は、Pulsar がトピックを作成します。ただし、これは基本的な非パーティショントピックであり、パーティション化されたトピックの作成などの高度な機能が必要な場合は、バインダーのトピックプロビジョナーを利用できます。Pulsar トピックプロビジョナーは、PulsarAdminBuilder. を使用するフレームワークの PulsarAdministration を使用します。このため、デフォルトのサーバーとポートで Pulsar を実行している場合を除き、spring.pulsar.administration.service-url プロパティを設定する必要があります。

トピック作成時のパーティション数の指定

トピックを作成するとき、2 つの方法でパーティション数を設定できます。まず、プロパティ spring.cloud.stream.pulsar.binder.partition-count を使用してバインダーレベルで設定できます。上で見たように、この方法を実行すると、アプリケーションによって作成されたすべてのトピックがこのプロパティを継承します。パーティションを設定するためにバインディングレベルでのきめ細かい制御が必要だとします。その場合、spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count 形式を使用してバインディングごとに partition-count プロパティを設定できます。このようにして、同じアプリケーション内の異なる機能によって作成されたさまざまなトピックは、アプリケーションの要件に基づいて異なるパーティションを持つことになります。