コルーチン

Kotlin コルーチン (英語) は、Kotlin 軽量スレッドであり、ノンブロッキングコードを命令的な方法で記述することができます。言語側では、サスペンド関数は非同期操作の抽象化を提供し、ライブラリ側では kotlinx.coroutines [GitHub] (英語) async { } (英語) などの関数や Flow (英語) などの型を提供します。

Spring Framework は、次のスコープでコルーチンのサポートを提供します。

依存関係

kotlinx-coroutines-core および kotlinx-coroutines-reactor の依存関係がクラスパスにある場合、コルーチンのサポートが有効になります。

build.gradle.kts

dependencies {

	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
	implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
}

バージョン 1.4.0 以上がサポートされています。

Reactive はコルーチンにどのように変換されますか?

戻り値の場合、Reactive API から Coroutines API への変換は次のとおりです。

  • fun handler(): Mono<Void> は suspend fun handler() になります

  • fun handler(): Mono<T> は、Mono を空にできるかどうかに応じて、suspend fun handler(): T または suspend fun handler(): T? になります。(より静的に型付けされるという利点がある)

  • fun handler(): Flux<T> は fun handler(): Flow<T> になります

入力パラメーターの場合:

  • 遅延が必要ない場合、値パラメーターを取得するために中断関数を呼び出すことができるため、fun handler(mono: Mono<T>) は fun handler(value: T) になります。

  • 怠 laz が必要な場合、fun handler(mono: Mono<T>) は fun handler(supplier: suspend () → T) または fun handler(supplier: suspend () → T?) になります

Flow (英語) は、コルーチンの世界で Flux に相当し、ホットストリームまたはコールドストリーム、有限ストリームまたは無限ストリームに適していますが、主な違いは次のとおりです。

コルーチンと並行してコードを実行する方法など、詳細については、Spring、コルーチン、Kotlin フローで反応する (英語) に関するこのブログ投稿を参照してください。

コントローラー

コルーチン @RestController の例を次に示します。

@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {

	@GetMapping("/suspend")
	suspend fun suspendingEndpoint(): Banner {
		delay(10)
		return banner
	}

	@GetMapping("/flow")
	fun flowEndpoint() = flow {
		delay(10)
		emit(banner)
		delay(10)
		emit(banner)
	}

	@GetMapping("/deferred")
	fun deferredEndpoint() = GlobalScope.async {
		delay(10)
		banner
	}

	@GetMapping("/sequential")
	suspend fun sequential(): List<Banner> {
		val banner1 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		val banner2 = client
				.get()
				.uri("/suspend")
				.accept(MediaType.APPLICATION_JSON)
				.awaitExchange()
				.awaitBody<Banner>()
		return listOf(banner1, banner2)
	}

	@GetMapping("/parallel")
	suspend fun parallel(): List<Banner> = coroutineScope {
		val deferredBanner1: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		val deferredBanner2: Deferred<Banner> = async {
			client
					.get()
					.uri("/suspend")
					.accept(MediaType.APPLICATION_JSON)
					.awaitExchange()
					.awaitBody<Banner>()
		}
		listOf(deferredBanner1.await(), deferredBanner2.await())
	}

	@GetMapping("/error")
	suspend fun error() {
		throw IllegalStateException()
	}

	@GetMapping("/cancel")
	suspend fun cancel() {
		throw CancellationException()
	}

}

@Controller によるビューレンダリングもサポートされています。

@Controller
class CoroutinesViewController(banner: Banner) {

	@GetMapping("/")
	suspend fun render(model: Model): String {
		delay(10)
		model["banner"] = banner
		return "index"
	}
}

WebFlux.fn

以下は、coRouter { } (英語) DSL および関連ハンドラーを介して定義されたコルーチンルーターの例です。

@Configuration
class RouterConfiguration {

	@Bean
	fun mainRouter(userHandler: UserHandler) = coRouter {
		GET("/", userHandler::listView)
		GET("/api/user", userHandler::listApi)
	}
}
class UserHandler(builder: WebClient.Builder) {

	private val client = builder.baseUrl("...").build()

	suspend fun listView(request: ServerRequest): ServerResponse =
			ServerResponse.ok().renderAndAwait("users", mapOf("users" to
			client.get().uri("...").awaitExchange().awaitBody<User>()))

	suspend fun listApi(request: ServerRequest): ServerResponse =
				ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
				client.get().uri("...").awaitExchange().awaitBody<User>())
}

トランザクション

コルーチンでのトランザクションは、Spring Framework 5.2 以降で提供されるリアクティブトランザクション管理のプログラムによるバリアントを介してサポートされます。

機能を中断するために、TransactionalOperator.executeAndAwait 拡張機能が提供されています。

import org.springframework.transaction.reactive.executeAndAwait

class PersonRepository(private val operator: TransactionalOperator) {

    suspend fun initDatabase() = operator.executeAndAwait {
        insertPerson1()
        insertPerson2()
    }

    private suspend fun insertPerson1() {
        // INSERT SQL statement
    }

    private suspend fun insertPerson2() {
        // INSERT SQL statement
    }
}

Kotlin Flow の場合、Flow<T>.transactional 拡張が提供されます。

import org.springframework.transaction.reactive.transactional

class PersonRepository(private val operator: TransactionalOperator) {

    fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)

    private fun findPeople(): Flow<Person> {
        // SELECT SQL statement
    }

    private suspend fun updatePerson(person: Person): Person {
        // UPDATE SQL statement
    }
}