Reactive Streams サポート

Spring Integration は、フレームワークのいくつかの場所で、さまざまな側面から Reactive Streams (英語) 対話のサポートを提供します。ここでは、必要に応じて、詳細については対象の章への適切なリンクを使用して、それらのほとんどについて説明します。

序文

要約すると、Spring Integration は Spring プログラミングモデルを継承して、よく知られているエンタープライズ統合パターンをサポートします。Spring Integration は、Spring ベースのアプリケーション内での軽量メッセージングを可能にし、宣言型アダプターを介した外部システムとの統合をサポートします。Spring Integration の主なゴールは、保守可能でテスト可能なコードの作成に不可欠な関心の分離を維持しながら、エンタープライズ統合ソリューションを構築するためのシンプルなモデルを提供することです。このゴールは、messagechannelendpoint のような第一級オブジェクトを使用するターゲットアプリケーションで達成されます。これにより、統合フロー(パイプライン)を構築できます。統合フロー(パイプライン)では、(ほとんどの場合)1 つのエンドポイントがメッセージをチャネルに生成して別のエンドポイントで消費します。このようにして、統合相互作用モデルをターゲットのビジネスロジックと区別します。ここで重要な部分は、その間のチャネルです。フローの動作は、エンドポイントを変更せずに実装に依存します。

一方、Reactive Streams は、ノンブロッキングバックプレッシャーを使用した非同期ストリーム処理の標準です。Reactive Streams の主なゴールは、受信側が任意の量のデータをバッファリングすることを強制されないようにしながら、非同期境界を越えたストリームデータの交換を管理することです(要素を別のスレッドまたはスレッドプールに渡すなど)。言い換えると、スレッド間を仲介するキューを制限できるようにするために、バックプレッシャはこのモデルの不可欠な部分です。プロジェクト Reactor (英語) などの Reactive Streams 実装の目的は、ストリームアプリケーションの処理グラフ全体にわたってこれらの利点と特性を維持することです。Reactive Streams ライブラリの最終的なゴールは、利用可能なプログラミング言語構造で可能な限り透過的かつスムーズなメソッドで、ターゲットアプリケーションの型、演算子のセット、サポート API を提供することですが、最終的なソリューションは通常の場合ほど必須ではありません。関数チェーンの呼び出し。それはフェーズに分けられます: 定義と実行は、最終的なリアクティブパブリッシャーへのサブスクリプション中にしばらくしてから発生し、データの需要は定義の下部から上部にプッシュされ、必要に応じてバックプレッシャーを適用します。多くをリクエストします。現時点で処理できるイベント。リアクティブアプリケーションは、"stream" のように見えるか、Spring Integration 用語で慣れているように - "flow" です。実際、Java 9 以降の Reactive Streams SPI は、java.util.concurrent.Flow クラスで提供されます。

ここから、エンドポイントにいくつかのリアクティブフレームワークオペレーターを適用する場合、Spring Integration フローは Reactive Streams アプリケーションの作成に本当に適しているように見えますが、実際には問題がはるかに広く、すべてのエンドポイント(JdbcMessageHandler など)ではないことに注意する必要があります。透過的に反応ストリームで処理できます。もちろん、Spring Integration での Reactive Streams サポートの主なゴールは、プロセス全体を完全に反応させ、要求に応じて開始し、バックプレッシャーに対応できるようにすることです。チャネルアダプターのターゲットプロトコルとシステムが Reactive Streams 対話モデルを提供するまで、これは不可能です。以下のセクションでは、統合フロー構造を保持するリアクティブ型アプリケーションを開発するために Spring Integration で提供されるコンポーネントとアプローチについて説明します。

Spring Integration のすべての Reactive Streams 相互作用は、Mono や Flux などのプロジェクト Reactor (英語) 型で実装されています。

メッセージングゲートウェイ

Reactive Streams との対話の最も単純なポイントは、@MessagingGateway であり、ゲートウェイメソッドの戻り型を Mono<?> として作成します。返された Mono インスタンスでサブスクリプションが発生すると、ゲートウェイメソッド呼び出しの背後にある統合フロー全体が実行されます。詳細については、Reactor Mono を参照してください。同様の Mono -reply アプローチは、Reactive Streams 互換プロトコルに完全に基づいている受信ゲートウェイのフレームワークで内部的に使用されます(詳細については、以下のリアクティブチャネルアダプターを参照してください)。送受信操作は、replyChannel ヘッダーからの応答評価が使用可能な場合は常にそれを連鎖させて、Mono.deffer() にラップされます。このように、特定のリアクティブプロトコル(Netty など)の受信コンポーネントは、Spring Integration で実行されるリアクティブフローのサブスクライバーおよびイニシエーターになります。リクエストペイロードがリアクティブ型である場合、プロセスをイニシエーターサブスクリプションに延期するリアクティブストリーム定義を使用してそれを処理することをお勧めします。この目的のために、ハンドラーメソッドもリアクティブ型を返す必要があります。詳細については、次のセクションを参照してください。

リアクティブ応答ペイロード

応答を生成する MessageHandler が応答メッセージのためのリアクティブ型ペイロードを返すとき、outputChannel のために提供される通常の MessageChannel 実装と非同期の方法で処理され、出力チャネルが ReactiveStreamsSubscribableChannel 実装、たとえば FluxMessageChannel では、オンデマンドサブスクリプションで平坦化されます。標準命令 MessageChannel ユースケースを使用し、応答ペイロードが複数値パブリッシャー (詳細については、ReactiveAdapter.isMultiValue() を参照) の場合は、Mono.just() にラップされます。この結果、Mono は明示的にダウンストリームにサブスクライブするか、FluxMessageChannel ダウンストリームによってフラット化する必要があります。outputChannel 用の ReactiveStreamsSubscribableChannel を使用すれば、戻り値の型やサブスクリプションについて心配する必要はありません。すべてがフレームワーク内部でスムーズに処理されます。

詳細については、非同期サービスアクティベーターを参照してください。

FluxMessageChannel および ReactiveStreamsConsumer

FluxMessageChannel は、MessageChannel と Publisher<Message<?>> を組み合わせた実装です。ホットソースとしての Flux は、send() 実装からの受信メッセージをシンクするために内部的に作成されます。Publisher.subscribe() 実装は、その内部 Flux に委譲されます。また、オンデマンドのアップストリーム消費のために、FluxMessageChannel は ReactiveStreamsSubscribableChannel 契約の実装を提供します。このチャネルに提供されるアップストリーム Publisher (たとえば、以下のソースポーリングチャネルアダプターとスプリッタを参照)は、このチャネルのサブスクリプションの準備ができたときに自動サブスクライブされます。この委譲パブリッシャーからのイベントは、上記の内部 Flux にシンクされます。

FluxMessageChannel のコンシューマーは、Reactive Streams 契約を尊重するための org.reactivestreams.Subscriber インスタンスである必要があります。幸い、Spring Integration のすべての MessageHandler 実装は、プロジェクト Reactor の CoreSubscriber も実装します。そして、その間の ReactiveStreamsConsumer 実装のおかげで、統合フロー構成全体がターゲット開発者に透過的に残されます。この場合、フローの動作は命令型プッシュモデルからリアクティブ型プルモデルに変更されます。ReactiveStreamsConsumer を使用して、IntegrationReactiveUtils を使用して任意の MessageChannel をリアクティブソースにして、積分フローを部分的にリアクティブにすることもできます。

詳細については、FluxMessageChannel を参照してください。

バージョン 5.5 以降、ConsumerEndpointSpec は reactive() オプションを導入して、フロー内のエンドポイントを入力チャネルとは独立した ReactiveStreamsConsumer として作成します。オプションの Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> を提供して、Flux.transform() 操作を介して入力チャネルからソース Flux をカスタマイズできます (例: publishOn()doOnNext()retry() )。この機能は、reactive() 属性を介して、すべてのメッセージングアノテーション(@ServiceActivator@Splitter など)の @Reactive サブアノテーションとして表されます。

ソースポーリングチャネルアダプター

通常、SourcePollingChannelAdapter は TaskScheduler によって開始されるタスクに依存しています。ポーリングトリガーは、提供されたオプションから構築され、データまたはイベントのターゲットソースをポーリングするタスクを定期的にスケジュールするために使用されます。outputChannel が ReactiveStreamsSubscribableChannel である場合、同じ Trigger が次回の実行時間を決定するために使用されますが、タスクをスケジュールする代わりに、SourcePollingChannelAdapter は nextExecutionTime 値の Flux.generate() と前のステップからの期間の Mono.delay() に基づいて Flux<Message<?>> を作成します。次に Flux.flatMapMany() を使用して maxMessagesPerPoll をポーリングし、出力 Flux にシンクします。このジェネレーター Flux は、提供された ReactiveStreamsSubscribableChannel によってサブスクライブされ、バックプレッシャーダウンストリームを尊重します。バージョン 5.5 以降、maxMessagesPerPoll == 0 の場合、ソースはまったく呼び出されず、maxMessagesPerPoll が後でゼロ以外の値に変更されるまで、flatMapMany() は Mono.empty() の結果を介してすぐに完了します。コントロールバス経由。このようにして、任意の MessageSource 実装をリアクティブホットソースに変えることができます。

詳細については、ポーリングコンシューマーを参照してください。

イベント駆動型チャネルアダプター

MessageProducerSupport は、イベント駆動型チャネルアダプターの基本クラスであり、通常、その sendMessage(Message<?>) は、生成するドライバー API のリスナーコールバックとして使用されます。このコールバックは、メッセージプロデューサー実装がリスナーベースの機能の代わりにメッセージの Flux を構築するときに、doOnNext() Reactor オペレーターに簡単にプラグインすることもできます。実際、これはメッセージプロデューサーの outputChannel が ReactiveStreamsSubscribableChannel ではない場合にフレームワークで行われます。ただし、エンドユーザーエクスペリエンスを改善し、より多くのバックプレッシャー対応機能を可能にするために、MessageProducerSupport は、Publisher<Message<?>>> がターゲットシステムからのデータのソースである場合にターゲット実装で使用される subscribeToPublisher(Publisher<? extends Message<?>>) API を提供します。通常、これは、ターゲットドライバー API がソースデータの Publisher に対して呼び出されるときに、doStart() 実装から使用されます。オンデマンドサブスクリプションとダウンストリームのイベント消費のために、outputChannel として FluxMessageChannel とリアクティブ MessageProducerSupport 実装を組み合わせることをお勧めします。Publisher へのサブスクリプションがキャンセルされると、チャネルアダプターは停止状態になります。このようなチャネルアダプターで stop() を呼び出すと、ソース Publisher からの生成が完了します。チャネルアダプターは、新しく作成されたソース Publisher への自動サブスクリプションで再起動できます。

Reactive Streams へのメッセージソース

バージョン 5.3 以降、ReactiveMessageSourceProducer が提供されます。これは、提供された MessageSource とイベント駆動型の本番を組み合わせて構成された outputChannel にします。内部的には、MessageSource を繰り返し再サブスクライブされた Mono にラップし、上記の subscribeToPublisher(Publisher<? extends Message<?>>) でサブスクライブされる Flux<Message<?>> を生成します。この Mono のサブスクリプションは、ターゲット MessageSource でブロックされる可能性を回避するために、Schedulers.boundedElastic() を使用して実行されます。メッセージソースが null (プルするデータがない)を返すと、Mono は repeatWhenEmpty() 状態になり、サブスクライバーコンテキストからの IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEYDuration エントリに基づいて後続の再サブスクリプションのために delay が付けられます。デフォルトでは 1 秒です。MessageSource がヘッダーに IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 情報を含むメッセージを生成する場合、元の Mono の doOnSuccess() で確認され(必要な場合)、ダウンストリームフローが拒否するメッセージを含む MessagingException をスローすると、doOnError() で拒否されます。この ReactiveMessageSourceProducer は、ポーリングチャネルアダプターの機能を既存の MessageSource<?> 実装のリアクティブなオンデマンドソリューションに変える必要がある場合のあらゆるユースケースに使用できます。

スプリッターとアグリゲーター

AbstractMessageSplitter がそのロジック用の Publisher を取得すると、プロセスは Publisher 内のアイテムを自然に処理し、outputChannel に送信するためのメッセージにマップします。このチャネルが ReactiveStreamsSubscribableChannel の場合、Publisher の Flux ラッパーはオンデマンドでそのチャネルからサブスクライブされ、このスプリッター動作は、受信イベントを複数値の出力 Publisher にマップするとき、flatMap Reactor オペレーターに似ています。統合フロー全体がスプリッターの前後に FluxMessageChannel を使用して構築され、Spring Integration 構成を Reactive Streams 要件およびそのオペレーターのイベント処理に合わせて調整することが最も理にかなっています。通常のチャネルでは、Publisher は Iterable に変換され、標準の反復および生成の分割ロジックに使用されます。

FluxAggregatorMessageHandler は、特定の Reactive Streams ロジック実装の別のサンプルであり、プロジェクト Reactor に関しては "reactive operator" として扱うことができます。これは、Flux.groupBy() および Flux.window() (または buffer())演算子に基づいています。受信メッセージは、FluxAggregatorMessageHandler が作成されたときに開始された Flux.create() に埋め込まれ、ホットソースになります。この Flux は、オンデマンドで ReactiveStreamsSubscribableChannel によってサブスクライブされるか、outputChannel がリアクティブでない場合は FluxAggregatorMessageHandler.start() で直接サブスクライブされます。この MessageHandler は、統合フロー全体がこのコンポーネントの前後に FluxMessageChannel を使用して構築されている場合に威力を発揮し、ロジック全体のバックプレッシャーを準備します。

詳細については、ストリームとフラックスの分割および Flux Aggregator を参照してください。

Java DSL

Java DSL の IntegrationFlow は、任意の Publisher インスタンスから開始できます(IntegrationFlows.from(Publisher<Message<T>>) を参照)。また、IntegrationFlowBuilder.toReactivePublisher() オペレーターを使用すると、IntegrationFlow をリアクティブホットソースに変えることができます。FluxMessageChannel は、どちらの場合も内部で使用されます。ReactiveStreamsSubscribableChannel 契約に従って受信 Publisher にサブスクライブでき、ダウンストリームサブスクライバー用の Publisher<Message<?>> 自体です。動的な IntegrationFlow 登録を使用すると、Reactive Streams と Publisher との間でブリッジするこの統合フローを組み合わせた強力なロジックを実装できます。

バージョン 5.5.6 以降、toReactivePublisher(boolean autoStartOnSubscribe) オペレーターバリアントが存在し、返された Publisher<Message<?>> の背後にある IntegrationFlow 全体のライフサイクルを制御します。通常、リアクティブパブリッシャーからのサブスクリプションと消費は、リアクティブストリームの構成中、または ApplicationContext の起動中ではなく、後のランタイムフェーズで発生します。Publisher<Message<?>> サブスクリプションポイントでの IntegrationFlow のライフサイクル管理の定型コードを回避し、エンドユーザーエクスペリエンスを向上させるために、autoStartOnSubscribe フラグを備えたこの新しいオペレーターが導入されました。IntegrationFlow とそのコンポーネントを autoStartup = false 用にマーク(true の場合)するため、ApplicationContext はフロー内のメッセージの生成と消費を自動的に開始しません。代わりに、IntegrationFlow の start() は、内部 Flux.doOnSubscribe() から開始されます。autoStartOnSubscribe 値とは関係なく、フローは Flux.doOnCancel() および Flux.doOnTerminate() から停止されます。メッセージを消費するものがない場合、メッセージを生成することは意味がありません。

まったく逆のユースケースでは、IntegrationFlow がリアクティブストリームを呼び出して完了後に続行する必要がある場合、fluxTransform() 演算子が IntegrationFlowDefinition で提供されます。この時点でのフローは FluxMessageChannel に変わり、Flux.transform() オペレーターで実行される提供された fluxFunction に伝搬されます。この関数の結果は、ダウンストリームフロー用に別の FluxMessageChannel によってサブスクライブされる出力 Flux へのフラットマッピングのために Mono<Message<?>> にラップされます。

詳細については、Java DSL の章を参照してください。

ReactiveMessageHandler

バージョン 5.3 以降、ReactiveMessageHandler はフレームワークでネイティブにサポートされています。この型のメッセージハンドラーは、低レベルの操作を実行するためのオンデマンドサブスクリプションのリアクティブ型を返し、リアクティブストリーム構成を続行するための応答データを提供しないリアクティブクライアント用に設計されています。命令型統合フローで ReactiveMessageHandler が使用されている場合、handleMessage() は返送直後にサブスクライブされます。これは、そのようなフローにバックプレッシャーを尊重する反応ストリーム構成がないためです。この場合、フレームワークはこの ReactiveMessageHandler を ReactiveMessageHandlerAdapter (MessageHandler の単純な実装)にラップします。ただし、ReactiveStreamsConsumer がフローに関与している場合(たとえば、消費するチャネルが FluxMessageChannel の場合)、そのような ReactiveMessageHandler は、flatMap() Reactor オペレーターを使用して反応ストリーム全体で構成され、消費中のバックプレッシャーを尊重します。

すぐに使える ReactiveMessageHandler 実装の 1 つは、送信チャネルアダプター用の ReactiveMongoDbStoringMessageHandler です。詳細については、MongoDB Reactive Channel Adapters を参照してください。

リアクティブチャネルアダプター

統合のターゲットプロトコルが Reactive Streams ソリューションを提供する場合、Spring Integration にチャネルアダプターを実装するのは簡単です。

受信のイベント駆動型チャネルアダプターの実装では、リクエストが遅延 Mono または Flux にラップされ、プロトコルコンポーネントが Mono へのサブスクリプションを開始したときにのみ送信(および応答がある場合は生成)が返されます。リスナーメソッド。このようにして、このコンポーネントに正確にカプセル化されたリアクティブストリームソリューションがあります。もちろん、出力チャネルでサブスクライブされたダウンストリーム統合フローは、Reactive Streams 仕様に準拠し、オンデマンドでバックプレッシャ対応の方法で実行する必要があります。これは、統合フローで使用される MessageHandler プロセッサーの性質(または現在の実装)によって常に利用できるとは限りません。この制限は、リアクティブ実装がない場合、統合エンドポイントの前後でスレッドプールとキューまたは FluxMessageChannel (上記を参照)を使用して処理できます。

リアクティブ送信チャネルアダプターの実装は、ターゲットプロトコル用に提供されたリアクティブ API に応じて、リアクティブストリームの開始(または継続)と外部システムとの対話に関するものです。受信ペイロードは、それ自体がリアクティブ型であるか、トップのリアクティブストリームの一部である統合フロー全体のイベントである場合があります。返されたリアクティブ型は、一方向の消火忘れシナリオにある場合はすぐにサブスクライブできます。または、ターゲットビジネスロジックでのさらなる統合フローまたは明示的なサブスクリプションのためにダウンストリームに伝播されます(リクエスト - 応答シナリオ)。リアクティブストリームセマンティクスを保持するダウンストリーム。

現在、Spring Integration は、WebFluxRSocketMongoDbR2DBC 用のチャネルアダプター (またはゲートウェイ) 実装を提供しています。Redis ストリームチャネルアダプターもリアクティブであり、Spring Data の ReactiveStreamOperations を使用します。また、Apache Cassandra 拡張 [GitHub] (英語) は、Cassandra リアクティブドライバー用の MessageHandler 実装を提供します。Spring for Apache Kafka の ReactiveKafkaProducerTemplate および ReactiveKafkaConsumerTemplate に基づく Kafka の Apache Kafka など、さらに多くのリアクティブチャネルアダプターが登場する予定です。その他の多くの非リアクティブチャネルアダプターでは、リアクティブストリーム処理中のブロックを回避するために、スレッドプールの使用が推奨されます。