Spring Cloud Stream スキーマレジストリ

導入

組織がメッセージングベースの pub/sub アーキテクチャを採用し、複数のプロデューサーマイクロサービスとコンシューマーマイクロサービスが相互に通信する場合、多くの場合、これらすべてのマイクロサービスがスキーマに基づく契約に同意する必要があります。このようなスキーマを新しいビジネス要件に対応するために進化させる必要がある場合でも、既存のコンポーネントは引き続き機能する必要があります。Spring Cloud Stream は、スタンドアロンスキーマレジストリサーバーのサポートを提供します。これにより、前述のスキーマを登録してアプリケーションで使用できます。Spring Cloud Stream スキーマレジストリサポートは、Avro ベースのスキーマレジストリクライアントのサポートも提供します。これは基本的に、メッセージ変換中にスキーマを調整するためにスキーマレジストリと通信するメッセージコンバーターを提供します。Spring Cloud Stream が提供するスキーマ進化サポートは、前述のスタンドアロンスキーマレジストリだけでなく、特に Apache Kafka で機能する Confluent が提供するスキーマレジストリでも機能します。

Spring Cloud Stream スキーマレジストリの概要

Spring Cloud Stream スキーマレジストリは、スキーマの進化をサポートするため、データは時間の経過とともに進化し、古いまたは新しいプロデューサーやコンシューマーと連携し、その逆も可能です。ほとんどのシリアライゼーションモデル、特に異なるプラットフォームや言語間での移植性を目的とするものは、バイナリペイロードでデータがどのようにシリアライズされるかを記述するスキーマに依存しています。データを直列化して解釈するには、送信側と受信側の両方が、バイナリ形式を記述するスキーマにアクセスできる必要があります。場合によっては、シリアライゼーションでペイロード型から、またはデシリアライゼーションでターゲット型からスキーマを推測できます。ただし、多くのアプリケーションでは、バイナリデータ形式を記述する明示的なスキーマにアクセスできるとメリットがあります。スキーマレジストリを使用すると、スキーマ情報をテキスト形式 (通常は JSON) で保存し、バイナリ形式でデータを送受信するためにスキーマ情報を必要とするさまざまなアプリケーションからその情報にアクセスできるようになります。スキーマは、次のもので構成されるタプルとして参照できます。

  • スキーマの論理名であるサブジェクト

  • スキーマバージョン

  • データのバイナリ形式を記述するスキーマ形式

Spring Cloud Stream Schema Registry は、次のコンポーネントを提供します

  • スタンドアロンスキーマレジストリサーバー

    By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
  • スキーマレジストリと通信することでメッセージマーシャリングが可能なスキーマレジストリクライアント。

    Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.

スキーマレジストリクライアント

スキーマレジストリサーバーと対話するためのクライアント側の抽象化は、次の構造を持つ SchemaRegistryClient インターフェースです。

public interface SchemaRegistryClient {

    SchemaRegistrationResponse register(String subject, String format, String schema);

    String fetch(SchemaReference schemaReference);

    String fetch(Integer id);

}

Spring Cloud Stream は、独自のスキーマサーバーと対話し、Confluent Schema Registry と対話するためのすぐに使える実装を提供します。

Spring Cloud Stream スキーマレジストリのクライアントは、次のように @EnableSchemaRegistryClient を使用して構成できます。

@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {

}
デフォルトのコンバーターは、リモートサーバーからのスキーマだけでなく、非常に高負荷な parse() および toString() メソッドもキャッシュするように最適化されています。このため、レスポンスをキャッシュしない DefaultSchemaRegistryClient を使用します。デフォルトの動作を変更する場合は、クライアントをコードで直接使用し、それをオーバーライドして目的の結果にすることができます。これを行うには、プロパティ spring.cloud.stream.schemaRegistryClient.cached=true をアプリケーションプロパティに追加する必要があります。

スキーマレジストリクライアントのプロパティ

スキーマレジストリクライアントは、次のプロパティをサポートしています。

spring.cloud.stream.schemaRegistryClient.endpoint

スキーマサーバーの場所。これを設定するときは、プロトコル (http または https)、ポート、コンテキストパスを含む完全な URL を使用します。

デフォルト

localhost:8990/

spring.cloud.stream.schemaRegistryClient.cached

クライアントがスキーマサーバーのレスポンスをキャッシュするかどうか。メッセージコンバーターでキャッシングが行われるため、通常は false に設定されます。スキーマレジストリクライアントを使用するクライアントは、これを true に設定する必要があります。

デフォルト

false

Avro スキーマレジストリクライアントメッセージコンバーター

アプリケーションコンテキストに SchemaRegistryClient Bean が登録されているアプリケーションの場合、Spring Cloud Stream はスキーマ管理用に Apache Avro メッセージコンバーターを自動構成します。これにより、メッセージを受信するアプリケーションが独自のリーダースキーマと調整できるライタースキーマに簡単にアクセスできるため、スキーマの進化が容易になります。

送信・メッセージの場合、バインディングのコンテンツ・型が application/*+avro に設定されている場合、次の例に示すように MessageConverter がアクティブ化されます:

spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro

送信変換中、メッセージコンバーターは、SchemaRegistryClient を使用して、各送信メッセージのスキーマ (型に基づいて) を推測し、それをサブジェクト (ペイロードの型に基づいて) に登録しようとします。同一のスキーマがすでに見つかった場合は、そのスキーマへの参照が取得されます。そうでない場合は、スキーマが登録され、新しいバージョン番号が提供されます。メッセージは、スキーム application/[prefix].[subject].v[version]+avro を使用して contentType ヘッダーとともに送信されます。ここで、prefix は構成可能であり、subject はペイロード型から推定されます。

例: 型 User のメッセージは、コンテンツ型 application/vnd.user.v2+avro のバイナリペイロードとして送信される場合があります。ここで、user は件名で、2 はバージョン番号です。

メッセージを受信すると、コンバーターは受信メッセージのヘッダーからスキーマ参照を推測し、それを取得しようとします。スキーマは、逆直列化プロセスでライタースキーマとして使用されます。

Avro スキーマレジストリメッセージコンバーターのプロパティ

spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro を設定して Avro ベースのスキーマレジストリクライアントを有効にした場合は、次のプロパティを設定して登録の動作をカスタマイズできます。

spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled

コンバーターでリフレクションを使用して POJO からスキーマを推測する場合に有効にします。

デフォルト: false

spring.cloud.stream.schema.avro.readerSchema

Avro は、ライタースキーマ (オリジンペイロード) とリーダースキーマ (アプリケーションペイロード) を調べて、スキーマのバージョンを比較します。詳細については、アブロのドキュメント [Apache] (英語) を参照してください。設定されている場合、これはスキーマサーバーでのルックアップをオーバーライドし、ローカルスキーマをリーダースキーマとして使用します。デフォルト: null

spring.cloud.stream.schema.avro.schemaLocations

このプロパティにリストされている .avsc ファイルをスキーマサーバーに登録します。

デフォルト: empty

spring.cloud.stream.schema.avro.prefix

Content-Type ヘッダーで使用されるプレフィックス。

デフォルト: vnd

spring.cloud.stream.schema.avro.subjectNamingStrategy

スキーマレジストリに Avro スキーマを登録するために使用されるサブジェクト名を決定します。サブジェクトがスキーマ名である org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy と、Avro スキーマの名前空間と名前を使用して完全修飾サブジェクトを返す org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy の 2 つの実装が利用可能です。org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy を実装することにより、カスタム戦略を作成できます。

デフォルト: org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy

spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer

スキーマレジストリ通信を無視します。単体テストの実行時にスキーマレジストリサーバーへの接続を不必要に試行しないように、テスト目的で役立ちます。

デフォルト: false

Apache Avro メッセージコンバーター

Spring Cloud Stream は、spring-cloud-stream-schema-registry-client モジュールを通じてスキーマベースのメッセージコンバーターのサポートを提供します。現在、スキーマベースのメッセージコンバーターですぐにサポートされている唯一の直列化形式は Apache Avro ですが、将来のバージョンではさらに多くの形式が追加される予定です。

spring-cloud-stream-schema-registry-client モジュールには、Apache Avro 直列化に使用できる 2 種類のメッセージコンバーターが含まれています。

  • 直列化または逆直列化されたオブジェクトのクラス情報、または起動時に場所がわかっているスキーマを使用するコンバーター。

  • スキーマレジストリを使用するコンバーター。実行時にスキーマを特定し、ドメインオブジェクトの進化に合わせて新しいスキーマを動的に登録します。

スキーマをサポートするコンバーター

AvroSchemaMessageConverter は、事前定義されたスキーマを使用するか、クラスで使用可能なスキーマ情報を使用して (反射的または SpecificRecord に含まれる)、メッセージのシリアライズおよびデシリアライズをサポートします。カスタムコンバーターを提供する場合、デフォルトの AvroSchemaMessageConverter Bean は作成されません。次の例は、カスタムコンバーターを示しています。

カスタムコンバーターを使用するには、それをアプリケーションコンテキストに追加するだけで、必要に応じて関連付ける 1 つ以上の MimeTypes を指定できます。デフォルトの MimeType は application/avro です。

変換のターゲット型が GenericRecord の場合、スキーマを設定する必要があります。

次の例は、定義済みのスキーマなしで Apache Avro MessageConverter を登録して、シンクアプリケーションでコンバーターを構成する方法を示しています。この例では、MIME 型値がデフォルトの application/avro ではなく、avro/bytes であることに注意してください。

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

逆に、次のアプリケーションはコンバーターを定義済みのスキーマ (クラスパスにある) に登録します。

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

スキーマレジストリサーバー

Spring Cloud Stream は、スキーマレジストリサーバーの実装を提供します。これを使用するには、最新の spring-cloud-stream-schema-registry-server リリースをダウンロードして、スタンドアロンアプリケーションとして実行します。

wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar

スキーマレジストリを既存の Spring Boot Web アプリケーションに埋め込むことができます。これを行うには、spring-cloud-stream-schema-registry-core アーティファクトをプロジェクトに追加し、@EnableSchemaRegistryServer アノテーションを使用して、スキーマレジストリサーバー REST コントローラーをアプリケーションに追加します。次の例は、スキーマレジストリを有効にする Spring Boot アプリケーションを示しています。

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}

spring.cloud.stream.schema.server.path プロパティを使用して、スキーマサーバーのルートパスを制御できます (特に、他のアプリケーションに埋め込まれている場合)。spring.cloud.stream.schema.server.allowSchemaDeletion ブールプロパティは、スキーマの削除を有効にします。デフォルトでは、これは無効になっています。

スキーマレジストリサーバーは、リレーショナルデータベースを使用してスキーマを格納します。デフォルトでは、組み込みデータベースが使用されます。Spring Boot SQL データベースおよび JDBC 構成オプション (英語) を使用して、スキーマストレージをカスタマイズできます。

スキーマレジストリサーバー API

スキーマレジストリサーバー API は、次の操作で構成されます。

新しいスキーマの登録

新しいスキーマを登録するには、POST リクエストを / エンドポイントに送信します。

/ は、次のフィールドを持つ JSON ペイロードを受け入れます。

  • subject: スキーマサブジェクト

  • format: スキーマ形式

  • definition: スキーマ定義

そのレスポンスは、次のフィールドを持つ JSON のスキーマオブジェクトです。

  • id: スキーマ ID

  • subject: スキーマサブジェクト

  • format: スキーマ形式

  • version: スキーマバージョン

  • definition: スキーマ定義

サブジェクト、フォーマット、バージョンによる既存のスキーマの取得

サブジェクト、形式、バージョンによって既存のスキーマを取得するには、GET リクエストを {subject}/{format}/{version} エンドポイントに送信します。

そのレスポンスは、次のフィールドを持つ JSON のスキーマオブジェクトです。

  • id: スキーマ ID

  • subject: スキーマサブジェクト

  • format: スキーマ形式

  • version: スキーマバージョン

  • definition: スキーマ定義

サブジェクトおよびフォーマットによる既存のスキーマの取得

サブジェクトと形式で既存のスキーマを取得するには、GET リクエストを /subject/format エンドポイントに送信します。

そのレスポンスは、JSON の各スキーマオブジェクトを含むスキーマのリストであり、次のフィールドがあります。

  • id: スキーマ ID

  • subject: スキーマサブジェクト

  • format: スキーマ形式

  • version: スキーマバージョン

  • definition: スキーマ定義

ID による既存のスキーマの取得

ID でスキーマを取得するには、GET リクエストを /schemas/{id} エンドポイントに送信します。

そのレスポンスは、次のフィールドを持つ JSON のスキーマオブジェクトです。

  • id: スキーマ ID

  • subject: スキーマサブジェクト

  • format: スキーマ形式

  • version: スキーマバージョン

  • definition: スキーマ定義

サブジェクト、フォーマット、バージョンによるスキーマの削除

サブジェクト、形式、バージョンで識別されるスキーマを削除するには、DELETE リクエストを {subject}/{format}/{version} エンドポイントに送信します。

ID によるスキーマの削除

ID でスキーマを削除するには、DELETE リクエストを /schemas/{id} エンドポイントに送信します。

サブジェクトによるスキーマの削除

DELETE /{subject}

サブジェクトごとに既存のスキーマを削除します。

この注意事項は、Spring Cloud Stream 1.1.0.RELEASE のユーザーにのみ適用されます。Spring Cloud Stream 1.1.0.RELEASE は、Schema オブジェクトを格納するためにテーブル名 schema を使用しました。Schema は、多くのデータベース実装におけるキーワードです。将来の競合を避けるために、1.1.1.RELEASE 以降、ストレージテーブルの名前に SCHEMA_REPOSITORY を選択しました。アップグレードする Spring Cloud Stream 1.1.0.RELEASE ユーザーは、アップグレードする前に既存のスキーマを新しいテーブルに移行する必要があります。

Confluent のスキーマレジストリの使用

デフォルト構成では、DefaultSchemaRegistryClient Bean が作成されます。Confluent スキーマレジストリを使用する場合は、型 ConfluentSchemaRegistryClient の Bean を作成する必要があります。これは、フレームワークによってデフォルトで設定されたものに取って代わります。次の例は、そのような Bean を作成する方法を示しています。

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
  ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
  client.setEndpoint(endpoint);
  return client;
}
ConfluentSchemaRegistryClient は、Confluent プラットフォームバージョン 4.0.0 に対してテストされています。

スキーマの登録と解決

Spring Cloud Stream が新しいスキーマを登録および解決する方法と、Avro スキーマ比較機能の使用方法をよりよく理解するために、2 つの個別のサブセクションを提供します。

スキーマ登録プロセス (直列化)

登録プロセスの最初の部分は、チャネルを介して送信されるペイロードからスキーマを抽出することです。SpecificRecord や GenericRecord などの Avro 型には、インスタンスからすぐに取得できるスキーマがすでに含まれています。POJO の場合、spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled プロパティが true (デフォルト) に設定されている場合、スキーマが推測されます。

スキーマが取得されると、コンバーターはそのメタデータ (バージョン) を リモートサーバーからロードします。まず、ローカルキャッシュをクエリします。結果が見つからない場合は、データをサーバーに送信し、サーバーはバージョン情報で応答します。直列化が必要な新しいメッセージごとにスキーマサーバーにクエリを実行するオーバーヘッドを回避するために、コンバーターは常に結果をキャッシュします。

スキーマのバージョン情報を使用して、コンバーターはメッセージの contentType ヘッダーを設定してバージョン情報を伝えます (例: application/vnd.user.v1+avro)。

スキーマ解決プロセス (逆直列化)

バージョン情報 (つまり、Schema Registration Process (Serialization) で説明されているようなスキームを持つ contentType ヘッダー) を含むメッセージを読み取るとき、コンバーターはスキーマサーバーにクエリを実行して、メッセージのライタースキーマをフェッチします。受信メッセージの正しいスキーマが見つかると、、リーダースキーマを取得し、Avro のスキーマ解決サポートを使用して、それをリーダー定義に読み込みます (既定値と不足しているプロパティを設定します)。

ライタースキーマ (メッセージを書き込んだアプリケーション) とリーダースキーマ (受信アプリケーション) の違いを理解する必要があります。少し時間を取って Avro の用語 [Apache] (英語) を読み、プロセスを理解することをお勧めします。Spring Cloud Stream は常にライタースキーマをフェッチして、メッセージの読み取り方法を決定します。Avro のスキーマ進化サポートを機能させたい場合は、アプリケーションに対して readerSchema が適切に設定されていることを確認する必要があります。