Kotlin サポート
フレームワークも関数の Kotlin ラムダをサポートするように改善されているため、Kotlin 言語と Spring Integration フロー定義の組み合わせを使用できるようになりました。
@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
return { it.toUpperCase() }
}
@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
return { print(it) }
}
@Bean
@InboundChannelAdapter(value = "counterChannel",
poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
return { "baz" }
}
Kotlin コルーチン
バージョン 6.0 以降、Spring Integration は Kotlin コルーチン (英語) のサポートを提供します。suspend
関数と kotlinx.coroutines.Deferred
および kotlinx.coroutines.flow.Flow
の戻り値の型をサービスメソッドに使用できるようになりました。
@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()
@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
flow {
for (i in 1..3) {
emit("$payload #$i")
}
}
フレームワークはそれらを Reactive Streams 相互作用として扱い、ReactiveAdapterRegistry
を使用してそれぞれの Mono
および Flux
リアクター型に変換します。このような関数応答は、それが ReactiveStreamsSubscribableChannel
の場合は応答チャネルで処理されるか、それぞれのコールバックで CompletableFuture
の結果として処理されます。
Flow の結果を伴う関数は、@ServiceActivator のデフォルトでは async ではないため、Flow インスタンスが応答メッセージペイロードとして生成されます。このオブジェクトをコルーチンとして処理するか、Flux に変換するのは、ターゲットアプリケーションのロールです。 |
@MessagingGateway
インターフェースメソッドは、Kotlin で宣言するときに suspend
修飾子でマークすることもできます。フレームワークは、Mono
を内部で使用して、ダウンストリームフローを使用してリクエストと応答を実行します。このような Mono
の結果は、MonoKt.awaitSingleOrNull()
API によって内部的に処理され、ゲートウェイの呼び出された suspend
関数の kotlin.coroutines.Continuation
引数を満たします。
@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {
suspend fun suspendGateway(payload: String): String
}
このメソッドは、Kotlin 言語の要件に従って、コルーチンとして呼び出す必要があります。
@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway
fun someServiceMethod() {
runBlocking {
val reply = suspendFunGateway.suspendGateway("test suspend gateway")
}
}