Kotlin サポート

フレームワークも関数の Kotlin ラムダをサポートするように改善されているため、Kotlin 言語と Spring Integration フロー定義の組み合わせを使用できるようになりました。

@Bean
@Transformer(inputChannel = "functionServiceChannel")
fun kotlinFunction(): (String) -> String {
    return { it.toUpperCase() }
}

@Bean
@ServiceActivator(inputChannel = "messageConsumerServiceChannel")
fun kotlinConsumer(): (Message<Any>) -> Unit {
    return { print(it) }
}

@Bean
@InboundChannelAdapter(value = "counterChannel",
        poller = Poller(fixedRate = "10", maxMessagesPerPoll = "1"))
fun kotlinSupplier(): () -> String {
    return { "baz" }
}

Kotlin コルーチン

バージョン 6.0 以降、Spring Integration は Kotlin コルーチン (英語) のサポートを提供します。suspend 関数と kotlinx.coroutines.Deferred および kotlinx.coroutines.flow.Flow の戻り値の型をサービスメソッドに使用できるようになりました。

@ServiceActivator(inputChannel = "suspendServiceChannel", outputChannel = "resultChannel")
suspend fun suspendServiceFunction(payload: String) = payload.uppercase()

@ServiceActivator(inputChannel = "flowServiceChannel", outputChannel = "resultChannel", async = "true")
fun flowServiceFunction(payload: String) =
    flow {
        for (i in 1..3) {
            emit("$payload #$i")
        }
    }

フレームワークはそれらを Reactive Streams 相互作用として扱い、ReactiveAdapterRegistry を使用してそれぞれの Mono および Flux リアクター型に変換します。このような関数応答は、それが ReactiveStreamsSubscribableChannel の場合は応答チャネルで処理されるか、それぞれのコールバックで CompletableFuture の結果として処理されます。

Flow の結果を伴う関数は、@ServiceActivator のデフォルトでは async ではないため、Flow インスタンスが応答メッセージペイロードとして生成されます。このオブジェクトをコルーチンとして処理するか、Flux に変換するのは、ターゲットアプリケーションのロールです。

@MessagingGateway インターフェースメソッドは、Kotlin で宣言するときに suspend 修飾子でマークすることもできます。フレームワークは、Mono を内部で使用して、ダウンストリームフローを使用してリクエストと応答を実行します。このような Mono の結果は、MonoKt.awaitSingleOrNull() API によって内部的に処理され、ゲートウェイの呼び出された suspend 関数の kotlin.coroutines.Continuation 引数を満たします。

@MessagingGateway(defaultRequestChannel = "suspendRequestChannel")
interface SuspendFunGateway {

    suspend fun suspendGateway(payload: String): String

}

このメソッドは、Kotlin 言語の要件に従って、コルーチンとして呼び出す必要があります。

@Autowired
private lateinit var suspendFunGateway: SuspendFunGateway

fun someServiceMethod() {
    runBlocking {
        val reply = suspendFunGateway.suspendGateway("test suspend gateway")
    }
}