スプリッター

スプリッターは、メッセージをいくつかの部分に分割し、結果のメッセージを送信して個別に処理するロールを持つコンポーネントです。非常に多くの場合、アグリゲーターを含むパイプラインの上流のプロデューサーです。

プログラミングモデル

分割を実行するための API は、1 つの基本クラス AbstractMessageSplitter で構成されています。これは、生成されるメッセージに適切なメッセージヘッダー(CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER)を入力するなど、スプリッターに共通の機能をカプセル化する MessageHandler 実装です。この入力により、メッセージとその処理結果を追跡できます(通常のシナリオでは、これらのヘッダーは、さまざまな変換エンドポイントによって生成されるメッセージにコピーされます)。値は、たとえば、構成されたメッセージプロセッサー (英語) によって使用できます。

次の例は、AbstractMessageSplitter からの抜粋を示しています。

public abstract class AbstractMessageSplitter
    extends AbstractReplyProducingMessageConsumer {
    ...
    protected abstract Object splitMessage(Message<?> message);

}

アプリケーションに特定のスプリッターを実装するには、AbstractMessageSplitter を継承し、メッセージを分割するためのロジックを含む splitMessage メソッドを実装します。戻り値は次のいずれかです。

  • Collection またはメッセージの配列、あるいはメッセージを反復処理する Iterable (または Iterator)。この場合、メッセージはメッセージとして送信されます(CORRELATION_IDSEQUENCE_SIZESEQUENCE_NUMBER が入力された後)。このアプローチを使用すると、より詳細な制御が可能になります。たとえば、分割プロセスの一部としてカスタムメッセージヘッダーにデータを入力できます。

  • Collection または非メッセージオブジェクトの配列、または非メッセージオブジェクトを反復処理する Iterable (または Iterator)。各コレクション要素がメッセージペイロードとして使用されることを除いて、前のケースと同様に機能します。このアプローチを使用すると、メッセージングシステムを考慮することなくドメインオブジェクトに集中でき、テストしやすいコードを生成できます。

  • Message または非メッセージオブジェクト(ただし、コレクションまたは配列ではありません)。単一のメッセージが送信されることを除いて、前のケースと同様に機能します。

Spring Integration では、単一の引数を受け入れ、戻り値を持つメソッドを定義していれば、どの POJO も分割アルゴリズムを実装できます。この場合、メソッドの戻り値は前述のように解釈されます。入力引数は、Message または単純な POJO のいずれかです。後者の場合、スプリッターは受信メッセージのペイロードを受信します。この方法は、Spring Integration API からコードを切り離し、通常はテストが簡単なので、この方法をお勧めします。

イテレータ

バージョン 4.1 から、AbstractMessageSplitter は value が分割する Iterator 型をサポートします。Iterator (または Iterable)の場合、基になるアイテムの数にアクセスできず、SEQUENCE_SIZE ヘッダーが 0 に設定されていることに注意してください。つまり、<aggregator> のデフォルト SequenceSizeReleaseStrategy は機能せず、splitter から CORRELATION_ID のグループは解放されません。incomplete のままになります。この場合、適切なカスタム ReleaseStrategy を使用するか、group-timeout または MessageGroupStoreReaper とともに send-partial-result-on-expiry に依存する必要があります。

バージョン 5.0 から、AbstractMessageSplitter は protected obtainSizeIfPossible() メソッドを提供し、可能であれば Iterable および Iterator オブジェクトのサイズを決定できます。たとえば、XPathMessageSplitter は、基になる NodeList オブジェクトのサイズを判別できます。また、バージョン 5.0.9 以降、このメソッドは com.fasterxml.jackson.core.TreeNode のサイズも適切に返します。

Iterator オブジェクトは、分割する前にメモリ内にコレクション全体を構築する必要を回避できます。例: 基礎となるアイテムが、反復またはストリームを使用して外部システム(例: DataBase または FTP MGET)から取り込まれた場合。

ストリームとフラックス

バージョン 5.0 以降、AbstractMessageSplitter は、value が分割する Java Stream および Reactive Streams Publisher 型をサポートします。この場合、ターゲット Iterator は反復機能に基づいて構築されます。

さらに、スプリッターの出力チャンネルが ReactiveStreamsSubscribableChannel のインスタンスである場合、AbstractMessageSplitter は Iterator の代わりに Flux の結果を生成し、出力チャンネルは、ダウンストリームフローデマンドでのバックプレッシャーベースの分割のためにこの Flux にサブスクライブされます。

バージョン 5.2 以降、スプリッターは、スプリット関数が空のコンテナー(コレクション、配列、ストリーム、Flux など)を返したリクエストメッセージを送信するための discardChannel オプションをサポートします。この場合、outputChannel に送信するために反復する項目はありません。null の分割結果は、フロー終了インジケータとして残ります。

XML を使用したスプリッターの構成

スプリッターは、次のように XML で構成できます。

<int:channel id="inputChannel"/>

<int:splitter id="splitter"           (1)
  ref="splitterBean"                  (2)
  method="split"                      (3)
  input-channel="inputChannel"        (4)
  output-channel="outputChannel"      (5)
  discard-channel="discardChannel" /> (6)

<int:channel id="outputChannel"/>

<beans:bean id="splitterBean" class="sample.PojoSplitter"/>
1 スプリッターの ID はオプションです。
2 アプリケーションコンテキストで定義された Bean への参照。Bean は、前のセクションで説明したように、分割ロジックを実装する必要があります。オプション。Bean への参照が提供されない場合、input-channel に到着したメッセージのペイロードは java.util.Collection の実装であり、デフォルトの分割ロジックがコレクションに適用され、個々の要素がメッセージに組み込まれて送信されると想定されます。output-channel へ。
3 分割ロジックを実装するメソッド(Bean で定義)。オプション。
4 スプリッターの入力チャンネル。必須。
5 スプリッターが受信メッセージを分割した結果を送信するチャネル。オプション(受信メッセージ自体が応答チャネルを指定できるため)。
6 分割結果が空の場合にリクエストメッセージが送信されるチャネル。オプション(null 結果の場合のように停止します)。

カスタムスプリッター実装を他の <splitter> 定義で参照できる場合は、ref 属性を使用することをお勧めします。ただし、カスタムスプリッターハンドラーの実装を <splitter> の単一の定義にスコープする必要がある場合、次の例のように、内部 Bean 定義を構成できます。

<int:splitter id="testSplitter" input-channel="inChannel" method="split"
                output-channel="outChannel">
  <beans:bean class="org.foo.TestSplitter"/>
</int:splitter>
同じ <int:splitter> 構成で ref 属性と内部ハンドラー定義の両方を使用することは許可されません。曖昧な状態を作成し、例外がスローされるためです。
ref 属性が AbstractMessageProducingHandler を継承する Bean を参照する場合(フレームワーク自体が提供するスプリッターなど)、構成は、出力チャネルをハンドラーに直接注入することにより最適化されます。この場合、各 ref は個別の Bean インスタンス(または prototype -scoped Bean)であるか、内部 <bean/> 構成型を使用する必要があります。ただし、この最適化は、スプリッター XML 定義でスプリッター固有の属性を指定しない場合にのみ適用されます。誤って複数の Bean から同じメッセージハンドラーを参照すると、構成例外が発生します。

アノテーション付きのスプリッターの構成

@Splitter アノテーションは、Message 型またはメッセージペイロード型のいずれかを期待するメソッドに適用可能であり、メソッドの戻り値は任意の型の Collection でなければなりません。返される値が実際の Message オブジェクトではない場合、各アイテムは Message のペイロードとして Message にラップされます。結果の各 Message は、@Splitter が定義されているエンドポイントの指定された出力チャネルに送信されます。

次の例は、@Splitter アノテーションを使用してスプリッターを構成する方法を示しています。

@Splitter
List<LineItem> extractItems(Order order) {
    return order.getItems()
}