メッセージングゲートウェイ

ゲートウェイは、Spring Integration によって提供されるメッセージング API を隠します。これにより、アプリケーションのビジネスロジックは Spring Integration API を認識できなくなります。汎用ゲートウェイを使用することにより、コードは単純なインターフェースのみと対話します。

GatewayProxyFactoryBean を入力してください

前述のように、Spring Integration API(ゲートウェイクラスを含む)に依存しないことは素晴らしいことです。そのため、Spring Integration は GatewayProxyFactoryBean を提供します。GatewayProxyFactoryBean は、任意のインターフェースのプロキシを生成し、以下に示すゲートウェイメソッドを内部的に呼び出します。依存性注入を使用することにより、ビジネスメソッドにインターフェースを公開できます。

次の例は、Spring Integration との対話に使用できるインターフェースを示しています。

public interface Cafe {

    void placeOrder(Order order);

}

ゲートウェイ XML 名前空間のサポート

名前空間のサポートも提供されます。次の例に示すように、インターフェースをサービスとして設定できます。

<int:gateway id="cafeService"
         service-interface="org.cafeteria.Cafe"
         default-request-channel="requestChannel"
         default-reply-timeout="10000"
         default-reply-channel="replyChannel"/>

この構成を定義すると、cafeService を他の Bean に注入できるようになり、Cafe インターフェースのプロキシされたインスタンスでメソッドを呼び出すコードは Spring Integration API を認識しません。gateway 要素を使用する例 (Cafe デモ) については、“サンプル” 付録を参照してください。

前述の設定のデフォルトは、ゲートウェイインターフェース上のすべてのメソッドに適用されます。応答タイムアウトが指定されていない場合、呼び出しスレッドは 30 秒間応答を待ちます。レスポンスがない場合のゲートウェイの動作を参照してください。

デフォルトは、個々のメソッドに対してオーバーライドできます。アノテーションと XML を使用したゲートウェイ構成を参照してください。

デフォルトの応答チャネルの設定

通常、ゲートウェイは応答をリッスンする一時的な匿名応答チャネルを自動作成するため、default-reply-channel を指定する必要はありません。ただし、default-reply-channel (または HTTP、JMS などのアダプターゲートウェイを備えた reply-channel)を定義するように求められる場合があります。

背景として、ゲートウェイの内部動作について簡単に説明します。ゲートウェイは、一時的なポイントツーポイント応答チャネルを作成します。これは匿名であり、replyChannel という名前でメッセージヘッダーに追加されます。明示的な default-reply-channel (リモートアダプターゲートウェイを使用した reply-channel ) を提供する場合、publish-subscribe チャネルを指定できます。これは、複数のサブスクライバーを追加できるため、この名前が付けられています。内部的に、Spring Integration は、一時的な replyChannel と明示的に定義された default-reply-channel の間にブリッジを作成します。

返信をゲートウェイだけでなく、他のコンシューマーにも送信するとします。この場合、次の 2 つが必要です。

  • サブスクライブできる名前付きチャンネル

  • パブリッシュ / サブスクライブチャネルになるチャネル

ヘッダーに追加された応答チャネルは匿名でポイントツーポイントであるため、ゲートウェイで使用されるデフォルトの戦略はこれらのニーズを満たしません。これは、他のサブスクライバーがそれへのハンドルを取得できないことを意味し、たとえそれが可能であっても、チャネルは 1 人のサブスクライバーのみがメッセージを取得するようなポイントツーポイントの動作を持ちます。default-reply-channel を定義することにより、選択したチャンネルを指すことができます。この場合、publish-subscribe-channel です。ゲートウェイは、そこから、ヘッダーに格納されている一時的な匿名応答チャネルへのブリッジを作成します。

また、インターセプター ( wiretap など) を介して監視または監査するための応答チャネルを明示的に提供することもできます。チャネルインターセプタを構成するには、名前付きチャネルが必要です。

バージョン 5.4 以降、ゲートウェイメソッドの戻り値の型が void の場合、フレームワークは、replyChannel ヘッダーが明示的に提供されていない場合、nullChannel Bean 参照として replyChannel ヘッダーを設定します。これにより、ダウンストリームフローからの可能性のあるすべての応答を破棄して、一方向ゲートウェイ契約を満たすことができます。

アノテーションと XML を使用したゲートウェイ構成

@Gateway アノテーションを追加することにより、前の Cafe インターフェースの例を拡張する次の例を検討してください。

public interface Cafe {

    @Gateway(requestChannel="orders")
    void placeOrder(Order order);

}

@Header アノテーションを使用すると、次の例に示すように、メッセージヘッダーとして解釈される値を追加できます。

public interface FileWriter {

    @Gateway(requestChannel="filesOut")
    void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);

}

ゲートウェイメソッドの構成に XML アプローチを好む場合、次の例に示すように、method 要素をゲートウェイ構成に追加できます。

<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB"/>
  <int:method name="echoViaDefault"/>
</int:gateway>

XML を使用して、メソッド呼び出しごとに個別のヘッダーを提供することもできます。これは、設定するヘッダーが本質的に静的であり、@Header アノテーションを使用してゲートウェイのメソッドシグネチャーにヘッダーを埋め込みたくない場合に役立ちます。例: ローンブローカーの例では、どの型のリクエストが開始されたか (単一の引用またはすべての引用) に基づいて、ローンの引用の集計方法に影響を与えたいと考えています。どのゲートウェイメソッドが呼び出されたかを評価してリクエストの型を決定することは可能ですが、関心の分離パラダイムに違反します (メソッドは Java アーティファクトです)。ただし、メッセージヘッダーで意図 (メタ情報) を表現することは、メッセージングアーキテクチャでは自然なことです。次の例は、2 つのメソッドのそれぞれに異なるメッセージヘッダーを追加する方法を示しています。

<int:gateway id="loanBrokerGateway"
         service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
  <int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="BEST"/>
  </int:method>
  <int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
    <int:header name="RESPONSE_TYPE" value="ALL"/>
  </int:method>
</int:gateway>

前の例では、ゲートウェイのメソッドに基づいて、"RESPONSE_TYPE" ヘッダーに異なる値が設定されています。

たとえば、<int:method/> および @Gateway アノテーションで requestChannel を指定すると、アノテーション値が優先されます。
引数なしのゲートウェイが XML で指定されており、インターフェースメソッドに @Payload アノテーションと @Gateway アノテーションの両方がある場合(<int:method/> 要素に payloadExpression または payload-expression がある場合)、@Payload 値は無視されます。

式と「グローバル」ヘッダー

<header/> 要素は、value の代替として expression をサポートします。SpEL 式が評価され、ヘッダーの値が決定されます。バージョン 5.2 以降、評価コンテキストの #root オブジェクトは、getMethod() および getArgs() アクセサーを持つ MethodArgsHolder です。例: 単純なメソッド名でルーティングする場合は、次の式でヘッダーを追加できます: method.name

java.reflect.Method は直列化できません。後でメッセージを直列化すると、method の式を持つヘッダーは失われます。これらの場合は method.name または method.toString() を使用することをお勧めします。toString() メソッドは、パラメーターと戻り値の型を含むメソッドの String 表現を提供します。

バージョン 3.0 以降、呼び出されたメソッドに関係なく、<default-header/> 要素を定義して、ゲートウェイによって生成されたすべてのメッセージにヘッダーを追加できます。メソッドに定義された特定のヘッダーは、デフォルトのヘッダーよりも優先されます。ここでメソッドに定義された特定のヘッダーは、サービスインターフェースの @Header アノテーションをオーバーライドします。ただし、デフォルトのヘッダーは、サービスインターフェースの @Header アノテーションをオーバーライドしません。

ゲートウェイは、(オーバーライドされない限り)すべてのメソッドに適用される default-payload-expression もサポートするようになりました。

メソッドへの引数のメッセージへのマッピング

前のセクションの構成手法を使用すると、メソッドの引数をメッセージ要素(ペイロードとヘッダー)にマップする方法を制御できます。明示的な構成が使用されない場合、特定の規則がマッピングの実行に使用されます。場合によっては、これらの規則では、どの引数がペイロードであり、どの引数をヘッダーにマップする必要があるかを判断できません。次の例を考えてみましょう。

public String send1(Object thing1, Map thing2);

public String send2(Map thing1, Map thing2);

最初の場合、慣例では、最初の引数をペイロードにマップし(Map でない限り)、2 番目の引数の内容はヘッダーになります。

2 番目の場合(またはパラメーター thing1 の引数が Map である場合の最初の場合)、フレームワークはどの引数をペイロードにするかを決定できません。その結果、マッピングは失敗します。通常、これは payload-expression@Payload アノテーション、@Headers アノテーションを使用して解決できます。

別の方法(および規則が破られるたび)では、メソッド呼び出しをメッセージにマッピングする責任をすべて負うことができます。そのためには、MethodArgsMessageMapper を実装し、mapper 属性を使用して <gateway/> に提供します。マッパーは MethodArgsHolder をマッピングします。これは、java.reflect.Method インスタンスと引数を含む Object[] をラップする単純なクラスです。カスタムマッパーを提供する場合、default-payload-expression 属性と <default-header/> 要素はゲートウェイで許可されていません。同様に、payload-expression 属性および <header/> 要素は、<method/> 要素では許可されていません。

マッピングメソッドの引数

次の例は、メソッド引数をメッセージにマップする方法と、無効な構成の例を示しています。

public interface MyGateway {

    void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);

    void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);

    void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);

    void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added

    void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);

    @Payload("args[0] + args[1] + '!'")
    void payloadAnnotationAtMethodLevel(String a, String b);

    @Payload("@someBean.exclaim(args[0])")
    void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);

    void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);

    void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); //  (1)

    // invalid
    void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);

    // invalid
    void twoPayloads(@Payload String s1, @Payload String s2);

    // invalid
    void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);

    // invalid
    void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);

}
1 この例では、SpEL 変数 #this が引数を参照していることに注意してください。この場合は s の値です。

メソッド引数の #this コンテキストがないため、同等の XML は少し異なります。ただし、式は、次の例に示すように、MethodArgsHolder ルートオブジェクトの args プロパティを使用してメソッド引数を参照できます (詳細については、式と「グローバル」ヘッダーを参照してください)。

<int:gateway id="myGateway" service-interface="org.something.MyGateway">
  <int:method name="send1" payload-expression="args[0] + 'thing2'"/>
  <int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
  <int:method name="send3" payload-expression="method"/>
  <int:method name="send4">
    <int:header name="thing1" expression="args[2].toUpperCase()"/>
  </int:method>
</int:gateway>

@MessagingGateway アノテーション

バージョン 4.0 以降、ゲートウェイサービスインターフェースは、構成のために <gateway /> xml 要素の定義を要求する代わりに、@MessagingGateway アノテーションでマークできます。次のペアの例では、同じゲートウェイを構成するための 2 つのアプローチを比較しています。

<int:gateway id="myGateway" service-interface="org.something.TestGateway"
      default-request-channel="inputC">
  <int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
  <int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
  <int:method name="echoUpperCase" request-channel="inputB">
    <int:header name="thing1" value="thing2"/>
  </int:method>
  <int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
		  defaultHeaders = @GatewayHeader(name = "calledMethod",
		                           expression="#gatewayMethod.name"))
public interface TestGateway {

   @Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
   String echo(String payload);

   @Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
   String echoUpperCase(String payload);

   String echoViaDefault(String payload);

}
XML バージョンと同様に、コンポーネントスキャン中に Spring Integration がこれらのアノテーションを検出すると、メッセージングインフラストラクチャを使用して proxy 実装を作成します。このスキャンを実行して BeanDefinition をアプリケーションコンテキストに登録するには、@IntegrationComponentScan アノテーションを @Configuration クラスに追加します。標準の @ComponentScan インフラストラクチャはインターフェースを処理しません。そのため、カスタム @IntegrationComponentScan ロジックを導入して、インターフェースで @MessagingGateway アノテーションを検索し、それらの GatewayProxyFactoryBean インスタンスを登録しました。アノテーションサポートも参照してください。

@MessagingGateway アノテーションとともに、@Profile アノテーションでサービスインターフェースをマークして、そのようなプロファイルがアクティブでない場合、Bean の作成を回避できます。

バージョン 6.0 から、@MessagingGateway とのインターフェースは、Spring @Component 定義で可能なように、それぞれの構成ロジックの @Primary アノテーションでマークすることもできます。

バージョン 6.0 以降では、@MessagingGateway インターフェースを標準の Spring @Import 構成で使用できます。これは、@IntegrationComponentScan または手動の AnnotationGatewayProxyFactoryBean Bean 定義の代替として使用できます。

@MessagingGateway は、バージョン 6.0 から @MessageEndpoint でメタアノテーションが付けられ、name() 属性は基本的に @Compnent.value() にエイリアスされます。このようにして、ゲートウェイプロキシの Bean 名生成戦略は、スキャンおよびインポートされたコンポーネントの標準 Spring アノテーション構成と再調整されます。デフォルトの AnnotationBeanNameGenerator は、AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR を介して、または @IntegrationComponentScan.nameGenerator() 属性としてグローバルにオーバーライドできます。

XML 構成がない場合は、少なくとも 1 つの @Configuration クラスで @EnableIntegration アノテーションが必要です。詳細については、構成と @EnableIntegration を参照してください。

引数なしのメソッドの呼び出し

引数を持たない Gateway インターフェースでメソッドを呼び出す場合、デフォルトの動作は PollableChannel から Message を受け取ります。

ただし、引数のない SQL 呼び出しやストアドプロシージャのトリガーなど、ユーザーが提供するパラメーターを必要としないダウンストリームの他のコンポーネントと対話できるように、引数のないメソッドをトリガーしたい場合があります。

送受信のセマンティクスを実現するには、ペイロードを提供する必要があります。ペイロードを生成するために、インターフェースのメソッドパラメーターは必要ありません。@Payload アノテーションまたは method 要素の XML の payload-expression 属性を使用できます。次のリストには、ペイロードの例をいくつか示します。

  • リテラル文字列

  • #gatewayMethod.name

  • 新しい java.util.Date()

  • @someBean.someMethod() の戻り値

次の例は、@Payload アノテーションの使用方法を示しています。

public interface Cafe {

    @Payload("new java.util.Date()")
    List<Order> retrieveOpenOrders();

}

@Gateway アノテーションを使用することもできます。

public interface Cafe {

    @Gateway(payloadExpression = "new java.util.Date()")
    List<Order> retrieveOpenOrders();

}
両方のアノテーションが存在する場合(および payloadExpression が提供されている場合)、@Gateway が優先されます。

メソッドに引数および戻り値がなく、ペイロード式が含まれている場合、メソッドは送信専用操作として扱われます。

default メソッドの呼び出し

ゲートウェイプロキシのインターフェースには default メソッドも含まれる場合があり、バージョン 5.3 以降、フレームワークはプロキシに DefaultMethodInvokingMethodInterceptor を注入して、プロキシの代わりに java.lang.invoke.MethodHandle アプローチを使用して default メソッドを呼び出します。java.util.function.Function などの JDK からのインターフェースは引き続きゲートウェイプロキシに使用できますが、JDK クラスに対する MethodHandles.Lookup インスタンス化の内部 Java セキュリティ上の理由により、それらの default メソッドを呼び出すことができません。これらのメソッドは、メソッドの明示的な @Gateway アノテーション、または @MessagingGateway アノテーションまたは <gateway> XML コンポーネントの proxyDefaultMethods を使用して、プロキシ化(実装ロジックを失い、同時に以前のゲートウェイプロキシの動作を復元)することもできます。

エラー処理

ゲートウェイの呼び出しはエラーになる可能性があります。デフォルトでは、ダウンストリームで発生したエラーは、ゲートウェイのメソッド呼び出し時に「そのまま」再スローされます。例: 次の簡単なフローを検討してください。

gateway -> service-activator

サービスアクティベータによって呼び出されたサービスが MyException をスローした場合(たとえば)、フレームワークはそれを MessagingException にラップし、failedMessage プロパティでサービスアクティベータに渡されたメッセージを添付します。そのため、フレームワークによって実行されるロギングには、障害の完全なコンテキストが含まれます。デフォルトでは、例外がゲートウェイによってキャッチされると、MyException はラップ解除され、呼び出し元にスローされます。原因チェーンの特定の例外型と一致するように、ゲートウェイメソッド宣言で throws 句を構成できます。例: ダウンストリームエラーの理由のすべてのメッセージング情報で MessagingException 全体をキャッチする場合、次のようなゲートウェイメソッドが必要です。

public interface MyGateway {

    void performProcess() throws MessagingException;

}

POJO プログラミングを推奨しているため、発信者をメッセージングインフラストラクチャに公開したくない場合があります。

ゲートウェイメソッドに throws 句がない場合、ゲートウェイは原因ツリーを走査し、MessagingException ではない RuntimeException を探します。何も見つからない場合、フレームワークは MessagingException をスローします。前の説明の MyException に SomeOtherException とメソッド throws SomeOtherException の原因がある場合、ゲートウェイはそれをさらにアンラップし、呼び出し元にスローします。

service-interface なしでゲートウェイが宣言されると、内部フレームワークインターフェース RequestReplyExchanger が使用されます。

次の例を考えてみましょう。

public interface RequestReplyExchanger {

	Message<?> exchange(Message<?> request) throws MessagingException;

}

バージョン 5.0 より前は、この exchange メソッドには throws 節がありませんでした。その結果、例外はラップ解除されました。このインターフェースを使用して以前のアンラップ動作を復元する場合は、代わりにカスタム service-interface を使用するか、MessagingException の cause にアクセスしてください。

ただし、エラーを伝播するのではなくログに記録したい場合や、例外を有効な応答として扱いたい場合があります (呼び出し元が理解できる「エラーメッセージ」契約に準拠するメッセージにマッピングすることによって)。これを実現するために、ゲートウェイは、error-channel 属性のサポートを含めることによって、エラー専用のメッセージチャネルのサポートを提供します。次の例では、「トランスフォーマー」が Exception から応答 Message を作成します。

<int:gateway id="sampleGateway"
    default-request-channel="gatewayChannel"
    service-interface="foo.bar.SimpleGateway"
    error-channel="exceptionTransformationChannel"/>

<int:transformer input-channel="exceptionTransformationChannel"
        ref="exceptionTransformer" method="createErrorResponse"/>

exceptionTransformer は、予期されるエラーレスポンスオブジェクトの作成方法を知っている単純な POJO である可能性があります。それが発呼者に送り返されるペイロードになります。必要に応じて、このような「エラーフロー」でより多くの詳細な処理を実行できます。ルーター(Spring Integration の ErrorMessageExceptionTypeRouter を含む)、フィルターなどが含まれる場合があります。ただし、ほとんどの場合、単純な「トランス」で十分です。

または、例外のみをログに記録する(または非同期でどこかに送信する)こともできます。一方向のフローを提供する場合、発信者には何も返されません。例外を完全に抑制したい場合は、グローバル nullChannel への参照を提供できます(本質的に /dev/null アプローチ)。最後に、上記のように、error-channel が定義されていない場合、例外は通常どおり伝播します。

@MessagingGateway アノテーション (@MessagingGateway` アノテーション ), you can use an `errorChannel 属性を参照) を使用する場合。

バージョン 5.0 以降、void 戻り値の型(一方向フロー)でゲートウェイメソッドを使用すると、error-channel 参照(提供されている場合)が各送信メッセージの標準 errorChannel ヘッダーに取り込まれます。この機能により、標準の ExecutorChannel 構成(または QueueChannel)に基づいたダウンストリーム非同期フローが、デフォルトのグローバル errorChannel 例外送信動作をオーバーライドできます。以前は、@GatewayHeader アノテーションまたは <header> 要素で errorChannel ヘッダーを手動で指定する必要がありました。error-channel プロパティは、非同期フローを持つ void メソッドでは無視されました。代わりに、エラーメッセージがデフォルトの errorChannel に送信されました。

シンプルな POJI ゲートウェイを通じてメッセージングシステムを公開すると利点がありますが、基礎となるメッセージングシステムの現実を「隠す」ことには代償が伴うため、考慮すべき点がいくつかあります。Java メソッドができるだけ早く戻るようにし、呼び出し側が (void、戻り値、スローされた例外のいずれであっても) 戻りを待っている間、無期限にハングしないようにしたいと考えています。通常のメソッドがメッセージングシステムの前でプロキシとして使用される場合、基礎となるメッセージングの非同期の性質を考慮する必要があります。これは、ゲートウェイによって開始されたメッセージがフィルターによってドロップされ、応答を生成するコンポーネントに到達しない可能性があることを意味します。一部のサービスアクティベーターメソッドでは例外が発生し、応答が返されない場合があります (null メッセージは生成されないため)。つまり、複数のシナリオにより、応答メッセージが受信されない可能性があります。これはメッセージングシステムではごく自然なことです。ただし、ゲートウェイ方式への影響について考えてください。ゲートウェイのメソッド入力引数はメッセージに組み込まれ、ダウンストリームに送信されました。応答メッセージはゲートウェイのメソッドの戻り値に変換されます。そのため、ゲートウェイ呼び出しごとに常に応答メッセージがあることを確認したい場合があります。そうしないと、reply-timeout が負の値に設定されている場合、ゲートウェイメソッドが戻らず、無期限にハングする可能性があります。この状況に対処する 1 つの方法は、非同期ゲートウェイを使用することです (このセクションで後ほど説明します)。これを処理する別の方法は、デフォルトの reply-timeout を 30 秒として利用することです。こうすることで、ゲートウェイは reply-timeout で指定された時間を超えてハングすることはなく、そのタイムアウトが経過すると "null" を返します。最後に、サービスアクティベーターに "requires-reply"、フィルターに "throw-Exceptions-on-rejection" などのダウンストリームフラグを設定することを検討することをお勧めします。これらのオプションについては、この章の最後のセクションで詳しく説明します。
ダウンストリームフローが ErrorMessage を返す場合、その payload (Throwable) は通常のダウンストリームエラーとして扱われます。error-channel が構成されている場合は、エラーフローに送信されます。それ以外の場合、ペイロードはゲートウェイの呼び出し元にスローされます。同様に、error-channel のエラーフローが ErrorMessage を返す場合、そのペイロードは呼び出し元にスローされます。同じことが、Throwable ペイロードを持つすべてのメッセージに適用されます。これは、Exception を呼び出し元に直接伝搬する必要がある非同期の状況で役立ちます。これを行うには、Exception を (一部のサービスからの reply として) 返すか、スローします。一般に、非同期フローの場合でも、フレームワークは、ダウンストリームフローによってスローされた例外をゲートウェイに伝播する処理を行います。TCP クライアントサーバーマルチプレックス [GitHub] (英語) サンプルは、呼び出し元に例外を返す両方の手法を示しています。aggregator と group-timeout ( アグリゲーターとグループのタイムアウトを参照) を使用し、破棄フローで MessagingTimeoutException 応答を使用して、待機スレッドへのソケット IO エラーをエミュレートします。

ゲートウェイのタイムアウト

ゲートウェイには、requestTimeout と replyTimeout の 2 つのタイムアウトプロパティがあります。リクエストのタイムアウトは、チャネルがブロックできる場合にのみ適用されます(たとえば、境界のある QueueChannel がいっぱいの場合)。replyTimeout 値は、ゲートウェイが応答を待つ時間または null を返す時間です。デフォルトは無限です。

タイムアウトは、ゲートウェイ(defaultRequestTimeout および defaultReplyTimeout)または MessagingGateway インターフェースアノテーションのすべてのメソッドのデフォルトとして設定できます。個々のメソッドは、これらのデフォルト(<method/> 子要素内)または @Gateway アノテーションをオーバーライドできます。

バージョン 5.0 以降、次の例に示すように、タイムアウトは式として定義できます。

@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
        requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);

評価コンテキストには BeanResolver (他の Bean を参照するために @someBean を使用) があり、#root オブジェクトからの args 配列プロパティが使用可能です。このルートオブジェクトの詳細については、式と「グローバル」ヘッダーを参照してください。XML で構成する場合、次の例に示すように、タイムアウト属性は long 値または SpEL 式にすることができます。

<method name="someMethod" request-channel="someRequestChannel"
                      payload-expression="args[0]"
                      request-timeout="1000"
                      reply-timeout="args[1]">
</method>

非同期ゲートウェイ

パターンとして、メッセージングゲートウェイは、メッセージングシステムの全機能を公開しながら、メッセージング固有のコードを隠す優れたメソッドを提供します。前述のように、GatewayProxyFactoryBean はサービスインターフェースを介してプロキシを公開する便利なメソッドを提供し、POJO ベースのメッセージングシステムへのアクセスを提供します (独自のドメイン内のオブジェクト、プリミティブ / 文字列、その他のオブジェクトに基づく)。ただし、値を返す単純な POJO メソッドを介してゲートウェイが公開される場合、(メソッドが呼び出されたときに生成される) リクエストメッセージごとに、(メソッドが返されたときに生成される) 応答メッセージが存在する必要があることを意味します。メッセージングシステムは当然のことながら非同期であるため、「リクエストごとに必ず応答がある」という契約を常に保証できるとは限りません。Spring Integration 2.0 は非同期ゲートウェイのサポートを導入しました。これは、応答が予想されるかどうか、応答が到着するまでにかかる時間がわからない場合に、フローを開始する便利なメソッドを提供します。

これらの型のシナリオを処理するために、Spring Integration は java.util.concurrent.Future インスタンスを使用して非同期ゲートウェイをサポートします。

次の例に示すように、XML 構成から変更はなく、通常のゲートウェイを定義するのと同じ方法で非同期ゲートウェイを定義します。

<int:gateway id="mathService"
     service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
     default-request-channel="requestChannel"/>

ただし、ゲートウェイインターフェース(サービスインターフェース)は、次のように少し異なります。

public interface MathServiceGateway {

  Future<Integer> multiplyByTwo(int i);

}

前の例が示すように、ゲートウェイメソッドの戻り値の型は Future です。GatewayProxyFactoryBean は、ゲートウェイメソッドの戻り値の型が Future であることを確認すると、AsyncTaskExecutor を使用して直ちに非同期モードに切り替えます。それが違いの範囲です。このようなメソッドの呼び出しは、常に Future インスタンスですぐに返されます。その後、自分のペースで Future を操作して、結果の取得やキャンセルなどを行うことができます。また、Future インスタンスの他の使用と同様に、get() を呼び出すと、タイムアウト、実行例外などが明らかになる場合があります。次の例は、非同期ゲートウェイから戻る Future の使用方法を示しています。

MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult =  result.get(1000, TimeUnit.SECONDS);

より詳細な例については、Spring Integration サンプルの async-gateway [GitHub] (英語) サンプルを参照してください。

AsyncTaskExecutor

デフォルトでは、GatewayProxyFactoryBean は、戻り値の型が Future であるゲートウェイメソッドの内部 AsyncInvocationTask インスタンスを送信するときに org.springframework.core.task.SimpleAsyncTaskExecutor を使用します。ただし、<gateway/> 要素の構成の async-executor 属性により、Spring アプリケーションコンテキスト内で利用可能な java.util.concurrent.Executor の実装への参照を提供できます。

(デフォルト) SimpleAsyncTaskExecutor は、Future と CompletableFuture の両方の戻り型をサポートします。CompletableFuture を参照してください。デフォルトのエグゼキュータがありますが、次の例に示すように、ログでそのスレッドを識別できるように、外部エグゼキュータを提供すると便利なことがよくあります (XML を使用する場合、スレッド名はエグゼキュータの Bean 名に基づいています)。

@Bean
public AsyncTaskExecutor exec() {
    SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
    simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
    return simpleAsyncTaskExecutor;
}

@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}

別の Future 実装を返す場合は、カスタムエグゼキューターを提供するか、エグゼキューターを完全に無効にして、ダウンストリームフローからの応答メッセージペイロードで Future を返すことができます。エグゼキューターを無効にするには、GatewayProxyFactoryBean で null に設定します(setAsyncTaskExecutor(null) を使用)。XML を使用してゲートウェイを構成する場合は、async-executor="" を使用します。@MessagingGateway アノテーションを使用して構成する場合、次のようなコードを使用します。

@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {

    @Gateway(requestChannel = "gatewayChannel")
    Future<?> doAsync(String foo);

}
戻り値の型が特定の具体的な Future 実装または構成されたエグゼキューターでサポートされていない他のサブインターフェースである場合、フローは呼び出し側のスレッドで実行され、フローは応答メッセージペイロードで必要な型を返す必要があります

CompletableFuture

バージョン 4.2 以降、ゲートウェイメソッドは CompletableFuture<?> を返すことができるようになりました。この型を返す場合、2 つの操作モードがあります。

  • 非同期エグゼキューターが提供され、戻り値の型が(サブクラスではなく) CompletableFuture である場合、フレームワークはエグゼキューターでタスクを実行し、すぐに呼び出し元に CompletableFuture を返します。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor) は未来の創造に使用されます。

  • 非同期エグゼキューターが明示的に null に設定され、戻り値の型が CompletableFuture であるか、戻り値の型が CompletableFuture のサブクラスである場合、フローは呼び出し側のスレッドで呼び出されます。このシナリオでは、ダウンストリームフローは適切な型の CompletableFuture を返すことが期待されています。

使用シナリオ

次のシナリオでは、ダウンストリームフローが(Invoice オブジェクトを使用して)ゲートウェイに応答すると、呼び出し元スレッドはすぐに CompletableFuture<Invoice> を返します。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />

次のシナリオでは、ダウンストリームフローがゲートウェイへの応答のペイロードとして提供する場合、呼び出し元スレッドは CompletableFuture<Invoice> を返します。請求書の準備ができたら、他のプロセスで将来を完了する必要があります。

CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
    async-executor="" />

次のシナリオでは、ダウンストリームフローがゲートウェイへの応答のペイロードとして提供する場合、呼び出し元スレッドは CompletableFuture<Invoice> を返します。請求書の準備ができたら、他のプロセスで将来を完了する必要があります。DEBUG ロギングが有効になっている場合、ログエントリが出力され、このシナリオでは非同期エグゼキューターを使用できないことを示します。

MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />

次の例に示すように、CompletableFuture インスタンスを使用して、応答に対して追加の操作を実行できます。

CompletableFuture<String> process(String data);

...

CompletableFuture result = process("foo")
    .thenApply(t -> t.toUpperCase());

...

String out = result.get(10, TimeUnit.SECONDS);

Reactor Mono

バージョン 5.0 以降、GatewayProxyFactoryBean では、Mono<T> [GitHub] (英語) 戻り型を使用して、ゲートウェイインターフェースメソッドでプロジェクト Reactor (英語) を使用できます。内部 AsyncInvocationTask は Mono.fromCallable() にラップされています。

Mono を使用して、後で結果を取得することができます(Future<?> と同様)。または、結果がゲートウェイに返されたときに Consumer を呼び出すことにより、ディスパッチャーから結果を取得できます。

Mono は、フレームワークによってすぐにはフラッシュされません。基になるメッセージフローは、ゲートウェイメソッドが戻る前に開始されません(Future<?> Executor タスクの場合のように)。Mono がサブスクライブされると、フローが開始されます。あるいは、subscribe() が Flux 全体に関連している場合、Mono (「構成可能」である)は Reactor ストリームの一部である可能性があります。次の例は、Project Reactor を使用してゲートウェイを作成する方法を示しています。
@MessagingGateway
public interface TestGateway {

    @Gateway(requestChannel = "multiplyChannel")
    Mono<Integer> multiply(Integer value);

}

@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
    return value * 2;
}

このようなゲートウェイは、データの Flux を処理するサービスで使用できます。

@Autowired
TestGateway testGateway;

public void hadnleFlux() {
    Flux.just("1", "2", "3", "4", "5")
            .map(Integer::parseInt)
            .flatMap(this.testGateway::multiply)
            .collectList()
            .subscribe(System.out::println);
}

Project Reactor を使用する別の例は、次の例が示すように、単純なコールバックシナリオです。

Mono<Invoice> mono = service.process(myOrder);

mono.subscribe(invoice -> handleInvoice(invoice));

呼び出しスレッドは継続し、フローが完了すると handleInvoice() が呼び出されます。

詳細については、Kotlin コルーチンも参照してください。

非同期型を返すダウンストリームフロー

上記の AsyncTaskExecutor セクションで記述されていたように、ダウンストリームコンポーネントが非同期ペイロード (FutureMono など) を含むメッセージを返すようにする場合は、非同期エグゼキューターを明示的に null (または XML 構成を使用する場合は "" ) に設定する必要があります。次に、呼び出し元スレッドでフローが呼び出され、後で結果を取得できます。

非同期 void 戻り型

メッセージングゲートウェイメソッドは、次のように宣言できます。

@MessagingGateway
public interface MyGateway {

    @Gateway(requestChannel = "sendAsyncChannel")
    @Async
    void sendAsync(String payload);

}

ただし、ダウンストリームの例外が呼び出し元に伝播されることはありません。バージョン 6.0 以降、ダウンストリームフローの呼び出しと呼び出し元への例外の伝播の非同期動作を保証するために、フレームワークは Future<Void> と Mono<Void> の戻り値の型をサポートします。ユースケースは、単純な void 戻り型について前述した send-and-forget 動作に似ていますが、フロー実行が非同期で行われ、返された Future (または Mono) が null で完了するか、send 操作結果に従って例外的に完了するという違いがあります。

Future<Void> が正確なダウンストリームフロー応答である場合、ゲートウェイの asyncExecutor オプションを null (@MessagingGateway 構成の場合は AnnotationConstants.NULL ) に設定する必要があり、send 部分はプロデューサースレッドで実行されます。応答 1 は、ダウンストリームフローの構成によって異なります。このようにして、ターゲットアプリケーションが Future<Void> 応答を正しく生成します。Mono ユースケースは、すでにフレームワークのスレッド制御から外れているため、asyncExecutor を null に設定しても意味がありません。リクエスト応答ゲートウェイ操作の結果としての Mono<Void> は、ゲートウェイメソッドの Mono<?> 戻り値の型として構成する必要があります。

レスポンスがない場合のゲートウェイの動作

前に説明したように、ゲートウェイは、POJO メソッド呼び出しを通じてメッセージングシステムと対話する便利な方法を提供します。ただし、通常は常に例外が返されると予想される一般的なメソッド呼び出しは、メッセージ交換に 1 対 1 を常にマップするとは限りません(たとえば、応答メッセージが到着しない場合があります。戻る)。

このセクションの残りの部分では、さまざまなシナリオと、ゲートウェイをより予測可能な動作にする方法について説明します。同期ゲートウェイの動作をより予測可能にするために特定の属性を構成できますが、一部の属性は期待どおりに動作しない場合があります。それらの 1 つは reply-timeout (メソッドレベルまたはゲートウェイレベルの default-reply-timeout)です。reply-timeout 属性を調べて、さまざまなシナリオで同期ゲートウェイの動作にどのように影響を与えることができるか、およびできないかを確認します。シングルスレッドシナリオ(ダウンストリームのすべてのコンポーネントが直接チャネルを介して接続されている)とマルチスレッドシナリオ(たとえば、ダウンストリームのどこかに、シングルスレッド境界を破るポーリング可能チャネルまたはエグゼキュータチャネルがある場合)を調べます。

長期実行プロセスダウンストリーム

同期ゲートウェイ、シングルスレッド

コンポーネントのダウンストリームがまだ実行されている場合(おそらく無限ループまたは遅いサービスのため)、reply-timeout の設定は効果がなく、ダウンストリームサービスが終了するまで(例外を返すかスローすることにより)ゲートウェイメソッド呼び出しは戻りません。

同期ゲートウェイ、マルチスレッド

マルチスレッドのメッセージフローでコンポーネントのダウンストリームが(おそらく無限ループまたは遅いサービスのために)実行されている場合、GatewayProxyFactoryBean がタイムアウトに達するとゲートウェイメソッドの呼び出しが返されるようにすることで reply-timeout を設定すると効果があります。応答チャネルでポーリングし、タイムアウトが期限切れになるまでメッセージを待機します。ただし、実際の応答が生成される前にタイムアウトに達すると、ゲートウェイメソッドから "null" が返される可能性があります。ゲートウェイメソッドの呼び出しが返された後、応答メッセージ(生成された場合)が応答チャネルに送信されることを理解する必要があります。

タイムアウトが発生したときに null を返す代わりに MessageTimeoutException をスローする errorOnTimeout プロパティも参照してください。

ダウンストリームコンポーネントが "null" を返す

同期ゲートウェイ — single-threaded

ダウンストリームのコンポーネントが "null" を返し、reply-timeout が負の値に構成されている場合、"null" を返す可能性があるダウンストリームコンポーネント (サービスアクティベーターなど) に requires-reply 属性が設定されていない限り、ゲートウェイメソッド呼び出しは無期限にハングします。この場合、例外がスローされてゲートウェイに伝播されます。

同期ゲートウェイ — multi-threaded

動作は前のケースと同じです。

ゲートウェイメソッドシグネチャーが非 void の場合、ダウンストリームコンポーネントのリターンシグネチャーは "void"

同期ゲートウェイ — single-threaded

ダウンストリームのコンポーネントが "void" を返し、reply-timeout が負の値に構成されている場合、ゲートウェイメソッド呼び出しは無期限にハングします。

同期ゲートウェイ — multi-threaded

動作は前のケースと同じです。

ダウンストリームコンポーネントがランタイム例外を引き起こす

同期ゲートウェイ — single-threaded

コンポーネントのダウンストリームがランタイム例外をスローした場合、例外はエラーメッセージを介してゲートウェイに伝播され、再スローされます。

同期ゲートウェイ — multi-threaded

動作は前のケースと同じです。

デフォルトでは、reply-timeout は無制限であることを理解する必要があります。reply-timeout を負の値に設定すると、ゲートウェイメソッドの呼び出しが無期限にハングする機能があります。フローを確実に分析し、リモートでこれらのシナリオのいずれかが発生する可能性がある場合は、reply-timeout 属性を「安全な」値に設定する必要があります。デフォルトでは 30 秒です。さらに良いのは、ダウンストリームコンポーネントの requires-reply 属性を "true" に設定して、ダウンストリームコンポーネントが内部で null を返した直後に例外をスローすることによってタイムリーなレスポンスを確実に行うことができることです。ただし、reply-timeout が役に立たないシナリオ ( 最初のシナリオを参照) があることも認識する必要があります。つまり、メッセージフローを分析し、非同期ゲートウェイではなく同期ゲートウェイをいつ使用するかを決定することも重要です。前に説明したように、後者の場合は、Future インスタンスを返すゲートウェイメソッドを定義する必要があります。そうすれば、その戻り値を受け取ることが保証され、呼び出しの結果をより詳細に制御できるようになります。また、ルーターを扱うときは、resolution-required 属性を "true" に設定すると、ルーターが特定のチャネルを解決できない場合に例外がスローされることに注意してください。同様に、フィルターを処理する場合は、throw-exception-on-rejection 属性を設定できます。これらのどちらの場合でも、結果として生じるフローは、'requires-reply' 属性 を持つサービスアクティベーターが含まれているかのように動作します。言い換えれば、ゲートウェイメソッド呼び出しからのタイムリーなレスポンスを確保できます。
スレッドがゲートウェイに戻ると、つまりフローが完了するか、メッセージが別のスレッドに渡されると、タイマーが開始することを理解する必要があります。その時点で、呼び出しスレッドは応答の待機を開始します。フローが完全に同期されている場合、応答はすぐに利用可能です。非同期フローの場合、スレッドはこの時間まで待機します。

バージョン 6.2 以降、MessagingGatewaySupport の内部 MethodInvocationGateway 拡張機能の errorOnTimeout プロパティが @MessagingGateway および GatewayEndpointSpec で公開されます。このオプションは、エンドポイントの概要の章の最後で説明した受信ゲートウェイの場合とまったく同じ意味を持ちます。つまり、このオプションを true に設定すると、受信タイムアウトが切れたときに null を返すのではなく、送受信ゲートウェイ操作から MessageTimeoutException がスローされることになります。

IntegrationFlow を介してゲートウェイを定義するオプションについては、Java DSL の章のゲートウェイとしての IntegrationFlow  を参照してください。