TCP メッセージ相関

IP エンドポイントの 1 つのゴールは、Spring Integration アプリケーション以外のシステムとの通信を提供することです。このため、デフォルトではメッセージペイロードのみが送受信されます。3.0 以降、JSON、Java 直列化、カスタムシリアライザーとデシリアライザーを使用してヘッダーを転送できます。詳細については、ヘッダーの転送を参照してください。フレームワーク(ゲートウェイを使用する場合を除く)またはサーバー側のチャネルアダプターのコラボレーションでは、メッセージの相関は提供されません。このドキュメントの後半では、アプリケーションで利用可能なさまざまな相関手法について説明します。ほとんどの場合、これには、メッセージペイロードに自然な相関データ(オーダー番号など)が含まれている場合でも、メッセージの特定のアプリケーションレベルの相関が必要です。

ゲートウェイ

ゲートウェイはメッセージを自動的に関連付けます。ただし、比較的少量のアプリケーションには送信ゲートウェイを使用する必要があります。すべてのメッセージペアに対して単一の共有接続を使用するように接続ファクトリを構成すると('single-use="false" ')、一度に処理できるメッセージは 1 つだけです。新しいメッセージは、前のメッセージへの応答が受信されるまで待機する必要があります。新しいメッセージを使用して新しい接続を使用するように接続ファクトリを構成する場合('single-use="true" ')、この制限は適用されません。この設定は、共有接続環境よりも高いスループットを提供できますが、メッセージペアごとに新しい接続を開いたり閉じたりするオーバーヘッドが伴います。

大量のメッセージについては、チャネルアダプターのコラボレーションペアの使用を検討してください。ただし、そのためには、コラボレーションロジックを提供する必要があります。

Spring Integration 2.2 で導入された別のソリューションは、CachingClientConnectionFactory を使用することです。これにより、共有接続のプールを使用できます。

送信および受信チャネルアダプターのコラボレーション

大容量のスループットを実現するために(前述のようにゲートウェイを使用する場合の落とし穴を回避するために)、連携する送信および受信チャネルアダプターのペアを構成できます。また、完全に非同期の通信のために、コラボレーションアダプター(サーバー側またはクライアント側)を使用できます(リクエスト - 応答セマンティクスではなく)。サーバー側では、受信アダプターが応答メッセージの送信時に使用する接続を送信アダプターが決定できるヘッダーを追加するため、メッセージ相関はアダプターによって自動的に処理されます。

サーバー側では、ip_connectionId ヘッダーを設定する必要があります。これは、メッセージと接続を関連付けるために使用されるためです。受信アダプターから発信されるメッセージには、自動的にヘッダーが設定されます。送信する他のメッセージを作成する場合は、ヘッダーを設定する必要があります。受信メッセージからヘッダー値を取得できます。

クライアント側では、アプリケーションは必要に応じて独自の相関ロジックを提供する必要があります。これにはいくつかの方法があります。

メッセージペイロードに自然な相関データ(トランザクション ID やオーダー番号など)があり、元の送信メッセージからの情報(応答チャネルヘッダーなど)を保持する必要がない場合、相関は単純で、いずれにしても、アプリケーションレベルで行われます。

メッセージペイロードに自然な相関データ(トランザクション ID やオーダー番号など)が含まれているが、元の送信メッセージからの情報(応答チャネルヘッダーなど)を保持する必要がある場合、元の送信メッセージのコピーを保持できます。発信メッセージ(パブリッシュ / サブスクライブチャネルを使用するなど)を使用し、アグリゲーターを使用して必要なデータを再結合します。

前の 2 つのシナリオのいずれかで、ペイロードに自然な相関データがない場合、送信チャネルアダプターの上流にトランスフォーマーを提供して、そのようなデータでペイロードを強化できます。このようなトランスフォーマーは、元のペイロードを、元のペイロードとメッセージヘッダーのサブセットの両方を含む新しいオブジェクトに変換できます。もちろん、ヘッダーからのライブオブジェクト(応答チャネルなど)は、変換されたペイロードに含めることはできません。

このような戦略を選択する場合は、接続ファクトリに適切なシリアライザーとデシリアライザーのペアがあり、そのようなペイロードを処理する必要があります(java 直列化を使用する DefaultSerializer や DefaultDeserializer、またはカスタムシリアライザーとデシリアライザー)。デフォルト ByteArrayCrLfSerializer を含む TCP 接続ファクトリでメンションされた ByteArray*Serializer オプションは、変換されたペイロードが String または byte[] でない限り、そのようなペイロードをサポートしません。

2.2 リリースの前は、コラボレーションチャネルアダプターがクライアント接続ファクトリを使用していた場合、so-timeout 属性はデフォルトの応答タイムアウト(10 秒)にデフォルト設定されていました。これは、この時間内に受信アダプターがデータを受信しなかった場合、ソケットが閉じられたことを意味します。

このデフォルトの動作は、真の非同期環境では適切ではなかったため、デフォルトで無限タイムアウトになりました。クライアント接続ファクトリの so-timeout 属性を 10000 ミリ秒に設定することにより、以前のデフォルトの動作に戻すことができます。

バージョン 5.4 以降、複数の送信チャネルアダプターと 1 つの TcpInboundChannelAdapter が同じ接続ファクトリを共有できます。これにより、アプリケーションはリクエスト / 応答と任意のサーバー→クライアントメッセージングの両方をサポートできます。詳細については、TCP ゲートウェイを参照してください。

ヘッダーの転送

TCP はストリーミングプロトコルです。Serializers および Deserializers は、ストリーム内のメッセージを区別します。3.0 より前は、メッセージペイロード(String または byte[])のみが TCP を介して転送できました。3.0 からは、選択したヘッダーとペイロードを転送できます。ただし、replyChannel ヘッダーなどの「ライブ」オブジェクトは直列化できません。

TCP でヘッダー情報を送信するには、追加の構成が必要です。

最初のステップは、mapper 属性を使用する MessageConvertingTcpMessageMapper を ConnectionFactory に提供することです。このマッパーは、任意の MessageConverter 実装に委譲して、構成された serializer および deserializer によってシリアライズおよびデシリアライズできるオブジェクトとの間でメッセージを変換します。

Spring Integration は MapMessageConverter を提供します。これにより、ペイロードとともに Map オブジェクトに追加されるヘッダーのリストを指定できます。生成されたマップには、payload と headers の 2 つのエントリがあります。headers エントリ自体が Map であり、選択されたヘッダーが含まれています。

2 番目のステップは、Map と何らかのワイヤー形式の間で変換できるシリアライザーとデシリアライザーを提供することです。これは、ピアシステムが Spring Integration アプリケーションでない場合に通常必要となるカスタム Serializer または Deserializer にすることができます。

Spring Integration は、MapJsonSerializer を提供して、Map と JSON を相互に変換します。Spring Integration JsonObjectMapper を使用します。必要に応じて、カスタム JsonObjectMapper を提供できます。デフォルトでは、シリアライザーはオブジェクト間にラインフィード(0x0a)文字を挿入します。詳細については、Javadoc を参照してください。

JsonObjectMapper は、クラスパスにある Jackson のバージョンを使用します。

DefaultSerializer および DefaultDeserializer を使用して、Map の標準 Java 直列化を使用することもできます。

次の例は、JSON を使用して correlationIdsequenceNumbersequenceSize ヘッダーを転送する接続ファクトリの構成を示しています。

<int-ip:tcp-connection-factory id="client"
    type="client"
    host="localhost"
    port="12345"
    mapper="mapper"
    serializer="jsonSerializer"
    deserializer="jsonSerializer"/>

<bean id="mapper"
      class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
    <constructor-arg name="messageConverter">
        <bean class="o.sf.integration.support.converter.MapMessageConverter">
            <property name="headerNames">
                <list>
                    <value>correlationId</value>
                    <value>sequenceNumber</value>
                    <value>sequenceSize</value>
                </list>
            </property>
        </bean>
    </constructor-arg>
</bean>

<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />

上記の構成で送信され、「何か」のペイロードを持つメッセージは、次のようにワイヤに表示されます。

{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}