コルーチン
Kotlin コルーチン (英語) は、Kotlin 軽量スレッドであり、ノンブロッキングコードを命令的な方法で記述することができます。言語側では、サスペンド関数は非同期操作の抽象化を提供し、ライブラリ側では kotlinx.coroutines [GitHub] (英語) は async { } (英語) などの関数や Flow (英語) などの型を提供します。
Spring Framework は、次のスコープでコルーチンのサポートを提供します。
Spring MVC および WebFlux アノテーション付き
@Controllerでの据え置きおよびフロー (英語) 戻り値のサポートSpring MVC および WebFlux アノテーション付き
@Controllerでの一時停止機能のサポートWebFlux クライアント (英語) およびサーバー (英語) 関数 API の拡張。
WebFlux.fn coRouter { } (英語) DSL
WebFlux
CoWebFilter(英語)RSocket
@MessageMappingアノテーション付きメソッドでの中断機能とFlowサポートRSocketRequester(英語) の拡張機能
依存関係
kotlinx-coroutines-core および kotlinx-coroutines-reactor の依存関係がクラスパスにある場合、コルーチンのサポートが有効になります。
build.gradle.kts
dependencies {
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:${coroutinesVersion}")
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactor:${coroutinesVersion}")
} バージョン 1.4.0 以上がサポートされています。
Reactive はコルーチンにどのように変換されますか?
戻り値の場合、Reactive API から Coroutines API への変換は次のとおりです。
fun handler(): Mono<Void>はsuspend fun handler()になりますfun handler(): Mono<T>は、Monoを空にできるかどうかに応じて、suspend fun handler(): Tまたはsuspend fun handler(): T?になります。(より静的に型付けされるという利点がある)fun handler(): Flux<T>はfun handler(): Flow<T>になります
入力パラメーターの場合:
遅延が必要ない場合、値パラメーターを取得するために中断関数を呼び出すことができるため、
fun handler(mono: Mono<T>)はfun handler(value: T)になります。怠 laz が必要な場合、
fun handler(mono: Mono<T>)はfun handler(supplier: suspend () → T)またはfun handler(supplier: suspend () → T?)になります
Flow (英語) は、コルーチンの世界で Flux に相当し、ホットストリームまたはコールドストリーム、有限ストリームまたは無限ストリームに適していますが、主な違いは次のとおりです。
Flowはプッシュベースで、Fluxはプッシュプルハイブリッドですバックプレッシャーは、サスペンド機能を介して実装されます
Flowには単一の中断collectメソッド (英語) のみがあり、オペレーターは拡張機能 (英語) として実装されますコルーチンのおかげでオペレーターは簡単に実装できます [GitHub] (英語)
拡張により、
Flowにカスタム演算子を追加できます収集操作は機能を中断しています
mapオペレーター (英語) は、中断機能パラメーターをとるため、非同期操作をサポートします(flatMapは不要)。
コルーチンと並行してコードを実行する方法など、詳細については、Spring、コルーチン、Kotlin フローで反応する (英語) に関するこのブログ投稿を参照してください。
コントローラー
コルーチン @RestController の例を次に示します。
@RestController
class CoroutinesRestController(client: WebClient, banner: Banner) {
@GetMapping("/suspend")
suspend fun suspendingEndpoint(): Banner {
delay(10)
return banner
}
@GetMapping("/flow")
fun flowEndpoint() = flow {
delay(10)
emit(banner)
delay(10)
emit(banner)
}
@GetMapping("/deferred")
fun deferredEndpoint() = GlobalScope.async {
delay(10)
banner
}
@GetMapping("/sequential")
suspend fun sequential(): List<Banner> {
val banner1 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
val banner2 = client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
return listOf(banner1, banner2)
}
@GetMapping("/parallel")
suspend fun parallel(): List<Banner> = coroutineScope {
val deferredBanner1: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
val deferredBanner2: Deferred<Banner> = async {
client
.get()
.uri("/suspend")
.accept(MediaType.APPLICATION_JSON)
.awaitExchange()
.awaitBody<Banner>()
}
listOf(deferredBanner1.await(), deferredBanner2.await())
}
@GetMapping("/error")
suspend fun error() {
throw IllegalStateException()
}
@GetMapping("/cancel")
suspend fun cancel() {
throw CancellationException()
}
}@Controller によるビューレンダリングもサポートされています。
@Controller
class CoroutinesViewController(banner: Banner) {
@GetMapping("/")
suspend fun render(model: Model): String {
delay(10)
model["banner"] = banner
return "index"
}
}WebFlux.fn
以下は、coRouter { } (英語) DSL および関連ハンドラーを介して定義されたコルーチンルーターの例です。
@Configuration
class RouterConfiguration {
@Bean
fun mainRouter(userHandler: UserHandler) = coRouter {
GET("/", userHandler::listView)
GET("/api/user", userHandler::listApi)
}
}class UserHandler(builder: WebClient.Builder) {
private val client = builder.baseUrl("...").build()
suspend fun listView(request: ServerRequest): ServerResponse =
ServerResponse.ok().renderAndAwait("users", mapOf("users" to
client.get().uri("...").awaitExchange().awaitBody<User>()))
suspend fun listApi(request: ServerRequest): ServerResponse =
ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).bodyAndAwait(
client.get().uri("...").awaitExchange().awaitBody<User>())
}トランザクション
コルーチン上のトランザクションは、リアクティブトランザクション管理のプログラムバリアントを介してサポートされます。
機能を中断するために、TransactionalOperator.executeAndAwait 拡張機能が提供されています。
import org.springframework.transaction.reactive.executeAndAwait
class PersonRepository(private val operator: TransactionalOperator) {
suspend fun initDatabase() = operator.executeAndAwait {
insertPerson1()
insertPerson2()
}
private suspend fun insertPerson1() {
// INSERT SQL statement
}
private suspend fun insertPerson2() {
// INSERT SQL statement
}
}Kotlin Flow の場合、Flow<T>.transactional 拡張が提供されます。
import org.springframework.transaction.reactive.transactional
class PersonRepository(private val operator: TransactionalOperator) {
fun updatePeople() = findPeople().map(::updatePerson).transactional(operator)
private fun findPeople(): Flow<Person> {
// SELECT SQL statement
}
private suspend fun updatePerson(person: Person): Person {
// UPDATE SQL statement
}
}コンテキストの伝播
Spring アプリケーションは、可観測性サポートのために Micrometer でインストルメントされています。トレーシングサポートのために、現在の観測情報は、ブロッキングコードの場合は ThreadLocal、リアクティブパイプラインの場合は Reactor の Context を介して伝播されます。ただし、現在の観測情報は、中断された関数の実行コンテキストでも利用可能である必要があります。そうしないと、コルーチンからログに記録されるステートメントの先頭に現在の "traceId" が自動的に追加されません。
PropagationContextElement (英語) 演算子は通常、Micrometer コンテキスト伝播ライブラリ (英語) が Kotlin コルーチンで動作することを保証します。
io.micrometer:context-propagation 依存関係と、オプションで org.jetbrains.kotlinx:kotlinx-coroutines-reactor 依存関係が必要です。CoroutinesUtils#invokeSuspendingFunction (Spring がコルーチンを Reactor、Flux、Mono に適合させるために使用)による自動コンテキスト伝播は、Hooks.enableAutomaticContextPropagation() を呼び出すことで有効にできます。
Applications can also use PropagationContextElement explicitly to augment the CoroutineContext with the context propagation mechanism:
fun main() {
runBlocking(Dispatchers.IO + PropagationContextElement()) {
waitAndLog()
}
}
suspend fun waitAndLog() {
delay(10)
logger.info("Suspending function with traceId")
}ここで、Micrometer トレースが構成されていると仮定すると、結果のログステートメントには現在の "traceId" が表示され、アプリケーションの監視性が向上します。