reactive()
エンドポイント
バージョン 5.5 以降、ConsumerEndpointSpec
は、オプションのカスタマイザー Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
を備えた reactive()
構成プロパティを提供します。このオプションは、IntegrationReactiveUtils.messageChannelToFlux()
を介して Flux
に変換される入力チャネル型とは関係なく、ターゲットエンドポイントを ReactiveStreamsConsumer
インスタンスとして構成します。提供された関数は、Flux.transform()
オペレーターから使用され、入力チャネルからのリアクティブストリームソースをカスタマイズします(publishOn()
、log()
、doOnNext()
など)。
次の例は、最終的なサブスクライバーとプロデューサーに関係なく、公開スレッドを入力チャネルからその DirectChannel
に変更する方法を示しています。
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}
詳細については、Reactive Streams サポートを参照してください。