R2DBC によるデータアクセス

R2DBC (英語) ("Reactive Relational Database Connectivity")は、リアクティブパターンを使用して SQL データベースへのアクセスを標準化するためのコミュニティ主導の仕様です。

パッケージ階層

Spring Framework の R2DBC 抽象化フレームワークは、2 つの異なるパッケージで構成されています。

  • coreorg.springframework.r2dbc.core パッケージには、DatabaseClient クラスとさまざまな関連クラスが含まれています。R2DBC コアクラスを使用した基本的な R2DBC 処理とエラー処理の制御を参照してください。

  • connectionorg.springframework.r2dbc.connection パッケージには、ConnectionFactory に簡単にアクセスできるユーティリティクラスと、変更されていない R2DBC のテストと実行に使用できるさまざまなシンプルな ConnectionFactory 実装が含まれています。データベース接続の制御を参照してください。

R2DBC コアクラスを使用した基本的な R2DBC 処理とエラー処理の制御

このセクションでは、R2DBC コアクラスを使用して、エラー処理を含む基本的な R2DBC 処理を制御する方法について説明します。次のトピックが含まれます。

DatabaseClient を使用する

DatabaseClient は、R2DBC コアパッケージの中心的なクラスです。リソースの作成と解放を処理します。これにより、接続を閉じるのを忘れるなどの一般的なエラーを回避できます。SQL を提供して結果を抽出するアプリケーションコードを残して、コア R2DBC ワークフローの基本的なタスク(ステートメントの作成や実行など)を実行します。DatabaseClient クラス:

  • SQL クエリを実行します

  • ステートメントとストアドプロシージャの呼び出しを更新する

  • Result インスタンスに対して反復を実行します

  • R2DBC 例外をキャッチし、org.springframework.dao パッケージで定義された一般的でより有益な例外階層に変換します。( 一貫した例外階層を参照してください。)

クライアントには、宣言型構成にリアクティブ型を使用する関数で流れるような API があります。

コードに DatabaseClient を使用する場合は、java.util.function インターフェースを実装するだけでよく、明確に定義された契約が提供されます。DatabaseClient クラスによって提供される Connection を指定すると、Function コールバックは Publisher を作成します。Row の結果を抽出するマッピング関数についても同様です。

ConnectionFactory 参照を使用して直接インスタンス化することにより、DAO 実装内で DatabaseClient を使用するか、Spring IoC コンテナーで構成して Bean 参照として DAO に渡すことができます。

DatabaseClient オブジェクトを作成する最も簡単な方法は、次のように静的ファクトリメソッドを使用することです。

  • Java

  • Kotlin

DatabaseClient client = DatabaseClient.create(connectionFactory);
val client = DatabaseClient.create(connectionFactory)
ConnectionFactory は、常に Spring IoC コンテナーで Bean として構成する必要があります。

上記の方法は、デフォルト設定で DatabaseClient を作成します。

DatabaseClient.builder() から Builder インスタンスを取得することもできます。次のメソッドを呼び出すことにより、クライアントをカスタマイズできます。

  • … .bindMarkers(…): 特定の BindMarkersFactory を指定して、名前付きパラメーターからデータベースバインドマーカーへの変換を構成します。

  • … .executeFunction(…)Statement オブジェクトの実行方法を ExecuteFunction に設定します。

  • … .namedParameters(false): 名前付きパラメーターの展開を無効にします。デフォルトで有効になっています。

ダイアレクトは、通常 ConnectionFactoryMetadata をインスペクションすることにより、ConnectionFactory から BindMarkersFactoryResolver (Javadoc) によって解決されます。
 org.springframework.r2dbc.core.binding.BindMarkersFactoryResolver$BindMarkerFactoryProvider 〜 META-INF/spring.factories を実装するクラスを登録することにより、Spring に BindMarkersFactory を自動検出させることができます。BindMarkersFactoryResolver は、Spring の SpringFactoriesLoader を使用して、クラスパスからバインドマーカープロバイダーの実装を検出します。

現在サポートされているデータベースは次のとおりです。

  • H2

  • MariaDB

  • Microsoft SQL Server

  • MySQL

  • Postgres

このクラスによって発行されたすべての SQL は、クライアントインスタンスの完全修飾クラス名(通常 DefaultDatabaseClient)に対応するカテゴリの DEBUG レベルでログに記録されます。さらに、各実行は、デバッグを支援するために反応シーケンスにチェックポイントを登録します。

以下のセクションでは、DatabaseClient の使用例をいくつか示します。これらの例は、DatabaseClient によって公開されたすべての機能の完全なリストではありません。詳細については、付随する javadoc を参照してください。

ステートメントの実行

DatabaseClient は、ステートメントを実行する基本機能を提供します。次の例は、新しいテーブルを作成する最小限で完全に機能するコードに含める必要があるものを示しています。

  • Java

  • Kotlin

Mono<Void> completion = client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .then();
client.sql("CREATE TABLE person (id VARCHAR(255) PRIMARY KEY, name VARCHAR(255), age INTEGER);")
        .await()

DatabaseClient は、便利で流れるように使用できるように設計されています。実行仕様の各段階で中間、継続、終了メソッドを公開します。上記の例では、then() を使用して、クエリ(または SQL クエリに複数のステートメントが含まれる場合はクエリ)が完了するとすぐに完了する完了 Publisher を返します。

execute(…) は、SQL クエリ文字列またはクエリ Supplier<String> のいずれかを受け入れて、実際のクエリの作成を実行まで延期します。

クエリ (SELECT)

SQL クエリは、Row オブジェクトまたは影響を受ける行の数を通じて値を返すことができます。DatabaseClient は、発行されたクエリに応じて、更新された行の数または行自体を返すことができます。

次のクエリは、テーブルから id 列と name 列を取得します。

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person")
        .fetch().awaitSingle()

次のクエリはバインド変数を使用します。

  • Java

  • Kotlin

Mono<Map<String, Object>> first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().first();
val first = client.sql("SELECT id, name FROM person WHERE first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitSingle()

上記の例で fetch() の使用に気づいたかもしれません。fetch() は、消費するデータ量を指定できる継続演算子です。

first() を呼び出すと、結果から最初の行が返され、残りの行が破棄されます。次の演算子を使用してデータを使用できます。

  • first() は、結果全体の最初の行を返します。その Kotlin コルーチンバリアントは、null 許容でない戻り値の場合は awaitSingle() と呼ばれ、値がオプションの場合は awaitSingleOrNull() と呼ばれます。

  • one() は正確に 1 つの結果を返し、結果にさらに行が含まれている場合は失敗します。Kotlin コルーチンを使用して、正確に 1 つの値に awaitOne() を使用するか、値が null の場合は awaitOneOrNull() を使用します。

  • all() は、結果のすべての行を返します。Kotlin コルーチンを使用する場合は、flow() を使用してください。

  • rowsUpdated() は、影響を受ける行の数(INSERT/UPDATE/DELETE カウント)を返します。その Kotlin コルーチンバリアントは awaitRowsUpdated() という名前です。

さらにマッピングの詳細を指定せずに、クエリは表形式の結果を Map として返します。そのキーは、列の値にマップされる大文字と小文字を区別しない列名です。

Row ごとに呼び出される Function<Row, T> を提供することで、結果のマッピングを制御できるため、任意の値 (特異値、コレクションとマップ、オブジェクト) を返すことができます。

次の例では、name 列を抽出し、その値を出力します。

  • Java

  • Kotlin

Flux<String> names = client.sql("SELECT name FROM person")
        .map(row -> row.get("name", String.class))
        .all();
val names = client.sql("SELECT name FROM person")
        .map{ row: Row -> row.get("name", String.class) }
        .flow()

あるいは、単一の値にマッピングするためのショートカットもあります。

	Flux<String> names = client.sql("SELECT name FROM person")
			.mapValue(String.class)
			.all();

または、Bean プロパティまたはレコードコンポーネントを使用して結果オブジェクトにマップすることもできます。

	// assuming a name property on Person
	Flux<Person> persons = client.sql("SELECT name FROM person")
			.mapProperties(Person.class)
			.all();
null は ?

リレーショナルデータベースの結果には、null 値を含めることができます。Reactive Streams 仕様は、null 値の発行を禁止しています。この要件により、抽出機能での適切な null 処理が義務付けられています。Row から null 値を取得できますが、null 値を発行しないでください。null 値をオブジェクトにラップして(たとえば、特異値の場合は Optional)、抽出機能によって null 値が直接戻されないようにする必要があります。

DatabaseClient で(INSERTUPDATEDELETE)を更新

ステートメントの変更の唯一の違いは、これらのステートメントは通常、表形式のデータを返さないため、rowsUpdated() を使用して結果を消費することです。

次の例は、更新された行の数を返す UPDATE ステートメントを示しています。

  • Java

  • Kotlin

Mono<Integer> affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().rowsUpdated();
val affectedRows = client.sql("UPDATE person SET first_name = :fn")
        .bind("fn", "Joe")
        .fetch().awaitRowsUpdated()

クエリへの値のバインド

一般的なアプリケーションでは、何らかの入力に従って行を選択または更新するために、パラメーター化された SQL ステートメントが必要です。これらは通常、WHERE 句によって制約された SELECT ステートメント、または入力パラメーターを受け入れる INSERT および UPDATE ステートメントです。パラメーター化されたステートメントは、パラメーターが適切にエスケープされない場合、SQL インジェクションのリスクを負います。DatabaseClient は R2DBC の bind API を利用して、クエリパラメーターの SQL インジェクションのリスクを排除します。execute(…) 演算子を使用してパラメーター化された SQL ステートメントを提供し、パラメーターを実際の Statement にバインドできます。次に、R2DBC ドライバーは、準備されたステートメントとパラメーター置換を使用してステートメントを実行します。

パラメーターバインディングは、2 つのバインディング戦略をサポートしています。

  • インデックスによる、ゼロベースのパラメーターインデックスの使用。

  • 名前別、プレースホルダー名を使用。

次の例は、クエリのパラメーターバインドを示しています。

    db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
	    	.bind("id", "joe")
	    	.bind("name", "Joe")
			.bind("age", 34);

あるいは、名前と値のマップを渡すこともできます。

	Map<String, Object> params = new LinkedHashMap<>();
	params.put("id", "joe");
	params.put("name", "Joe");
	params.put("age", 34);
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindValues(params);

または、Bean プロパティまたはレコードコンポーネントを含むパラメーターオブジェクトを渡すこともできます。

	// assuming id, name, age properties on Person
	db.sql("INSERT INTO person (id, name, age) VALUES(:id, :name, :age)")
			.bindProperties(new Person("joe", "Joe", 34);
R2DBC ネイティブバインドマーカー

R2DBC は、実際のデータベースベンダーに依存するデータベースネイティブバインドマーカーを使用します。例として、Postgres は $1$2$n などのインデックス付きマーカーを使用します。別の例は SQL Server で、これは @ で始まる名前付きバインドマーカーを使用します。

これは、バインドマーカーとして ? を必要とする JDBC とは異なります。JDBC では、実際のドライバーはステートメント実行の一部として ? バインドマーカーをデータベースネイティブマーカーに変換します。

Spring Framework の R2DBC サポートにより、:name 構文でネイティブバインドマーカーまたは名前付きバインドマーカーを使用できます。

名前付きパラメーターのサポートでは、BindMarkersFactory インスタンスを活用して、クエリ実行時に名前付きパラメーターをネイティブバインドマーカーに展開します。これにより、さまざまなデータベースベンダー間である程度のクエリの移植性が得られます。

クエリプリプロセッサーは、名前の Collection パラメーターを一連のバインドマーカーに展開し、引数の数に基づいて動的なクエリを作成する必要をなくします。ネストされたオブジェクト配列は、(たとえば)選択リストを使用できるように拡張されています。

次のクエリを検討してください。

SELECT id, name, state FROM table WHERE (name, age) IN (('John', 35), ('Ann', 50))

上記のクエリは、次のようにパラメーター化して実行できます。

  • Java

  • Kotlin

List<Object[]> tuples = new ArrayList<>();
tuples.add(new Object[] {"John", 35});
tuples.add(new Object[] {"Ann",  50});

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples);
val tuples: MutableList<Array<Any>> = ArrayList()
tuples.add(arrayOf("John", 35))
tuples.add(arrayOf("Ann", 50))

client.sql("SELECT id, name, state FROM table WHERE (name, age) IN (:tuples)")
	    .bind("tuples", tuples)
選択リストの使用はベンダーに依存します。

次の例は、IN 述語を使用したより単純なバリアントを示しています。

  • Java

  • Kotlin

client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", Arrays.asList(35, 50));
client.sql("SELECT id, name, state FROM table WHERE age IN (:ages)")
	    .bind("ages", arrayOf(35, 50))
R2DBC 自体は、コレクションのような値をサポートしていません。それでも、上記の例で特定の List を展開すると、Spring の R2DBC サポートの名前付きパラメーターに対して機能します。上記の IN 句で使用します。ただし、配列型の列を挿入または更新するには(Postgres など)、基盤となる R2DBC ドライバーでサポートされている配列型が必要です。通常は Java 配列です。text[] 列を更新するための String[]Collection<String> などを配列パラメーターとして渡さないでください。

ステートメントフィルター

場合によっては、実際の Statement を実行する前にオプションを微調整する必要があります。これを行うには、次の例に示すように、Statement フィルター (StatementFilterFunction) を DatabaseClient に登録して、実行中のステートメントをインターセプトして変更します。

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter((s, next) -> next.execute(s.returnGeneratedValues("id")))
	    .bind("name", …)
	    .bind("state", …);
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { s: Statement, next: ExecuteFunction -> next.execute(s.returnGeneratedValues("id")) }
		.bind("name", …)
		.bind("state", …)

DatabaseClient は、Function<Statement, Statement> を受け入れる単純化された filter(…) オーバーロードも公開します。

  • Java

  • Kotlin

client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter(statement -> s.returnGeneratedValues("id"));

client.sql("SELECT id, name, state FROM table")
	    .filter(statement -> s.fetchSize(25));
client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
	    .filter { statement -> s.returnGeneratedValues("id") }

client.sql("SELECT id, name, state FROM table")
	    .filter { statement -> s.fetchSize(25) }

StatementFilterFunction の実装により、Statement のフィルタリングと Result オブジェクトのフィルタリングが可能になります。

DatabaseClient ベストプラクティス

DatabaseClient クラスのインスタンスは、一度設定されるとスレッドセーフです。これは、DatabaseClient の単一インスタンスを構成し、この共有参照を複数の DAO(またはリポジトリ)に安全に挿入できることを意味するため、重要です。DatabaseClient は、ConnectionFactory への参照を維持するという点でステートフルですが、この状態は会話状態ではありません。

DatabaseClient クラスを使用する場合の一般的な方法は、Spring 構成ファイルで ConnectionFactory を構成してから、その共有 ConnectionFactory Bean を DAO クラスに依存性注入することです。DatabaseClient は、ConnectionFactory の setter で作成されます。これにより、次のような DAO が発生します。

  • Java

  • Kotlin

public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory);
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao {

	private val databaseClient = DatabaseClient.create(connectionFactory)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}

明示的な構成の代わりに、コンポーネントスキャンと依存性注入のアノテーションサポートを使用します。この場合、クラスに @Component (コンポーネントスキャンの候補になります)でアノテーションを付け、ConnectionFactory setter メソッドに @Autowired でアノテーションを付けることができます。次の例は、その方法を示しています。

  • Java

  • Kotlin

@Component (1)
public class R2dbcCorporateEventDao implements CorporateEventDao {

	private DatabaseClient databaseClient;

	@Autowired (2)
	public void setConnectionFactory(ConnectionFactory connectionFactory) {
		this.databaseClient = DatabaseClient.create(connectionFactory); (3)
	}

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1@Component でクラスにアノテーションを付けます。
2ConnectionFactory setter メソッドに @Autowired でアノテーションを付けます。
3ConnectionFactory で新しい DatabaseClient を作成します。
@Component (1)
class R2dbcCorporateEventDao(connectionFactory: ConnectionFactory) : CorporateEventDao { (2)

	private val databaseClient = DatabaseClient(connectionFactory) (3)

	// R2DBC-backed implementations of the methods on the CorporateEventDao follow...
}
1@Component でクラスにアノテーションを付けます。
2ConnectionFactory のコンストラクターインジェクション。
3ConnectionFactory で新しい DatabaseClient を作成します。

上記のテンプレート初期化スタイルのどちらを使用する(またはしない)かに関係なく、SQL を実行するたびに DatabaseClient クラスの新しいインスタンスを作成する必要はほとんどありません。一度構成すると、DatabaseClient インスタンスはスレッドセーフになります。アプリケーションが複数のデータベースにアクセスする場合、複数の DatabaseClient インスタンスが必要になる場合があります。これには、複数の ConnectionFactory が必要であり、その後、複数の異なる構成の DatabaseClient インスタンスが必要です。

自動生成されたキーの取得

INSERT ステートメントは、自動インクリメント列または ID 列を定義するテーブルに行を挿入するときにキーを生成する場合があります。生成する列名を完全に制御するには、目的の列に対して生成されたキーをリクエストする StatementFilterFunction を登録するだけです。

  • Java

  • Kotlin

Mono<Integer> generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter(statement -> s.returnGeneratedValues("id"))
		.map(row -> row.get("id", Integer.class))
		.first();

// generatedId emits the generated key once the INSERT statement has finished
val generatedId = client.sql("INSERT INTO table (name, state) VALUES(:name, :state)")
		.filter { statement -> s.returnGeneratedValues("id") }
		.map { row -> row.get("id", Integer.class) }
		.awaitOne()

// generatedId emits the generated key once the INSERT statement has finished

データベース接続の制御

このセクションでは以下について説明します。

ConnectionFactory を使用する

Spring は、ConnectionFactory を介してデータベースへの R2DBC 接続を取得します。ConnectionFactory は R2DBC 仕様の一部であり、ドライバーの一般的なエントリポイントです。コンテナーまたはフレームワークで、接続プールとトランザクション管理の課題をアプリケーションコードから隠すことができます。開発者は、データベースへの接続方法に関する詳細を知る必要はありません。これは、ConnectionFactory を設定する管理者の責任です。コードの開発とテストを行うときに両方のロールを果たす可能性がありますが、本番データソースがどのように構成されているかを必ずしも知っている必要はありません。

Spring の R2DBC レイヤーを使用すると、サードパーティが提供する接続プールの実装を使用して独自のレイヤーを構成できます。一般的な実装は R2DBC プール (r2dbc-pool) です。Spring ディストリビューションでの実装は、テストのみを目的としており、プーリングは提供しません。

ConnectionFactory を構成するには:

  1. 通常は R2DBC ConnectionFactory を取得するため、ConnectionFactory との接続を取得します。

  2. R2DBC URL を提供します(正しい値については、ドライバーのドキュメントを参照してください)。

次の例は、ConnectionFactory を構成する方法を示しています。

  • Java

  • Kotlin

ConnectionFactory factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");
val factory = ConnectionFactories.get("r2dbc:h2:mem:///test?options=DB_CLOSE_DELAY=-1;DB_CLOSE_ON_EXIT=FALSE");

ConnectionFactoryUtils を使用する

ConnectionFactoryUtils クラスは、ConnectionFactory から接続を取得し、(必要に応じて)接続を閉じるための static メソッドを提供する便利で強力なヘルパークラスです。

サブスクライバー Context -bound 接続、たとえば R2dbcTransactionManager をサポートします。

SingleConnectionFactory を使用する

SingleConnectionFactory クラスは、使用するたびに閉じられない単一の Connection をラップする DelegatingConnectionFactory インターフェースの実装です。

(永続化ツールを使用する場合のように)プールされた接続を想定してクライアントコードが close を呼び出す場合は、suppressClose プロパティを true に設定する必要があります。この設定は、物理接続をラップするクローズ抑制プロキシを返します。これをネイティブ Connection または同様のオブジェクトにキャストすることはできなくなったことに注意してください。

SingleConnectionFactory は主にテストクラスであり、R2DBC ドライバーで使用が許可されている場合は、パイプライン処理などの特定の要件に使用できます。プールされた ConnectionFactory とは対照的に、常に同じ接続を再利用し、物理接続の過度の作成を回避します。

TransactionAwareConnectionFactoryProxy を使用する

TransactionAwareConnectionFactoryProxy は、ターゲット ConnectionFactory のプロキシです。プロキシは、ConnectionFactory をターゲットとしてラップし、Spring が管理するトランザクションの認識を追加します。

Spring の R2DBC サポートと統合されていない R2DBC クライアントを使用する場合は、このクラスを使用する必要があります。この場合でも、このクライアントを使用すると同時に、このクライアントを Spring 管理対象トランザクションに参加させることができます。リソース管理のために、R2DBC クライアントを ConnectionFactoryUtils への適切なアクセスと統合することが一般的に望ましいです。

詳細については、TransactionAwareConnectionFactoryProxy javadoc を参照してください。

R2dbcTransactionManager を使用する

R2dbcTransactionManager クラスは、単一の R2DBC ConnectionFactory の ReactiveTransactionManager 実装です。これは、指定された ConnectionFactory からの R2DBC Connection をサブスクライバー Context にバインドし、潜在的に ConnectionFactory ごとに 1 つのサブスクライバー Connection を許可します。

R2DBC の標準 ConnectionFactory.create() ではなく、R2DBC Connection から ConnectionFactoryUtils.getConnection(ConnectionFactory) を取得するには、アプリケーションコードが必要です。すべてのフレームワーククラス ( DatabaseClient など) はこの戦略を暗黙的に使用します。トランザクションマネージャーで使用しない場合、検索戦略は ConnectionFactory.create() とまったく同じように動作するため、どのような場合でも使用できます。