メッセージングゲートウェイ
ゲートウェイは、Spring Integration によって提供されるメッセージング API を隠します。これにより、アプリケーションのビジネスロジックは Spring Integration API を認識できなくなります。汎用ゲートウェイを使用することにより、コードは単純なインターフェースのみと対話します。
GatewayProxyFactoryBean
を入力してください
前述のように、Spring Integration API(ゲートウェイクラスを含む)に依存しないことは素晴らしいことです。そのため、Spring Integration は GatewayProxyFactoryBean
を提供します。GatewayProxyFactoryBean
は、任意のインターフェースのプロキシを生成し、以下に示すゲートウェイメソッドを内部的に呼び出します。依存性注入を使用することにより、ビジネスメソッドにインターフェースを公開できます。
次の例は、Spring Integration との対話に使用できるインターフェースを示しています。
package org.cafeteria;
public interface Cafe {
void placeOrder(Order order);
}
ゲートウェイ XML 名前空間のサポート
名前空間のサポートも提供されます。次の例に示すように、インターフェースをサービスとして設定できます。
<int:gateway id="cafeService"
service-interface="org.cafeteria.Cafe"
default-request-channel="requestChannel"
default-reply-timeout="10000"
default-reply-channel="replyChannel"/>
この構成を定義すると、cafeService
を他の Bean に注入できるようになり、Cafe
インターフェースのプロキシされたインスタンスのメソッドを呼び出すコードは Spring Integration API を認識しなくなります。一般的なアプローチは、Spring Remoting(RMI、HttpInvoker など)のアプローチに似ています。(Cafe デモで) gateway
要素を使用する例については、“サンプル” 付録を参照してください。
上記の構成のデフォルトは、ゲートウェイインターフェースのすべてのメソッドに適用されます。応答タイムアウトが指定されていない場合、呼び出しスレッドは応答を無期限に待機します。レスポンスがない場合のゲートウェイの動作を参照してください。
デフォルトは、個々のメソッドに対してオーバーライドできます。アノテーションと XML を使用したゲートウェイ構成を参照してください。
デフォルトの応答チャネルの設定
通常、ゲートウェイは応答をリッスンする一時的な匿名応答チャネルを自動作成するため、default-reply-channel
を指定する必要はありません。ただし、default-reply-channel
(または HTTP、JMS などのアダプターゲートウェイを備えた reply-channel
)を定義するように求められる場合があります。
背景については、ゲートウェイの内部動作について簡単に説明します。ゲートウェイは、一時的なポイントツーポイント応答チャネルを作成します。これは匿名であり、名前 replyChannel
でメッセージヘッダーに追加されます。明示的な default-reply-channel
(リモートアダプターゲートウェイを備えた reply-channel
)を提供する場合、パブリッシュ / サブスクライブチャネルを指すことができます。このチャネルは、複数のサブスクライバを追加できるため、その名前が付けられています。内部的に、Spring Integration は一時的な replyChannel
と明示的に定義された default-reply-channel
の間にブリッジを作成します。
返信をゲートウェイだけでなく、他のコンシューマーにも送信するとします。この場合、次の 2 つが必要です。
サブスクライブできる名前付きチャンネル
パブリッシュ / サブスクライブチャネルになるチャネル
ヘッダーに追加された応答チャネルは匿名でポイントツーポイントであるため、ゲートウェイで使用されるデフォルトの戦略はこれらのニーズを満たしません。これは、他のサブスクライバーがそれへのハンドルを取得できないことを意味し、たとえそれが可能であっても、チャネルは 1 人のサブスクライバーのみがメッセージを取得するようなポイントツーポイントの動作を持ちます。default-reply-channel
を定義することにより、選択したチャンネルを指すことができます。この場合、publish-subscribe-channel
です。ゲートウェイは、そこから、ヘッダーに格納されている一時的な匿名応答チャネルへのブリッジを作成します。
また、インターセプター(盗聴など)を介してモニターまたは監査するための応答チャネルを明示的に提供することもできます。チャンネルインターセプターを設定するには、名前付きチャンネルが必要です。
バージョン 5.4 以降、ゲートウェイメソッドの戻り値の型が void の場合、フレームワークは、replyChannel ヘッダーが明示的に提供されていない場合、nullChannel Bean 参照として replyChannel ヘッダーを設定します。これにより、ダウンストリームフローからの可能性のあるすべての応答を破棄して、一方向ゲートウェイ契約を満たすことができます。 |
アノテーションと XML を使用したゲートウェイ構成
@Gateway
アノテーションを追加することにより、前の Cafe
インターフェースの例を拡張する次の例を検討してください。
public interface Cafe {
@Gateway(requestChannel="orders")
void placeOrder(Order order);
}
@Header
アノテーションを使用すると、次の例に示すように、メッセージヘッダーとして解釈される値を追加できます。
public interface FileWriter {
@Gateway(requestChannel="filesOut")
void write(byte[] content, @Header(FileHeaders.FILENAME) String filename);
}
ゲートウェイメソッドの構成に XML アプローチを好む場合、次の例に示すように、method
要素をゲートウェイ構成に追加できます。
<int:gateway id="myGateway" service-interface="org.foo.bar.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB"/>
<int:method name="echoViaDefault"/>
</int:gateway>
XML を使用して、メソッド呼び出しごとに個別のヘッダーを提供することもできます。これは、設定するヘッダーが本質的に静的であり、@Header
アノテーションを使用してゲートウェイのメソッドシグネチャーに埋め込みたくない場合に役立ちます。例: ローンブローカーの例では、開始されたリクエストの型(単一見積もりまたはすべての見積もり)に基づいて、ローン見積もりの集計がどのように行われるかに影響を与えたいと考えています。どのゲートウェイメソッドが呼び出されたかを評価することによってリクエストの型を決定することは、可能ではありますが、関心の分離パラダイムに違反します(メソッドは Java アーティファクトです)。ただし、メッセージヘッダーで意図(メタ情報)を表現することは、メッセージングアーキテクチャでは自然なことです。次の例は、2 つの方法のそれぞれに異なるメッセージヘッダーを追加する方法を示しています。
<int:gateway id="loanBrokerGateway"
service-interface="org.springframework.integration.loanbroker.LoanBrokerGateway">
<int:method name="getLoanQuote" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="BEST"/>
</int:method>
<int:method name="getAllLoanQuotes" request-channel="loanBrokerPreProcessingChannel">
<int:header name="RESPONSE_TYPE" value="ALL"/>
</int:method>
</int:gateway>
前の例では、ゲートウェイのメソッドに基づいて、"RESPONSE_TYPE" ヘッダーに異なる値が設定されています。
たとえば、<int:method/> および @Gateway アノテーションで requestChannel を指定すると、アノテーション値が優先されます。 |
引数なしのゲートウェイが XML で指定されており、インターフェースメソッドに @Payload アノテーションと @Gateway アノテーションの両方がある場合(<int:method/> 要素に payloadExpression または payload-expression がある場合)、@Payload 値は無視されます。 |
式と「グローバル」ヘッダー
<header/>
要素は、value
の代替として expression
をサポートします。SpEL 式が評価され、ヘッダーの値が決定されます。バージョン 5.2 以降、評価コンテキストの #root
オブジェクトは、getMethod()
および getArgs()
アクセサーを持つ MethodArgsHolder
です。
これらの 2 つの式評価コンテキスト変数は、バージョン 5.2 から非推奨です。
#args: メソッド引数を含む
Object[]
#gatewayMethod: 呼び出された
service-interface
のメソッドを表すオブジェクト(java.reflect.Method
から派生)。この変数を含むヘッダーは、フローの後半で使用できます(ルーティングなど)。例: 単純なメソッド名でルーティングする場合は、次の式でヘッダーを追加できます:#gatewayMethod.name
.
java.reflect.Method は直列化できません。後でメッセージを直列化すると、method の式を持つヘッダーは失われます。これらの場合は method.name または method.toString() を使用することをお勧めします。toString() メソッドは、パラメーターと戻り値の型を含むメソッドの String 表現を提供します。 |
バージョン 3.0 以降、呼び出されたメソッドに関係なく、<default-header/>
要素を定義して、ゲートウェイによって生成されたすべてのメッセージにヘッダーを追加できます。メソッドに定義された特定のヘッダーは、デフォルトのヘッダーよりも優先されます。ここでメソッドに定義された特定のヘッダーは、サービスインターフェースの @Header
アノテーションをオーバーライドします。ただし、デフォルトのヘッダーは、サービスインターフェースの @Header
アノテーションをオーバーライドしません。
ゲートウェイは、(オーバーライドされない限り)すべてのメソッドに適用される default-payload-expression
もサポートするようになりました。
メソッドへの引数のメッセージへのマッピング
前のセクションの構成手法を使用すると、メソッドの引数をメッセージ要素(ペイロードとヘッダー)にマップする方法を制御できます。明示的な構成が使用されない場合、特定の規則がマッピングの実行に使用されます。場合によっては、これらの規則では、どの引数がペイロードであり、どの引数をヘッダーにマップする必要があるかを判断できません。次の例を考えてみましょう。
public String send1(Object thing1, Map thing2);
public String send2(Map thing1, Map thing2);
最初の場合、慣例では、最初の引数をペイロードにマップし(Map
でない限り)、2 番目の引数の内容はヘッダーになります。
2 番目の場合(またはパラメーター thing1
の引数が Map
である場合の最初の場合)、フレームワークはどの引数をペイロードにするかを決定できません。その結果、マッピングは失敗します。通常、これは payload-expression
、@Payload
アノテーション、@Headers
アノテーションを使用して解決できます。
別の方法(および規則が破られるたび)では、メソッド呼び出しをメッセージにマッピングする責任をすべて負うことができます。そのためには、MethodArgsMessageMapper
を実装し、mapper
属性を使用して <gateway/>
に提供します。マッパーは MethodArgsHolder
をマッピングします。これは、java.reflect.Method
インスタンスと引数を含む Object[]
をラップする単純なクラスです。カスタムマッパーを提供する場合、default-payload-expression
属性と <default-header/>
要素はゲートウェイで許可されていません。同様に、payload-expression
属性および <header/>
要素は、<method/>
要素では許可されていません。
マッピングメソッドの引数
次の例は、メソッド引数をメッセージにマップする方法と、無効な構成の例を示しています。
public interface MyGateway {
void payloadAndHeaderMapWithoutAnnotations(String s, Map<String, Object> map);
void payloadAndHeaderMapWithAnnotations(@Payload String s, @Headers Map<String, Object> map);
void headerValuesAndPayloadWithAnnotations(@Header("k1") String x, @Payload String s, @Header("k2") String y);
void mapOnly(Map<String, Object> map); // the payload is the map and no custom headers are added
void twoMapsAndOneAnnotatedWithPayload(@Payload Map<String, Object> payload, Map<String, Object> headers);
@Payload("#args[0] + #args[1] + '!'")
void payloadAnnotationAtMethodLevel(String a, String b);
@Payload("@someBean.exclaim(#args[0])")
void payloadAnnotationAtMethodLevelUsingBeanResolver(String s);
void payloadAnnotationWithExpression(@Payload("toUpperCase()") String s);
void payloadAnnotationWithExpressionUsingBeanResolver(@Payload("@someBean.sum(#this)") String s); // (1)
// invalid
void twoMapsWithoutAnnotations(Map<String, Object> m1, Map<String, Object> m2);
// invalid
void twoPayloads(@Payload String s1, @Payload String s2);
// invalid
void payloadAndHeaderAnnotationsOnSameParameter(@Payload @Header("x") String s);
// invalid
void payloadAndHeadersAnnotationsOnSameParameter(@Payload @Headers Map<String, Object> map);
}
1 | この例では、SpEL 変数 #this が引数を参照していることに注意してください。この場合は s の値です。 |
メソッドの引数に #this
コンテキストがないため、XML の同等物は少し異なります。ただし、次の例に示すように、式は #args
変数を使用してメソッド引数を参照できます。
<int:gateway id="myGateway" service-interface="org.something.MyGateway">
<int:method name="send1" payload-expression="#args[0] + 'thing2'"/>
<int:method name="send2" payload-expression="@someBean.sum(#args[0])"/>
<int:method name="send3" payload-expression="#method"/>
<int:method name="send4">
<int:header name="thing1" expression="#args[2].toUpperCase()"/>
</int:method>
</int:gateway>
@MessagingGateway
アノテーション
バージョン 4.0 以降、ゲートウェイサービスインターフェースは、構成のために <gateway />
xml 要素の定義を要求する代わりに、@MessagingGateway
アノテーションでマークできます。次のペアの例では、同じゲートウェイを構成するための 2 つのアプローチを比較しています。
<int:gateway id="myGateway" service-interface="org.something.TestGateway"
default-request-channel="inputC">
<int:default-header name="calledMethod" expression="#gatewayMethod.name"/>
<int:method name="echo" request-channel="inputA" reply-timeout="2" request-timeout="200"/>
<int:method name="echoUpperCase" request-channel="inputB">
<int:header name="thing1" value="thing2"/>
</int:method>
<int:method name="echoViaDefault"/>
</int:gateway>
@MessagingGateway(name = "myGateway", defaultRequestChannel = "inputC",
defaultHeaders = @GatewayHeader(name = "calledMethod",
expression="#gatewayMethod.name"))
public interface TestGateway {
@Gateway(requestChannel = "inputA", replyTimeout = 2, requestTimeout = 200)
String echo(String payload);
@Gateway(requestChannel = "inputB", headers = @GatewayHeader(name = "thing1", value="thing2"))
String echoUpperCase(String payload);
String echoViaDefault(String payload);
}
XML バージョンと同様に、Spring Integration はコンポーネントスキャン中にこれらのアノテーションを検出すると、メッセージングインフラストラクチャを使用して proxy 実装を作成します。このスキャンを実行し、BeanDefinition をアプリケーションコンテキストに登録するには、@IntegrationComponentScan アノテーションを @Configuration クラスに追加します。標準の @ComponentScan インフラストラクチャはインターフェースを処理しません。そのため、カスタム @IntegrationComponentScan ロジックを導入して、インターフェース上の @MessagingGateway アノテーションを微調整し、GatewayProxyFactoryBean インスタンスを登録します。アノテーションサポートも参照してください。 |
@MessagingGateway
アノテーションとともに、@Profile
アノテーションでサービスインターフェースをマークして、そのようなプロファイルがアクティブでない場合、Bean の作成を回避できます。
XML 構成がない場合は、少なくとも 1 つの @Configuration クラスで @EnableIntegration アノテーションが必要です。詳細については、構成と @EnableIntegration を参照してください。 |
引数なしのメソッドの呼び出し
引数を持たない Gateway インターフェースでメソッドを呼び出す場合、デフォルトの動作は PollableChannel
から Message
を受け取ります。
ただし、引数のない SQL 呼び出しやストアドプロシージャのトリガーなど、ユーザーが提供するパラメーターを必要としないダウンストリームの他のコンポーネントと対話できるように、引数のないメソッドをトリガーしたい場合があります。
送受信のセマンティクスを実現するには、ペイロードを提供する必要があります。ペイロードを生成するために、インターフェースのメソッドパラメーターは必要ありません。@Payload
アノテーションまたは method
要素の XML の payload-expression
属性を使用できます。次のリストには、ペイロードの例をいくつか示します。
リテラル文字列
#gatewayMethod.name
新しい java.util.Date()
@someBean.someMethod() の戻り値
次の例は、@Payload
アノテーションの使用方法を示しています。
public interface Cafe {
@Payload("new java.util.Date()")
List<Order> retrieveOpenOrders();
}
@Gateway
アノテーションを使用することもできます。
public interface Cafe {
@Gateway(payloadExpression = "new java.util.Date()")
List<Order> retrieveOpenOrders();
}
両方のアノテーションが存在する場合(および payloadExpression が提供されている場合)、@Gateway が優先されます。 |
アノテーションと XML を使用したゲートウェイ構成も参照してください。
メソッドに引数および戻り値がなく、ペイロード式が含まれている場合、メソッドは送信専用操作として扱われます。
default
メソッドの呼び出し
ゲートウェイプロキシのインターフェースには default
メソッドも含まれる場合があり、バージョン 5.3 以降、フレームワークはプロキシに DefaultMethodInvokingMethodInterceptor
を注入して、プロキシの代わりに java.lang.invoke.MethodHandle
アプローチを使用して default
メソッドを呼び出します。java.util.function.Function
などの JDK からのインターフェースは引き続きゲートウェイプロキシに使用できますが、JDK クラスに対する MethodHandles.Lookup
インスタンス化の内部 Java セキュリティ上の理由により、それらの default
メソッドを呼び出すことができません。これらのメソッドは、メソッドの明示的な @Gateway
アノテーション、または @MessagingGateway
アノテーションまたは <gateway>
XML コンポーネントの proxyDefaultMethods
を使用して、プロキシ化(実装ロジックを失い、同時に以前のゲートウェイプロキシの動作を復元)することもできます。
エラー処理
ゲートウェイの呼び出しはエラーになる可能性があります。デフォルトでは、ダウンストリームで発生したエラーは、ゲートウェイのメソッド呼び出し時に「そのまま」再スローされます。例: 次の簡単なフローを検討してください。
gateway -> service-activator
サービスアクティベータによって呼び出されたサービスが MyException
をスローした場合(たとえば)、フレームワークはそれを MessagingException
にラップし、failedMessage
プロパティでサービスアクティベータに渡されたメッセージを添付します。そのため、フレームワークによって実行されるロギングには、障害の完全なコンテキストが含まれます。デフォルトでは、例外がゲートウェイによってキャッチされると、MyException
はラップ解除され、呼び出し元にスローされます。原因チェーンの特定の例外型と一致するように、ゲートウェイメソッド宣言で throws
句を構成できます。例: ダウンストリームエラーの理由のすべてのメッセージング情報で MessagingException
全体をキャッチする場合、次のようなゲートウェイメソッドが必要です。
public interface MyGateway {
void performProcess() throws MessagingException;
}
POJO プログラミングを推奨しているため、発信者をメッセージングインフラストラクチャに公開したくない場合があります。
ゲートウェイメソッドに throws
句がない場合、ゲートウェイは原因ツリーを走査し、MessagingException
ではない RuntimeException
を探します。何も見つからない場合、フレームワークは MessagingException
をスローします。前の説明の MyException
に SomeOtherException
とメソッド throws SomeOtherException
の原因がある場合、ゲートウェイはそれをさらにアンラップし、呼び出し元にスローします。
service-interface
なしでゲートウェイが宣言されると、内部フレームワークインターフェース RequestReplyExchanger
が使用されます。
次の例を考えてみましょう。
public interface RequestReplyExchanger {
Message<?> exchange(Message<?> request) throws MessagingException;
}
バージョン 5.0 より前は、この exchange
メソッドには throws
節がありませんでした。その結果、例外はラップ解除されました。このインターフェースを使用して以前のアンラップ動作を復元する場合は、代わりにカスタム service-interface
を使用するか、MessagingException
の cause
にアクセスしてください。
ただし、エラーを伝播するのではなくログに記録するか、例外を有効な応答として扱うことができます(呼び出し元が理解する「エラーメッセージ」規約に準拠するメッセージにマッピングすることにより)。これを実現するために、ゲートウェイは、error-channel
属性のサポートを含めることにより、エラー専用のメッセージチャネルのサポートを提供します。次の例では、「トランスフォーマー」が Exception
から応答 Message
を作成します。
<int:gateway id="sampleGateway"
default-request-channel="gatewayChannel"
service-interface="foo.bar.SimpleGateway"
error-channel="exceptionTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
exceptionTransformer
は、予期されるエラーレスポンスオブジェクトの作成方法を知っている単純な POJO である可能性があります。それが発呼者に送り返されるペイロードになります。必要に応じて、このような「エラーフロー」でより多くの詳細な処理を実行できます。ルーター(Spring Integration の ErrorMessageExceptionTypeRouter
を含む)、フィルターなどが含まれる場合があります。ただし、ほとんどの場合、単純な「トランス」で十分です。
または、例外のみをログに記録する(または非同期でどこかに送信する)こともできます。一方向のフローを提供する場合、発信者には何も返されません。例外を完全に抑制したい場合は、グローバル nullChannel
への参照を提供できます(本質的に /dev/null
アプローチ)。最後に、上記のように、error-channel
が定義されていない場合、例外は通常どおり伝播します。
@MessagingGateway
アノテーションを使用する場合(
)、@MessagingGateway
アノテーションを参照 errorChannel
属性を使用できます。
バージョン 5.0 以降、void
戻り値の型(一方向フロー)でゲートウェイメソッドを使用すると、error-channel
参照(提供されている場合)が各送信メッセージの標準 errorChannel
ヘッダーに取り込まれます。この機能により、標準の ExecutorChannel
構成(または QueueChannel
)に基づいたダウンストリーム非同期フローが、デフォルトのグローバル errorChannel
例外送信動作をオーバーライドできます。以前は、@GatewayHeader
アノテーションまたは <header>
要素で errorChannel
ヘッダーを手動で指定する必要がありました。error-channel
プロパティは、非同期フローを持つ void
メソッドでは無視されました。代わりに、エラーメッセージがデフォルトの errorChannel
に送信されました。
シンプルな POJI ゲートウェイを介してメッセージングシステムを公開することには利点がありますが、基盤となるメッセージングシステムの現実を「隠す」ことには代償が伴うため、考慮する必要がある特定の事項があります。Java メソッドができるだけ早く戻り、呼び出し側が戻るのを待機している間(void、戻り値、スローされた例外)無限にハングしないようにします。メッセージングシステムの前で通常のメソッドをプロキシとして使用する場合、基になるメッセージングの潜在的な非同期性を考慮する必要があります。これは、ゲートウェイによって開始されたメッセージがフィルターによってドロップされ、応答の生成を担当するコンポーネントに到達しない可能性があることを意味します。一部のサービスアクティベータメソッドでは例外が発生し、応答が返されない場合があります(null メッセージを生成しないため)。言い換えると、複数のシナリオが原因で、返信メッセージが届かないことがあります。これはメッセージングシステムではまったく自然なことです。ただし、ゲートウェイ方式への影響を考慮してください。ゲートウェイのメソッド入力引数はメッセージに組み込まれ、ダウンストリームに送信されました。応答メッセージは、ゲートウェイのメソッドの戻り値に変換されます。そのため、ゲートウェイコールごとに、必ず応答メッセージが存在するようにする必要があります。そうしないと、ゲートウェイメソッドが戻らず、無期限にハングする可能性があります。この状況を処理する 1 つの方法は、非同期ゲートウェイを使用することです(このセクションの後半で説明します)。それを処理する別の方法は、reply-timeout 属性を明示的に設定することです。そうすれば、ゲートウェイは reply-timeout で指定された時間より長くハングせず、タイムアウトが経過した場合に "null" を返します。最後に、サービスアクティベータで "requires-reply" などのダウンストリームフラグを設定したり、フィルターで "throw-exceptions-on-rejection" を設定することを検討できます。これらのオプションについては、この章の最後のセクションで詳しく説明します。 |
ダウンストリームフローが ErrorMessage を返す場合、その payload (Throwable )は通常のダウンストリームエラーとして扱われます。error-channel が構成されている場合、エラーフローに送信されます。それ以外の場合、ペイロードはゲートウェイの呼び出し元にスローされます。同様に、error-channel のエラーフローが ErrorMessage を返す場合、そのペイロードは呼び出し元にスローされます。同じことが、Throwable ペイロードを持つすべてのメッセージに当てはまります。これは、Exception を直接呼び出し元に伝搬する必要がある場合の非同期状況で役立ちます。これを行うには、Exception を返す(サービスから reply として)か、スローします。一般に、非同期フローの場合でも、フレームワークは、ダウンストリームフローによってゲートウェイにスローされる例外の伝播を処理します。TCP クライアントサーバーマルチプレックス [GitHub] (英語) サンプルは、呼び出し元に例外を返すための両方の手法を示しています。aggregator と group-timeout (アグリゲーターとグループのタイムアウトを参照)および破棄フローでの MessagingTimeoutException 応答を使用して、待機スレッドへのソケット IO エラーをエミュレートします。 |
ゲートウェイのタイムアウト
ゲートウェイには、requestTimeout
と replyTimeout
の 2 つのタイムアウトプロパティがあります。リクエストのタイムアウトは、チャネルがブロックできる場合にのみ適用されます(たとえば、境界のある QueueChannel
がいっぱいの場合)。replyTimeout
値は、ゲートウェイが応答を待つ時間または null
を返す時間です。デフォルトは無限です。
タイムアウトは、ゲートウェイ(defaultRequestTimeout
および defaultReplyTimeout
)または MessagingGateway
インターフェースアノテーションのすべてのメソッドのデフォルトとして設定できます。個々のメソッドは、これらのデフォルト(<method/>
子要素内)または @Gateway
アノテーションをオーバーライドできます。
バージョン 5.0 以降、次の例に示すように、タイムアウトは式として定義できます。
@Gateway(payloadExpression = "#args[0]", requestChannel = "someChannel",
requestTimeoutExpression = "#args[1]", replyTimeoutExpression = "#args[2]")
String lateReply(String payload, long requestTimeout, long replyTimeout);
評価コンテキストには BeanResolver
(他の Bean を参照するには @someBean
を使用)があり、#args
配列変数が使用可能です。
XML で設定する場合、タイムアウト属性は、次の例に示すように、長い値または SpEL 式にすることができます。
<method name="someMethod" request-channel="someRequestChannel"
payload-expression="#args[0]"
request-timeout="1000"
reply-timeout="#args[1]">
</method>
非同期ゲートウェイ
パターンとして、メッセージングゲートウェイは、メッセージングシステムの全機能を公開しながら、メッセージング固有のコードを非表示にする優れたメソッドを提供します。前述のように、GatewayProxyFactoryBean
は、サービスインターフェースを介してプロキシを公開する便利なメソッドを提供し、メッセージングシステムへの POJO ベースのアクセスを提供します(独自のドメイン内のオブジェクト、プリミティブ / 文字列、その他のオブジェクトに基づく)。ただし、値を返す単純な POJO メソッドを介してゲートウェイが公開されている場合、リクエストメッセージ(メソッドが呼び出されたときに生成された)ごとに、応答メッセージ(メソッドが返されたときに生成された)が必要です。メッセージングシステムは本来非同期であるため、「各リクエストに対して常に応答がある」という契約を常に保証できるとは限りません。Spring Integration 2.0 は非同期ゲートウェイのサポートを導入しました。これは、応答が予期されているかどうか、応答が到着するまでにかかる時間がわからない場合に、フローを開始する便利なメソッドを提供します。
これらの型のシナリオを処理するために、Spring Integration は java.util.concurrent.Future
インスタンスを使用して非同期ゲートウェイをサポートします。
次の例に示すように、XML 構成から変更はなく、通常のゲートウェイを定義するのと同じ方法で非同期ゲートウェイを定義します。
<int:gateway id="mathService"
service-interface="org.springframework.integration.sample.gateway.futures.MathServiceGateway"
default-request-channel="requestChannel"/>
ただし、ゲートウェイインターフェース(サービスインターフェース)は、次のように少し異なります。
public interface MathServiceGateway {
Future<Integer> multiplyByTwo(int i);
}
前の例が示すように、ゲートウェイメソッドの戻り値の型は Future
です。GatewayProxyFactoryBean
は、ゲートウェイメソッドの戻り値の型が Future
であることを確認すると、AsyncTaskExecutor
を使用して直ちに非同期モードに切り替えます。それが違いの範囲です。このようなメソッドの呼び出しは、常に Future
インスタンスですぐに返されます。その後、自分のペースで Future
を操作して、結果の取得やキャンセルなどを行うことができます。また、Future
インスタンスの他の使用と同様に、get()
を呼び出すと、タイムアウト、実行例外などが明らかになる場合があります。次の例は、非同期ゲートウェイから戻る Future
の使用方法を示しています。
MathServiceGateway mathService = ac.getBean("mathService", MathServiceGateway.class);
Future<Integer> result = mathService.multiplyByTwo(number);
// do something else here since the reply might take a moment
int finalResult = result.get(1000, TimeUnit.SECONDS);
より詳細な例については、Spring Integration サンプルの async-gateway [GitHub] (英語) サンプルを参照してください。
ListenableFuture
バージョン 4.1 以降、非同期ゲートウェイメソッドは ListenableFuture
(Spring Framework 4.0 で導入)を返すこともできます。これらの戻り値の型を使用すると、結果が利用可能になった(または例外が発生した)ときに呼び出されるコールバックを提供できます。ゲートウェイがこの戻り値の型を検出し、タスクエグゼキューターが AsyncListenableTaskExecutor
である場合、エグゼキューターの submitListenable()
メソッドが呼び出されます。次の例は、ListenableFuture
の使用方法を示しています。
ListenableFuture<String> result = this.asyncGateway.async("something");
result.addCallback(new ListenableFutureCallback<String>() {
@Override
public void onSuccess(String result) {
...
}
@Override
public void onFailure(Throwable t) {
...
}
});
AsyncTaskExecutor
デフォルトでは、GatewayProxyFactoryBean
は、戻り値の型が Future
であるゲートウェイメソッドの内部 AsyncInvocationTask
インスタンスを送信するときに org.springframework.core.task.SimpleAsyncTaskExecutor
を使用します。ただし、<gateway/>
要素の構成の async-executor
属性により、Spring アプリケーションコンテキスト内で利用可能な java.util.concurrent.Executor
の実装への参照を提供できます。
(デフォルト) SimpleAsyncTaskExecutor
は、Future
と ListenableFuture
の両方の戻り値の型をサポートし、それぞれ FutureTask
または ListenableFutureTask
を返します。CompletableFuture
を参照してください。デフォルトのエグゼキューターがありますが、次の例に示すように、ログでスレッドを識別できるように外部エグゼキューターを提供すると便利です(XML を使用する場合、スレッド名はエグゼキューターの Bean 名に基づきます)。
@Bean
public AsyncTaskExecutor exec() {
SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor();
simpleAsyncTaskExecutor.setThreadNamePrefix("exec-");
return simpleAsyncTaskExecutor;
}
@MessagingGateway(asyncExecutor = "exec")
public interface ExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
別の Future
実装を返す場合は、カスタムエグゼキューターを提供するか、エグゼキューターを完全に無効にして、ダウンストリームフローからの応答メッセージペイロードで Future
を返すことができます。エグゼキューターを無効にするには、GatewayProxyFactoryBean
で null
に設定します(setAsyncTaskExecutor(null)
を使用)。XML を使用してゲートウェイを構成する場合は、async-executor=""
を使用します。@MessagingGateway
アノテーションを使用して構成する場合、次のようなコードを使用します。
@MessagingGateway(asyncExecutor = AnnotationConstants.NULL)
public interface NoExecGateway {
@Gateway(requestChannel = "gatewayChannel")
Future<?> doAsync(String foo);
}
戻り値の型が特定の具体的な Future 実装または構成されたエグゼキューターでサポートされていない他のサブインターフェースである場合、フローは呼び出し側のスレッドで実行され、フローは応答メッセージペイロードで必要な型を返す必要があります |
CompletableFuture
バージョン 4.2 以降、ゲートウェイメソッドは CompletableFuture<?>
を返すことができるようになりました。この型を返す場合、2 つの操作モードがあります。
非同期エグゼキューターが提供され、戻り値の型が(サブクラスではなく)
CompletableFuture
である場合、フレームワークはエグゼキューターでタスクを実行し、すぐに呼び出し元にCompletableFuture
を返します。CompletableFuture.supplyAsync(Supplier<U> supplier, Executor executor)
は未来の創造に使用されます。非同期エグゼキューターが明示的に
null
に設定され、戻り値の型がCompletableFuture
であるか、戻り値の型がCompletableFuture
のサブクラスである場合、フローは呼び出し側のスレッドで呼び出されます。このシナリオでは、ダウンストリームフローは適切な型のCompletableFuture
を返すことが期待されています。
使用シナリオ
次のシナリオでは、ダウンストリームフローが(Invoice
オブジェクトを使用して)ゲートウェイに応答すると、呼び出し元スレッドはすぐに CompletableFuture<Invoice>
を返します。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="something.Service" default-request-channel="orders" />
次のシナリオでは、ダウンストリームフローがゲートウェイへの応答のペイロードとして提供する場合、呼び出し元スレッドは CompletableFuture<Invoice>
を返します。請求書の準備ができたら、他のプロセスで将来を完了する必要があります。
CompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders"
async-executor="" />
次のシナリオでは、ダウンストリームフローがゲートウェイへの応答のペイロードとして提供する場合、呼び出し元スレッドは CompletableFuture<Invoice>
を返します。請求書の準備ができたら、他のプロセスで将来を完了する必要があります。DEBUG
ロギングが有効になっている場合、ログエントリが出力され、このシナリオでは非同期エグゼキューターを使用できないことを示します。
MyCompletableFuture<Invoice> order(Order order);
<int:gateway service-interface="foo.Service" default-request-channel="orders" />
次の例に示すように、CompletableFuture
インスタンスを使用して、応答に対して追加の操作を実行できます。
CompletableFuture<String> process(String data);
...
CompletableFuture result = process("foo")
.thenApply(t -> t.toUpperCase());
...
String out = result.get(10, TimeUnit.SECONDS);
Reactor Mono
バージョン 5.0 以降、GatewayProxyFactoryBean
では、Mono<T>
[GitHub] (英語) 戻り型を使用して、ゲートウェイインターフェースメソッドでプロジェクト Reactor (英語) を使用できます。内部 AsyncInvocationTask
は Mono.fromCallable()
にラップされています。
Mono
を使用して、後で結果を取得することができます(Future<?>
と同様)。または、結果がゲートウェイに返されたときに Consumer
を呼び出すことにより、ディスパッチャーから結果を取得できます。
Mono は、フレームワークによってすぐにはフラッシュされません。基になるメッセージフローは、ゲートウェイメソッドが戻る前に開始されません(Future<?> Executor タスクの場合のように)。Mono がサブスクライブされると、フローが開始されます。あるいは、subscribe() が Flux 全体に関連している場合、Mono (「構成可能」である)は Reactor ストリームの一部である可能性があります。次の例は、Project Reactor を使用してゲートウェイを作成する方法を示しています。 |
@MessagingGateway
public static interface TestGateway {
@Gateway(requestChannel = "promiseChannel")
Mono<Integer> multiply(Integer value);
}
...
@ServiceActivator(inputChannel = "promiseChannel")
public Integer multiply(Integer value) {
return value * 2;
}
...
Flux.just("1", "2", "3", "4", "5")
.map(Integer::parseInt)
.flatMap(this.testGateway::multiply)
.collectList()
.subscribe(integers -> ...);
Project Reactor を使用する別の例は、次の例が示すように、単純なコールバックシナリオです。
Mono<Invoice> mono = service.process(myOrder);
mono.subscribe(invoice -> handleInvoice(invoice));
呼び出しスレッドは継続し、フローが完了すると handleInvoice()
が呼び出されます。
非同期型を返すダウンストリームフロー
上記の ListenableFuture
セクションで説明したように、ダウンストリームコンポーネントが非同期ペイロード(Future
、Mono
など)を含むメッセージを返すようにする場合は、非同期エグゼキューターを null
(または XML 構成を使用する場合は ""
)に明示的に設定する必要があります。次に、フローは呼び出し元のスレッドで呼び出され、結果を後で取得できます。
void
戻り値の型
前述の戻り値の型とは異なり、メソッドの戻り値の型が void
の場合、フレームワークは、ダウンストリームフローを非同期に実行することを暗黙的に決定することはできません。呼び出し元のスレッドはすぐに戻ります。この場合、次の例に示すように、インターフェースメソッドに @Async
でアノテーションを付ける必要があります。
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "sendAsyncChannel")
@Async
void sendAsync(String payload);
}
Future<?>
戻り型とは異なり、カスタム TaskExecutor
(ErrorHandlingTaskExecutor
など)が @Async
アノテーションに関連付けられていない限り、フローによって例外がスローされた場合に呼び出し元に通知する方法はありません。
レスポンスがない場合のゲートウェイの動作
前に説明したように、ゲートウェイは、POJO メソッド呼び出しを通じてメッセージングシステムと対話する便利な方法を提供します。ただし、通常は常に例外が返されると予想される一般的なメソッド呼び出しは、メッセージ交換に 1 対 1 を常にマップするとは限りません(たとえば、応答メッセージが到着しない場合があります。戻る)。
このセクションの残りの部分では、さまざまなシナリオと、ゲートウェイをより予測可能な動作にする方法について説明します。同期ゲートウェイの動作をより予測可能にするために特定の属性を構成できますが、一部の属性は期待どおりに動作しない場合があります。それらの 1 つは reply-timeout
(メソッドレベルまたはゲートウェイレベルの default-reply-timeout
)です。reply-timeout
属性を調べて、さまざまなシナリオで同期ゲートウェイの動作にどのように影響を与えることができるか、およびできないかを確認します。シングルスレッドシナリオ(ダウンストリームのすべてのコンポーネントが直接チャネルを介して接続されている)とマルチスレッドシナリオ(たとえば、ダウンストリームのどこかに、シングルスレッド境界を破るポーリング可能チャネルまたはエグゼキュータチャネルがある場合)を調べます。
長期実行プロセスダウンストリーム
- 同期ゲートウェイ、シングルスレッド
コンポーネントのダウンストリームがまだ実行されている場合(おそらく無限ループまたは遅いサービスのため)、
reply-timeout
の設定は効果がなく、ダウンストリームサービスが終了するまで(例外を返すかスローすることにより)ゲートウェイメソッド呼び出しは戻りません。- 同期ゲートウェイ、マルチスレッド
マルチスレッドのメッセージフローでコンポーネントのダウンストリームが(おそらく無限ループまたは遅いサービスのために)実行されている場合、
GatewayProxyFactoryBean
がタイムアウトに達するとゲートウェイメソッドの呼び出しが返されるようにすることでreply-timeout
を設定すると効果があります。応答チャネルでポーリングし、タイムアウトが期限切れになるまでメッセージを待機します。ただし、実際の応答が生成される前にタイムアウトに達すると、ゲートウェイメソッドから "null" が返される可能性があります。ゲートウェイメソッドの呼び出しが返された後、応答メッセージ(生成された場合)が応答チャネルに送信されることを理解する必要があります。
ダウンストリームコンポーネントが "null" を返す
- 同期ゲートウェイ — single-threaded
コンポーネントダウンストリームが "null" を返し、
reply-timeout
が設定されていない場合、reply-timeout
が設定されているか、requires-reply
属性がダウンストリームコンポーネント(サービスアクティベータなど)に設定されていない限り、ゲートウェイメソッド呼び出しは無期限にハングします 'null'。この場合、例外がスローされ、ゲートウェイに伝播されます。- 同期ゲートウェイ — multi-threaded
動作は前のケースと同じです。
ゲートウェイメソッドシグネチャーが非 void の場合、ダウンストリームコンポーネントのリターンシグネチャーは "void"
- 同期ゲートウェイ — single-threaded
コンポーネントダウンストリームが "void" を返し、
reply-timeout
が設定されていない場合、reply-timeout
が設定されていない限り、ゲートウェイメソッド呼び出しは無期限にハングします。- 同期ゲートウェイ — multi-threaded
動作は前のケースと同じです。
ダウンストリームコンポーネントがランタイム例外を引き起こす
- 同期ゲートウェイ — single-threaded
コンポーネントのダウンストリームがランタイム例外をスローした場合、例外はエラーメッセージを介してゲートウェイに伝播され、再スローされます。
- 同期ゲートウェイ — multi-threaded
動作は前のケースと同じです。
デフォルトでは、reply-timeout は無制限であることを理解する必要があります。そのため、reply-timeout を明示的に設定しないと、ゲートウェイメソッドの呼び出しが無期限にハングする機能があります。フローを確実に分析し、これらのシナリオの 1 つがリモートで発生する可能性がある場合でも、reply-timeout 属性を " 'safe'" 値に設定する必要があります。さらに良いことに、ダウンストリームコンポーネントの requires-reply 属性を 'true' に設定して、そのダウンストリームコンポーネントが内部で null を返すとすぐに例外をスローすることで生成されるタイムリーなレスポンスを確保できます。ただし、reply-timeout が役に立たないシナリオ(最初のシナリオを参照)があることも認識する必要があります。つまり、メッセージフローを分析し、非同期ゲートウェイではなく同期ゲートウェイをいつ使用するかを決定することも重要です。前述のように、後者の場合は、Future インスタンスを返すゲートウェイメソッドを定義する問題です。その後、その戻り値を受け取ることが保証され、呼び出しの結果をよりきめ細かく制御できます。また、ルーターを扱う場合、resolution-required 属性を "true" に設定すると、特定のチャネルを解決できない場合にルーターによって例外がスローされることに注意してください。同様に、フィルターを扱う場合、throw-exception-on-rejection 属性を設定できます。どちらの場合も、結果のフローは、'requires-reply' 属性のサービスアクティベーターを含むように動作します。つまり、ゲートウェイメソッドの呼び出しからタイムリーなレスポンスを保証できます。 |
reply-timeout は、<gateway/> エレメント(GatewayProxyFactoryBean によって作成された)に対して制限されていません。外部統合用の受信ゲートウェイ(WS、HTTP など)は、これらのゲートウェイと多くの特性と属性を共有しています。ただし、これらの受信ゲートウェイの場合、デフォルトの reply-timeout は 1000 ミリ秒(1 秒)です。別のスレッドへのダウンストリーム非同期ハンドオフが行われる場合、この属性を増やして、ゲートウェイがタイムアウトする前にフローが完了するのに十分な時間を確保する必要があります。 |
スレッドがゲートウェイに戻ると、つまりフローが完了するか、メッセージが別のスレッドに渡されると、タイマーが開始することを理解する必要があります。その時点で、呼び出しスレッドは応答の待機を開始します。フローが完全に同期されている場合、応答はすぐに利用可能です。非同期フローの場合、スレッドはこの時間まで待機します。 |
IntegrationFlows
を介してゲートウェイを定義するオプションについては、Java DSL の章のゲートウェイとしての IntegrationFlow
を参照してください。