TCP 接続ファクトリ

概要

TCP の場合、基礎となる接続の構成は、接続ファクトリを使用して提供されます。クライアント接続ファクトリとサーバー接続ファクトリの 2 種類の接続ファクトリが提供されます。クライアント接続ファクトリは、発信接続を確立します。サーバー接続ファクトリは、受信接続をリッスンします。

送信チャネルアダプターはクライアント接続ファクトリを使用しますが、クライアント接続ファクトリへの参照を受信チャネルアダプターに提供することもできます。そのアダプターは、送信アダプターによって作成された接続で受信される受信メッセージを受信します。

受信チャネルアダプターまたはゲートウェイは、サーバー接続ファクトリを使用します。(実際、接続ファクトリは接続ファクトリなしでは機能できません)。サーバー接続ファクトリへの参照を送信アダプターに提供することもできます。その後、そのアダプターを使用して、同じ接続で受信メッセージに返信を送信できます。

返信メッセージは、接続ファクトリによって元のメッセージに挿入された ip_connectionId ヘッダーが返信に含まれている場合にのみ、接続にルーティングされます。
これは、受信アダプターと送信アダプター間で接続ファクトリを共有するときに実行されるメッセージ相関の範囲です。このような共有により、TCP を介した非同期の双方向通信が可能になります。デフォルトでは、ペイロード情報のみが TCP を使用して転送されます。メッセージの関連付けは、アグリゲーターまたはその他のエンドポイントなどのダウンストリームコンポーネントによって実行する必要があります。選択されたヘッダーの転送のサポートは、バージョン 3.0 で導入されました。詳細については、TCP メッセージ相関を参照してください。

接続ファクトリへの参照を、各型の最大 1 つのアダプターに提供できます。

Spring Integration は、java.net.Socket および java.nio.channel.SocketChannel を使用する接続ファクトリを提供します。

次の例は、java.net.Socket 接続を使用する単純なサーバー接続ファクトリを示しています。

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"/>

次の例は、java.nio.channel.SocketChannel 接続を使用する単純なサーバー接続ファクトリを示しています。

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    using-nio="true"/>
Spring Integration バージョン 4.2 以降、サーバーが(ポートを 0 に設定することにより)ランダムポートでリッスンするように構成されている場合、getPort() を使用して OS が選択した実際のポートを取得できます。また、getServerSocketAddress() を使用すると、完全な SocketAddress を取得できます。詳細については、TcpServerConnectionFactory インターフェースの Javadoc を参照してください。
<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"/>

次の例は、java.net.Socket 接続を使用し、各メッセージに新しい接続を作成するクライアント接続ファクトリを示しています。

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="1234"
    single-use="true"
    so-timeout="10000"
    using-nio=true/>

バージョン 5.2 以降、クライアント接続ファクトリは、秒単位で指定されるプロパティ connectTimeout をサポートします。デフォルトは 60 です。

メッセージ境界 (シリアライザーとデシリアライザー)

TCP はストリーミングプロトコルです。これは、受信者がデータを個別のメッセージに区分できるように、TCP で転送されるデータに何らかの構造を提供する必要があることを意味します。接続ファクトリは、シリアライザーとデシリアライザーを使用して、メッセージペイロードと TCP を介して送信されるビットの間で変換するように構成されます。これは、受信メッセージと送信メッセージにそれぞれデシリアライザーとシリアライザーを提供することで実現されます。Spring Integration は、多くの標準シリアライザーとデシリアライザーを提供します。

ByteArrayCrlfSerializer* は、バイト配列をバイトストリームに変換し、その後にキャリッジリターンおよびラインフィード文字(\r\n)が続きます。これはデフォルトのシリアライザー(およびデシリアライザー)であり、クライアントとして telnet で(たとえば)使用できます。

ByteArraySingleTerminatorSerializer* は、バイト配列を、単一の終了文字が続くバイトのストリームに変換します(デフォルトは 0x00 です)。

ByteArrayLfSerializer* は、バイト配列をバイトのストリームに変換し、その後に単一の改行文字(0x0a)が続きます。

ByteArrayStxEtxSerializer* は、バイト配列を、STX(0x02)が先行し、ETX(0x03)が後続するバイトのストリームに変換します。

ByteArrayLengthHeaderSerializer は、バイト配列を、ネットワークバイト順のバイナリ長(ビッグエンディアン)が前に付くバイトのストリームに変換します。これは、終了文字シーケンスを探すためにすべてのバイトを解析する必要がないため、効率的なデシリアライザーです。バイナリデータを含むペイロードにも使用できます。上記のシリアライザーは、ペイロード内のテキストのみをサポートします。長さヘッダーのデフォルトサイズは 4 バイト(整数)で、最大(2 ^ 31-1)バイトのメッセージを許可します。ただし、length ヘッダーは、255 バイトまでのメッセージの場合は 1 バイト(符号なし)、(2 ^ 16-1)バイトまでのメッセージの場合は符号なしショート(2 バイト)にすることができます。ヘッダーに他の形式が必要な場合は、ByteArrayLengthHeaderSerializer をサブクラス化し、readHeader メソッドと writeHeader メソッドの実装を提供できます。絶対最大データサイズは(2 ^ 31-1)バイトです。バージョン 5.2 以降、ヘッダー値には、ペイロードに加えてヘッダーの長さを含めることができます。inclusive プロパティを設定して、そのメカニズムを有効にします(プロデューサーとコンシューマーで同じに設定する必要があります)。

ByteArrayRawSerializer* は、バイト配列をバイトのストリームに変換し、追加のメッセージ境界データを追加しません。このシリアライザ(およびデシリアライザ)を使用すると、メッセージの終わりは、クライアントが規則的にソケットを閉じることで示されます。このシリアライザーを使用すると、クライアントがソケットを閉じるかタイムアウトが発生するまで、メッセージの受信がハングします。タイムアウトはメッセージになりません。このシリアライザーが使用され、クライアントが Spring Integration アプリケーションである場合、クライアントは single-use="true" で構成された接続ファクトリを使用する必要があります。これを行うと、アダプターはメッセージの送信後にソケットを閉じます。シリアライザーは、それ自体では接続を閉じません。このシリアライザーは、チャネルアダプター(ゲートウェイではなく)で使用される接続ファクトリでのみ使用する必要があり、接続ファクトリは、受信または送信アダプターのいずれかで使用する必要があります。このセクションで後述する ByteArrayElasticRawDeserializer も参照してください。ただし、バージョン 5.2 以降、送信ゲートウェイには新しいプロパティ closeStreamAfterSend があります。これにより、応答を受信するために接続を開いたまま、EOF がサーバーに通知されるため、生のシリアライザー / デシリアライザーを使用できます。

バージョン 4.2.2 より前、ノンブロッキング I/O(NIO)を使用する場合、このシリアライザーは(読み取り中の)タイムアウトをファイルの終わりとして扱い、これまでに読み取られたデータはメッセージとして出力されました。これは信頼性が低く、メッセージの区切りには使用しないでください。現在では、このような条件を例外として扱います。この方法で使用することはほとんどありませんが、treatTimeoutAsEndOfMessage コンストラクター引数を true に設定することにより、以前の動作を復元できます。

これらはそれぞれ AbstractByteArraySerializer のサブクラスであり、org.springframework.core.serializer.Serializer と org.springframework.core.serializer.Deserializer の両方を実装しています。後方互換性のために、シリアライゼーションに AbstractByteArraySerializer のサブクラスを使用する接続も、最初にバイト配列に変換される String を受け入れます。これらのシリアライザーとデシリアライザーはそれぞれ、対応する形式を含む入力ストリームをバイト配列ペイロードに変換します。

動作が不適切なクライアント(構成されたシリアライザーのプロトコルに準拠していないクライアント)によるメモリの枯渇を回避するために、これらのシリアライザーは最大メッセージサイズを課します。受信メッセージがこのサイズを超えると、例外がスローされます。デフォルトの最大メッセージサイズは 2048 バイトです。maxMessageSize プロパティを設定することにより、これを増やすことができます。デフォルトのシリアライザーまたはデシリアライザーを使用して最大メッセージサイズを増やしたい場合、maxMessageSize プロパティセットを使用して明示的な Bean として最大メッセージサイズを宣言し、その Bean を使用するように接続ファクトリを構成する必要があります。

このセクションで前述の * マークが付いたクラスは、中間バッファーを使用し、デコードされたデータを正しいサイズの最終バッファーにコピーします。バージョン 4.3 から、poolSize プロパティを設定してこれらのバッファを設定できます。これらのバッファは、デフォルトの動作である各メッセージに割り当てられて破棄されるのではなく、再利用されます。プロパティを負の値に設定すると、境界のないプールが作成されます。プールが制限されている場合は、poolWaitTimeout プロパティ(ミリ秒単位)を設定することもできます。その後、使用可能なバッファーがなくなると例外がスローされます。デフォルトは無限です。このような例外により、ソケットが閉じられます。

カスタムデシリアライザーで同じメカニズムを使用する場合は、(スーパークラス AbstractByteArraySerializer の代わりに) AbstractPooledBufferByteArraySerializer を継承し、deserialize() の代わりに doDeserialize() を実装できます。バッファは自動的にプールに返されます。AbstractPooledBufferByteArraySerializer は、便利なユーティリティメソッド copyToSizedArray() も提供します。

バージョン 5.0 は ByteArrayElasticRawDeserializer を追加しました。これは、上記の ByteArrayRawSerializer のデシリアライザー側に似ていますが、maxMessageSize を設定する必要がない点が異なります。内部的には、必要に応じてバッファーを拡張できる ByteArrayOutputStream を使用します。クライアントは、メッセージの終わりを通知するために、適切な方法でソケットを閉じる必要があります。

このデシリアライザは、ピアが信頼されている場合にのみ使用してください。メモリ不足の状態により、DoS 接続の影響を受けやすくなります。

MapJsonSerializer は Jackson ObjectMapper を使用して、Map と JSON の間の変換を行います。このシリアライザーを MessageConvertingTcpMessageMapper および MapMessageConverter と組み合わせて使用して、選択したヘッダーとペイロードを JSON で転送できます。

Jackson ObjectMapper は、ストリーム内のメッセージを区別できません。MapJsonSerializer は、メッセージの境界を処理するために別のシリアライザーまたはデシリアライザーに委譲する必要があります。デフォルトでは、ByteArrayLfSerializer が使用され、その結果、メッセージは <json><LF> の形式でワイヤ上に表示されますが、他のメッセージを代わりに使用するように構成できます。(次の例は、その方法を示しています。)

最終的な標準シリアライザーは org.springframework.core.serializer.DefaultSerializer です。これを使用して、Java シリアライゼーションでシリアライズ可能オブジェクトを変換できます。org.springframework.core.serializer.DefaultDeserializer は、シリアライズ可能なオブジェクトを含むストリームの受信デシリアライゼーションのために提供されています。

デフォルトのシリアライザーおよびデシリアライザー(ByteArrayCrLfSerializer)を使用したくない場合は、接続ファクトリで serializer および deserializer 属性を設定する必要があります。次の例は、その方法を示しています。

<bean id="javaSerializer"
      class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
      class="org.springframework.core.serializer.DefaultDeserializer" />

<int-ip:tcp-connection-factory id="server"
    type="server"
    port="1234"
    deserializer="javaDeserializer"
    serializer="javaSerializer"/>

java.net.Socket 接続を使用し、ワイヤー上で Java 直列化を使用するサーバー接続ファクトリ。

接続ファクトリで使用できる属性の詳細については、このセクションの最後にあるリファレンスを参照してください。

デフォルトでは、逆引き DNS ルックアップは受信 パケットに対して実行されません。DNS が構成されていない環境 (例: Docker コンテナー) では、これにより接続遅延が発生する可能性があります。メッセージヘッダーで使用するために IP アドレスをホスト名に変換するには、lookup-host 属性を true に設定することで、デフォルトの動作をオーバーライドできます。

ソケットおよびソケットファクトリの属性を変更することもできます。詳細については、SSL/TLS サポートを参照してください。そこに記載されているように、SSL が使用されているかどうかにかかわらず、このような変更は可能です。

ホスト検証

バージョン 5.1.0 以降では、セキュリティ強化のため、ホスト検証がデフォルトで有効になっています。この機能により、TCP 接続中にサーバーの ID が検証されます。

ホスト検証を無効にする必要があるシナリオが発生した場合 (推奨されません)、tcp-connection-factory で socket-support 属性を構成できます。

<int-ip:tcp-connection-factory id="client"
                                type="client"
                                host="localhost"
                                port="0"
                                socket-support="customSocketSupport"
                                single-use="true"
                                so-timeout="10000"/>

<bean id="customSocketSupport" class="org.springframework.integration.ip.tcp.connection.DefaultTcpSocketSupport">
	<constructor-arg value="false" />
</bean>

カスタムシリアライザーとデシリアライザー

データが標準のデシリアライザのいずれかでサポートされている形式でない場合、独自のデシリアライザを実装できます。カスタムシリアライザーを実装することもできます。

カスタムシリアライザーとデシリアライザーのペアを実装するには、org.springframework.core.serializer.Deserializer および org.springframework.core.serializer.Serializer インターフェースを実装します。

デシリアライザは、メッセージ間で閉じた入力ストリームを検出すると、SoftEndOfStreamException をスローする必要があります。これは、クローズが「正常」だったことを示すフレームワークへのシグナルです。メッセージのデコード中にストリームが閉じられた場合、代わりに他の例外がスローされます。

バージョン 5.2 から、SoftEndOfStreamException は IOException を継承する代わりに RuntimeException になりました。

TCP キャッシングクライアント接続ファクトリ

前述したように、TCP ソケットは「使い捨て」 (1 つのリクエストまたはレスポンス) または共有することができます。共有ソケットは、一度に 1 つのリクエストまたはレスポンスしか処理できないため、大容量環境の送信ゲートウェイでは適切なパフォーマンスを発揮しません。

パフォーマンスを向上させるために、ゲートウェイの代わりに共同チャネルアダプターを使用できますが、これにはアプリケーションレベルのメッセージ相関が必要です。詳細については、TCP メッセージ相関を参照してください。

Spring Integration 2.2 は、共有ソケットのプールを使用するキャッシングクライアント接続ファクトリを導入しました。これにより、ゲートウェイは共有接続のプールを使用して複数の同時リクエストを処理できます。

TCP フェールオーバークライアント接続ファクトリ

1 つ以上の他のサーバーへのフェイルオーバーをサポートする TCP 接続ファクトリを構成できます。メッセージを送信する場合、ファクトリは、メッセージが送信されるか接続が見つからなくなるまで、構成されているすべてのファクトリを反復処理します。最初に、構成済みリストの最初のファクトリが使用されます。その後接続が失敗すると、次のファクトリが現在のファクトリになります。次の例は、フェールオーバークライアント接続ファクトリを構成する方法を示しています。

<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
    <constructor-arg>
        <list>
            <ref bean="clientFactory1"/>
            <ref bean="clientFactory2"/>
        </list>
    </constructor-arg>
</bean>
フェイルオーバー接続ファクトリを使用する場合、singleUse プロパティは、ファクトリ自体と、使用するように構成されているファクトリのリストの間で一貫している必要があります。

接続ファクトリには、共有接続(singleUse=false)で使用した場合のフェイルバックに関連する 2 つのプロパティがあります。

  • refreshSharedInterval

  • closeOnRefresh

上記の構成に基づいて、次のシナリオを検討してください。clientFactory1 は接続を確立できないが、clientFactory2 は接続を確立できるとします。refreshSharedInterval が通過した後に failCF getConnection() メソッドが呼び出されると、clientFactory1 を使用して接続を再試行します。成功すると、clientFactory2 への接続が閉じられます。closeOnRefresh が false の場合、「古い」接続は開いたままになり、最初のファクトリがもう一度失敗した場合に将来再利用される可能性があります。

refreshSharedInterval を設定して、その時間が経過した後、最初のファクトリとの再接続のみを試みるようにします。現在の接続が失敗したときに最初のファクトリにのみフェールバックする場合は、Long.MAX_VALUE (デフォルト)に設定します。

リフレッシュによって実際に新しい接続が作成された後、「古い」接続を閉じるように closeOnRefresh を設定します。

これらのプロパティは、デリゲートファクトリのいずれかが CachingClientConnectionFactory である場合は適用されません。これは、接続キャッシュがそこで処理されるためです。その場合、接続を取得するために接続ファクトリのリストが常に参照されます。

バージョン 5.3 以降、これらはデフォルトで Long.MAX_VALUE および true になるため、ファクトリは現在の接続が失敗した場合にのみフェールバックを試みます。以前のバージョンのデフォルトの動作に戻すには、0 および false に設定します。

接続をテストも参照してください。

TCP スレッドアフィニティ接続ファクトリ

Spring Integration バージョン 5.0 は、この接続ファクトリを導入しました。接続は呼び出しスレッドにバインドされ、その接続はスレッドがメッセージを送信するたびに再利用されます。これは、接続が(サーバーまたはネットワークによって)閉じられるか、スレッドが releaseConnection() メソッドを呼び出すまで続きます。接続自体は、別のクライアントファクトリ実装によって提供されます。これは、各スレッドが接続を取得できるように、非共有(単一使用)接続を提供するように構成する必要があります。

次の例は、TCP スレッドアフィニティ接続ファクトリを構成する方法を示しています。

@Bean
public TcpNetClientConnectionFactory cf() {
    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
            Integer.parseInt(System.getProperty(PORT)));
    cf.setSingleUse(true);
    return cf;
}

@Bean
public ThreadAffinityClientConnectionFactory tacf() {
    return new ThreadAffinityClientConnectionFactory(cf());
}

@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
    TcpOutboundGateway outGate = new TcpOutboundGateway();
    outGate.setConnectionFactory(tacf());
    outGate.setReplyChannelName("toString");
    return outGate;
}