Reactive Streams サポート
Spring Integration は、フレームワークのいくつかの場所で、さまざまな側面から Reactive Streams (英語) 対話のサポートを提供します。ここでは、必要に応じて、詳細については対象の章への適切なリンクを使用して、それらのほとんどについて説明します。
序文
要約すると、Spring Integration は Spring プログラミングモデルを継承して、よく知られたエンタープライズ統合パターンをサポートします。Spring Integration は、Spring ベースのアプリケーション内での軽量メッセージングを可能にし、宣言型アダプターを介した外部システムとの統合をサポートします。Spring Integration の主なゴールは、エンタープライズ統合ソリューションを構築するためのシンプルなモデルを提供すると同時に、保守可能でテスト可能なコードを作成するために不可欠な関心の分離を維持することです。このゴールは、ターゲットアプリケーションで message
、channel
、endpoint
のような 第一級オブジェクトを使用して達成されます。これにより、(ほとんどの場合) 1 つのエンドポイントが別のエンドポイントによって消費されるチャネルにメッセージを生成する統合フロー (パイプライン) を構築できます。このようにして、統合相互作用モデルをターゲットビジネスロジックから区別します。ここで重要な部分は、その間のチャネルです。フローの動作は、エンドポイントをそのままにして、その実装に依存します。
一方、Reactive Streams は、ノンブロッキングバックプレッシャーを使用した非同期ストリーム処理の標準です。Reactive Streams の主なゴールは、受信側が任意の量のデータをバッファリングすることを強制されないようにしながら、非同期境界を越えたストリームデータの交換を管理することです(要素を別のスレッドまたはスレッドプールに渡すなど)。言い換えると、スレッド間を仲介するキューを制限できるようにするために、バックプレッシャはこのモデルの不可欠な部分です。プロジェクト Reactor (英語) などの Reactive Streams 実装の目的は、ストリームアプリケーションの処理グラフ全体にわたってこれらの利点と特性を維持することです。Reactive Streams ライブラリの最終的なゴールは、利用可能なプログラミング言語構造で可能な限り透過的かつスムーズなメソッドで、ターゲットアプリケーションの型、演算子のセット、サポート API を提供することですが、最終的なソリューションは通常の場合ほど必須ではありません。関数チェーンの呼び出し。それはフェーズに分けられます: 定義と実行は、最終的なリアクティブパブリッシャーへのサブスクリプション中にしばらくしてから発生し、データの需要は定義の下部から上部にプッシュされ、必要に応じてバックプレッシャーを適用します。多くをリクエストします。現時点で処理できるイベント。リアクティブアプリケーションは、"stream"
のように見えるか、Spring Integration 用語で慣れているように - "flow"
です。実際、Java 9 以降の Reactive Streams SPI は、java.util.concurrent.Flow
クラスで提供されます。
ここから、エンドポイントにいくつかのリアクティブフレームワークオペレーターを適用する場合、Spring Integration フローは Reactive Streams アプリケーションの作成に本当に適しているように見えますが、実際には問題がはるかに広く、すべてのエンドポイント(JdbcMessageHandler
など)ではないことに注意する必要があります。透過的に反応ストリームで処理できます。もちろん、Spring Integration での Reactive Streams サポートの主なゴールは、プロセス全体を完全に反応させ、要求に応じて開始し、バックプレッシャーに対応できるようにすることです。チャネルアダプターのターゲットプロトコルとシステムが Reactive Streams 対話モデルを提供するまで、これは不可能です。以下のセクションでは、統合フロー構造を保持するリアクティブ型アプリケーションを開発するために Spring Integration で提供されるコンポーネントとアプローチについて説明します。
Spring Integration のすべての Reactive Streams 相互作用は、Mono や Flux などのプロジェクト Reactor (英語) 型で実装されています。 |
メッセージングゲートウェイ
Reactive Streams との最も単純なインタラクションポイントは @MessagingGateway
です。ここでは、ゲートウェイメソッドの戻り値の型を Mono<?>
として作成します。ゲートウェイメソッド呼び出しの背後にある統合フロー全体は、返された Mono
インスタンスでサブスクリプションが発生したときに実行されます。詳細については、Reactor Mono
を参照してください。同様の Mono
-reply アプローチは、完全に Reactive Streams 互換プロトコルに基づく受信ゲートウェイのフレームワーク内部で使用されます (詳細については、以下のリアクティブチャネルアダプターを参照してください)。送受信操作は、replyChannel
ヘッダーからの応答評価が利用可能な場合は常に、Mono.defer()
にラップされます。このように、特定のリアクティブプロトコル (例: Netty) の受信コンポーネントは、Spring Integration で実行されるリアクティブフローのサブスクライバーおよびイニシエーターになります。リクエストペイロードがリアクティブ型の場合、イニシエーターサブスクリプションにプロセスを延期するリアクティブストリーム定義内で処理する方が適切です。この目的のために、ハンドラーメソッドもリアクティブ型を返す必要があります。詳細については、次のセクションを参照してください。
リアクティブ応答ペイロード
MessageHandler
を生成する応答が応答メッセージのリアクティブ型のペイロードを返すと、outputChannel
に提供される通常の MessageChannel
実装 (async
は true
に設定する必要があります) を使用して非同期的に処理され、出力チャネルがオンデマンドのサブスクリプションでフラット化されます。ReactiveStreamsSubscribableChannel
実装です (例: FluxMessageChannel
)。標準の命令型 MessageChannel
ユースケースでは、応答ペイロードが複数値パブリッシャー (詳細については ReactiveAdapter.isMultiValue()
を参照) の場合、Mono.just()
にラップされます。この結果、Mono
をダウンストリームで明示的にサブスクライブするか、FluxMessageChannel
ダウンストリームによってフラット化する必要があります。outputChannel
の ReactiveStreamsSubscribableChannel
を使用すると、戻り値の型とサブスクリプションについて心配する必要はありません。すべてが内部のフレームワークによってスムーズに処理されます。
詳細については、非同期サービスアクティベーターを参照してください。
詳細については、Kotlin コルーチンも参照してください。
FluxMessageChannel
および ReactiveStreamsConsumer
FluxMessageChannel
は、MessageChannel
と Publisher<Message<?>>
を組み合わせた実装です。ホットソースとしての Flux
は、send()
実装からの受信メッセージをシンクするために内部的に作成されます。Publisher.subscribe()
実装は、その内部 Flux
に委譲されます。また、オンデマンドのアップストリーム消費のために、FluxMessageChannel
は ReactiveStreamsSubscribableChannel
契約の実装を提供します。このチャネルに提供されるアップストリーム Publisher
(たとえば、以下のソースポーリングチャネルアダプターとスプリッタを参照)は、このチャネルのサブスクリプションの準備ができたときに自動サブスクライブされます。この委譲パブリッシャーからのイベントは、上記の内部 Flux
にシンクされます。
FluxMessageChannel
のコンシューマーは、Reactive Streams 契約を尊重するための org.reactivestreams.Subscriber
インスタンスである必要があります。幸い、Spring Integration のすべての MessageHandler
実装は、プロジェクト Reactor の CoreSubscriber
も実装します。そして、その間の ReactiveStreamsConsumer
実装のおかげで、統合フロー構成全体がターゲット開発者に透過的に残されます。この場合、フローの動作は命令型プッシュモデルからリアクティブ型プルモデルに変更されます。ReactiveStreamsConsumer
を使用して、IntegrationReactiveUtils
を使用して任意の MessageChannel
をリアクティブソースにして、積分フローを部分的にリアクティブにすることもできます。
詳細については、FluxMessageChannel
を参照してください。
バージョン 5.5 以降、ConsumerEndpointSpec
は reactive()
オプションを導入して、フロー内のエンドポイントを入力チャネルとは独立した ReactiveStreamsConsumer
として作成します。オプションの Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>>
を提供して、Flux.transform()
操作を介して入力チャネルからソース Flux
をカスタマイズできます (例: publishOn()
、doOnNext()
、retry()
)。この機能は、reactive()
属性を介して、すべてのメッセージングアノテーション(@ServiceActivator
、@Splitter
など)の @Reactive
サブアノテーションとして表されます。
ソースポーリングチャネルアダプター
通常、SourcePollingChannelAdapter
は TaskScheduler
によって開始されるタスクに依存しています。ポーリングトリガーは、提供されたオプションから構築され、データまたはイベントのターゲットソースをポーリングするタスクを定期的にスケジュールするために使用されます。outputChannel
が ReactiveStreamsSubscribableChannel
である場合、同じ Trigger
が次回の実行時間を決定するために使用されますが、タスクをスケジュールする代わりに、SourcePollingChannelAdapter
は nextExecutionTime
値の Flux.generate()
と前のステップからの期間の Mono.delay()
に基づいて Flux<Message<?>>
を作成します。次に Flux.flatMapMany()
を使用して maxMessagesPerPoll
をポーリングし、出力 Flux
にシンクします。このジェネレーター Flux
は、提供された ReactiveStreamsSubscribableChannel
によってサブスクライブされ、バックプレッシャーダウンストリームを尊重します。バージョン 5.5 以降、maxMessagesPerPoll == 0
の場合、ソースはまったく呼び出されず、maxMessagesPerPoll
が後でゼロ以外の値に変更されるまで、flatMapMany()
は Mono.empty()
の結果を介してすぐに完了します。コントロールバス経由。このようにして、任意の MessageSource
実装をリアクティブホットソースに変えることができます。
詳細については、ポーリングコンシューマーを参照してください。
イベント駆動型チャネルアダプター
MessageProducerSupport
は、イベント駆動型チャネルアダプターの基本クラスであり、通常、その sendMessage(Message<?>)
は、生成するドライバー API のリスナーコールバックとして使用されます。このコールバックは、メッセージプロデューサー実装がリスナーベースの機能の代わりにメッセージの Flux
を構築するときに、doOnNext()
Reactor オペレーターに簡単にプラグインすることもできます。実際、これはメッセージプロデューサーの outputChannel
が ReactiveStreamsSubscribableChannel
ではない場合にフレームワークで行われます。ただし、エンドユーザーエクスペリエンスを改善し、より多くのバックプレッシャー対応機能を可能にするために、MessageProducerSupport
は、Publisher<Message<?>>>
がターゲットシステムからのデータのソースである場合にターゲット実装で使用される subscribeToPublisher(Publisher<? extends Message<?>>)
API を提供します。通常、これは、ターゲットドライバー API がソースデータの Publisher
に対して呼び出されるときに、doStart()
実装から使用されます。オンデマンドサブスクリプションとダウンストリームのイベント消費のために、outputChannel
として FluxMessageChannel
とリアクティブ MessageProducerSupport
実装を組み合わせることをお勧めします。Publisher
へのサブスクリプションがキャンセルされると、チャネルアダプターは停止状態になります。このようなチャネルアダプターで stop()
を呼び出すと、ソース Publisher
からの生成が完了します。チャネルアダプターは、新しく作成されたソース Publisher
への自動サブスクリプションで再起動できます。
Reactive Streams へのメッセージソース
バージョン 5.3 から、ReactiveMessageSourceProducer
が提供されます。これは、提供された MessageSource
とイベント駆動型の本番を構成された outputChannel
に組み合わせたものです。内部的には、MessageSource
を繰り返し再サブスクライブされた Mono
にラップし、上記の subscribeToPublisher(Publisher<? extends Message<?>>)
でサブスクライブされる Flux<Message<?>>
を生成します。この Mono
のサブスクリプションは、ターゲット MessageSource
でブロックされる可能性を回避するために Schedulers.boundedElastic()
を使用して行われます。メッセージソースが null
(プルするデータがない) を返すと、Mono
は、サブスクライバーコンテキストからの IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY
Duration
エントリに基づく後続の再サブスクリプションのために delay
を使用して repeatWhenEmpty()
状態に変わります。デフォルトでは、1 秒です。MessageSource
がヘッダーに IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
情報を含むメッセージを生成する場合、(必要に応じて) 元の Mono
の doOnSuccess()
で確認応答され、ダウンストリームフローが拒否する失敗メッセージを含む MessagingException
をスローする場合、doOnError()
で拒否されます。この ReactiveMessageSourceProducer
は、ポーリングチャネルアダプターの機能を既存の MessageSource<?>
実装のリアクティブなオンデマンドソリューションに変える必要がある場合に、あらゆるユースケースに使用できます。
スプリッターとアグリゲーター
AbstractMessageSplitter
がそのロジック用の Publisher
を取得すると、プロセスは Publisher
内のアイテムを自然に処理し、outputChannel
に送信するためのメッセージにマップします。このチャネルが ReactiveStreamsSubscribableChannel
の場合、Publisher
の Flux
ラッパーはオンデマンドでそのチャネルからサブスクライブされ、このスプリッター動作は、受信イベントを複数値の出力 Publisher
にマップするとき、flatMap
Reactor オペレーターに似ています。統合フロー全体がスプリッターの前後に FluxMessageChannel
を使用して構築され、Spring Integration 構成を Reactive Streams 要件およびそのオペレーターのイベント処理に合わせて調整することが最も理にかなっています。通常のチャネルでは、Publisher
は Iterable
に変換され、標準の反復および生成の分割ロジックに使用されます。
FluxAggregatorMessageHandler
は、特定の Reactive Streams ロジック実装の別のサンプルであり、プロジェクト Reactor に関しては "reactive operator"
として扱うことができます。これは、Flux.groupBy()
および Flux.window()
(または buffer()
)演算子に基づいています。受信メッセージは、FluxAggregatorMessageHandler
が作成されたときに開始された Flux.create()
に埋め込まれ、ホットソースになります。この Flux
は、オンデマンドで ReactiveStreamsSubscribableChannel
によってサブスクライブされるか、outputChannel
がリアクティブでない場合は FluxAggregatorMessageHandler.start()
で直接サブスクライブされます。この MessageHandler
は、統合フロー全体がこのコンポーネントの前後に FluxMessageChannel
を使用して構築されている場合に威力を発揮し、ロジック全体のバックプレッシャーを準備します。
詳細については、ストリームと Flux の分割および Flux Aggregator を参照してください。
Java DSL
Java DSL の IntegrationFlow
は、任意の Publisher
インスタンスから開始できます(IntegrationFlow.from(Publisher<Message<T>>)
を参照)。また、IntegrationFlowBuilder.toReactivePublisher()
オペレーターを使用すると、IntegrationFlow
をリアクティブホットソースに変えることができます。FluxMessageChannel
は、どちらの場合も内部で使用されます。ReactiveStreamsSubscribableChannel
契約に従って受信 Publisher
にサブスクライブでき、ダウンストリームサブスクライバー用の Publisher<Message<?>>
自体です。動的な IntegrationFlow
登録を使用すると、Reactive Streams と Publisher
との間でブリッジするこの統合フローを組み合わせた強力なロジックを実装できます。
バージョン 5.5.6 以降、返された Publisher<Message<?>>
の背後にある IntegrationFlow
全体のライフサイクルを制御する toReactivePublisher(boolean autoStartOnSubscribe)
オペレーターバリアントが存在します。通常、リアクティブパブリッシャーからのサブスクリプションと消費は、リアクティブストリームの構成時や ApplicationContext
の起動時ではなく、後のランタイムフェーズで発生します。Publisher<Message<?>>
サブスクリプションポイントでの IntegrationFlow
のライフサイクル管理のボイラープレートコードを回避し、エンドユーザーエクスペリエンスを向上させるために、autoStartOnSubscribe
フラグを持つこの新しい演算子が導入されました。( true
の場合) IntegrationFlow
とそのコンポーネントを autoStartup = false
にマークするため、ApplicationContext
はフロー内のメッセージの生成と消費を自動的に開始しません。代わりに、IntegrationFlow
の start()
が内部 Flux.doOnSubscribe()
から開始されます。autoStartOnSubscribe
値とは無関係に、フローは Flux.doOnCancel()
および Flux.doOnTerminate()
から停止されます。消費するものが何もない場合、メッセージを生成しても意味がありません。
まったく逆のユースケースでは、IntegrationFlow
がリアクティブストリームを呼び出して完了後に続行する必要がある場合、fluxTransform()
演算子が IntegrationFlowDefinition
で提供されます。この時点でのフローは FluxMessageChannel
に変わり、Flux.transform()
オペレーターで実行される提供された fluxFunction
に伝搬されます。この関数の結果は、ダウンストリームフロー用に別の FluxMessageChannel
によってサブスクライブされる出力 Flux
へのフラットマッピングのために Mono<Message<?>>
にラップされます。
詳細については、Java DSL の章を参照してください。
ReactiveMessageHandler
バージョン 5.3 以降、ReactiveMessageHandler
はフレームワークでネイティブにサポートされています。この型のメッセージハンドラーは、低レベル操作の実行のためにオンデマンドサブスクリプションのリアクティブ型を返し、リアクティブストリーム構成を続行するための応答データを提供しないリアクティブクライアント用に設計されています。ReactiveMessageHandler
が命令型統合フローで使用される場合、handleMessage()
は戻り直後にサブスクライブされます。これは、このようなフローにバックプレッシャーを尊重するための反応ストリーム構成がないためです。この場合、フレームワークはこの ReactiveMessageHandler
を ReactiveMessageHandlerAdapter
( MessageHandler
の単純な実装) にラップします。ただし、フローに ReactiveStreamsConsumer
が含まれる場合 (たとえば、消費するチャネルが FluxMessageChannel
の場合)、そのような ReactiveMessageHandler
は、消費中のバックプレッシャーを尊重するために flatMap()
Reactor オペレーターを使用してリアクティブストリーム全体に構成されます。
すぐに使える ReactiveMessageHandler
実装の 1 つは、送信チャネルアダプター用の ReactiveMongoDbStoringMessageHandler
です。詳細については、MongoDB Reactive Channel Adapters を参照してください。
バージョン 6.1 以降、IntegrationFlowDefinition
は便利な handleReactive(ReactiveMessageHandler)
ターミナルオペレーターを公開します。この演算子には、任意の ReactiveMessageHandler
実装 (Mono
API を使用した単純なラムダでも) を使用できます。フレームワークは、返された Mono<Void>
を自動的にサブスクライブします。この演算子の可能な構成の簡単なサンプルを次に示します。
@Bean
public IntegrationFlow wireTapFlow1() {
return IntegrationFlow.from("tappedChannel1")
.wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
.handleReactive((message) -> Mono.just(message).log().then());
}
このオペレーターのオーバーロードされたバージョンは、Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>>
を受け入れて、提供された ReactiveMessageHandler
を中心にコンシューマーエンドポイントをカスタマイズします。
さらに、ReactiveMessageHandlerSpec
ベースのバリアントも提供されます。ほとんどの場合、これらはプロトコル固有のチャネルアダプターの実装に使用されます。それぞれのリアクティブチャネルアダプターを含むターゲットテクノロジへのリンクに続く次のセクションを参照してください。
リアクティブチャネルアダプター
統合のターゲットプロトコルが Reactive Streams ソリューションを提供する場合、Spring Integration にチャネルアダプターを実装するのは簡単です。
受信のイベントドリブンチャネルアダプターの実装は、リクエスト (必要な場合) を遅延 Mono
または Flux
にラップし、プロトコルコンポーネントがサーバーから返された Mono
へのサブスクリプションを開始した場合にのみ、送信を実行します (そして、応答があれば応答を生成します)。リスナーメソッド。このようにして、このコンポーネントに正確にカプセル化されたリアクティブストリームソリューションが得られます。もちろん、出力チャネルでサブスクライブされたダウンストリーム統合フローは、Reactive Streams 仕様を尊重し、オンデマンドでバックプレッシャー対応の方法で実行する必要があります。
これは、統合フローで使用される MessageHandler
プロセッサーの性質 (または現在の実装) により、常に利用できるとは限りません。この制限は、リアクティブな実装がない場合、統合エンドポイントの前後にスレッドプールとキューまたは FluxMessageChannel
(上記を参照) を使用して処理できます。
リアクティブなイベント駆動型受信チャネルアダプターの例:
public class CustomReactiveMessageProducer extends MessageProducerSupport {
private final CustomReactiveSource customReactiveSource;
public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
this.customReactiveSource = customReactiveSource;
}
@Override
protected void doStart() {
Flux<Message<?>> messageFlux =
this.customReactiveSource
.map(event - >
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
subscribeToPublisher(messageFlux);
}
}
使用箇所は次のようになります。
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.channel(outputChannel)
.get();
}
}
または宣言的な方法で:
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
.handle(outputChannel)
.get();
}
}
または、チャネルアダプターがなくても、次の方法でいつでも Java DSL を使用できます。
public class MainFlow {
@Bean
public IntegrationFlow buildFlow() {
Flux<Message<?>> myFlux = this.customReactiveSource
.map(event ->
MessageBuilder
.withPayload(event.getBody())
.setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
.build());
return IntegrationFlow.from(myFlux)
.handle(outputChannel)
.get();
}
}
リアクティブ送信 チャネルアダプターの実装は、ターゲットプロトコル用に提供されたリアクティブ API に従って、リアクティブストリームを開始 (または継続) して外部システムとやり取りすることに関するものです。受信 ペイロードは、それ自体がリアクティブ型である場合もあれば、上部のリアクティブストリームの一部である統合フロー全体のイベントである場合もあります。返されたリアクティブ型は、一方向のファイアアンドフォーゲットシナリオの場合はすぐにサブスクライブできます。または、さらなる統合フローまたはターゲットビジネスロジックでの明示的なサブスクリプションのためにダウンストリーム (リクエスト - 応答シナリオ) に伝播されますが、リアクティブストリームのセマンティクスを維持するまだ下流。
リアクティブな送信チャネルアダプターの例:
public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {
private final CustomEntityOperations customEntityOperations;
public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
this.customEntityOperations = customEntityOperations;
}
@Override
protected Mono<Void> handleMessageInternal(Message<?> message) {
return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
.flatMap(mode -> {
switch (mode) {
case INSERT:
return handleInsert(message);
case UPDATE:
return handleUpdate(message);
default:
return Mono.error(new IllegalArgumentException());
}
}).then();
}
private Mono<Void> handleInsert(Message<?> message) {
return this.customEntityOperations.insert(message.getPayload())
.then();
}
private Mono<Void> handleUpdate(Message<?> message) {
return this.r2dbcEntityOperations.update(message.getPayload())
.then();
}
public enum Type {
INSERT,
UPDATE,
}
}
両方のチャネルアダプターを使用できるようになります。
public class MainFlow {
@Autowired
private CustomReactiveMessageProducer customReactiveMessageProducer;
@Autowired
private CustomReactiveMessageHandler customReactiveMessageHandler;
@Bean
public IntegrationFlow buildFlow() {
return IntegrationFlow.from(customReactiveMessageProducer)
.transform(someOperation)
.handle(customReactiveMessageHandler)
.get();
}
}
現在、Spring Integration は、WebFlux、RSocket、MongoDb、R2DBC、ZeroMQ、GraphQL、Apache Cassandra 用のチャネルアダプター (またはゲートウェイ) 実装を提供しています。Redis ストリームチャネルアダプターもリアクティブで、Spring Data の ReactiveStreamOperations
を使用します。Spring for Apache Kafka の ReactiveKafkaProducerTemplate
および ReactiveKafkaConsumerTemplate
に基づく Kafka の Apache Kafka など、さらに多くのリアクティブチャネルアダプターが登場する予定です。その他の多くの非リアクティブチャネルアダプターでは、リアクティブストリーム処理中のブロックを回避するために、スレッドプールの使用が推奨されます。
命令的なコンテキストの伝播に反応する
コンテキストの伝播 [GitHub] (英語) ライブラリがクラスパスにある場合、プロジェクト Reactor は ThreadLocal
値 (例: Micrometer Observation (英語) または SecurityContextHolder
) を取得し、Subscriber
コンテキストに格納できます。逆の操作も可能です。トレース用にロギング MDC を設定する必要がある場合、またはスコープから観測を復元するためにリアクティブストリームからサービスを呼び出す必要がある場合です。コンテキスト伝播のための特別な演算子については、プロジェクト Reactor のドキュメント (英語) で詳細を参照してください。Subscriber
コンテキストはダウンストリームからコンポジションの先頭 (Flux
または Mono
) まで見えるため、ソリューション全体が単一のリアクティブストリームコンポジションである場合、コンテキストの保存と復元はスムーズに機能します。ただし、アプリケーションが異なる Flux
インスタンス間で切り替えたり、命令型処理に切り替えたり元に戻したりすると、Subscriber
に関連付けられたコンテキストが利用できない可能性があります。このようなユースケースのために、Spring Integration は、たとえば、直接 send()
操作を実行するときに、リアクティブストリームから生成された IntegrationMessageHeaderAccessor.REACTOR_CONTEXT
メッセージヘッダーに Reactor ContextView
を格納する追加機能 (バージョン 6.0.5
以降) を提供します。このヘッダーは、FluxMessageChannel.subscribeTo()
で使用され、このチャネルが発行する Message
の Reactor コンテキストを復元します。現在、このヘッダーは WebFluxInboundEndpoint
および RSocketInboundGateway
コンポーネントから取り込まれていますが、リアクティブ型から命令型への統合が実行される任意のソリューションで使用できます。このヘッダーに入力するロジックは次のようになります。
return requestMono
.flatMap((message) ->
Mono.deferContextual((context) ->
Mono.just(message)
.handle((messageToSend, sink) ->
send(messageWithReactorContextIfAny(messageToSend, context)))));
...
private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
if (!context.isEmpty()) {
return getMessageBuilderFactory()
.fromMessage(message)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
.build();
}
return message;
}
Reactor がコンテキストから ThreadLocal
値を復元できるようにするには、まだ handle()
演算子を使用する必要があることに注意してください。ヘッダーとして送信されたとしても、ダウンストリームで ThreadLocal
値に復元する場合、フレームワークは想定できません。
他の Flux
または Mono
コンポジションで Message
からコンテキストを復元するには、次のロジックを実行できます。
Mono.just(message)
.handle((messageToHandle, sink) -> ...)
.contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));