スプリッター
スプリッターは、メッセージをいくつかの部分に分割し、結果のメッセージを送信して個別に処理するロールを持つコンポーネントです。非常に多くの場合、アグリゲーターを含むパイプラインの上流のプロデューサーです。
プログラミングモデル
分割を実行するための API は、1 つの基本クラス AbstractMessageSplitter
で構成されています。これは、生成されるメッセージに適切なメッセージヘッダー(CORRELATION_ID
、SEQUENCE_SIZE
、SEQUENCE_NUMBER
)を入力するなど、スプリッターに共通の機能をカプセル化する MessageHandler
実装です。この入力により、メッセージとその処理結果を追跡できます(通常のシナリオでは、これらのヘッダーは、さまざまな変換エンドポイントによって生成されるメッセージにコピーされます)。値は、たとえば、構成されたメッセージプロセッサー (英語) によって使用できます。
次の例は、AbstractMessageSplitter
からの抜粋を示しています。
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
}
アプリケーションに特定のスプリッターを実装するには、AbstractMessageSplitter
を継承し、メッセージを分割するためのロジックを含む splitMessage
メソッドを実装します。戻り値は次のいずれかです。
Collection
またはメッセージの配列、あるいはメッセージを反復処理するIterable
(またはIterator
)。この場合、メッセージはメッセージとして送信されます(CORRELATION_ID
、SEQUENCE_SIZE
、SEQUENCE_NUMBER
が入力された後)。このアプローチを使用すると、より詳細な制御が可能になります。たとえば、分割プロセスの一部としてカスタムメッセージヘッダーにデータを入力できます。Collection
または非メッセージオブジェクトの配列、または非メッセージオブジェクトを反復処理するIterable
(またはIterator
)。各コレクション要素がメッセージペイロードとして使用されることを除いて、前のケースと同様に機能します。このアプローチを使用すると、メッセージングシステムを考慮することなくドメインオブジェクトに集中でき、テストしやすいコードを生成できます。Message
または非メッセージオブジェクト(ただし、コレクションまたは配列ではありません)。単一のメッセージが送信されることを除いて、前のケースと同様に機能します。
Spring Integration では、単一の引数を受け入れ、戻り値を持つメソッドを定義していれば、どの POJO も分割アルゴリズムを実装できます。この場合、メソッドの戻り値は前述のように解釈されます。入力引数は、Message
または単純な POJO のいずれかです。後者の場合、スプリッターは受信メッセージのペイロードを受信します。この方法は、Spring Integration API からコードを切り離し、通常はテストが簡単なので、この方法をお勧めします。
イテレータ
バージョン 4.1 から、AbstractMessageSplitter
は value
が分割する Iterator
型をサポートします。Iterator
(または Iterable
)の場合、基になるアイテムの数にアクセスできず、SEQUENCE_SIZE
ヘッダーが 0
に設定されていることに注意してください。つまり、<aggregator>
のデフォルト SequenceSizeReleaseStrategy
は機能せず、splitter
から CORRELATION_ID
のグループは解放されません。incomplete
のままになります。この場合、適切なカスタム ReleaseStrategy
を使用するか、group-timeout
または MessageGroupStoreReaper
とともに send-partial-result-on-expiry
に依存する必要があります。
バージョン 5.0 から、AbstractMessageSplitter
は protected obtainSizeIfPossible()
メソッドを提供し、可能であれば Iterable
および Iterator
オブジェクトのサイズを決定できます。たとえば、XPathMessageSplitter
は、基になる NodeList
オブジェクトのサイズを判別できます。また、バージョン 5.0.9 以降、このメソッドは com.fasterxml.jackson.core.TreeNode
のサイズも適切に返します。
Iterator
オブジェクトは、分割する前にメモリ内にコレクション全体を構築する必要を回避できます。例: 基礎となるアイテムが、反復またはストリームを使用して外部システム(例: DataBase または FTP MGET
)から取り込まれた場合。
ストリームと Flux
バージョン 5.0 以降、AbstractMessageSplitter
は、value
が分割する Java Stream
および Reactive Streams Publisher
型をサポートします。この場合、ターゲット Iterator
は反復機能に基づいて構築されます。
さらに、スプリッターの出力チャンネルが ReactiveStreamsSubscribableChannel
のインスタンスである場合、AbstractMessageSplitter
は Iterator
の代わりに Flux
の結果を生成し、出力チャンネルは、ダウンストリームフローデマンドでのバックプレッシャーベースの分割のためにこの Flux
にサブスクライブされます。
バージョン 5.2 以降、スプリッターは、スプリット関数が空のコンテナー(コレクション、配列、ストリーム、Flux
など)を返したリクエストメッセージを送信するための discardChannel
オプションをサポートします。この場合、outputChannel
に送信するために反復する項目はありません。null
の分割結果は、フロー終了インジケータとして残ります。
Java、Groovy、Kotlin DSL を使用したスプリッターの構成
Message
とその DSL 構成の反復可能なペイロードに基づく単純なスプリッターの例:
Java DSL
Kotlin DSL
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
DSL の詳細については、次の各章を参照してください。
XML を使用したスプリッターの構成
スプリッターは、次のように XML で構成できます。
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 | スプリッターの ID はオプションです。 |
2 | アプリケーションコンテキストで定義された Bean への参照。Bean は、前のセクションで説明したように、分割ロジックを実装する必要があります。オプション。Bean への参照が提供されない場合、input-channel に到着したメッセージのペイロードは java.util.Collection の実装であり、デフォルトの分割ロジックがコレクションに適用され、個々の要素がメッセージに組み込まれて送信されると想定されます。output-channel へ。 |
3 | 分割ロジックを実装するメソッド(Bean で定義)。オプション。 |
4 | スプリッターの入力チャンネル。必須。 |
5 | スプリッターが受信メッセージを分割した結果を送信するチャネル。オプション(受信メッセージ自体が応答チャネルを指定できるため)。 |
6 | 空の分割結果の場合にリクエストメッセージが送信されるチャネル。オプション (null 結果の場合と同様に停止します)。 |
カスタムスプリッターの実装を他の <splitter>
定義で参照できる場合は、ref
属性を使用することをお勧めします。ただし、カスタムスプリッターハンドラーの実装のスコープを <splitter>
の単一の定義に限定する必要がある場合は、次の例のように内部 Bean 定義を構成できます。
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
同じ <int:splitter> 構成で ref 属性と内部ハンドラー定義の両方を使用することは許可されません。曖昧な状態を作成し、例外がスローされるためです。 |
ref 属性が AbstractMessageProducingHandler を継承する Bean を参照する場合(フレームワーク自体が提供するスプリッターなど)、構成は、出力チャネルをハンドラーに直接注入することにより最適化されます。この場合、各 ref は個別の Bean インスタンス(または prototype -scoped Bean)であるか、内部 <bean/> 構成型を使用する必要があります。ただし、この最適化は、スプリッター XML 定義でスプリッター固有の属性を指定しない場合にのみ適用されます。誤って複数の Bean から同じメッセージハンドラーを参照すると、構成例外が発生します。 |
アノテーション付きのスプリッターの構成
@Splitter
アノテーションは、Message
型またはメッセージペイロード型のいずれかを期待するメソッドに適用可能であり、メソッドの戻り値は任意の型の Collection
でなければなりません。返される値が実際の Message
オブジェクトではない場合、各アイテムは Message
のペイロードとして Message
にラップされます。結果の各 Message
は、@Splitter
が定義されているエンドポイントの指定された出力チャネルに送信されます。
次の例は、@Splitter
アノテーションを使用してスプリッターを構成する方法を示しています。
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}
アノテーションを使用したエンドポイントへのアドバイスおよびファイル分割も参照してください。