Reactive Streams サポート

Spring Integration は、フレームワークのいくつかの場所で、さまざまな側面から Reactive Streams (英語) 対話のサポートを提供します。ここでは、必要に応じて、詳細については対象の章への適切なリンクを使用して、それらのほとんどについて説明します。

序文

要約すると、Spring Integration は Spring プログラミングモデルを継承して、よく知られたエンタープライズ統合パターンをサポートします。Spring Integration は、Spring ベースのアプリケーション内での軽量メッセージングを可能にし、宣言型アダプターを介した外部システムとの統合をサポートします。Spring Integration の主なゴールは、エンタープライズ統合ソリューションを構築するためのシンプルなモデルを提供すると同時に、保守可能でテスト可能なコードを作成するために不可欠な関心の分離を維持することです。このゴールは、ターゲットアプリケーションで messagechannelendpoint のような 第一級オブジェクトを使用して達成されます。これにより、(ほとんどの場合) 1 つのエンドポイントが別のエンドポイントによって消費されるチャネルにメッセージを生成する統合フロー (パイプライン) を構築できます。このようにして、統合相互作用モデルをターゲットビジネスロジックから区別します。ここで重要な部分は、その間のチャネルです。フローの動作は、エンドポイントをそのままにして、その実装に依存します。

一方、Reactive Streams は、ノンブロッキングバックプレッシャーを使用した非同期ストリーム処理の標準です。Reactive Streams の主なゴールは、受信側が任意の量のデータをバッファリングすることを強制されないようにしながら、非同期境界を越えたストリームデータの交換を管理することです(要素を別のスレッドまたはスレッドプールに渡すなど)。言い換えると、スレッド間を仲介するキューを制限できるようにするために、バックプレッシャはこのモデルの不可欠な部分です。プロジェクト Reactor (英語) などの Reactive Streams 実装の目的は、ストリームアプリケーションの処理グラフ全体にわたってこれらの利点と特性を維持することです。Reactive Streams ライブラリの最終的なゴールは、利用可能なプログラミング言語構造で可能な限り透過的かつスムーズなメソッドで、ターゲットアプリケーションの型、演算子のセット、サポート API を提供することですが、最終的なソリューションは通常の場合ほど必須ではありません。関数チェーンの呼び出し。それはフェーズに分けられます: 定義と実行は、最終的なリアクティブパブリッシャーへのサブスクリプション中にしばらくしてから発生し、データの需要は定義の下部から上部にプッシュされ、必要に応じてバックプレッシャーを適用します。多くをリクエストします。現時点で処理できるイベント。リアクティブアプリケーションは、"stream" のように見えるか、Spring Integration 用語で慣れているように - "flow" です。実際、Java 9 以降の Reactive Streams SPI は、java.util.concurrent.Flow クラスで提供されます。

ここから、エンドポイントにいくつかのリアクティブフレームワークオペレーターを適用する場合、Spring Integration フローは Reactive Streams アプリケーションの作成に本当に適しているように見えますが、実際には問題がはるかに広く、すべてのエンドポイント(JdbcMessageHandler など)ではないことに注意する必要があります。透過的に反応ストリームで処理できます。もちろん、Spring Integration での Reactive Streams サポートの主なゴールは、プロセス全体を完全に反応させ、要求に応じて開始し、バックプレッシャーに対応できるようにすることです。チャネルアダプターのターゲットプロトコルとシステムが Reactive Streams 対話モデルを提供するまで、これは不可能です。以下のセクションでは、統合フロー構造を保持するリアクティブ型アプリケーションを開発するために Spring Integration で提供されるコンポーネントとアプローチについて説明します。

Spring Integration のすべての Reactive Streams 相互作用は、Mono や Flux などのプロジェクト Reactor (英語) 型で実装されています。

メッセージングゲートウェイ

Reactive Streams との最も単純なインタラクションポイントは @MessagingGateway です。ここでは、ゲートウェイメソッドの戻り値の型を Mono<?> として作成します。ゲートウェイメソッド呼び出しの背後にある統合フロー全体は、返された Mono インスタンスでサブスクリプションが発生したときに実行されます。詳細については、Reactor Mono を参照してください。同様の Mono -reply アプローチは、完全に Reactive Streams 互換プロトコルに基づく受信ゲートウェイのフレームワーク内部で使用されます (詳細については、以下のリアクティブチャネルアダプターを参照してください)。送受信操作は、replyChannel ヘッダーからの応答評価が利用可能な場合は常に、Mono.defer() にラップされます。このように、特定のリアクティブプロトコル (例: Netty) の受信コンポーネントは、Spring Integration で実行されるリアクティブフローのサブスクライバーおよびイニシエーターになります。リクエストペイロードがリアクティブ型の場合、イニシエーターサブスクリプションにプロセスを延期するリアクティブストリーム定義内で処理する方が適切です。この目的のために、ハンドラーメソッドもリアクティブ型を返す必要があります。詳細については、次のセクションを参照してください。

リアクティブ応答ペイロード

MessageHandler を生成する応答が応答メッセージのリアクティブ型のペイロードを返すと、outputChannel に提供される通常の MessageChannel 実装 (async は true に設定する必要があります) を使用して非同期的に処理され、出力チャネルがオンデマンドのサブスクリプションでフラット化されます。ReactiveStreamsSubscribableChannel 実装です (例: FluxMessageChannel)。標準の命令型 MessageChannel ユースケースでは、応答ペイロードが複数値パブリッシャー (詳細については ReactiveAdapter.isMultiValue() を参照) の場合、Mono.just() にラップされます。この結果、Mono をダウンストリームで明示的にサブスクライブするか、FluxMessageChannel ダウンストリームによってフラット化する必要があります。outputChannel の ReactiveStreamsSubscribableChannel を使用すると、戻り値の型とサブスクリプションについて心配する必要はありません。すべてが内部のフレームワークによってスムーズに処理されます。

詳細については、非同期サービスアクティベーターを参照してください。

詳細については、Kotlin コルーチンも参照してください。

FluxMessageChannel および ReactiveStreamsConsumer

FluxMessageChannel は、MessageChannel と Publisher<Message<?>> を組み合わせた実装です。ホットソースとしての Flux は、send() 実装からの受信メッセージをシンクするために内部的に作成されます。Publisher.subscribe() 実装は、その内部 Flux に委譲されます。また、オンデマンドのアップストリーム消費のために、FluxMessageChannel は ReactiveStreamsSubscribableChannel 契約の実装を提供します。このチャネルに提供されるアップストリーム Publisher (たとえば、以下のソースポーリングチャネルアダプターとスプリッタを参照)は、このチャネルのサブスクリプションの準備ができたときに自動サブスクライブされます。この委譲パブリッシャーからのイベントは、上記の内部 Flux にシンクされます。

FluxMessageChannel のコンシューマーは、Reactive Streams 契約を尊重するための org.reactivestreams.Subscriber インスタンスである必要があります。幸い、Spring Integration のすべての MessageHandler 実装は、プロジェクト Reactor の CoreSubscriber も実装します。そして、その間の ReactiveStreamsConsumer 実装のおかげで、統合フロー構成全体がターゲット開発者に透過的に残されます。この場合、フローの動作は命令型プッシュモデルからリアクティブ型プルモデルに変更されます。ReactiveStreamsConsumer を使用して、IntegrationReactiveUtils を使用して任意の MessageChannel をリアクティブソースにして、積分フローを部分的にリアクティブにすることもできます。

詳細については、FluxMessageChannel を参照してください。

バージョン 5.5 以降、ConsumerEndpointSpec は reactive() オプションを導入して、フロー内のエンドポイントを入力チャネルとは独立した ReactiveStreamsConsumer として作成します。オプションの Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> を提供して、Flux.transform() 操作を介して入力チャネルからソース Flux をカスタマイズできます (例: publishOn()doOnNext()retry() )。この機能は、reactive() 属性を介して、すべてのメッセージングアノテーション(@ServiceActivator@Splitter など)の @Reactive サブアノテーションとして表されます。

ソースポーリングチャネルアダプター

通常、SourcePollingChannelAdapter は TaskScheduler によって開始されるタスクに依存しています。ポーリングトリガーは、提供されたオプションから構築され、データまたはイベントのターゲットソースをポーリングするタスクを定期的にスケジュールするために使用されます。outputChannel が ReactiveStreamsSubscribableChannel である場合、同じ Trigger が次回の実行時間を決定するために使用されますが、タスクをスケジュールする代わりに、SourcePollingChannelAdapter は nextExecutionTime 値の Flux.generate() と前のステップからの期間の Mono.delay() に基づいて Flux<Message<?>> を作成します。次に Flux.flatMapMany() を使用して maxMessagesPerPoll をポーリングし、出力 Flux にシンクします。このジェネレーター Flux は、提供された ReactiveStreamsSubscribableChannel によってサブスクライブされ、バックプレッシャーダウンストリームを尊重します。バージョン 5.5 以降、maxMessagesPerPoll == 0 の場合、ソースはまったく呼び出されず、maxMessagesPerPoll が後でゼロ以外の値に変更されるまで、flatMapMany() は Mono.empty() の結果を介してすぐに完了します。コントロールバス経由。このようにして、任意の MessageSource 実装をリアクティブホットソースに変えることができます。

詳細については、ポーリングコンシューマーを参照してください。

イベント駆動型チャネルアダプター

MessageProducerSupport は、イベント駆動型チャネルアダプターの基本クラスであり、通常、その sendMessage(Message<?>) は、生成するドライバー API のリスナーコールバックとして使用されます。このコールバックは、メッセージプロデューサー実装がリスナーベースの機能の代わりにメッセージの Flux を構築するときに、doOnNext() Reactor オペレーターに簡単にプラグインすることもできます。実際、これはメッセージプロデューサーの outputChannel が ReactiveStreamsSubscribableChannel ではない場合にフレームワークで行われます。ただし、エンドユーザーエクスペリエンスを改善し、より多くのバックプレッシャー対応機能を可能にするために、MessageProducerSupport は、Publisher<Message<?>>> がターゲットシステムからのデータのソースである場合にターゲット実装で使用される subscribeToPublisher(Publisher<? extends Message<?>>) API を提供します。通常、これは、ターゲットドライバー API がソースデータの Publisher に対して呼び出されるときに、doStart() 実装から使用されます。オンデマンドサブスクリプションとダウンストリームのイベント消費のために、outputChannel として FluxMessageChannel とリアクティブ MessageProducerSupport 実装を組み合わせることをお勧めします。Publisher へのサブスクリプションがキャンセルされると、チャネルアダプターは停止状態になります。このようなチャネルアダプターで stop() を呼び出すと、ソース Publisher からの生成が完了します。チャネルアダプターは、新しく作成されたソース Publisher への自動サブスクリプションで再起動できます。

Reactive Streams へのメッセージソース

バージョン 5.3 から、ReactiveMessageSourceProducer が提供されます。これは、提供された MessageSource とイベント駆動型の本番を構成された outputChannel に組み合わせたものです。内部的には、MessageSource を繰り返し再サブスクライブされた Mono にラップし、上記の subscribeToPublisher(Publisher<? extends Message<?>>) でサブスクライブされる Flux<Message<?>> を生成します。この Mono のサブスクリプションは、ターゲット MessageSource でブロックされる可能性を回避するために Schedulers.boundedElastic() を使用して行われます。メッセージソースが null (プルするデータがない) を返すと、Mono は、サブスクライバーコンテキストからの IntegrationReactiveUtils.DELAY_WHEN_EMPTY_KEY Duration エントリに基づく後続の再サブスクリプションのために delay を使用して repeatWhenEmpty() 状態に変わります。デフォルトでは、1 秒です。MessageSource がヘッダーに IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 情報を含むメッセージを生成する場合、(必要に応じて) 元の Mono の doOnSuccess() で確認応答され、ダウンストリームフローが拒否する失敗メッセージを含む MessagingException をスローする場合、doOnError() で拒否されます。この ReactiveMessageSourceProducer は、ポーリングチャネルアダプターの機能を既存の MessageSource<?> 実装のリアクティブなオンデマンドソリューションに変える必要がある場合に、あらゆるユースケースに使用できます。

スプリッターとアグリゲーター

AbstractMessageSplitter がそのロジック用の Publisher を取得すると、プロセスは Publisher 内のアイテムを自然に処理し、outputChannel に送信するためのメッセージにマップします。このチャネルが ReactiveStreamsSubscribableChannel の場合、Publisher の Flux ラッパーはオンデマンドでそのチャネルからサブスクライブされ、このスプリッター動作は、受信イベントを複数値の出力 Publisher にマップするとき、flatMap Reactor オペレーターに似ています。統合フロー全体がスプリッターの前後に FluxMessageChannel を使用して構築され、Spring Integration 構成を Reactive Streams 要件およびそのオペレーターのイベント処理に合わせて調整することが最も理にかなっています。通常のチャネルでは、Publisher は Iterable に変換され、標準の反復および生成の分割ロジックに使用されます。

FluxAggregatorMessageHandler は、特定の Reactive Streams ロジック実装の別のサンプルであり、プロジェクト Reactor に関しては "reactive operator" として扱うことができます。これは、Flux.groupBy() および Flux.window() (または buffer())演算子に基づいています。受信メッセージは、FluxAggregatorMessageHandler が作成されたときに開始された Flux.create() に埋め込まれ、ホットソースになります。この Flux は、オンデマンドで ReactiveStreamsSubscribableChannel によってサブスクライブされるか、outputChannel がリアクティブでない場合は FluxAggregatorMessageHandler.start() で直接サブスクライブされます。この MessageHandler は、統合フロー全体がこのコンポーネントの前後に FluxMessageChannel を使用して構築されている場合に威力を発揮し、ロジック全体のバックプレッシャーを準備します。

詳細については、ストリームと Flux の分割および Flux Aggregator を参照してください。

Java DSL

Java DSL の IntegrationFlow は、任意の Publisher インスタンスから開始できます(IntegrationFlow.from(Publisher<Message<T>>) を参照)。また、IntegrationFlowBuilder.toReactivePublisher() オペレーターを使用すると、IntegrationFlow をリアクティブホットソースに変えることができます。FluxMessageChannel は、どちらの場合も内部で使用されます。ReactiveStreamsSubscribableChannel 契約に従って受信 Publisher にサブスクライブでき、ダウンストリームサブスクライバー用の Publisher<Message<?>> 自体です。動的な IntegrationFlow 登録を使用すると、Reactive Streams と Publisher との間でブリッジするこの統合フローを組み合わせた強力なロジックを実装できます。

バージョン 5.5.6 以降、返された Publisher<Message<?>> の背後にある IntegrationFlow 全体のライフサイクルを制御する toReactivePublisher(boolean autoStartOnSubscribe) オペレーターバリアントが存在します。通常、リアクティブパブリッシャーからのサブスクリプションと消費は、リアクティブストリームの構成時や ApplicationContext の起動時ではなく、後のランタイムフェーズで発生します。Publisher<Message<?>> サブスクリプションポイントでの IntegrationFlow のライフサイクル管理のボイラープレートコードを回避し、エンドユーザーエクスペリエンスを向上させるために、autoStartOnSubscribe フラグを持つこの新しい演算子が導入されました。( true の場合) IntegrationFlow とそのコンポーネントを autoStartup = false にマークするため、ApplicationContext はフロー内のメッセージの生成と消費を自動的に開始しません。代わりに、IntegrationFlow の start() が内部 Flux.doOnSubscribe() から開始されます。autoStartOnSubscribe 値とは無関係に、フローは Flux.doOnCancel() および Flux.doOnTerminate() から停止されます。消費するものが何もない場合、メッセージを生成しても意味がありません。

まったく逆のユースケースでは、IntegrationFlow がリアクティブストリームを呼び出して完了後に続行する必要がある場合、fluxTransform() 演算子が IntegrationFlowDefinition で提供されます。この時点でのフローは FluxMessageChannel に変わり、Flux.transform() オペレーターで実行される提供された fluxFunction に伝搬されます。この関数の結果は、ダウンストリームフロー用に別の FluxMessageChannel によってサブスクライブされる出力 Flux へのフラットマッピングのために Mono<Message<?>> にラップされます。

詳細については、Java DSL の章を参照してください。

ReactiveMessageHandler

バージョン 5.3 以降、ReactiveMessageHandler はフレームワークでネイティブにサポートされています。この型のメッセージハンドラーは、低レベル操作の実行のためにオンデマンドサブスクリプションのリアクティブ型を返し、リアクティブストリーム構成を続行するための応答データを提供しないリアクティブクライアント用に設計されています。ReactiveMessageHandler が命令型統合フローで使用される場合、handleMessage() は戻り直後にサブスクライブされます。これは、このようなフローにバックプレッシャーを尊重するための反応ストリーム構成がないためです。この場合、フレームワークはこの ReactiveMessageHandler を ReactiveMessageHandlerAdapter ( MessageHandler の単純な実装) にラップします。ただし、フローに ReactiveStreamsConsumer が含まれる場合 (たとえば、消費するチャネルが FluxMessageChannel の場合)、そのような ReactiveMessageHandler は、消費中のバックプレッシャーを尊重するために flatMap() Reactor オペレーターを使用してリアクティブストリーム全体に構成されます。

すぐに使える ReactiveMessageHandler 実装の 1 つは、送信チャネルアダプター用の ReactiveMongoDbStoringMessageHandler です。詳細については、MongoDB Reactive Channel Adapters を参照してください。

バージョン 6.1 以降、IntegrationFlowDefinition は便利な handleReactive(ReactiveMessageHandler) ターミナルオペレーターを公開します。この演算子には、任意の ReactiveMessageHandler 実装 (Mono API を使用した単純なラムダでも) を使用できます。フレームワークは、返された Mono<Void> を自動的にサブスクライブします。この演算子の可能な構成の簡単なサンプルを次に示します。

@Bean
public IntegrationFlow wireTapFlow1() {
    return IntegrationFlow.from("tappedChannel1")
            .wireTap("tapChannel", wt -> wt.selector(m -> m.getPayload().equals("foo")))
            .handleReactive((message) -> Mono.just(message).log().then());
}

このオペレーターのオーバーロードされたバージョンは、Consumer<GenericEndpointSpec<ReactiveMessageHandlerAdapter>> を受け入れて、提供された ReactiveMessageHandler を中心にコンシューマーエンドポイントをカスタマイズします。

さらに、ReactiveMessageHandlerSpec ベースのバリアントも提供されます。ほとんどの場合、これらはプロトコル固有のチャネルアダプターの実装に使用されます。それぞれのリアクティブチャネルアダプターを含むターゲットテクノロジへのリンクに続く次のセクションを参照してください。

リアクティブチャネルアダプター

統合のターゲットプロトコルが Reactive Streams ソリューションを提供する場合、Spring Integration にチャネルアダプターを実装するのは簡単です。

受信のイベントドリブンチャネルアダプターの実装は、リクエスト (必要な場合) を遅延 Mono または Flux にラップし、プロトコルコンポーネントがサーバーから返された Mono へのサブスクリプションを開始した場合にのみ、送信を実行します (そして、応答があれば応答を生成します)。リスナーメソッド。このようにして、このコンポーネントに正確にカプセル化されたリアクティブストリームソリューションが得られます。もちろん、出力チャネルでサブスクライブされたダウンストリーム統合フローは、Reactive Streams 仕様を尊重し、オンデマンドでバックプレッシャー対応の方法で実行する必要があります。

これは、統合フローで使用される MessageHandler プロセッサーの性質 (または現在の実装) により、常に利用できるとは限りません。この制限は、リアクティブな実装がない場合、統合エンドポイントの前後にスレッドプールとキューまたは FluxMessageChannel (上記を参照) を使用して処理できます。

リアクティブなイベント駆動型受信チャネルアダプターの例:

public class CustomReactiveMessageProducer extends MessageProducerSupport {

    private final CustomReactiveSource customReactiveSource;

    public CustomReactiveMessageProducer(CustomReactiveSource customReactiveSource) {
        Assert.notNull(customReactiveSource, "'customReactiveSource' must not be null");
        this.customReactiveSource = customReactiveSource;
    }

    @Override
    protected void doStart() {
        Flux<Message<?>> messageFlux =
            this.customReactiveSource
                .map(event - >
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());

        subscribeToPublisher(messageFlux);
    }
}

使用箇所は次のようになります。

public class MainFlow {
  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .channel(outputChannel)
        .get();
  }
}

または宣言的な方法で:

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(new CustomReactiveMessageProducer(new CustomReactiveSource()))
        .handle(outputChannel)
        .get();
  }
}

または、チャネルアダプターがなくても、次の方法でいつでも Java DSL を使用できます。

public class MainFlow {
  @Bean
  public IntegrationFlow buildFlow() {
    Flux<Message<?>> myFlux = this.customReactiveSource
                .map(event ->
                    MessageBuilder
                    .withPayload(event.getBody())
                    .setHeader(MyReactiveHeaders.SOURCE_NAME, event.getSourceName())
                    .build());
     return IntegrationFlow.from(myFlux)
        .handle(outputChannel)
        .get();
  }
}

リアクティブ送信 チャネルアダプターの実装は、ターゲットプロトコル用に提供されたリアクティブ API に従って、リアクティブストリームを開始 (または継続) して外部システムとやり取りすることに関するものです。受信 ペイロードは、それ自体がリアクティブ型である場合もあれば、上部のリアクティブストリームの一部である統合フロー全体のイベントである場合もあります。返されたリアクティブ型は、一方向のファイアアンドフォーゲットシナリオの場合はすぐにサブスクライブできます。または、さらなる統合フローまたはターゲットビジネスロジックでの明示的なサブスクリプションのためにダウンストリーム (リクエスト - 応答シナリオ) に伝播されますが、リアクティブストリームのセマンティクスを維持するまだ下流。

リアクティブな送信チャネルアダプターの例:

public class CustomReactiveMessageHandler extends AbstractReactiveMessageHandler {

    private final CustomEntityOperations customEntityOperations;

    public CustomReactiveMessageHandler(CustomEntityOperations customEntityOperations) {
        Assert.notNull(customEntityOperations, "'customEntityOperations' must not be null");
        this.customEntityOperations = customEntityOperations;
    }

    @Override
    protected Mono<Void> handleMessageInternal(Message<?> message) {
        return Mono.fromSupplier(() -> message.getHeaders().get("queryType", Type.class))
                .flatMap(mode -> {
                    switch (mode) {
                        case INSERT:
                            return handleInsert(message);
                        case UPDATE:
                            return handleUpdate(message);
                        default:
                            return Mono.error(new IllegalArgumentException());
                    }
                }).then();
    }

    private Mono<Void> handleInsert(Message<?> message) {
        return this.customEntityOperations.insert(message.getPayload())
                .then();
    }

    private Mono<Void> handleUpdate(Message<?> message) {
        return this.r2dbcEntityOperations.update(message.getPayload())
                .then();
    }

    public enum Type {
        INSERT,
        UPDATE,
    }
}

両方のチャネルアダプターを使用できるようになります。

public class MainFlow {

  @Autowired
  private CustomReactiveMessageProducer customReactiveMessageProducer;

  @Autowired
  private CustomReactiveMessageHandler customReactiveMessageHandler;

  @Bean
  public IntegrationFlow buildFlow() {
     return IntegrationFlow.from(customReactiveMessageProducer)
        .transform(someOperation)
        .handle(customReactiveMessageHandler)
        .get();
  }
}

現在、Spring Integration は、WebFluxRSocketMongoDbR2DBCZeroMQGraphQLApache Cassandra 用のチャネルアダプター (またはゲートウェイ) 実装を提供しています。Redis ストリームチャネルアダプターもリアクティブで、Spring Data の ReactiveStreamOperations を使用します。Spring for Apache Kafka の ReactiveKafkaProducerTemplate および ReactiveKafkaConsumerTemplate に基づく Kafka の Apache Kafka など、さらに多くのリアクティブチャネルアダプターが登場する予定です。その他の多くの非リアクティブチャネルアダプターでは、リアクティブストリーム処理中のブロックを回避するために、スレッドプールの使用が推奨されます。

命令的なコンテキストの伝播に反応する

コンテキストの伝播 [GitHub] (英語) ライブラリがクラスパスにある場合、プロジェクト Reactor は ThreadLocal 値 (例: Micrometer Observation (英語) または SecurityContextHolder) を取得し、Subscriber コンテキストに格納できます。逆の操作も可能です。トレース用にロギング MDC を設定する必要がある場合、またはスコープから観測を復元するためにリアクティブストリームからサービスを呼び出す必要がある場合です。コンテキスト伝播のための特別な演算子については、プロジェクト Reactor のドキュメント (英語) で詳細を参照してください。Subscriber コンテキストはダウンストリームからコンポジションの先頭 (Flux または Mono) まで見えるため、ソリューション全体が単一のリアクティブストリームコンポジションである場合、コンテキストの保存と復元はスムーズに機能します。ただし、アプリケーションが異なる Flux インスタンス間で切り替えたり、命令型処理に切り替えたり元に戻したりすると、Subscriber に関連付けられたコンテキストが利用できない可能性があります。このようなユースケースのために、Spring Integration は、たとえば、直接 send() 操作を実行するときに、リアクティブストリームから生成された IntegrationMessageHeaderAccessor.REACTOR_CONTEXT メッセージヘッダーに Reactor ContextView を格納する追加機能 (バージョン 6.0.5 以降) を提供します。このヘッダーは、FluxMessageChannel.subscribeTo() で使用され、このチャネルが発行する Message の Reactor コンテキストを復元します。現在、このヘッダーは WebFluxInboundEndpoint および RSocketInboundGateway コンポーネントから取り込まれていますが、リアクティブ型から命令型への統合が実行される任意のソリューションで使用できます。このヘッダーに入力するロジックは次のようになります。

return requestMono
        .flatMap((message) ->
                Mono.deferContextual((context) ->
                        Mono.just(message)
                                .handle((messageToSend, sink) ->
                                        send(messageWithReactorContextIfAny(messageToSend, context)))));
...

private Message<?> messageWithReactorContextIfAny(Message<?> message, ContextView context) {
    if (!context.isEmpty()) {
        return getMessageBuilderFactory()
                .fromMessage(message)
                .setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, context)
                .build();
    }
    return message;
}

Reactor がコンテキストから ThreadLocal 値を復元できるようにするには、まだ handle() 演算子を使用する必要があることに注意してください。ヘッダーとして送信されたとしても、ダウンストリームで ThreadLocal 値に復元する場合、フレームワークは想定できません。

他の Flux または Mono コンポジションで Message からコンテキストを復元するには、次のロジックを実行できます。

Mono.just(message)
        .handle((messageToHandle, sink) -> ...)
        .contextWrite(StaticMessageHeaderAccessor.getReactorContext(message)));