Spring Cloud Stream スキーマレジストリ
導入
組織がメッセージングベースの pub/sub アーキテクチャを採用し、複数のプロデューサーマイクロサービスとコンシューマーマイクロサービスが相互に通信する場合、多くの場合、これらすべてのマイクロサービスがスキーマに基づく契約に同意する必要があります。このようなスキーマを新しいビジネス要件に対応するために進化させる必要がある場合でも、既存のコンポーネントは引き続き機能する必要があります。Spring Cloud Stream は、スタンドアロンスキーマレジストリサーバーのサポートを提供します。これにより、前述のスキーマを登録してアプリケーションで使用できます。Spring Cloud Stream スキーマレジストリサポートは、Avro ベースのスキーマレジストリクライアントのサポートも提供します。これは基本的に、メッセージ変換中にスキーマを調整するためにスキーマレジストリと通信するメッセージコンバーターを提供します。Spring Cloud Stream が提供するスキーマ進化サポートは、前述のスタンドアロンスキーマレジストリだけでなく、特に Apache Kafka で機能する Confluent が提供するスキーマレジストリでも機能します。
Spring Cloud Stream スキーマレジストリの概要
Spring Cloud Stream スキーマレジストリは、スキーマの進化をサポートするため、データは時間の経過とともに進化し、古いまたは新しいプロデューサーやコンシューマーと連携し、その逆も可能です。ほとんどのシリアライゼーションモデル、特に異なるプラットフォームや言語間での移植性を目的とするものは、バイナリペイロードでデータがどのようにシリアライズされるかを記述するスキーマに依存しています。データを直列化して解釈するには、送信側と受信側の両方が、バイナリ形式を記述するスキーマにアクセスできる必要があります。場合によっては、シリアライゼーションでペイロード型から、またはデシリアライゼーションでターゲット型からスキーマを推測できます。ただし、多くのアプリケーションでは、バイナリデータ形式を記述する明示的なスキーマにアクセスできるとメリットがあります。スキーマレジストリを使用すると、スキーマ情報をテキスト形式 (通常は JSON) で保存し、バイナリ形式でデータを送受信するためにスキーマ情報を必要とするさまざまなアプリケーションからその情報にアクセスできるようになります。スキーマは、次のもので構成されるタプルとして参照できます。
スキーマの論理名であるサブジェクト
スキーマバージョン
データのバイナリ形式を記述するスキーマ形式
Spring Cloud Stream Schema Registry は、次のコンポーネントを提供します
スタンドアロンスキーマレジストリサーバー
By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
スキーマレジストリと通信することでメッセージマーシャリングが可能なスキーマレジストリクライアント。
Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.
スキーマレジストリクライアント
スキーマレジストリサーバーと対話するためのクライアント側の抽象化は、次の構造を持つ SchemaRegistryClient
インターフェースです。
public interface SchemaRegistryClient {
SchemaRegistrationResponse register(String subject, String format, String schema);
String fetch(SchemaReference schemaReference);
String fetch(Integer id);
}
Spring Cloud Stream は、独自のスキーマサーバーと対話し、Confluent Schema Registry と対話するためのすぐに使える実装を提供します。
Spring Cloud Stream スキーマレジストリのクライアントは、次のように @EnableSchemaRegistryClient
を使用して構成できます。
@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {
}
デフォルトのコンバーターは、リモートサーバーからのスキーマだけでなく、非常に高負荷な parse() および toString() メソッドもキャッシュするように最適化されています。このため、レスポンスをキャッシュしない DefaultSchemaRegistryClient を使用します。デフォルトの動作を変更する場合は、クライアントをコードで直接使用し、それをオーバーライドして目的の結果にすることができます。これを行うには、プロパティ spring.cloud.stream.schemaRegistryClient.cached=true をアプリケーションプロパティに追加する必要があります。 |
スキーマレジストリクライアントのプロパティ
スキーマレジストリクライアントは、次のプロパティをサポートしています。
spring.cloud.stream.schemaRegistryClient.endpoint
スキーマサーバーの場所。これを設定するときは、プロトコル (
http
またはhttps
)、ポート、コンテキストパスを含む完全な URL を使用します。- デフォルト
spring.cloud.stream.schemaRegistryClient.cached
クライアントがスキーマサーバーのレスポンスをキャッシュするかどうか。メッセージコンバーターでキャッシングが行われるため、通常は
false
に設定されます。スキーマレジストリクライアントを使用するクライアントは、これをtrue
に設定する必要があります。- デフォルト
false
Avro スキーマレジストリクライアントメッセージコンバーター
アプリケーションコンテキストに SchemaRegistryClient Bean が登録されているアプリケーションの場合、Spring Cloud Stream はスキーマ管理用に Apache Avro メッセージコンバーターを自動構成します。これにより、メッセージを受信するアプリケーションが独自のリーダースキーマと調整できるライタースキーマに簡単にアクセスできるため、スキーマの進化が容易になります。
送信・メッセージの場合、バインディングのコンテンツ・型が application/*+avro
に設定されている場合、次の例に示すように MessageConverter
がアクティブ化されます:
spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
送信変換中、メッセージコンバーターは、SchemaRegistryClient
を使用して、各送信メッセージのスキーマ (型に基づいて) を推測し、それをサブジェクト (ペイロードの型に基づいて) に登録しようとします。同一のスキーマがすでに見つかった場合は、そのスキーマへの参照が取得されます。そうでない場合は、スキーマが登録され、新しいバージョン番号が提供されます。メッセージは、スキーム application/[prefix].[subject].v[version]+avro
を使用して contentType
ヘッダーとともに送信されます。ここで、prefix
は構成可能であり、subject
はペイロード型から推定されます。
例: 型 User
のメッセージは、コンテンツ型 application/vnd.user.v2+avro
のバイナリペイロードとして送信される場合があります。ここで、user
は件名で、2
はバージョン番号です。
メッセージを受信すると、コンバーターは受信メッセージのヘッダーからスキーマ参照を推測し、それを取得しようとします。スキーマは、逆直列化プロセスでライタースキーマとして使用されます。
Avro スキーマレジストリメッセージコンバーターのプロパティ
spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
を設定して Avro ベースのスキーマレジストリクライアントを有効にした場合は、次のプロパティを設定して登録の動作をカスタマイズできます。
- spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
コンバーターでリフレクションを使用して POJO からスキーマを推測する場合に有効にします。
デフォルト:
false
- spring.cloud.stream.schema.avro.readerSchema
Avro は、ライタースキーマ (オリジンペイロード) とリーダースキーマ (アプリケーションペイロード) を調べて、スキーマのバージョンを比較します。詳細については、アブロのドキュメント [Apache] (英語) を参照してください。設定されている場合、これはスキーマサーバーでのルックアップをオーバーライドし、ローカルスキーマをリーダースキーマとして使用します。デフォルト:
null
- spring.cloud.stream.schema.avro.schemaLocations
このプロパティにリストされている
.avsc
ファイルをスキーマサーバーに登録します。デフォルト:
empty
- spring.cloud.stream.schema.avro.prefix
Content-Type ヘッダーで使用されるプレフィックス。
デフォルト:
vnd
- spring.cloud.stream.schema.avro.subjectNamingStrategy
スキーマレジストリに Avro スキーマを登録するために使用されるサブジェクト名を決定します。サブジェクトがスキーマ名である
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
と、Avro スキーマの名前空間と名前を使用して完全修飾サブジェクトを返すorg.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy
の 2 つの実装が利用可能です。org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy
を実装することにより、カスタム戦略を作成できます。デフォルト:
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy
- spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer
スキーマレジストリ通信を無視します。単体テストの実行時にスキーマレジストリサーバーへの接続を不必要に試行しないように、テスト目的で役立ちます。
デフォルト:
false
Apache Avro メッセージコンバーター
Spring Cloud Stream は、spring-cloud-stream-schema-registry-client
モジュールを通じてスキーマベースのメッセージコンバーターのサポートを提供します。現在、スキーマベースのメッセージコンバーターですぐにサポートされている唯一の直列化形式は Apache Avro ですが、将来のバージョンではさらに多くの形式が追加される予定です。
spring-cloud-stream-schema-registry-client
モジュールには、Apache Avro 直列化に使用できる 2 種類のメッセージコンバーターが含まれています。
直列化または逆直列化されたオブジェクトのクラス情報、または起動時に場所がわかっているスキーマを使用するコンバーター。
スキーマレジストリを使用するコンバーター。実行時にスキーマを特定し、ドメインオブジェクトの進化に合わせて新しいスキーマを動的に登録します。
スキーマをサポートするコンバーター
AvroSchemaMessageConverter
は、事前定義されたスキーマを使用するか、クラスで使用可能なスキーマ情報を使用して (反射的または SpecificRecord
に含まれる)、メッセージのシリアライズおよびデシリアライズをサポートします。カスタムコンバーターを提供する場合、デフォルトの AvroSchemaMessageConverter Bean は作成されません。次の例は、カスタムコンバーターを示しています。
カスタムコンバーターを使用するには、それをアプリケーションコンテキストに追加するだけで、必要に応じて関連付ける 1 つ以上の MimeTypes
を指定できます。デフォルトの MimeType
は application/avro
です。
変換のターゲット型が GenericRecord
の場合、スキーマを設定する必要があります。
次の例は、定義済みのスキーマなしで Apache Avro MessageConverter
を登録して、シンクアプリケーションでコンバーターを構成する方法を示しています。この例では、MIME 型値がデフォルトの application/avro
ではなく、avro/bytes
であることに注意してください。
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
逆に、次のアプリケーションはコンバーターを定義済みのスキーマ (クラスパスにある) に登録します。
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
return converter;
}
}
スキーマレジストリサーバー
Spring Cloud Stream は、スキーマレジストリサーバーの実装を提供します。これを使用するには、最新の spring-cloud-stream-schema-registry-server
リリースをダウンロードして、スタンドアロンアプリケーションとして実行します。
wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar
スキーマレジストリを既存の Spring Boot Web アプリケーションに埋め込むことができます。これを行うには、
|
spring.cloud.stream.schema.server.path
プロパティを使用して、スキーマサーバーのルートパスを制御できます (特に、他のアプリケーションに埋め込まれている場合)。spring.cloud.stream.schema.server.allowSchemaDeletion
ブールプロパティは、スキーマの削除を有効にします。デフォルトでは、これは無効になっています。
スキーマレジストリサーバーは、リレーショナルデータベースを使用してスキーマを格納します。デフォルトでは、組み込みデータベースが使用されます。Spring Boot SQL データベースおよび JDBC 構成オプション (英語) を使用して、スキーマストレージをカスタマイズできます。
スキーマレジストリサーバー API
スキーマレジストリサーバー API は、次の操作で構成されます。
POST /
—Registering a New Schema
を参照GET /{subject}/{format}/{version}
—Retrieving an Existing Schema by Subject, Format, and Version
を参照GET /{subject}/{format}
—Retrieving an Existing Schema by Subject and Format
を参照GET /schemas/{id}
—Retrieving an Existing Schema by ID
を参照DELETE /{subject}/{format}/{version}
—Deleting a Schema by Subject, Format, and Version
を参照DELETE /schemas/{id}
—Deleting a Schema by ID
を参照DELETE /{subject}
—Deleting a Schema by Subject
を参照
新しいスキーマの登録
新しいスキーマを登録するには、POST
リクエストを /
エンドポイントに送信します。
/
は、次のフィールドを持つ JSON ペイロードを受け入れます。
subject
: スキーマサブジェクトformat
: スキーマ形式definition
: スキーマ定義
そのレスポンスは、次のフィールドを持つ JSON のスキーマオブジェクトです。
id
: スキーマ IDsubject
: スキーマサブジェクトformat
: スキーマ形式version
: スキーマバージョンdefinition
: スキーマ定義
サブジェクト、フォーマット、バージョンによる既存のスキーマの取得
サブジェクト、形式、バージョンによって既存のスキーマを取得するには、GET
リクエストを {subject}/{format}/{version}
エンドポイントに送信します。
そのレスポンスは、次のフィールドを持つ JSON のスキーマオブジェクトです。
id
: スキーマ IDsubject
: スキーマサブジェクトformat
: スキーマ形式version
: スキーマバージョンdefinition
: スキーマ定義
サブジェクトおよびフォーマットによる既存のスキーマの取得
サブジェクトと形式で既存のスキーマを取得するには、GET
リクエストを /subject/format
エンドポイントに送信します。
そのレスポンスは、JSON の各スキーマオブジェクトを含むスキーマのリストであり、次のフィールドがあります。
id
: スキーマ IDsubject
: スキーマサブジェクトformat
: スキーマ形式version
: スキーマバージョンdefinition
: スキーマ定義
ID による既存のスキーマの取得
ID でスキーマを取得するには、GET
リクエストを /schemas/{id}
エンドポイントに送信します。
そのレスポンスは、次のフィールドを持つ JSON のスキーマオブジェクトです。
id
: スキーマ IDsubject
: スキーマサブジェクトformat
: スキーマ形式version
: スキーマバージョンdefinition
: スキーマ定義
サブジェクト、フォーマット、バージョンによるスキーマの削除
サブジェクト、形式、バージョンで識別されるスキーマを削除するには、DELETE
リクエストを {subject}/{format}/{version}
エンドポイントに送信します。
サブジェクトによるスキーマの削除
DELETE /{subject}
サブジェクトごとに既存のスキーマを削除します。
この注意事項は、Spring Cloud Stream 1.1.0.RELEASE のユーザーにのみ適用されます。Spring Cloud Stream 1.1.0.RELEASE は、Schema オブジェクトを格納するためにテーブル名 schema を使用しました。Schema は、多くのデータベース実装におけるキーワードです。将来の競合を避けるために、1.1.1.RELEASE 以降、ストレージテーブルの名前に SCHEMA_REPOSITORY を選択しました。アップグレードする Spring Cloud Stream 1.1.0.RELEASE ユーザーは、アップグレードする前に既存のスキーマを新しいテーブルに移行する必要があります。 |
Confluent のスキーマレジストリの使用
デフォルト構成では、DefaultSchemaRegistryClient
Bean が作成されます。Confluent スキーマレジストリを使用する場合は、型 ConfluentSchemaRegistryClient
の Bean を作成する必要があります。これは、フレームワークによってデフォルトで設定されたものに取って代わります。次の例は、そのような Bean を作成する方法を示しています。
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
ConfluentSchemaRegistryClient は、Confluent プラットフォームバージョン 4.0.0 に対してテストされています。 |
スキーマの登録と解決
Spring Cloud Stream が新しいスキーマを登録および解決する方法と、Avro スキーマ比較機能の使用方法をよりよく理解するために、2 つの個別のサブセクションを提供します。
スキーマ登録プロセス (直列化)
登録プロセスの最初の部分は、チャネルを介して送信されるペイロードからスキーマを抽出することです。SpecificRecord
や GenericRecord
などの Avro 型には、インスタンスからすぐに取得できるスキーマがすでに含まれています。POJO の場合、spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
プロパティが true
(デフォルト) に設定されている場合、スキーマが推測されます。
スキーマが取得されると、コンバーターはそのメタデータ (バージョン) を リモートサーバーからロードします。まず、ローカルキャッシュをクエリします。結果が見つからない場合は、データをサーバーに送信し、サーバーはバージョン情報で応答します。直列化が必要な新しいメッセージごとにスキーマサーバーにクエリを実行するオーバーヘッドを回避するために、コンバーターは常に結果をキャッシュします。
スキーマのバージョン情報を使用して、コンバーターはメッセージの contentType
ヘッダーを設定してバージョン情報を伝えます (例: application/vnd.user.v1+avro
)。
スキーマ解決プロセス (逆直列化)
バージョン情報 (つまり、Schema Registration Process (Serialization)
で説明されているようなスキームを持つ contentType
ヘッダー) を含むメッセージを読み取るとき、コンバーターはスキーマサーバーにクエリを実行して、メッセージのライタースキーマをフェッチします。受信メッセージの正しいスキーマが見つかると、、リーダースキーマを取得し、Avro のスキーマ解決サポートを使用して、それをリーダー定義に読み込みます (既定値と不足しているプロパティを設定します)。
ライタースキーマ (メッセージを書き込んだアプリケーション) とリーダースキーマ (受信アプリケーション) の違いを理解する必要があります。少し時間を取って Avro の用語 [Apache] (英語) を読み、プロセスを理解することをお勧めします。Spring Cloud Stream は常にライタースキーマをフェッチして、メッセージの読み取り方法を決定します。Avro のスキーマ進化サポートを機能させたい場合は、アプリケーションに対して readerSchema が適切に設定されていることを確認する必要があります。 |