スプリッター
スプリッターは、メッセージをいくつかの部分に分割し、結果のメッセージを送信して個別に処理するロールを持つコンポーネントです。非常に多くの場合、アグリゲーターを含むパイプラインの上流のプロデューサーです。
プログラミングモデル
分割を実行するための API は、1 つの基本クラス AbstractMessageSplitter で構成されています。これは、生成されるメッセージに適切なメッセージヘッダー(CORRELATION_ID、SEQUENCE_SIZE、SEQUENCE_NUMBER)を入力するなど、スプリッターに共通の機能をカプセル化する MessageHandler 実装です。この入力により、メッセージとその処理結果を追跡できます(通常のシナリオでは、これらのヘッダーは、さまざまな変換エンドポイントによって生成されるメッセージにコピーされます)。値は、たとえば、構成されたメッセージプロセッサー (英語) によって使用できます。
次の例は、AbstractMessageSplitter からの抜粋を示しています。
public abstract class AbstractMessageSplitter
extends AbstractReplyProducingMessageConsumer {
...
protected abstract Object splitMessage(Message<?> message);
} アプリケーションに特定のスプリッターを実装するには、AbstractMessageSplitter を継承し、メッセージを分割するためのロジックを含む splitMessage メソッドを実装します。戻り値は次のいずれかです。
Collectionまたはメッセージの配列、あるいはメッセージを反復処理するIterable(またはIterator)。この場合、メッセージはメッセージとして送信されます(CORRELATION_ID、SEQUENCE_SIZE、SEQUENCE_NUMBERが入力された後)。このアプローチを使用すると、より詳細な制御が可能になります。たとえば、分割プロセスの一部としてカスタムメッセージヘッダーにデータを入力できます。Collectionまたは非メッセージオブジェクトの配列、または非メッセージオブジェクトを反復処理するIterable(またはIterator)。各コレクション要素がメッセージペイロードとして使用されることを除いて、前のケースと同様に機能します。このアプローチを使用すると、メッセージングシステムを考慮することなくドメインオブジェクトに集中でき、テストしやすいコードを生成できます。Messageまたは非メッセージオブジェクト(ただし、コレクションまたは配列ではありません)。単一のメッセージが送信されることを除いて、前のケースと同様に機能します。
Spring Integration では、単一の引数を受け取り、戻り値を持つメソッドを定義していれば、任意の POJO で分割アルゴリズムを実装できます。この場合、メソッドの戻り値は前述のように解釈されます。入力引数は Message または単純な POJO のいずれかです。後者の場合、スプリッターは受信メッセージのペイロードを受け取ります。このアプローチはコードを Spring Integration API から分離し、一般的にテストが容易になるため、推奨されます。
イテレータ
バージョン 4.1 から、AbstractMessageSplitter は value が分割する Iterator 型をサポートします。Iterator (または Iterable)の場合、基になるアイテムの数にアクセスできず、SEQUENCE_SIZE ヘッダーが 0 に設定されていることに注意してください。つまり、<aggregator> のデフォルト SequenceSizeReleaseStrategy は機能せず、splitter から CORRELATION_ID のグループは解放されません。incomplete のままになります。この場合、適切なカスタム ReleaseStrategy を使用するか、group-timeout または MessageGroupStoreReaper とともに send-partial-result-on-expiry に依存する必要があります。
バージョン 5.0 から、AbstractMessageSplitter は protected obtainSizeIfPossible() メソッドを提供し、可能であれば Iterable および Iterator オブジェクトのサイズを決定できます。たとえば、XPathMessageSplitter は、基になる NodeList オブジェクトのサイズを判別できます。また、バージョン 5.0.9 以降、このメソッドは com.fasterxml.jackson.core.TreeNode のサイズも適切に返します。
Iterator オブジェクトは、分割する前にメモリ内にコレクション全体を構築する必要を回避できます。例: 基礎となるアイテムが、反復またはストリームを使用して外部システム(例: DataBase または FTP MGET)から取り込まれた場合。
ストリームと Flux
バージョン 5.0 以降、AbstractMessageSplitter は、value が分割する Java Stream および Reactive Streams Publisher 型をサポートします。この場合、ターゲット Iterator は反復機能に基づいて構築されます。
さらに、スプリッターの出力チャンネルが ReactiveStreamsSubscribableChannel のインスタンスである場合、AbstractMessageSplitter は Iterator の代わりに Flux の結果を生成し、出力チャンネルは、ダウンストリームフローデマンドでのバックプレッシャーベースの分割のためにこの Flux にサブスクライブされます。
バージョン 5.2 以降、スプリッターは、分割関数が空のコンテナー(コレクション、配列、ストリーム、Flux など)を返したリクエストメッセージを送信するための discardChannel オプションをサポートします。この場合、outputChannel への送信のために反復処理するアイテムが存在しないことになります。null の分割結果は、フローの終了を示す指標として残ります。
Java、Groovy、Kotlin DSL を使用したスプリッターの構成
Message とその DSL 構成の反復可能なペイロードに基づく単純なスプリッターの例:
Java DSL
Kotlin DSL
Groovy DSL
@Bean
public IntegrationFlow someFlow() {
return f -> f.split(Message.class, Message::getPayload);
}
@Bean
fun someFlow() =
integrationFlow {
split<Message<*>> { it.payload }
}
@Bean
someFlow() {
integrationFlow {
splitWith {
expectedType Message<?>
function { it.payload }
}
}
}
DSL の詳細については、次の各章を参照してください。
XML を使用したスプリッターの構成
スプリッターは、次のように XML で構成できます。
<int:channel id="inputChannel"/>
<int:splitter id="splitter" (1)
ref="splitterBean" (2)
method="split" (3)
input-channel="inputChannel" (4)
output-channel="outputChannel" (5)
discard-channel="discardChannel" /> (6)
<int:channel id="outputChannel"/>
<beans:bean id="splitterBean" class="sample.PojoSplitter"/>| 1 | スプリッターの ID はオプションです。 |
| 2 | アプリケーションコンテキストで定義された Bean への参照。Bean は、前のセクションで説明した分割ロジックを実装する必要があります。オプション。Bean への参照が指定されていない場合、input-channel に到着したメッセージのペイロードは java.util.Collection の実装であると想定され、コレクションにはデフォルトの分割ロジックが適用され、個々の要素がメッセージに組み入れられて output-channel に送信されます。 |
| 3 | 分割ロジックを実装するメソッド(Bean で定義)。オプション。 |
| 4 | スプリッターの入力チャンネル。必須。 |
| 5 | スプリッターが受信メッセージを分割した結果を送信するチャネル。オプション(受信メッセージ自体が応答チャネルを指定できるため)。 |
| 6 | 分割結果が空の場合にリクエストメッセージを送信するチャネル。オプション(null の場合と同様に停止します)。 |
カスタムスプリッターの実装を他の <splitter> 定義で参照できる場合は、ref 属性を使用することをお勧めします。ただし、カスタムスプリッターハンドラーの実装のスコープを <splitter> の単一の定義に限定する必要がある場合は、次の例のように内部 Bean 定義を構成できます。
<int:splitter id="testSplitter" input-channel="inChannel" method="split"
output-channel="outChannel">
<beans:bean class="org.foo.TestSplitter"/>
</int:splitter> 同じ <int:splitter> 構成で ref 属性と内部ハンドラー定義の両方を使用することは許可されません。曖昧な状態を作成し、例外がスローされるためです。 |
ref 属性が AbstractMessageProducingHandler を継承する Bean を参照する場合(フレームワーク自体が提供するスプリッターなど)、構成は、出力チャネルをハンドラーに直接注入することにより最適化されます。この場合、各 ref は個別の Bean インスタンス(または prototype -scoped Bean)であるか、内部 <bean/> 構成型を使用する必要があります。ただし、この最適化は、スプリッター XML 定義でスプリッター固有の属性を指定しない場合にのみ適用されます。誤って複数の Bean から同じメッセージハンドラーを参照すると、構成例外が発生します。 |
アノテーション付きのスプリッターの構成
@Splitter アノテーションは、Message 型またはメッセージペイロード型のいずれかを期待するメソッドに適用可能であり、メソッドの戻り値は任意の型の Collection でなければなりません。返される値が実際の Message オブジェクトではない場合、各アイテムは Message のペイロードとして Message にラップされます。結果の各 Message は、@Splitter が定義されているエンドポイントの指定された出力チャネルに送信されます。
次の例は、@Splitter アノテーションを使用してスプリッターを構成する方法を示しています。
@Splitter
List<LineItem> extractItems(Order order) {
return order.getItems()
}アノテーションを使用したエンドポイントへのアドバイスおよびファイル分割も参照してください。