reactive() エンドポイント

バージョン 5.5 以降、ConsumerEndpointSpec は、オプションのカスタマイザー Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> を備えた reactive() 構成プロパティを提供します。このオプションは、IntegrationReactiveUtils.messageChannelToFlux() を介して Flux に変換される入力チャネル型とは関係なく、ターゲットエンドポイントを ReactiveStreamsConsumer インスタンスとして構成します。提供された関数は、Flux.transform() オペレーターから使用され、入力チャネルからのリアクティブストリームソースをカスタマイズします(publishOn()log()doOnNext() など)。

次の例は、最終的なサブスクライバーとプロデューサーに関係なく、公開スレッドを入力チャネルからその DirectChannel に変更する方法を示しています。

@Bean
public IntegrationFlow reactiveEndpointFlow() {
    return IntegrationFlow
            .from("inputChannel")
            .transformWith(t -> t
                              .<String, Integer>transformer(Integer::parseInt)
                              .reactive(flux -> flux.publishOn(Schedulers.parallel()))
            )
            .get();
}

詳細については、Reactive Streams サポートを参照してください。