Reactive Streams サポート
Spring Integration は、フレームワークのいくつかの場所で、さまざまな側面から Reactive Streams (英語) 対話のサポートを提供します。ここでは、必要に応じて、詳細については対象の章への適切なリンクを使用して、それらのほとんどについて説明します。
序文
要約すると、Spring Integration は Spring プログラミングモデルを継承して、よく知られているエンタープライズ統合パターンをサポートします。Spring Integration は、Spring ベースのアプリケーション内での軽量メッセージングを可能にし、宣言型アダプターを介した外部システムとの統合をサポートします。Spring Integration の主なゴールは、保守可能でテスト可能なコードの作成に不可欠な関心の分離を維持しながら、エンタープライズ統合ソリューションを構築するためのシンプルなモデルを提供することです。このゴールは、message
、channel
、endpoint
のような第一級オブジェクトを使用するターゲットアプリケーションで達成されます。これにより、統合フロー(パイプライン)を構築できます。統合フロー(パイプライン)では、(ほとんどの場合)1 つのエンドポイントがメッセージをチャネルに生成して別のエンドポイントで消費します。このようにして、統合相互作用モデルをターゲットのビジネスロジックと区別します。ここで重要な部分は、その間のチャネルです。フローの動作は、エンドポイントを変更せずに実装に依存します。
一方、Reactive Streams は、ノンブロッキングバックプレッシャーを使用した非同期ストリーム処理の標準です。Reactive Streams の主なゴールは、受信側が任意の量のデータをバッファリングすることを強制されないようにしながら、非同期境界を越えたストリームデータの交換を管理することです(要素を別のスレッドまたはスレッドプールに渡すなど)。言い換えると、スレッド間を仲介するキューを制限できるようにするために、バックプレッシャはこのモデルの不可欠な部分です。プロジェクト Reactor (英語) などの Reactive Streams 実装の目的は、ストリームアプリケーションの処理グラフ全体にわたってこれらの利点と特性を維持することです。Reactive Streams ライブラリの最終的なゴールは、利用可能なプログラミング言語構造で可能な限り透過的かつスムーズなメソッドで、ターゲットアプリケーションの型、演算子のセット、サポート API を提供することですが、最終的なソリューションは通常の場合ほど必須ではありません。関数チェーンの呼び出し。それはフェーズに分けられます: 定義と実行は、最終的なリアクティブパブリッシャーへのサブスクリプション中にしばらくしてから発生し、データの需要は定義の下部から上部にプッシュされ、必要に応じてバックプレッシャーを適用します。多くをリクエストします。現時点で処理できるイベント。リアクティブアプリケーションは、"stream"
のように見えるか、Spring Integration 用語で慣れているように - "flow"
です。実際、Java 9 以降の Reactive Streams SPI は、java.util.concurrent.Flow
クラスで提供されます。
ここから、エンドポイントにいくつかのリアクティブフレームワークオペレーターを適用する場合、Spring Integration フローは Reactive Streams アプリケーションの作成に本当に適しているように見えますが、実際には問題がはるかに広く、すべてのエンドポイント(JdbcMessageHandler
など)ではないことに注意する必要があります。透過的に反応ストリームで処理できます。もちろん、Spring Integration での Reactive Streams サポートの主なゴールは、プロセス全体を完全に反応させ、要求に応じて開始し、バックプレッシャーに対応できるようにすることです。チャネルアダプターのターゲットプロトコルとシステムが Reactive Streams 対話モデルを提供するまで、これは不可能です。以下のセクションでは、統合フロー構造を保持するリアクティブ型アプリケーションを開発するために Spring Integration で提供されるコンポーネントとアプローチについて説明します。
Spring Integration のすべての Reactive Streams 相互作用は、Mono や Flux などのプロジェクト Reactor (英語) 型で実装されています。 |
メッセージングゲートウェイ
Reactive Streams との対話の最も単純なポイントは、@MessagingGateway
であり、ゲートウェイメソッドの戻り型を Mono<?>
として作成します。返された Mono
インスタンスでサブスクリプションが発生すると、ゲートウェイメソッド呼び出しの背後にある統合フロー全体が実行されます。詳細については、Reactor Mono
を参照してください。同様の Mono
-reply アプローチは、Reactive Streams 互換プロトコルに完全に基づいている受信ゲートウェイのフレームワークで内部的に使用されます(詳細については、以下のリアクティブチャネルアダプターを参照してください)。送受信操作は、replyChannel
ヘッダーからの応答評価が使用可能な場合は常にそれを連鎖させて、Mono.deffer()
にラップされます。このように、特定のリアクティブプロトコル(Netty など)の受信コンポーネントは、Spring Integration で実行されるリアクティブフローのサブスクライバーおよびイニシエーターになります。リクエストペイロードがリアクティブ型である場合、プロセスをイニシエーターサブスクリプションに延期するリアクティブストリーム定義を使用してそれを処理することをお勧めします。この目的のために、ハンドラーメソッドもリアクティブ型を返す必要があります。詳細については、次のセクションを参照してください。
リアクティブ応答ペイロード
応答を生成する MessageHandler
が応答メッセージのためのリアクティブ型ペイロードを返すとき、outputChannel
のために提供される通常の MessageChannel
実装と非同期の方法で処理され、出力チャネルが ReactiveStreamsSubscribableChannel
実装、たとえば FluxMessageChannel
では、オンデマンドサブスクリプションで平坦化されます。標準命令 MessageChannel
ユースケースを使用し、応答ペイロードが複数値パブリッシャー (詳細については、ReactiveAdapter.isMultiValue()
を参照) の場合は、Mono.just()
にラップされます。この結果、Mono
は明示的にダウンストリームにサブスクライブするか、FluxMessageChannel
ダウンストリームによってフラット化する必要があります。outputChannel
用の ReactiveStreamsSubscribableChannel
を使用すれば、戻り値の型やサブスクリプションについて心配する必要はありません。すべてがフレームワーク内部でスムーズに処理されます。
詳細については、非同期サービスアクティベーターを参照してください。
FluxMessageChannel
および ReactiveStreamsConsumer
FluxMessageChannel
は、MessageChannel
と Publisher<Message<?>>
を組み合わせた実装です。ホットソースとしての Flux
は、send()
実装からの受信メッセージをシンクするために内部的に作成されます。Publisher.subscribe()
実装は、その内部 Flux
に委譲されます。また、オンデマンドのアップストリーム消費のために、FluxMessageChannel
は ReactiveStreamsSubscribableChannel
契約の実装を提供します。このチャネルに提供されるアップストリーム Publisher
(たとえば、以下のソースポーリングチャネルアダプターとスプリッタを参照)は、このチャネルのサブスクリプションの準備ができたときに自動サブスクライブされます。この委譲パブリッシャーからのイベントは、上記の内部 Flux
にシンクされます。
FluxMessageChannel
のコンシューマーは、Reactive Streams 契約を尊重するための org.reactivestreams.Subscriber
インスタンスである必要があります。幸い、Spring Integration のすべての MessageHandler
実装は、プロジェクト Reactor の CoreSubscriber
も実装します。そして、その間の ReactiveStreamsConsumer
実装のおかげで、統合フロー構成全体がターゲット開発者に透過的に残されます。この場合、フローの動作は命令型プッシュモデルからリアクティブ型プルモデルに変更されます。ReactiveStreamsConsumer
を使用して、IntegrationReactiveUtils
を使用して任意の MessageChannel
をリアクティブソースにして、積分フローを部分的にリアクティブにすることもできます。
詳細については、FluxMessageChannel
を参照してください。
バージョン 5.5 以降、ConsumerEndpointSpec
は reactive()
オプションを導入して、フロー内のエンドポイントを入力チャネルとは独立した ReactiveStreamsConsumer
として作成します。オプションの Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
を提供して、Flux.transform()
操作を介して入力チャネルからソース Flux
をカスタマイズできます (例: publishOn()
、doOnNext()
、retry()
)。この機能は、reactive()
属性を介して、すべてのメッセージングアノテーション(@ServiceActivator
、@Splitter
など)の @Reactive
サブアノテーションとして表されます。
ソースポーリングチャネルアダプター
通常、SourcePollingChannelAdapter
は TaskScheduler
によって開始されるタスクに依存しています。ポーリングトリガーは、提供されたオプションから構築され、データまたはイベントのターゲットソースをポーリングするタスクを定期的にスケジュールするために使用されます。outputChannel
が ReactiveStreamsSubscribableChannel
である場合、同じ Trigger
が次回の実行時間を決定するために使用されますが、タスクをスケジュールする代わりに、SourcePollingChannelAdapter
は nextExecutionTime
値の Flux.generate()
と前のステップからの期間の Mono.delay()
に基づいて Flux<Message<?>>
を作成します。次に Flux.flatMapMany()
を使用して maxMessagesPerPoll
をポーリングし、出力 Flux
にシンクします。このジェネレーター Flux
は、提供された ReactiveStreamsSubscribableChannel
によってサブスクライブされ、バックプレッシャーダウンストリームを尊重します。バージョン 5.5 以降、maxMessagesPerPoll == 0
の場合、ソースはまったく呼び出されず、maxMessagesPerPoll
が後でゼロ以外の値に変更されるまで、flatMapMany()
は Mono.empty()
の結果を介してすぐに完了します。コントロールバス経由。このようにして、任意の MessageSource
実装をリアクティブホットソースに変えることができます。
詳細については、ポーリングコンシューマーを参照してください。
イベント駆動型チャネルアダプター
MessageProducerSupport
は、イベント駆動型チャネルアダプターの基本クラスであり、通常、その sendMessage(Message<?>)
は、生成するドライバー API のリスナーコールバックとして使用されます。このコールバックは、メッセージプロデューサー実装がリスナーベースの機能の代わりにメッセージの Flux
を構築するときに、doOnNext()
Reactor オペレーターに簡単にプラグインすることもできます。実際、これはメッセージプロデューサーの outputChannel
が ReactiveStreamsSubscribableChannel
ではない場合にフレームワークで行われます。ただし、エンドユーザーエクスペリエンスを改善し、より多くのバックプレッシャー対応機能を可能にするために、MessageProducerSupport
は、Publisher<Message<?>>>
がターゲットシステムからのデータのソースである場合にターゲット実装で使用される subscribeToPublisher(Publisher<? extends Message<?>>)
API を提供します。通常、これは、ターゲットドライバー API がソースデータの Publisher
に対して呼び出されるときに、doStart()
実装から使用されます。オンデマンドサブスクリプションとダウンストリームのイベント消費のために、outputChannel
として FluxMessageChannel
とリアクティブ MessageProducerSupport
実装を組み合わせることをお勧めします。Publisher
へのサブスクリプションがキャンセルされると、チャネルアダプターは停止状態になります。このようなチャネルアダプターで stop()
を呼び出すと、ソース Publisher
からの生成が完了します。チャネルアダプターは、新しく作成されたソース Publisher
への自動サブスクリプションで再起動できます。
Reactive Streams へのメッセージソース
バージョン 5.3 以降、ReactiveMessageSourceProducer
が提供されます。これは、提供された MessageSource
とイベント駆動型の本番を組み合わせて構成された outputChannel
にします。内部的には、MessageSource
を繰り返し再サブスクライブされた Mono
にラップし、上記の subscribeToPublisher(Publisher<? extends Message<?>>)
でサブスクライブされる Flux<Message<?>>
を生成します。この Mono
のサブスクリプションは、ターゲット MessageSource
でブロックされる可能性を回避するために、Schedulers.boundedElastic()
を使用して実行されます。メッセージソースが null
(プルするデータがない)を返すと、Mono
は repeatWhenEmpty()
状態になり、サブスクライバーコンテキストからの IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY
Duration
エントリに基づいて後続の再サブスクリプションのために delay
が付けられます。デフォルトでは 1 秒です。MessageSource
がヘッダーに IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
情報を含むメッセージを生成する場合、元の Mono
の doOnSuccess()
で確認され(必要な場合)、ダウンストリームフローが拒否するメッセージを含む MessagingException
をスローすると、doOnError()
で拒否されます。この ReactiveMessageSourceProducer
は、ポーリングチャネルアダプターの機能を既存の MessageSource<?>
実装のリアクティブなオンデマンドソリューションに変える必要がある場合のあらゆるユースケースに使用できます。
スプリッターとアグリゲーター
AbstractMessageSplitter
がそのロジック用の Publisher
を取得すると、プロセスは Publisher
内のアイテムを自然に処理し、outputChannel
に送信するためのメッセージにマップします。このチャネルが ReactiveStreamsSubscribableChannel
の場合、Publisher
の Flux
ラッパーはオンデマンドでそのチャネルからサブスクライブされ、このスプリッター動作は、受信イベントを複数値の出力 Publisher
にマップするとき、flatMap
Reactor オペレーターに似ています。統合フロー全体がスプリッターの前後に FluxMessageChannel
を使用して構築され、Spring Integration 構成を Reactive Streams 要件およびそのオペレーターのイベント処理に合わせて調整することが最も理にかなっています。通常のチャネルでは、Publisher
は Iterable
に変換され、標準の反復および生成の分割ロジックに使用されます。
FluxAggregatorMessageHandler
は、特定の Reactive Streams ロジック実装の別のサンプルであり、プロジェクト Reactor に関しては "reactive operator"
として扱うことができます。これは、Flux.groupBy()
および Flux.window()
(または buffer()
)演算子に基づいています。受信メッセージは、FluxAggregatorMessageHandler
が作成されたときに開始された Flux.create()
に埋め込まれ、ホットソースになります。この Flux
は、オンデマンドで ReactiveStreamsSubscribableChannel
によってサブスクライブされるか、outputChannel
がリアクティブでない場合は FluxAggregatorMessageHandler.start()
で直接サブスクライブされます。この MessageHandler
は、統合フロー全体がこのコンポーネントの前後に FluxMessageChannel
を使用して構築されている場合に威力を発揮し、ロジック全体のバックプレッシャーを準備します。
詳細については、ストリームとフラックスの分割および Flux Aggregator を参照してください。
Java DSL
Java DSL の IntegrationFlow
は、任意の Publisher
インスタンスから開始できます(IntegrationFlows.from(Publisher<Message<T>>)
を参照)。また、IntegrationFlowBuilder.toReactivePublisher()
オペレーターを使用すると、IntegrationFlow
をリアクティブホットソースに変えることができます。FluxMessageChannel
は、どちらの場合も内部で使用されます。ReactiveStreamsSubscribableChannel
契約に従って受信 Publisher
にサブスクライブでき、ダウンストリームサブスクライバー用の Publisher<Message<?>>
自体です。動的な IntegrationFlow
登録を使用すると、Reactive Streams と Publisher
との間でブリッジするこの統合フローを組み合わせた強力なロジックを実装できます。
バージョン 5.5.6 以降、toReactivePublisher(boolean autoStartOnSubscribe)
オペレーターバリアントが存在し、返された Publisher<Message<?>>
の背後にある IntegrationFlow
全体のライフサイクルを制御します。通常、リアクティブパブリッシャーからのサブスクリプションと消費は、リアクティブストリームの構成中、または ApplicationContext
の起動中ではなく、後のランタイムフェーズで発生します。Publisher<Message<?>>
サブスクリプションポイントでの IntegrationFlow
のライフサイクル管理の定型コードを回避し、エンドユーザーエクスペリエンスを向上させるために、autoStartOnSubscribe
フラグを備えたこの新しいオペレーターが導入されました。IntegrationFlow
とそのコンポーネントを autoStartup = false
用にマーク(true
の場合)するため、ApplicationContext
はフロー内のメッセージの生成と消費を自動的に開始しません。代わりに、IntegrationFlow
の start()
は、内部 Flux.doOnSubscribe()
から開始されます。autoStartOnSubscribe
値とは関係なく、フローは Flux.doOnCancel()
および Flux.doOnTerminate()
から停止されます。メッセージを消費するものがない場合、メッセージを生成することは意味がありません。
まったく逆のユースケースでは、IntegrationFlow
がリアクティブストリームを呼び出して完了後に続行する必要がある場合、fluxTransform()
演算子が IntegrationFlowDefinition
で提供されます。この時点でのフローは FluxMessageChannel
に変わり、Flux.transform()
オペレーターで実行される提供された fluxFunction
に伝搬されます。この関数の結果は、ダウンストリームフロー用に別の FluxMessageChannel
によってサブスクライブされる出力 Flux
へのフラットマッピングのために Mono<Message<?>>
にラップされます。
詳細については、Java DSL の章を参照してください。
ReactiveMessageHandler
バージョン 5.3 以降、ReactiveMessageHandler
はフレームワークでネイティブにサポートされています。この型のメッセージハンドラーは、低レベルの操作を実行するためのオンデマンドサブスクリプションのリアクティブ型を返し、リアクティブストリーム構成を続行するための応答データを提供しないリアクティブクライアント用に設計されています。命令型統合フローで ReactiveMessageHandler
が使用されている場合、handleMessage()
は返送直後にサブスクライブされます。これは、そのようなフローにバックプレッシャーを尊重する反応ストリーム構成がないためです。この場合、フレームワークはこの ReactiveMessageHandler
を ReactiveMessageHandlerAdapter
(MessageHandler
の単純な実装)にラップします。ただし、ReactiveStreamsConsumer
がフローに関与している場合(たとえば、消費するチャネルが FluxMessageChannel
の場合)、そのような ReactiveMessageHandler
は、flatMap()
Reactor オペレーターを使用して反応ストリーム全体で構成され、消費中のバックプレッシャーを尊重します。
すぐに使える ReactiveMessageHandler
実装の 1 つは、送信チャネルアダプター用の ReactiveMongoDbStoringMessageHandler
です。詳細については、MongoDB Reactive Channel Adapters を参照してください。
リアクティブチャネルアダプター
統合のターゲットプロトコルが Reactive Streams ソリューションを提供する場合、Spring Integration にチャネルアダプターを実装するのは簡単です。
受信のイベント駆動型チャネルアダプターの実装では、リクエストが遅延 Mono
または Flux
にラップされ、プロトコルコンポーネントが Mono
へのサブスクリプションを開始したときにのみ送信(および応答がある場合は生成)が返されます。リスナーメソッド。このようにして、このコンポーネントに正確にカプセル化されたリアクティブストリームソリューションがあります。もちろん、出力チャネルでサブスクライブされたダウンストリーム統合フローは、Reactive Streams 仕様に準拠し、オンデマンドでバックプレッシャ対応の方法で実行する必要があります。これは、統合フローで使用される MessageHandler
プロセッサーの性質(または現在の実装)によって常に利用できるとは限りません。この制限は、リアクティブ実装がない場合、統合エンドポイントの前後でスレッドプールとキューまたは FluxMessageChannel
(上記を参照)を使用して処理できます。
リアクティブ送信チャネルアダプターの実装は、ターゲットプロトコル用に提供されたリアクティブ API に応じて、リアクティブストリームの開始(または継続)と外部システムとの対話に関するものです。受信ペイロードは、それ自体がリアクティブ型であるか、トップのリアクティブストリームの一部である統合フロー全体のイベントである場合があります。返されたリアクティブ型は、一方向の消火忘れシナリオにある場合はすぐにサブスクライブできます。または、ターゲットビジネスロジックでのさらなる統合フローまたは明示的なサブスクリプションのためにダウンストリームに伝播されます(リクエスト - 応答シナリオ)。リアクティブストリームセマンティクスを保持するダウンストリーム。
現在、Spring Integration は、WebFlux、RSocket、MongoDb、R2DBC 用のチャネルアダプター (またはゲートウェイ) 実装を提供しています。Redis ストリームチャネルアダプターもリアクティブであり、Spring Data の ReactiveStreamOperations
を使用します。また、Apache Cassandra 拡張 [GitHub] (英語) は、Cassandra リアクティブドライバー用の MessageHandler
実装を提供します。Spring for Apache Kafka の ReactiveKafkaProducerTemplate
および ReactiveKafkaConsumerTemplate
に基づく Kafka の Apache Kafka など、さらに多くのリアクティブチャネルアダプターが登場する予定です。その他の多くの非リアクティブチャネルアダプターでは、リアクティブストリーム処理中のブロックを回避するために、スレッドプールの使用が推奨されます。