最新の安定バージョンについては、Spring Integration 6.5.3 を使用してください! |
reactive() エンドポイント
バージョン 5.5 以降、ConsumerEndpointSpec は、オプションのカスタマイザー Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> を備えた reactive() 構成プロパティを提供します。このオプションは、IntegrationReactiveUtils.messageChannelToFlux() を介して Flux に変換される入力チャネル型とは関係なく、ターゲットエンドポイントを ReactiveStreamsConsumer インスタンスとして構成します。提供された関数は、Flux.transform() オペレーターから使用され、入力チャネルからのリアクティブストリームソースをカスタマイズします(publishOn()、log()、doOnNext() など)。
次の例は、最終的なサブスクライバーとプロデューサーに関係なく、公開スレッドを入力チャネルからその DirectChannel に変更する方法を示しています。
@Bean
public IntegrationFlow reactiveEndpointFlow() {
return IntegrationFlow
.from("inputChannel")
.transformWith(t -> t
.<String, Integer>transformer(Integer::parseInt)
.reactive(flux -> flux.publishOn(Schedulers.parallel()))
)
.get();
}詳細については、Reactive Streams サポートを参照してください。