メッセージングゲートウェイ
ゲートウェイは、Spring Integration によって提供されるメッセージング API を隠します。これにより、アプリケーションのビジネスロジックは Spring Integration API を認識できなくなります。汎用ゲートウェイを使用することにより、コードは単純なインターフェースのみと対話します。
GatewayProxyFactoryBean を入力してください
前述のように、Spring Integration API(ゲートウェイクラスを含む)に依存しないことは素晴らしいことです。そのため、Spring Integration は GatewayProxyFactoryBean を提供します。GatewayProxyFactoryBean は、任意のインターフェースのプロキシを生成し、以下に示すゲートウェイメソッドを内部的に呼び出します。依存性注入を使用することにより、ビジネスメソッドにインターフェースを公開できます。
次の例は、Spring Integration との対話に使用できるインターフェースを示しています。
public interface Cafe {
void placeOrder(Order order);
}ゲートウェイ XML 名前空間のサポート
名前空間のサポートも提供されます。次の例に示すように、インターフェースをサービスとして設定できます。
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/> この設定を定義すると、cafeService を他の Bean に注入できるようになり、Cafe インターフェースのプロキシインスタンスのメソッドを呼び出すコードは Spring Integration API を意識する必要がなくなります。gateway 要素を使用する例(Caf éデモ内)については、“サンプル” の付録を参照してください。
前述の設定のデフォルトは、ゲートウェイインターフェース上のすべてのメソッドに適用されます。応答タイムアウトが指定されていない場合、呼び出しスレッドは 30 秒間応答を待ちます。レスポンスがない場合のゲートウェイの動作を参照してください。
デフォルトは、個々のメソッドに対してオーバーライドできます。アノテーションと XML を使用したゲートウェイ構成を参照してください。
デフォルトの応答チャネルの設定
通常、ゲートウェイは応答をリッスンする一時的な匿名応答チャネルを自動作成するため、default-reply-channel を指定する必要はありません。ただし、default-reply-channel (または HTTP、JMS などのアダプターゲートウェイを備えた reply-channel)を定義するように求められる場合があります。
背景として、ゲートウェイの内部動作について簡単に説明します。ゲートウェイは、一時的なポイントツーポイント応答チャネルを作成します。これは匿名であり、replyChannel という名前でメッセージヘッダーに追加されます。明示的な default-reply-channel (リモートアダプターゲートウェイを使用した reply-channel ) を提供する場合、publish-subscribe チャネルを指定できます。これは、複数のサブスクライバーを追加できるため、この名前が付けられています。内部的に、Spring Integration は、一時的な replyChannel と明示的に定義された default-reply-channel の間にブリッジを作成します。
返信をゲートウェイだけでなく、他のコンシューマーにも送信するとします。この場合、次の 2 つが必要です。
サブスクライブできる名前付きチャンネル
パブリッシュ / サブスクライブチャネルになるチャネル
ゲートウェイが使用するデフォルトの戦略では、ヘッダーに追加される応答チャネルが匿名かつポイントツーポイントであるため、これらのニーズを満たすことができません。つまり、他のサブスクライバーは応答チャネルをハンドルできず、たとえハンドルできたとしても、チャネルはポイントツーポイントの動作となり、1 つのサブスクライバーのみがメッセージを取得します。default-reply-channel を定義することで、任意のチャネルを指定できます。この場合、publish-subscribe-channel が該当します。ゲートウェイは、このチャネルから、ヘッダーに格納されている一時的な匿名応答チャネルへのブリッジを作成します。
また、インターセプター ( wiretap など) を介して監視または監査するための応答チャネルを明示的に提供することもできます。チャネルインターセプタを構成するには、名前付きチャネルが必要です。
バージョン 5.4 以降、ゲートウェイメソッドの戻り値の型が void の場合、フレームワークは、replyChannel ヘッダーが明示的に提供されていない場合、nullChannel Bean 参照として replyChannel ヘッダーを設定します。これにより、ダウンストリームフローからの可能性のあるすべての応答を破棄して、一方向ゲートウェイ契約を満たすことができます。 |
アノテーションと XML を使用したゲートウェイ構成
@Gateway アノテーションを追加することにより、前の Cafe インターフェースの例を拡張する次の例を検討してください。
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}@Header アノテーションを使用すると、次の例に示すように、メッセージヘッダーとして解釈される値を追加できます。
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
} ゲートウェイメソッドの構成に XML アプローチを好む場合、次の例に示すように、method 要素をゲートウェイ構成に追加できます。
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>XML を使用して、各メソッド呼び出しに個別のヘッダーを提供することもできます。これは、設定するヘッダーが本質的に静的であり、@Header アノテーションを使用してゲートウェイのメソッドシグネチャーに埋め込みたくない場合に便利です。例: ローンブローカーの例では、開始されたリクエストの型 (一重引用符またはすべて引用符) に基づいて、ローン見積りの集約方法に影響を与えたいと考えています。どのゲートウェイメソッドが呼び出されたかを評価することによってリクエストの型を判別することは可能ですが、関心の分離パラダイム (メソッドは Java 成果物です) に違反します。ただし、メッセージヘッダーで意図 (メタ情報) を表現することは、メッセージングアーキテクチャでは自然なことです。次の例は、2 つのメソッドごとに異なるメッセージヘッダーを追加する方法を示しています。
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>前の例では、ゲートウェイのメソッドに基づいて、"RESPONSE_TYPE" ヘッダーに異なる値が設定されています。
たとえば、<int:method/> および @Gateway アノテーションで requestChannel を指定すると、アノテーション値が優先されます。 |
引数なしのゲートウェイが XML で指定されており、インターフェースメソッドに @Payload アノテーションと @Gateway アノテーションの両方がある場合(<int:method/> 要素に payloadExpression または payload-expression がある場合)、@Payload 値は無視されます。 |
式と「グローバル」ヘッダー
<header/> 要素は、value の代替として expression をサポートします。SpEL 式が評価され、ヘッダーの値が決定されます。バージョン 5.2 以降、評価コンテキストの #root オブジェクトは、getMethod() および getArgs() アクセサーを持つ MethodArgsHolder です。例: 単純なメソッド名でルーティングする場合は、次の式でヘッダーを追加できます: method.name。
java.reflect.Method は直列化できません。後でメッセージを直列化すると、method の式を持つヘッダーは失われます。これらの場合は method.name または method.toString() を使用することをお勧めします。toString() メソッドは、パラメーターと戻り値の型を含むメソッドの String 表現を提供します。 |
バージョン 3.0 以降、呼び出されたメソッドに関係なく、<default-header/> 要素を定義して、ゲートウェイによって生成されたすべてのメッセージにヘッダーを追加できます。メソッドに定義された特定のヘッダーは、デフォルトのヘッダーよりも優先されます。ここでメソッドに定義された特定のヘッダーは、サービスインターフェースの @Header アノテーションをオーバーライドします。ただし、デフォルトのヘッダーは、サービスインターフェースの @Header アノテーションをオーバーライドしません。
ゲートウェイは、(オーバーライドされない限り)すべてのメソッドに適用される default-payload-expression もサポートするようになりました。
メソッドへの引数のメッセージへのマッピング
前のセクションの構成手法を使用すると、メソッドの引数をメッセージ要素(ペイロードとヘッダー)にマップする方法を制御できます。明示的な構成が使用されない場合、特定の規則がマッピングの実行に使用されます。場合によっては、これらの規則では、どの引数がペイロードであり、どの引数をヘッダーにマップする必要があるかを判断できません。次の例を考えてみましょう。
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2); 最初の場合、慣例では、最初の引数をペイロードにマップし(Map でない限り)、2 番目の引数の内容はヘッダーになります。
2 番目の場合(またはパラメーター thing1 の引数が Map である場合の最初の場合)、フレームワークはどの引数をペイロードにするかを決定できません。その結果、マッピングは失敗します。通常、これは payload-expression、@Payload アノテーション、@Headers アノテーションを使用して解決できます。
別の方法(および規則が破られるたび)では、メソッド呼び出しをメッセージにマッピングする責任をすべて負うことができます。そのためには、MethodArgsMessageMapper を実装し、mapper 属性を使用して <gateway/> に提供します。マッパーは MethodArgsHolder をマッピングします。これは、java.reflect.Method インスタンスと引数を含む Object[] をラップする単純なクラスです。カスタムマッパーを提供する場合、default-payload-expression 属性と <default-header/> 要素はゲートウェイで許可されていません。同様に、payload-expression 属性および <header/> 要素は、<method/> 要素では許可されていません。
マッピングメソッドの引数
次の例は、メソッド引数をメッセージにマップする方法と、無効な構成の例を示しています。
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("args[0] + args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}| 1 | この例では、SpEL 変数 #this が引数を参照していることに注意してください。この場合は s の値です。 |
メソッド引数の #this コンテキストがないため、同等の XML は少し異なります。ただし、式は、次の例に示すように、MethodArgsHolder ルートオブジェクトの args プロパティを使用してメソッド引数を参照できます (詳細については、式と「グローバル」ヘッダーを参照してください)。
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(args[0])"/>
<int:method name="send3" payload-expression="method"/>
<int:method name="send4">
<int:header name="thing1" expression="args[2].toUpperCase()"/>
</int:method>
</int:gateway>@MessagingGateway アノテーション
バージョン 4.0 以降、ゲートウェイサービスインターフェースは、構成のために <gateway /> xml 要素の定義を要求する代わりに、@MessagingGateway アノテーションでマークできます。次のペアの例では、同じゲートウェイを構成するための 2 つのアプローチを比較しています。
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}XML バージョンと同様に、コンポーネントスキャン中に Spring Integration がこれらのアノテーションを検出すると、メッセージングインフラストラクチャを使用して proxy 実装を作成します。このスキャンを実行して BeanDefinition をアプリケーションコンテキストに登録するには、@IntegrationComponentScan アノテーションを @Configuration クラスに追加します。標準の @ComponentScan インフラストラクチャはインターフェースを処理しません。そのため、カスタム @IntegrationComponentScan ロジックを導入して、インターフェースで @MessagingGateway アノテーションを検索し、それらの GatewayProxyFactoryBean インスタンスを登録しました。アノテーションサポートも参照してください。 |
@MessagingGateway アノテーションとともに、@Profile アノテーションでサービスインターフェースをマークして、そのようなプロファイルがアクティブでない場合、Bean の作成を回避できます。
バージョン 6.0 から、@MessagingGateway とのインターフェースは、Spring @Component 定義で可能なように、それぞれの構成ロジックの @Primary アノテーションでマークすることもできます。
バージョン 6.0 以降では、@MessagingGateway インターフェースを標準の Spring @Import 構成で使用できます。これは、@IntegrationComponentScan または手動の AnnotationGatewayProxyFactoryBean Bean 定義の代替として使用できます。
@MessagingGateway は、バージョン 6.0 から @MessageEndpoint でメタアノテーションが付けられ、name() 属性は基本的に @Compnent.value() にエイリアスされます。このようにして、ゲートウェイプロキシの Bean 名生成戦略は、スキャンおよびインポートされたコンポーネントの標準 Spring アノテーション構成と再調整されます。デフォルトの AnnotationBeanNameGenerator は、AnnotationConfigUtils.CONFIGURATION_BEAN_NAME_GENERATOR を介して、または @IntegrationComponentScan.nameGenerator() 属性としてグローバルにオーバーライドできます。
XML 構成がない場合は、少なくとも 1 つの @Configuration クラスで @EnableIntegration アノテーションが必要です。詳細については、構成と @EnableIntegration を参照してください。 |
引数なしのメソッドの呼び出し
引数を持たない Gateway インターフェースでメソッドを呼び出す場合、デフォルトの動作は PollableChannel から Message を受け取ります。
ただし、引数のない SQL 呼び出しやストアドプロシージャのトリガーなど、ユーザーが提供するパラメーターを必要としないダウンストリームの他のコンポーネントと対話できるように、引数のないメソッドをトリガーしたい場合があります。
送受信のセマンティクスを実現するには、ペイロードを提供する必要があります。ペイロードを生成するために、インターフェースのメソッドパラメーターは必要ありません。@Payload アノテーションまたは method 要素の XML の payload-expression 属性を使用できます。次のリストには、ペイロードの例をいくつか示します。
リテラル文字列
#gatewayMethod.name
新しい java.util.Date()
@someBean.someMethod() の戻り値
次の例は、@Payload アノテーションの使用方法を示しています。
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}@Gateway アノテーションを使用することもできます。
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
} 両方のアノテーションが存在する場合(および payloadExpression が提供されている場合)、@Gateway が優先されます。 |
アノテーションと XML を使用したゲートウェイ構成も参照してください。
メソッドに引数および戻り値がなく、ペイロード式が含まれている場合、メソッドは送信専用操作として扱われます。
default メソッドの呼び出し
ゲートウェイプロキシのインターフェースには default メソッドも含まれる場合があり、バージョン 5.3 以降、フレームワークはプロキシに DefaultMethodInvokingMethodInterceptor を注入して、プロキシの代わりに java.lang.invoke.MethodHandle アプローチを使用して default メソッドを呼び出します。java.util.function.Function などの JDK からのインターフェースは引き続きゲートウェイプロキシに使用できますが、JDK クラスに対する MethodHandles.Lookup インスタンス化の内部 Java セキュリティ上の理由により、それらの default メソッドを呼び出すことができません。これらのメソッドは、メソッドの明示的な @Gateway アノテーション、または @MessagingGateway アノテーションまたは <gateway> XML コンポーネントの proxyDefaultMethods を使用して、プロキシ化(実装ロジックを失い、同時に以前のゲートウェイプロキシの動作を復元)することもできます。
エラー処理
ゲートウェイの呼び出しはエラーになる可能性があります。デフォルトでは、ダウンストリームで発生したエラーは、ゲートウェイのメソッド呼び出し時に「そのまま」再スローされます。例: 次の簡単なフローを検討してください。
gateway -> service-activator サービスアクティベータによって呼び出されたサービスが MyException をスローした場合(たとえば)、フレームワークはそれを MessagingException にラップし、failedMessage プロパティでサービスアクティベータに渡されたメッセージを添付します。そのため、フレームワークによって実行されるロギングには、障害の完全なコンテキストが含まれます。デフォルトでは、例外がゲートウェイによってキャッチされると、MyException はラップ解除され、呼び出し元にスローされます。原因チェーンの特定の例外型と一致するように、ゲートウェイメソッド宣言で throws 句を構成できます。例: ダウンストリームエラーの理由のすべてのメッセージング情報で MessagingException 全体をキャッチする場合、次のようなゲートウェイメソッドが必要です。
public interface MyGateway {
void performProcess() throws MessagingException;
}POJO プログラミングを推奨しているため、呼び出し元をメッセージングインフラストラクチャに公開したくない場合があります。
ゲートウェイメソッドに throws 句がない場合、ゲートウェイは原因ツリーを走査し、MessagingException ではない RuntimeException を探します。何も見つからない場合、フレームワークは MessagingException をスローします。前の説明の MyException に SomeOtherException とメソッド throws SomeOtherException の原因がある場合、ゲートウェイはそれをさらにアンラップし、呼び出し元にスローします。
service-interface なしでゲートウェイが宣言されると、内部フレームワークインターフェース RequestReplyExchanger が使用されます。
次の例を考えてみましょう。
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
} バージョン 5.0 より前は、この exchange メソッドには throws 節がありませんでした。その結果、例外はラップ解除されました。このインターフェースを使用して以前のアンラップ動作を復元する場合は、代わりにカスタム service-interface を使用するか、MessagingException の cause にアクセスしてください。
ただし、エラーを伝播するのではなくログに記録したい場合や、例外を有効な応答として扱いたい場合があります (呼び出し元が理解できる「エラーメッセージ」契約に準拠するメッセージにマッピングすることによって)。これを実現するために、ゲートウェイは、error-channel 属性のサポートを含めることによって、エラー専用のメッセージチャネルのサポートを提供します。次の例では、「トランスフォーマー」が Exception から応答 Message を作成します。
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>exceptionTransformer は、予期されるエラーレスポンスオブジェクトの作成方法を知っている単純な POJO である可能性があります。それが発呼者に送り返されるペイロードになります。必要に応じて、このような「エラーフロー」でより多くの詳細な処理を実行できます。ルーター(Spring Integration の ErrorMessageExceptionTypeRouter を含む)、フィルターなどが含まれる場合があります。ただし、ほとんどの場合、単純な「トランス」で十分です。
または、例外のみをログに記録する(または非同期でどこかに送信する)こともできます。一方向のフローを提供する場合、呼び出し元には何も返されません。例外を完全に抑制したい場合は、グローバル nullChannel への参照を提供できます(本質的に /dev/null アプローチ)。最後に、上記のように、error-channel が定義されていない場合、例外は通常どおり伝播します。
@MessagingGateway アノテーション (@MessagingGateway` アノテーション ), you can use an `errorChannel 属性を参照) を使用する場合。
バージョン 5.0 以降、void 戻り値の型(一方向フロー)でゲートウェイメソッドを使用すると、error-channel 参照(提供されている場合)が各送信メッセージの標準 errorChannel ヘッダーに取り込まれます。この機能により、標準の ExecutorChannel 構成(または QueueChannel)に基づいたダウンストリーム非同期フローが、デフォルトのグローバル errorChannel 例外送信動作をオーバーライドできます。以前は、@GatewayHeader アノテーションまたは <header> 要素で errorChannel ヘッダーを手動で指定する必要がありました。error-channel プロパティは、非同期フローを持つ void メソッドでは無視されました。代わりに、エラーメッセージがデフォルトの errorChannel に送信されました。
シンプルな POJI ゲートウェイを通じてメッセージングシステムを公開すると利点がありますが、基礎となるメッセージングシステムの現実を「隠す」ことには代償が伴うため、考慮すべき点がいくつかあります。Java メソッドができるだけ早く戻るようにし、呼び出し側が (void、戻り値、スローされた例外のいずれであっても) 戻りを待っている間、無期限にハングしないようにしたいと考えています。通常のメソッドがメッセージングシステムの前でプロキシとして使用される場合、基礎となるメッセージングの非同期の性質を考慮する必要があります。これは、ゲートウェイによって開始されたメッセージがフィルターによってドロップされ、応答を生成するコンポーネントに到達しない可能性があることを意味します。一部のサービスアクティベーターメソッドでは例外が発生し、応答が返されない場合があります (null メッセージは生成されないため)。つまり、複数のシナリオにより、応答メッセージが受信されない可能性があります。これはメッセージングシステムではごく自然なことです。ただし、ゲートウェイ方式への影響について考えてください。ゲートウェイのメソッド入力引数はメッセージに組み込まれ、ダウンストリームに送信されました。応答メッセージはゲートウェイのメソッドの戻り値に変換されます。そのため、ゲートウェイ呼び出しごとに常に応答メッセージがあることを確認したい場合があります。そうしないと、reply-timeout が負の値に設定されている場合、ゲートウェイメソッドが戻らず、無期限にハングする可能性があります。この状況に対処する 1 つの方法は、非同期ゲートウェイを使用することです (このセクションで後ほど説明します)。これを処理する別の方法は、デフォルトの reply-timeout を 30 秒として利用することです。こうすることで、ゲートウェイは reply-timeout で指定された時間を超えてハングすることはなく、そのタイムアウトが経過すると "null" を返します。最後に、サービスアクティベーターに "requires-reply"、フィルターに "throw-Exceptions-on-rejection" などのダウンストリームフラグを設定することを検討することをお勧めします。これらのオプションについては、この章の最後のセクションで詳しく説明します。 |
ダウンストリームフローが ErrorMessage を返す場合、その payload (Throwable) は通常のダウンストリームエラーとして扱われます。error-channel が構成されている場合は、エラーフローに送信されます。それ以外の場合、ペイロードはゲートウェイの呼び出し元にスローされます。同様に、error-channel のエラーフローが ErrorMessage を返す場合、そのペイロードは呼び出し元にスローされます。同じことが、Throwable ペイロードを持つすべてのメッセージに適用されます。これは、Exception を呼び出し元に直接伝搬する必要がある非同期の状況で役立ちます。これを行うには、Exception を (一部のサービスからの reply として) 返すか、スローします。一般に、非同期フローの場合でも、フレームワークは、ダウンストリームフローによってスローされた例外をゲートウェイに伝播する処理を行います。TCP クライアントサーバーマルチプレックス [GitHub] (英語) サンプルは、呼び出し元に例外を返す両方の手法を示しています。aggregator と group-timeout ( アグリゲーターとグループのタイムアウトを参照) を使用し、破棄フローで MessagingTimeoutException 応答を使用して、待機スレッドへのソケット IO エラーをエミュレートします。 |
ゲートウェイのタイムアウト
ゲートウェイには、requestTimeout と replyTimeout という 2 つのタイムアウトプロパティがあります。リクエストタイムアウトは、チャネルがブロックできる場合にのみ適用されます(たとえば、制限付き QueueChannel がいっぱいの場合など)。replyTimeout の値は、ゲートウェイが応答を待機するか、null を返す時間です。デフォルトは無限大です。
タイムアウトは、ゲートウェイ(defaultRequestTimeout および defaultReplyTimeout)または MessagingGateway インターフェースアノテーションのすべてのメソッドのデフォルトとして設定できます。個々のメソッドは、これらのデフォルト(<method/> 子要素内)または @Gateway アノテーションをオーバーライドできます。
バージョン 5.0 以降、次の例に示すように、タイムアウトは式として定義できます。
@Gateway(payloadExpression = "args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "args[1]", replyTimeoutExpression = "args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout); 評価コンテキストには BeanResolver (他の Bean を参照するために @someBean を使用) があり、#root オブジェクトからの args 配列プロパティが使用可能です。このルートオブジェクトの詳細については、式と「グローバル」ヘッダーを参照してください。XML で構成する場合、次の例に示すように、タイムアウト属性は long 値または SpEL 式にすることができます。
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="args[0]"
request-timeout="1000"
reply-timeout="args[1]">
</method>非同期ゲートウェイ
パターンとして、メッセージングゲートウェイは、メッセージングシステムの全機能を公開しながら、メッセージング固有のコードを隠す優れたメソッドを提供します。前述のように、GatewayProxyFactoryBean はサービスインターフェースを介してプロキシを公開する便利なメソッドを提供し、POJO ベースのメッセージングシステムへのアクセスを提供します (独自のドメイン内のオブジェクト、プリミティブ / 文字列、その他のオブジェクトに基づく)。ただし、値を返す単純な POJO メソッドを介してゲートウェイが公開される場合、(メソッドが呼び出されたときに生成される) リクエストメッセージごとに、(メソッドが返されたときに生成される) 応答メッセージが存在する必要があることを意味します。メッセージングシステムは当然のことながら非同期であるため、「リクエストごとに必ず応答がある」という契約を常に保証できるとは限りません。Spring Integration 2.0 は非同期ゲートウェイのサポートを導入しました。これは、応答が予想されるかどうか、応答が到着するまでにかかる時間がわからない場合に、フローを開始する便利なメソッドを提供します。
これらの型のシナリオを処理するために、Spring Integration は java.util.concurrent.Future インスタンスを使用して非同期ゲートウェイをサポートします。
次の例に示すように、XML 構成から変更はなく、通常のゲートウェイを定義するのと同じ方法で非同期ゲートウェイを定義します。
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>ただし、ゲートウェイインターフェース(サービスインターフェース)は、次のように少し異なります。
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
} 前の例が示すように、ゲートウェイメソッドの戻り値の型は Future です。GatewayProxyFactoryBean は、ゲートウェイメソッドの戻り値の型が Future であることを確認すると、AsyncTaskExecutor を使用して直ちに非同期モードに切り替えます。それが違いの範囲です。このようなメソッドの呼び出しは、常に Future インスタンスですぐに返されます。その後、自分のペースで Future を操作して、結果の取得やキャンセルなどを行うことができます。また、Future インスタンスの他の使用と同様に、get() を呼び出すと、タイムアウト、実行例外などが明らかになる場合があります。次の例は、非同期ゲートウェイから戻る Future の使用方法を示しています。
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);より詳細な例については、Spring Integration サンプルの async-gateway [GitHub] (英語) サンプルを参照してください。
また、バージョン 6.5 以降、Java DSL の gateway() 演算子は async(true) の動作を完全にサポートしています。内部的には、GatewayProxyFactoryBean 用の AsyncRequestReplyExchanger サービスインターフェースが提供されています。また、AsyncRequestReplyExchanger 契約は CompletableFuture<Message<?>> であるため、リクエストと応答全体が非同期で実行されます。この動作は、たとえば、スプリッターとアグリゲータのシナリオで、各アイテムに対して別のフローを呼び出す必要がある場合に役立ちます。ただし、順序は重要ではなく、すべての処理が完了した後にグループがアグリゲータに集まることだけが重要です。
AsyncTaskExecutor
デフォルトでは、GatewayProxyFactoryBean は、戻り値の型が Future であるゲートウェイメソッドの内部 AsyncInvocationTask インスタンスを送信するときに org.springframework.core.task.SimpleAsyncTaskExecutor を使用します。ただし、<gateway/> 要素の構成の async-executor 属性により、Spring アプリケーションコンテキスト内で利用可能な java.util.concurrent.Executor の実装への参照を提供できます。
(デフォルト) SimpleAsyncTaskExecutor は、Future と CompletableFuture の両方の戻り型をサポートします。CompletableFuture を参照してください。デフォルトのエグゼキュータがありますが、次の例に示すように、ログでそのスレッドを識別できるように、外部エグゼキュータを提供すると便利なことがよくあります (XML を使用する場合、スレッド名はエグゼキュータの Bean 名に基づいています)。
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
} 別の Future 実装を返す場合は、カスタムエグゼキューターを提供するか、エグゼキューターを完全に無効にして、ダウンストリームフローからの応答メッセージペイロードで Future を返すことができます。エグゼキューターを無効にするには、GatewayProxyFactoryBean で null に設定します(setAsyncTaskExecutor(null) を使用)。XML を使用してゲートウェイを構成する場合は、async-executor="" を使用します。@MessagingGateway アノテーションを使用して構成する場合、次のようなコードを使用します。
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
} 戻り値の型が特定の具体的な Future 実装または構成されたエグゼキューターでサポートされていない他のサブインターフェースである場合、フローは呼び出し側のスレッドで実行され、フローは応答メッセージペイロードで必要な型を返す必要があります |
CompletableFuture
バージョン 4.2 以降、ゲートウェイメソッドは CompletableFuture<?> を返すことができるようになりました。この型を返す場合、2 つの操作モードがあります。
非同期エグゼキューターが提供され、戻り値の型が(サブクラスではなく)
CompletableFutureである場合、フレームワークはエグゼキューターでタスクを実行し、すぐに呼び出し元にCompletableFutureを返します。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)は未来の創造に使用されます。非同期エグゼキューターが明示的に
nullに設定され、戻り値の型がCompletableFutureであるか、戻り値の型がCompletableFutureのサブクラスである場合、フローは呼び出し側のスレッドで呼び出されます。このシナリオでは、ダウンストリームフローは適切な型のCompletableFutureを返すことが期待されています。
使用シナリオ
次のシナリオでは、ダウンストリームフローが(Invoice オブジェクトを使用して)ゲートウェイに応答すると、呼び出し元スレッドはすぐに CompletableFuture<Invoice> を返します。
CompletableFuture<Invoice> order(Order order);<int:gateway service-interface="something.Service" default-request-channel="orders" /> 以下のシナリオでは、下流フローがゲートウェイへの応答のペイロードとして CompletableFuture<Invoice> を提供すると、呼び出し元スレッドは CompletableFuture<Invoice> を返します。請求書の準備が整うまで、他の何らかの処理を完了する必要があります。
CompletableFuture<Invoice> order(Order order);<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" /> 以下のシナリオでは、下流フローがゲートウェイへの応答のペイロードとして CompletableFuture<Invoice> を提供すると、呼び出し元スレッドは CompletableFuture<Invoice> を返します。請求書の準備が整うまで、他のプロセスを完了する必要があります。DEBUG ログが有効になっている場合、このシナリオでは非同期エグゼキュータを使用できないことを示すログエントリが生成されます。
MyCompletableFuture<Invoice> order(Order order);<int:gateway service-interface="foo.Service" default-request-channel="orders" /> 次の例に示すように、CompletableFuture インスタンスを使用して、応答に対して追加の操作を実行できます。
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);Reactor Mono
バージョン 5.0 以降、GatewayProxyFactoryBean では、Mono<T> [GitHub] (英語) 戻り型を使用して、ゲートウェイインターフェースメソッドでプロジェクト Reactor (英語) を使用できます。内部 AsyncInvocationTask は Mono.fromCallable() にラップされています。
Mono は、後で結果を取得するために使用できます ( Future<?> と同様)。また、結果がゲートウェイに返されたときに Consumer を呼び出して、ディスパッチャーでそれを使用することもできます。
Mono は、フレームワークによってすぐにはフラッシュされません。基になるメッセージフローは、ゲートウェイメソッドが戻る前に開始されません(Future<?> Executor タスクの場合のように)。Mono がサブスクライブされると、フローが開始されます。あるいは、subscribe() が Flux 全体に関連している場合、Mono (「構成可能」である)は Reactor ストリームの一部である可能性があります。次の例は、Project Reactor を使用してゲートウェイを作成する方法を示しています。 |
@MessagingGateway
public interface TestGateway {
@Gateway(requestChannel = "multiplyChannel")
Mono<Integer> multiply(Integer value);
}
@ServiceActivator(inputChannel = "multiplyChannel")
public Integer multiply(Integer value) {
return value * 2;
} このようなゲートウェイは、データの Flux を処理するサービスで使用できます。
@Autowired
TestGateway testGateway;
public void handleFlux() {
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(System.out::println);
}Project Reactor を使用する別の例は、次の例が示すように、単純なコールバックシナリオです。
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice)); 呼び出しスレッドは継続し、フローが完了すると handleInvoice() が呼び出されます。
詳細については、Kotlin コルーチンも参照してください。
非同期型を返すダウンストリームフロー
上記の AsyncTaskExecutor セクションで記述されていたように、下流コンポーネントから非同期ペイロード(Future、Mono など)を含むメッセージを返したい場合は、非同期エグゼキュータを明示的に null (XML 設定の場合は "")に設定する必要があります。これにより、フローは呼び出し元スレッドで呼び出され、後で結果を取得できるようになります。
非同期 void 戻り型
メッセージングゲートウェイメソッドは、次のように宣言できます。
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
} ただし、ダウンストリームの例外が呼び出し元に伝播されることはありません。バージョン 6.0 以降、ダウンストリームフローの呼び出しと呼び出し元への例外の伝播の非同期動作を保証するために、フレームワークは Future<Void> と Mono<Void> の戻り値の型をサポートします。ユースケースは、単純な void 戻り型について前述した send-and-forget 動作に似ていますが、フロー実行が非同期で行われ、返された Future (または Mono) が null で完了するか、send 操作結果に従って例外的に完了するという違いがあります。
Future<Void> が正確なダウンストリームフロー応答である場合、ゲートウェイの asyncExecutor オプションを null (@MessagingGateway 構成の場合は AnnotationConstants.NULL ) に設定する必要があり、send 部分はプロデューサースレッドで実行されます。応答 1 は、ダウンストリームフローの構成によって異なります。このようにして、ターゲットアプリケーションが Future<Void> 応答を正しく生成します。Mono ユースケースは、すでにフレームワークのスレッド制御から外れているため、asyncExecutor を null に設定しても意味がありません。リクエスト応答ゲートウェイ操作の結果としての Mono<Void> は、ゲートウェイメソッドの Mono<?> 戻り値の型として構成する必要があります。 |
レスポンスがない場合のゲートウェイの動作
前に説明したように、ゲートウェイは、POJO メソッド呼び出しを通じてメッセージングシステムと対話する便利な方法を提供します。ただし、通常は常に例外が返されると予想される一般的なメソッド呼び出しは、メッセージ交換に 1 対 1 を常にマップするとは限りません(たとえば、応答メッセージが到着しない場合があります。戻る)。
このセクションの残りの部分では、様々なシナリオと、ゲートウェイの動作をより予測可能にする方法について説明します。同期ゲートウェイの動作をより予測可能にするために特定の属性を設定できますが、その一部は必ずしも期待どおりに動作するとは限りません。その 1 つが reply-timeout (メソッドレベルでは reply-timeout、ゲートウェイレベルでは default-reply-timeout)です。ここでは、reply-timeout 属性を調べ、様々なシナリオにおいて同期ゲートウェイの動作にどのような影響を与えるか、また影響を与えないかを確認します。シングルスレッドのシナリオ(下流のすべてのコンポーネントが直接チャネルを介して接続されている)とマルチスレッドのシナリオ(たとえば、下流のどこかにシングルスレッドの境界を破るポーリング可能なチャネルまたはエグゼキュータチャネルが存在する可能性がある)を調べます。
長期実行プロセスダウンストリーム
- 同期ゲートウェイ、シングルスレッド
コンポーネントのダウンストリームがまだ実行されている場合(おそらく無限ループまたは遅いサービスのため)、
reply-timeoutの設定は効果がなく、ダウンストリームサービスが終了するまで(例外を返すかスローすることにより)ゲートウェイメソッド呼び出しは戻りません。- 同期ゲートウェイ、マルチスレッド
マルチスレッドメッセージフローにおいて、下流のコンポーネントが(無限ループやサービスの低速化などにより)まだ実行中の場合、
reply-timeoutを設定すると、タイムアウトに達した後でもゲートウェイメソッド呼び出しが返されるようになります。これは、GatewayProxyFactoryBeanが応答チャネルをポーリングし、タイムアウトになるまでメッセージを待機するためです。ただし、実際の応答が生成される前にタイムアウトに達した場合は、ゲートウェイメソッドから "null" が返される可能性があります。応答メッセージが生成された場合、ゲートウェイメソッド呼び出しが返された後に応答チャネルに送信されることに注意してください。そのため、フローを設計する際には、この点に留意する必要があります。
タイムアウトが発生したときに null を返す代わりに MessageTimeoutException をスローする errorOnTimeout プロパティも参照してください。
ダウンストリームコンポーネントが "null" を返す
- 同期ゲートウェイ — single-threaded
ダウンストリームのコンポーネントが "null" を返し、
reply-timeoutが負の値に構成されている場合、"null" を返す可能性があるダウンストリームコンポーネント (サービスアクティベーターなど) にrequires-reply属性が設定されていない限り、ゲートウェイメソッド呼び出しは無期限にハングします。この場合、例外がスローされてゲートウェイに伝播されます。- 同期ゲートウェイ — マルチスレッド
動作は前のケースと同じです。
ゲートウェイメソッドシグネチャーが非 void の場合、ダウンストリームコンポーネントのリターンシグネチャーは "void"
- 同期ゲートウェイ — single-threaded
ダウンストリームのコンポーネントが "void" を返し、
reply-timeoutが負の値に構成されている場合、ゲートウェイメソッド呼び出しは無期限にハングします。- 同期ゲートウェイ — マルチスレッド
動作は前のケースと同じです。
ダウンストリームコンポーネントがランタイム例外を引き起こす
- 同期ゲートウェイ — single-threaded
コンポーネントのダウンストリームがランタイム例外をスローした場合、例外はエラーメッセージを介してゲートウェイに伝播され、再スローされます。
- 同期ゲートウェイ — マルチスレッド
動作は前のケースと同じです。
デフォルトでは reply-timeout は無制限であることを理解しておく必要があります。そのため、reply-timeout を負の値に設定すると、ゲートウェイメソッドの呼び出しが無期限にハングする機能があります。そのため、フローを分析し、これらのシナリオのいずれかが発生する可能性が少しでもある場合は、reply-timeout 属性を「安全な」値に設定する必要があります。デフォルトでは 30 秒です。さらに良い方法として、下流コンポーネントの requires-reply 属性を "true" に設定することで、下流コンポーネントが内部的に null を返すとすぐに例外をスローし、タイムリーなレスポンスを保証することができます。ただし、reply-timeout が役に立たないシナリオもあることを認識しておく必要があります(最初の例を参照)。つまり、メッセージフローを分析し、非同期ゲートウェイではなく同期ゲートウェイを使用するタイミングを判断することも重要です。前述のように、後者の場合は、Future インスタンスを返すゲートウェイメソッドを定義する必要があります。こうすることで、戻り値が確実に受け取れるようになり、呼び出しの結果をより細かく制御できるようになります。また、ルーターを扱う場合、resolution-required 属性を "true" に設定すると、ルーターが特定のチャネルを解決できない場合に例外がスローされることに注意してください。同様に、フィルターを扱う場合は、throw-exception-on-rejection 属性を設定できます。どちらの場合も、結果として得られるフローは、'requires-reply' 属性を含むサービスアクティベーターが含まれているかのように動作します。つまり、ゲートウェイメソッド呼び出しからのタイムリーなレスポンスを保証できます。 |
| スレッドがゲートウェイに戻ると、つまりフローが完了するか、メッセージが別のスレッドに渡されると、タイマーが開始することを理解する必要があります。その時点で、呼び出しスレッドは応答の待機を開始します。フローが完全に同期されている場合、応答はすぐに利用可能です。非同期フローの場合、スレッドはこの時間まで待機します。 |
バージョン 6.2 以降、MessagingGatewaySupport の内部拡張である MethodInvocationGateway の errorOnTimeout プロパティが @MessagingGateway および GatewayEndpointSpec で公開されるようになりました。このオプションは、エンドポイントの概要の章の最後で説明されている受信ゲートウェイと全く同じ意味を持ちます。つまり、このオプションを true に設定すると、受信タイムアウトに達した際に、送受信ゲートウェイ操作から null が返されるのではなく、MessageTimeoutException がスローされます。
IntegrationFlow を介してゲートウェイを定義するオプションについては、Java DSL の章のゲートウェイとしての IntegrationFlow を参照してください。