@MessageEndpoint public class MessageChannelPartitionHandler extends java.lang.Object implements PartitionHandler, org.springframework.beans.factory.InitializingBean
MessageChannel
インスタンスを使用してリモートワーカーに命令を送信し、それらのレスポンスを受信する PartitionHandler
。MessageChannel
は優れた抽象化を提供するため、ワーカーの場所と、それらとの通信に使用されるトランスポートを実行時に変更できます。リモートとの通信ワーカーは、トランザクションである必要はなく、配信が保証されている必要もありません。そのため、ローカルスレッドプールベースの実装は、リモート Web サービスまたは JMS 実装と同様に機能します。リモートワーカーが失敗した場合、ジョブは失敗し、再起動して欠落しているメッセージをピックアップして処理することができます。リモートワーカーは、Spring Batch JobRepository
にアクセスして、これらの再起動全体の共有状態を一元的に管理できるようにする必要があります。MessageChannel
はワーカーにリクエストを送信するために使用されますが、ワーカーのレスポンスは次の 2 つの方法のいずれかで取得できます。コンストラクターと説明 |
---|
MessageChannelPartitionHandler() |
修飾子と型 | メソッドと説明 |
---|---|
void | afterPropertiesSet() |
java.util.List<?> | aggregate(java.util.List<?> messages) |
java.util.Collection<StepExecution> | handle(StepExecutionSplitter stepExecutionSplitter, StepExecution masterStepExecution) StepExecutionRequest オブジェクトを MessagingTemplate のリクエストチャネルに送信し、その結果を応答チャネルで StepExecution のリストとして受信します。 |
void | setDataSource(javax.sql.DataSource dataSource) ジョブリポジトリを指す DataSource |
void | setGridSize(int gridSize) handle(StepExecutionSplitter, StepExecution) メソッドで StepExecutionSplitter に渡され、理想的には必要な StepExecution インスタンスの数を指示します。 |
void | setJobExplorer(JobExplorer jobExplorer) ジョブリポジトリの照会に使用する JobExplorer 。 |
void | setMessagingOperations(org.springframework.integration.core.MessagingTemplate messagingGateway) リモートワーカーとメッセージを送受信するための事前設定されたゲートウェイ。 |
void | setPollInterval(long pollInterval) ワーカーのステータスについてジョブリポジトリをポーリングする頻度。 |
void | setReplyChannel(org.springframework.messaging.PollableChannel replyChannel) |
void | setStepName(java.lang.String stepName) パーティション化された StepExecution を実行するために使用される Step の名前。 |
void | setTimeout(long timeout) ジョブリポジトリのポーリングを使用する場合、待機する時間制限。 |
public void afterPropertiesSet() throws java.lang.Exception
org.springframework.beans.factory.InitializingBean
の afterPropertiesSet
java.lang.Exception
public void setTimeout(long timeout)
timeout
- 待機するミリ秒。デフォルトは -1(タイムアウトなし)です。public void setJobExplorer(JobExplorer jobExplorer)
JobExplorer
。ジョブリポジトリのポーリングを使用する場合は、これまたは DataSource
のいずれかが必要です。jobExplorer
- ルックアップに使用する JobExplorer
public void setPollInterval(long pollInterval)
pollInterval
- ポーリング間のミリ秒、デフォルトは 10000 (10 秒)。public void setDataSource(javax.sql.DataSource dataSource)
DataSource
dataSource
- ジョブリポジトリのストアを指す DataSource
public void setMessagingOperations(org.springframework.integration.core.MessagingTemplate messagingGateway)
StepExecutionRequest
ペイロードを受け入れることができるリクエストチャネル StepExecution
結果のリストを返す返信チャネルmessagingGateway
- 設定する MessagingTemplate
public void setGridSize(int gridSize)
handle(StepExecutionSplitter, StepExecution)
メソッドで StepExecutionSplitter
に渡され、理想的には必要な StepExecution
インスタンスの数を指示します。StepExecutionSplitter
は、入力データパーティションを保持する必要があるため、再起動の場合にグリッドサイズを無視できます。gridSize
- 作成されるステップ実行の数 public void setStepName(java.lang.String stepName)
StepExecution
を実行するために使用される Step
の名前。これは通常の Spring Batch ステップであり、StepExecution
コンテキストの入力パラメーターに基づいて実行を完了するために必要なすべてのビジネスロジックが含まれています。名前は、リモートワーカーによって Step
インスタンスに変換されます。stepName
- ビジネスロジックを実行する Step
インスタンスの名前 @Aggregator(sendPartialResultsOnExpiry="true") public java.util.List<?> aggregate(@Payloads java.util.List<?> messages)
messages
- 集約されるメッセージ public void setReplyChannel(org.springframework.messaging.PollableChannel replyChannel)
public java.util.Collection<StepExecution> handle(StepExecutionSplitter stepExecutionSplitter, StepExecution masterStepExecution) throws java.lang.Exception
StepExecutionRequest
オブジェクトを MessagingTemplate
のリクエストチャネルに送信し、その結果を応答チャネルで StepExecution
のリストとして受信します。個々のリモート応答のアグリゲーターとして aggregate(List)
メソッドを使用します。すべての作業が行われる可能性が高くなるように、受信タイムアウトは MessagingTemplate
と アグリゲーターで現実的に設定する必要があります。PartitionHandler
の handle
stepExecutionSplitter
- StepExecution
インスタンスのコレクションを生成するための戦略 masterStepExecution
- パーティション全体のマスターステップの実行 StepExecution
インスタンスのコレクション java.lang.Exception
- 何かがうまくいかない場合。これにより、実装は自由になり、必要に応じて呼び出し元が例外をステップ障害に変換することに依存することができます。PartitionHandler.handle(StepExecutionSplitter, StepExecution)