ノンブロッキング I/O について (NIO)
NIO(IP 設定属性の using-nio
を参照)を使用すると、各ソケットから読み取るためのスレッド専用になりません。少数のソケットの場合、NIO を使用せずに(QueueChannel
などへの)非同期ハンドオフを実行すると、NIO を使用した場合と同等以上のパフォーマンスが得られる可能性があります。
多数の接続を処理する場合は、NIO の使用を検討する必要があります。ただし、NIO の使用には、他にもいくつかの影響があります。(タスクエグゼキューター内の) スレッドのプールは、すべてのソケットで共有されます。各受信メッセージは組み立てられ、そのプールから選択されたスレッドの個別の作業単位として構成されたチャネルに送信されます。同じソケットに到着する 2 つの連続したメッセージは、異なるスレッドによって処理される場合があります。これは、メッセージがチャネルに送信される順序が不確定であることを意味します。ソケットに到着するメッセージの厳密な順序は維持されません。
一部のアプリケーションでは、これは課題ではありません。他の人にとっては、課題です。厳密な順序付けが必要な場合は、using-nio
を false
に設定し、非同期ハンドオフを使用することを検討してください。
または、受信エンドポイントの下流にリシーケンサーを挿入して、メッセージを適切な順序に戻すこともできます。接続ファクトリで apply-sequence
を true
に設定すると、TCP 接続で到着するメッセージには sequenceNumber
および correlationId
ヘッダーが設定されます。リシーケンサーはこれらのヘッダーを使用して、メッセージを適切な順序に戻します。
バージョン 5.1.4 以降、既存の接続からの読み取りよりも新しい接続の受け入れが優先されます。一般に、新しい受信接続の割合が非常に高い場合を除き、これはほとんど影響を与えません。読み取り優先度を与える以前の動作に戻したい場合は、TcpNioServerConnectionFactory の multiAccept プロパティを false に設定します。 |
プールサイズ
プールサイズ属性は使用されなくなりました。以前は、タスクエグゼキューターが指定されていない場合、デフォルトのスレッドプールのサイズを指定していました。サーバーソケットの接続バックログを設定するためにも使用されました。最初の関数は不要になりました(次の段落を参照)。2 番目の関数は、backlog
属性に置き換えられます。
以前は、NIO で固定スレッドプールタスクエグゼキューター(デフォルト)を使用すると、デッドロックが発生して処理が停止することがありました。この問題は、バッファがいっぱいで、ソケットから読み取っているスレッドがバッファにさらにデータを追加しようとしており、バッファ内にスペースを作成するためのスレッドがなかったときに発生しました。これは、プールサイズが非常に小さい場合にのみ発生しましたが、極端な条件下では可能です。2.2 以降、2 つの変更によりこの問題が解消されました。まず、デフォルトのタスクエグゼキューターはキャッシュスレッドプールエグゼキューターです。次に、デッドロック検出ロジックが追加され、デッドロックの代わりにスレッド不足が発生した場合、例外がスローされ、デッドロックされたリソースが解放されます。
デフォルトのタスクエグゼキューターは制限されていないため、メッセージ処理に時間がかかる場合、高いレートの受信メッセージでメモリ不足状態が発生する可能性があります。アプリケーションがこの型の動作を示す場合、適切なプールサイズのプールされたタスクエグゼキューターを使用する必要がありますが、次のセクションを参照してください。 |
CALLER_RUNS
ポリシーを使用したスレッドプールタスクエグゼキュータ
CallerRunsPolicy
(<task/>
名前空間を使用する場合は CALLER_RUNS
)で固定スレッドプールを使用し、キュー容量が小さい場合、いくつかの重要な考慮事項に留意する必要があります。
固定スレッドプールを使用しない場合、以下は適用されません。
NIO 接続では、3 つの異なるタスク型があります。I/O セレクター処理は、1 つの専用スレッドで実行されます(イベントの検出、新しい接続の受け入れ、タスクエグゼキューターを使用した他のスレッドへの I/O 読み取り操作のディスパッチ)。I/O リーダースレッド(読み取り操作のディスパッチ先)がデータを読み取ると、別のスレッドに渡されて受信メッセージを組み立てます。大きなメッセージは、完了するまでに数回の読み取りが必要になる場合があります。これらの「アセンブラ」スレッドは、データを待っている間ブロックできます。新しい読み取りイベントが発生すると、リーダーはこのソケットにすでにアセンブラがあるかどうかを判断し、ない場合は新しいものを実行します。アセンブリプロセスが完了すると、アセンブラスレッドがプールに返されます。
これにより、プールが使い果たされ、CALLER_RUNS
拒否ポリシーが使用され、タスクキューがいっぱいになると、デッドロックが発生する可能性があります。プールが空でキューに空きがない場合、IO セレクタースレッドは OP_READ
イベントを受け取り、executor を使用して読み取りをディスパッチします。キューがいっぱいなので、セレクタースレッド自体が読み取りプロセスを開始します。現在、このソケットにアセンブラーがないことを検出し、読み取りを行う前にアセンブラーを起動します。再び、キューがいっぱいになり、セレクタースレッドがアセンブラーになります。これでアセンブラーはブロックされ、データの読み取りを待機しますが、読み取りは行われません。セレクタスレッドは新しいイベントを処理できないため、接続ファクトリはデッドロックされています。
このデッドロックを回避するには、アセンブリ(タスク)を実行するセレクター(またはリーダー)スレッドを回避する必要があります。IO 操作とアセンブリ操作に別々のプールを使用する必要があります。
フレームワークは CompositeExecutor
を提供します。これにより、2 つの異なるエグゼキューターの構成が可能になります。1 つは IO 操作を実行し、もう 1 つはメッセージアセンブリ用です。この環境では、IO スレッドがアセンブラスレッドになることはなく、デッドロックは発生しません。
さらに、タスク実行プログラムは AbortPolicy
(<task>
を使用する場合は ABORT
)を使用するように構成する必要があります。I/O タスクが完了できない場合、短時間延期され、完了してアセンブラーが割り当てられるまで継続的に再試行されます。デフォルトでは、遅延は 100 ミリ秒ですが、接続ファクトリで readDelay
プロパティを設定することで変更できます(XML 名前空間で構成する場合は read-delay
)。
次の 3 つの例は、複合エグゼキューターの構成方法を示しています。
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>