このバージョンはまだ開発中であり、まだ安定しているとは見なされていません。最新の安定バージョンについては、Spring Integration 6.5.3 を使用してください! |
アグリゲーター
基本的にスプリッターの鏡像であるアグリゲーターは、複数のメッセージを受け取り、単一のメッセージに結合するメッセージハンドラーの一種です。実際、アグリゲーターは多くの場合、スプリッターを含むパイプラインのダウンストリームコンシューマーです。
技術的には、アグリゲータはステートフルであるため、スプリッタよりも複雑です。アグリゲータは、集約するメッセージを保持し、メッセージグループ全体を集約する準備が整ったかどうかを判断しなければなりません。そのためには、MessageStore が必要です。
機能性
アグリゲーターは、グループが完了したと見なされるまで、関連するメッセージのグループを関連付けて保管することにより、グループを結合します。その時点で、アグリゲーターはグループ全体を処理して単一のメッセージを作成し、集約されたメッセージを出力として送信します。
アグリゲーターを実装するには、集約を実行するロジックを提供する必要があります(つまり、多数からの単一メッセージの作成)。関連する 2 つの概念は、相関とリリースです。
相関により、メッセージを集約用にグループ化する方法が決まります。Spring Integration では、IntegrationMessageHeaderAccessor.CORRELATION_ID メッセージヘッダーに基づいて、デフォルトで相関が行われます。同じ IntegrationMessageHeaderAccessor.CORRELATION_ID を持つメッセージはグループ化されます。ただし、相関ストラテジをカスタマイズして、メッセージをグループ化する方法を指定する他の方法を許可できます。そのためには、CorrelationStrategy (この章で後述)を実装できます。
メッセージのグループを処理する準備が整った時点を判断するために、ReleaseStrategy が参照されます。アグリゲーターのデフォルトのリリース戦略は、IntegrationMessageHeaderAccessor.SEQUENCE_SIZE ヘッダーに基づいて、シーケンスに含まれるすべてのメッセージが存在するときにグループをリリースします。カスタム ReleaseStrategy 実装への参照を提供することにより、このデフォルト戦略をオーバーライドできます。
プログラミングモデル
Aggregation API は、いくつかのクラスで構成されています。
インターフェース
MessageGroupProcessorおよびそのサブクラス:MethodInvokingAggregatingMessageGroupProcessorおよびExpressionEvaluatingMessageGroupProcessorReleaseStrategyインターフェースとそのデフォルト実装:SimpleSequenceSizeReleaseStrategyCorrelationStrategyインターフェースとそのデフォルト実装:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
AggregatingMessageHandler (AbstractCorrelatingMessageHandler のサブクラス)は MessageHandler 実装であり、アグリゲーター(および他の相関するユースケース)の共通機能をカプセル化します。
集約するグループにメッセージを関連付ける
グループが解放されるまで
MessageStoreでこれらのメッセージを維持するグループをいつリリースできるかを決定する
リリースされたグループを単一のメッセージに集約する
期限切れのグループの認識と対応
メッセージをグループ化する方法を決定する責任は、CorrelationStrategy インスタンスに委譲されます。メッセージグループを解放できるかどうかを決定する責任は、ReleaseStrategy インスタンスに委譲されます。
次のリストは、ベース AbstractAggregatingMessageGroupProcessor の簡単なハイライトを示しています(aggregatePayloads メソッドを実装する責任は開発者に任されています)。
public abstract class AbstractAggregatingMessageGroupProcessor
implements MessageGroupProcessor {
protected Map<String, Object> aggregateHeaders(MessageGroup group) {
// default implementation exists
}
protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}AbstractAggregatingMessageGroupProcessor のすぐに使用可能な実装として DefaultAggregatingMessageGroupProcessor、ExpressionEvaluatingMessageGroupProcessor、MethodInvokingMessageGroupProcessor を参照してください。
バージョン 5.2 以降、Function<MessageGroup, Map<String, Object>> 戦略は、AbstractAggregatingMessageGroupProcessor が出力メッセージのヘッダーをマージおよび計算(集約)するために使用可能です。DefaultAggregateHeadersFunction 実装は、グループ間で競合のないすべてのヘッダーを返すロジックで使用できます。グループ内の 1 つ以上のメッセージにヘッダーがない場合、競合とは見なされません。競合するヘッダーは省略されます。新しく導入された DelegatingMessageGroupProcessor とともに、この関数は任意の(AbstractAggregatingMessageGroupProcessor 以外の) MessageGroupProcessor 実装に使用されます。基本的に、フレームワークは提供された関数を AbstractAggregatingMessageGroupProcessor インスタンスに注入し、他のすべての実装を DelegatingMessageGroupProcessor にラップします。AbstractAggregatingMessageGroupProcessor と DelegatingMessageGroupProcessor のロジックの違い。後者は、デリゲート戦略を呼び出す前にヘッダーを事前に計算せず、デリゲートが Message または AbstractIntegrationMessageBuilder を返す場合、関数を呼び出しません。その場合、フレームワークは、ターゲット実装が返された結果に含まれる適切なヘッダーセットを生成することを考慮したと想定します。Function<MessageGroup, Map<String, Object>> ストラテジーは、XML 構成の headers-function 参照属性、Java DSL の AggregatorSpec.headersFunction() オプション、プレーン Java 構成の AggregatorFactoryBean.setHeadersFunction() として利用可能です。
CorrelationStrategy は AbstractCorrelatingMessageHandler が所有し、次の例に示すように、IntegrationMessageHeaderAccessor.CORRELATION_ID メッセージヘッダーに基づいたデフォルト値を持っています。
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
...
this.correlationStrategy = correlationStrategy == null ?
new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
...
} メッセージグループの実際の処理に関しては、デフォルトの実装は DefaultAggregatingMessageGroupProcessor です。ペイロードが特定のグループに対して受信されたペイロードの List である単一の Message を作成します。これは、スプリッター、パブリッシュ / サブスクライブチャネル、アップストリームの受信者リストルーターを使用した単純なスキャッターギャザーの実装に適しています。
この型のシナリオでパブリッシュ / サブスクライブチャネルまたは受信者リストルーターを使用する場合は、必ず apply-sequence フラグを有効にしてください。そうすることで、必要なヘッダー CORRELATION_ID、SEQUENCE_NUMBER、SEQUENCE_SIZE が追加されます。この動作は、Spring Integration のスプリッターではデフォルトで有効になっていますが、パブリッシュ / サブスクライブチャネルや受信者リストルーターでは有効になっていません。これらのコンポーネントは、これらのヘッダーが不要なさまざまなコンテキストで使用される可能性があるためです。 |
アプリケーションに特定のアグリゲーター戦略を実装する場合、AbstractAggregatingMessageGroupProcessor を継承して aggregatePayloads メソッドを実装できます。ただし、XML またはアノテーションを使用して構成できる集約ロジックを実装するための、API とはあまり関係のない、より優れたソリューションがあります。
一般に、POJO は、単一の java.util.List を引数として受け入れるメソッドを提供する場合、集約アルゴリズムを実装できます(パラメーター化されたリストもサポートされます)。このメソッドは、次のようにメッセージを集約するために呼び出されます。
引数が
java.util.Collection<T>であり、パラメーター型 T がMessageに割り当て可能な場合、集約のために蓄積されたメッセージのリスト全体が集約器に送信されます。引数がパラメーター化されていない
java.util.Collectionであるか、パラメーター型がMessageに割り当て可能でない場合、メソッドは蓄積されたメッセージのペイロードを受け取ります。戻り値の型が
Messageに割り当て可能でない場合、フレームワークによって自動的に作成されるMessageのペイロードとして扱われます。
| コードをシンプルにし、低カップリング、テスト容易性などのベストプラクティスを促進するために、集約ロジックを実装する推奨する方法は、POJO を使用し、XML またはアノテーションサポートを使用してアプリケーションで構成することです。 |
Starting with version 5.3, after processing a message group, an AbstractCorrelatingMessageHandler performs a MessageBuilder.popSequenceDetails() message headers modification for the proper splitter-aggregator scenario with several nested levels. It is done only if the message group release result is not a collection of messages. In that case a target MessageGroupProcessor is responsible for the MessageBuilder.popSequenceDetails() call while building those messages.
MessageGroupProcessor が Message を返す場合、sequenceDetails がグループ内の最初のメッセージと一致する場合にのみ、出力メッセージに対して MessageBuilder.popSequenceDetails() が実行されます。(以前は、プレーンペイロードまたは AbstractIntegrationMessageBuilder が MessageGroupProcessor から返された場合にのみ、これが行われていました。)
この機能は、新しい popSequence boolean プロパティによって制御できるため、相関の詳細が標準スプリッターによって入力されていないシナリオでは、MessageBuilder.popSequenceDetails() を無効にすることができます。このプロパティは、基本的に、AbstractMessageSplitter 内の最も近いアップストリーム applySequence = true によって実行されたことを元に戻します。詳細については、スプリッターを参照してください。
The SimpleMessageGroup.getMessages() method returns an unmodifiableCollection. Therefore, if an aggregating POJO method has a Collection<Message> parameter, the argument passed in is exactly that Collection instance and, when you use a SimpleMessageStore for the aggregator, that original Collection<Message> is cleared after releasing the group. Consequently, the Collection<Message> variable in the POJO is cleared too, if it is passed out of the aggregator. If you wish to simply release that collection as-is for further processing, you must build a new Collection (for example, new ArrayList<Message>(messages)). Starting with version 4.3, the framework no longer copies the messages to a new collection to avoid undesired extra object creation. |
バージョン 4.2 より前では、XML 構成を使用して MessageGroupProcessor を提供することはできませんでした。集計には POJO メソッドのみを使用できました。ここで、参照された (または内部の) Bean が MessageProcessor を実装していることをフレームワークが検出すると、それがアグリゲーターの出力プロセッサーとして使用されます。
カスタム MessageGroupProcessor からオブジェクトのコレクションをメッセージのペイロードとしてリリースする場合、クラスは AbstractAggregatingMessageGroupProcessor を継承し、aggregatePayloads() を実装する必要があります。
また、バージョン 4.2 以降、SimpleMessageGroupProcessor が提供されています。グループからメッセージのコレクションを返します。これにより、前述のように、リリースされたメッセージが個別に送信されます。
これにより、アグリゲーターはメッセージバリアとして機能し、リリース戦略が実行され、グループが個々のメッセージのシーケンスとしてリリースされるまで、到着したメッセージが保持されます。
バージョン 6.0 以降、上記の分割動作は、グループプロセッサーが SimpleMessageGroupProcessor の場合にのみ機能します。それ以外の場合、Collection<Message> を返す他の MessageGroupProcessor 実装では、メッセージのコレクション全体をペイロードとして単一の応答メッセージのみが発行されます。このようなロジックは、アグリゲーターの標準的な目的 (キーによってリクエストメッセージを収集し、単一のグループ化されたメッセージを生成する) によって決定されます。
Prior to version 6.5, if a MessageGroupProcessor (usually lambda from DSL) returns a collection of payloads, the AbstractCorrelatingMessageHandler has failed with the IllegalArgumentException stating that only collection of messages is possible. From now on such a restriction is eliminated, and a returned collection of payloads is emitted as a single reply message from the aggregator with just headers from the last request message. If header aggregation is required alongside a collection of payloads, an AbstractAggregatingMessageGroupProcessor implementations are recommended to be used instead of plain MessageGroupProcessor functional interface.
ReleaseStrategy
ReleaseStrategy インターフェースは次のように定義されます。
public interface ReleaseStrategy {
boolean canRelease(MessageGroup group);
}In general, any POJO can implement the completion decision logic if it provides a method that accepts a single java.util.List as an argument (parameterized lists are supported as well) and returns a boolean value. This method is invoked after the arrival of each new message to decide whether the group is complete or not, as follows:
引数が
java.util.List<T>であり、パラメーター型TがMessageに割り当て可能な場合、グループに蓄積されたメッセージのリスト全体がメソッドに送信されます。引数がパラメーター化されていない
java.util.Listであるか、パラメーター型がMessageに割り当て可能でない場合、メソッドは蓄積されたメッセージのペイロードを受け取ります。このメソッドは、メッセージグループが集約の準備ができている場合は
trueを返し、それ以外の場合は false を返す必要があります。
次の例は、型 Message の List に @ReleaseStrategy アノテーションを使用する方法を示しています。
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<Message<?>>) {...}
} 次の例は、型 String の List に @ReleaseStrategy アノテーションを使用する方法を示しています。
public class MyReleaseStrategy {
@ReleaseStrategy
public boolean canMessagesBeReleased(List<String>) {...}
} 前の 2 つの例の署名に基づいて、POJO ベースのリリース戦略には、まだリリースされていないメッセージの Collection ( Message 全体へのアクセスが必要な場合) またはペイロードオブジェクトの Collection (型パラメーターが何かの場合) が渡されます。Message 以外)。これは、ほとんどのユースケースを満たします。ただし、何らかの理由で MessageGroup に完全にアクセスする必要がある場合は、ReleaseStrategy インターフェースの実装を提供する必要があります。
グループがリリースされる前にリリース戦略が複数回呼び出される可能性があるため、潜在的に大きなグループを処理する場合、これらのメソッドがどのように呼び出されるかを理解する必要があります。最も効率的なのは、アグリゲーターが直接呼び出すことができるため、 これらの理由から、大規模なグループでは、 |
When the group is released for aggregation, all its not-yet-released messages are processed and removed from the group. If the group is also complete, (that is, if all messages from a sequence have arrived or if there is no sequence defined), then the group is marked as complete. Any new messages for this group are sent to the discard channel (if defined). Setting expire-groups-upon-completion to true (the default is false) removes the entire group, and any new messages (with the same correlation ID as the removed group) form a new group. You can release partial sequences by using a MessageGroupStoreReaper together with send-partial-result-on-expiry being set to true.
バージョン 6.5 以降では、相関ハンドラーを discardIndividuallyOnExpiry オプションで構成して、グループ全体を 1 つのメッセージとして破棄することもできます。基本的に、このメッセージのペイロードは、期限切れのグループからのメッセージのリストです。sendPartialResultOnExpiry が false (デフォルト) に設定され、dicardChannel が提供されている場合にのみ機能します。
To facilitate discarding of late-arriving messages, the aggregator must maintain the state about the group after it has been released. This can eventually cause out-of-memory conditions. To avoid such situations, you should consider configuring a MessageGroupStoreReaper to remove the group metadata. The expiry parameters should be set to expire groups once a point has been reached, after which late messages are not expected to arrive. For information about configuring a reaper, see アグリゲーターでの状態の管理: MessageGroupStore。 |
Spring Integration は、ReleaseStrategy の実装を提供します: SimpleSequenceSizeReleaseStrategy。この実装は、到着する各メッセージの SEQUENCE_NUMBER および SEQUENCE_SIZE ヘッダーを調べて、メッセージグループが完了し、集約の準備ができたときを判断します。前に示したように、これはデフォルトの戦略でもあります。
バージョン 5.0 より前のデフォルトのリリース戦略は SequenceSizeReleaseStrategy でしたが、これは大規模なグループではうまく機能しません。この戦略では、重複したシーケンス番号が検出され、拒否されます。この操作は高負荷になる可能性があります。 |
大きなグループを集約する場合、部分的なグループを解放する必要はなく、重複したシーケンスを検出 / 拒否する必要はありません。代わりに SimpleSequenceSizeReleaseStrategy を使用することを検討してください。これらのユースケースでははるかに効率的で、デフォルトです部分的なグループリリースが指定されていない場合、バージョン 5.0 以降。
大規模グループの集約
The 4.3 release changed the default Collection for messages in a SimpleMessageGroup to HashSet (it was previously a BlockingQueue). This was expensive when removing individual messages from large groups (an O(n) linear scan was required). Although the hash set is generally much faster to remove, it can be expensive for large messages because the hash has to be calculated on both inserts and removes. If you have messages that are expensive to hash, consider using some other collection type. As discussed in MessageGroupFactory を使用する , a SimpleMessageGroupFactory is provided so that you can select the Collection that best suits your needs. You can also provide your own factory implementation to create some other Collection<Message<?>>.
次の例は、以前の実装と SimpleSequenceSizeReleaseStrategy でアグリゲーターを構成する方法を示しています。
<int:aggregator input-channel="aggregate"
output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
<property name="messageGroupFactory">
<bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
<constructor-arg value="BLOCKING_QUEUE"/>
</bean>
</property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" /> フィルターエンドポイントがアグリゲータの上流のフローに関与している場合、シーケンスからの一部のメッセージがフィルターによって破棄される可能性があるため、シーケンスサイズリリース戦略(固定または sequenceSize ヘッダーに基づく)はその目的を果たしません。この場合、別の ReleaseStrategy を選択するか、コンテンツに一部の情報を含む破棄サブフローから送信された補正メッセージを使用して、カスタム完全グループ関数でスキップすることをお勧めします。詳細については、フィルターを参照してください。 |
相関戦略
CorrelationStrategy インターフェースは次のように定義されます。
public interface CorrelationStrategy {
Object getCorrelationKey(Message<?> message);
} このメソッドは、メッセージをメッセージグループに関連付けるために使用される相関キーを表す Object を返します。キーは、equals() および hashCode() の実装に関して Map のキーに使用される条件を満たさなければなりません。
一般に、どの POJO も相関ロジックを実装でき、メッセージをメソッドの引数にマッピングするルールは ServiceActivator と同じです(@Header アノテーションのサポートを含む)。メソッドは値を返す必要があり、値は null であってはなりません。
Spring Integration は、CorrelationStrategy の実装を提供します: HeaderAttributeCorrelationStrategy。この実装は、メッセージヘッダーの 1 つ(コンストラクター引数で名前が指定されている)の値を相関キーとして返します。デフォルトでは、相関戦略は CORRELATION_ID ヘッダー属性の値を返す HeaderAttributeCorrelationStrategy です。相関に使用するカスタムヘッダー名がある場合は、HeaderAttributeCorrelationStrategy のインスタンスで構成して、アグリゲーターの相関戦略の参照として提供できます。
レジストリをロック
グループへの変更はスレッドセーフです。同じ相関 ID のメッセージを同時に送信すると、そのうちの 1 つのみがアグリゲーターで処理され、メッセージグループごとにシングルスレッドとして効果的に処理されます。LockRegistry は、解決された相関 ID のロックを取得するために使用されます。DefaultLockRegistry はデフォルトで使用されます(メモリ内)。共有 MessageGroupStore が使用されているサーバー間で更新を同期するには、共有ロックレジストリを構成する必要があります。
デッドロックの回避
As discussed above, when message groups are mutated (messages added or released), a lock is held.
次のフローを検討してください。
...->aggregator1-> ... ->aggregator2-> ... 複数のスレッドがあり、アグリゲーターが共通のロックレジストリを共有している場合、デッドロックが発生する可能性があります。これにより、スレッドがハングし、jstack <pid> は次のような結果を表示する場合があります。
Found one Java-level deadlock:
=============================
"t2":
waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t1"
"t1":
waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
which is held by "t2"この問題を回避するには、いくつかの方法があります。
各アグリゲーターに独自のロックレジストリがあることを確認します (this can be a shared registry across application instances, but two or more aggregators in the flow must each have a distinct registry)
アグリゲータの出力チャネルとして
ExecutorChannelまたはQueueChannelを使用して、ダウンストリームフローが新しいスレッドで実行されるようにしますバージョン 5.1.1 以降、
releaseLockBeforeSendアグリゲータープロパティをtrueに設定します。
| この問題は、何らかの理由で単一のアグリゲーターの出力が最終的に同じアグリゲーターにルーティングされる場合にも発生する可能性があります。もちろん、この場合、上記の最初の解決策は適用されません。 |
Java DSL でのアグリゲーターの構成
Java DSL でアグリゲーターを構成する方法については、アグリゲーターとリシーケンサーを参照してください。
XML を使用したアグリゲーターの構成
Spring Integration は、<aggregator/> 要素を介した XML を使用したアグリゲーターの構成をサポートします。次の例は、アグリゲーターの例を示しています。
<channel id="inputChannel"/>
<int:aggregator id="myAggregator" (1)
auto-startup="true" (2)
input-channel="inputChannel" (3)
output-channel="outputChannel" (4)
discard-channel="throwAwayChannel" (5)
message-store="persistentMessageStore" (6)
order="1" (7)
send-partial-result-on-expiry="false" (8)
send-timeout="1000" (9)
correlation-strategy="correlationStrategyBean" (10)
correlation-strategy-method="correlate" (11)
correlation-strategy-expression="headers['foo']" (12)
ref="aggregatorBean" (13)
method="aggregate" (14)
release-strategy="releaseStrategyBean" (15)
release-strategy-method="release" (16)
release-strategy-expression="size() == 5" (17)
expire-groups-upon-completion="false" (18)
empty-group-min-timeout="60000" (19)
lock-registry="lockRegistry" (20)
group-timeout="60000" (21)
group-timeout-expression="size() ge 2 ? 100 : -1" (22)
expire-groups-upon-timeout="true" (23)
scheduler="taskScheduler" > (24)
<expire-transactional/> (25)
<expire-advice-chain/> (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
<constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>| 1 | アグリゲーターの ID はオプションです。 |
| 2 | アプリケーションコンテキストの起動中にアグリゲーターを起動する必要があるかどうかを示すライフサイクル属性。オプション(デフォルトは "true" )。 |
| 3 | アグリゲーターがメッセージを受信するチャンネル。必須。 |
| 4 | アグリゲータが集約結果を送信するチャネル。オプション (受信メッセージ自体が 'replyChannel' メッセージヘッダーで応答チャネルを指定できるため)。 |
| 5 | アグリゲーターがタイムアウトしたメッセージを送信するチャネル(send-partial-result-on-expiry が false の場合)。オプション。 |
| 6 | 完了するまで相関キーにメッセージのグループを保管するために使用される MessageGroupStore への参照。オプション。デフォルトでは、揮発性のメモリ内ストアです。詳細については、メッセージストアを参照してください。 |
| 7 | 複数のハンドルが同じ DirectChannel にサブスクライブされるときのこのアグリゲーターの順序(負荷分散の目的で使用)。オプション。 |
| 8 | 期限切れのメッセージは、それを含む MessageGroup の有効期限が切れたら ( MessageGroupStore.expireMessageGroups(long) (Javadoc) を参照)、集約されて「出力チャネル」または "replyChannel" に送信されることを示します。MessageGroup を期限切れにする 1 つの方法は、MessageGroupStoreReaper を構成することです。ただし、代わりに MessageGroupStore.expireMessageGroups(timeout) を呼び出して MessageGroup を期限切れにすることもできます。これは、コントロールバス操作を通じて、または MessageGroupStore インスタンスへの参照がある場合は expireMessageGroups(timeout) を呼び出すことによって実行できます。それ以外の場合、この属性自体は何も行いません。期限切れになりそうな MessageGroup に残っているメッセージを破棄するか、出力チャネルまたは応答チャネルに送信するかを示すインジケータとしてのみ機能します。オプション (デフォルトは false)。注: expire-groups-upon-timeout が false に設定されている場合、グループは実際には期限切れにならない可能性があるため、この属性は send-partial-result-on-timeout と呼ぶ方が適切です。 |
| 9 | The timeout interval to wait when sending a reply Message to the output-channel or discard-channel. Defaults to 30 seconds. It is applied only if the output channel has some 'sending' limitations, such as a QueueChannel with a fixed 'capacity'. In this case, a MessageDeliveryException is thrown. For AbstractSubscribableChannel implementations, the send-timeout is ignored. For group-timeout(-expression), the MessageDeliveryException from the scheduled expiring task leads this task to be rescheduled. Optional. |
| 10 | メッセージ相関(グループ化)アルゴリズムを実装する Bean への参照。Bean は、CorrelationStrategy インターフェースまたは POJO の実装にすることができます。後者の場合、correlation-strategy-method 属性も定義する必要があります。オプション(デフォルトでは、アグリゲーターは IntegrationMessageHeaderAccessor.CORRELATION_ID ヘッダーを使用します)。 |
| 11 | correlation-strategy によって参照される Bean で定義されたメソッド。相関決定アルゴリズムを実装します。オプション、制限付き(correlation-strategy が存在する必要があります)。 |
| 12 | 相関戦略を表す SpEL 式。例: "headers['something']" correlation-strategy または correlation-strategy-expression のいずれかのみが許可されます。 |
| 13 | アプリケーションコンテキストで定義された Bean への参照。Bean は、前述のように集約ロジックを実装する必要があります。オプション(デフォルトでは、集約されたメッセージのリストは出力メッセージのペイロードになります)。 |
| 14 | ref 属性によって参照される Bean で定義されたメソッド。メッセージ集約アルゴリズムを実装します。オプション(定義される ref 属性に依存)。 |
| 15 | リリース戦略を実装する Bean への参照。Bean は、ReleaseStrategy インターフェースまたは POJO の実装にすることができます。後者の場合、release-strategy-method 属性も定義する必要があります。オプション(デフォルトでは、アグリゲーターは IntegrationMessageHeaderAccessor.SEQUENCE_SIZE ヘッダー属性を使用します)。 |
| 16 | release-strategy 属性によって参照される Bean で定義されたメソッド。完了決定アルゴリズムを実装します。オプション、制限付き(release-strategy が存在する必要があります)。 |
| 17 | リリース戦略を表す SpEL 式。式のルートオブジェクトは MessageGroup です。例: "size() == 5" release-strategy または release-strategy-expression のいずれかのみが許可されます。 |
| 18 | true (デフォルトは false)に設定すると、完了したグループがメッセージストアから削除され、同じ相関を持つ後続のメッセージが新しいグループを形成します。デフォルトの動作では、完了したグループと同じ相関関係を持つメッセージを discard-channel に送信します。 |
| 19 | MessageGroupStoreReaper が <aggregator> の MessageStore 用に構成されている場合にのみ適用されます。デフォルトでは、MessageGroupStoreReaper が部分グループを期限切れにするように構成されている場合、空のグループも削除されます。グループが通常解放された後、空のグループが存在します。空のグループにより、後着メッセージの検出と破棄が可能になります。部分的なグループの期限切れよりも長いスケジュールで空のグループを期限切れにする場合は、このプロパティを設定します。空のグループは、少なくともこのミリ秒数の間変更されない限り、MessageStore から削除されません。空のグループの有効期限が切れる実際の時間は、リーパーの timeout プロパティの影響も受けます。この値にタイムアウトを加えた値になる可能性があることに注意してください。 |
| 20 | org.springframework.integration.util.LockRegistry Bean への参照。以前は、MessageGroup での同時操作のために groupId に基づいて Lock を取得していました。デフォルトでは、内部 DefaultLockRegistry が使用されます。ZookeeperLockRegistry などの分散 LockRegistry を使用すると、アグリゲーターの 1 つのインスタンスのみがグループで同時に操作できるようになります。詳細については、Redis ロックレジストリまたは Zookeeper ロックレジストリを参照してください。 |
| 21 | A timeout (in milliseconds) to force the MessageGroup complete when the ReleaseStrategy does not release the group when the current message arrives. This attribute provides a built-in time-based release strategy for the aggregator when there is a need to emit a partial result (or discard the group) if a new message does not arrive for the MessageGroup within the timeout which counts from the time the last message arrived. To set up a timeout which counts from the time the MessageGroup was created see group-timeout-expression information. When a new message arrives at the aggregator, any existing ScheduledFuture<?> for its MessageGroup is canceled. If the ReleaseStrategy returns false (meaning do not release) and groupTimeout > 0, a new task is scheduled to expire the group. We do not advise setting this attribute to zero (or a negative value). Doing so effectively disables the aggregator because every message group is immediately completed. You can, however, conditionally set it to zero (or a negative value) by using an expression. See group-timeout-expression for information. The action taken during the completion depends on the ReleaseStrategy and the send-partial-group-on-expiry attribute. See アグリゲーターとグループのタイムアウト for more information. It is mutually exclusive with group-timeout-expression attribute. |
| 22 | The SpEL expression that evaluates to a groupTimeout with the MessageGroup as the #root evaluation context object. Used for scheduling the MessageGroup to be forced complete. If the expression evaluates to null, the completion is not scheduled. If it evaluates to zero, the group is completed immediately on the current thread. In effect, this provides a dynamic group-timeout property. As an example, if you wish to forcibly complete a MessageGroup after 10 seconds have elapsed since the time the group was created you might consider using the following SpEL expression: timestamp + 10000 - T(System).currentTimeMillis() where timestamp is provided by MessageGroup.getTimestamp() as the MessageGroup here is the #root evaluation context object. Bear in mind however that the group creation time might differ from the time of the first arrived message depending on other group expiration properties' configuration. See group-timeout for more information. Mutually exclusive with group-timeout attribute. |
| 23 | タイムアウトにより(または MessageGroupStoreReaper により)グループが完了すると、グループはデフォルトで期限切れ(完全に削除)になります。遅れて到着するメッセージは新しいグループを開始します。これを false に設定してグループを完成させることができますが、そのメタデータを残しておくと、遅れて到着するメッセージが破棄されます。空のグループは、empty-group-min-timeout 属性と一緒に MessageGroupStoreReaper を使用して、後で期限切れにすることができます。デフォルトは "true" です。 |
| 24 | groupTimeout 内の MessageGroup に新しいメッセージが到着しない場合、MessageGroup を強制的に完了するようにスケジュールする TaskScheduler Bean 参照。指定しない場合、ApplicationContext (ThreadPoolTaskScheduler)に登録されているデフォルトのスケジューラー(taskScheduler)が使用されます。group-timeout または group-timeout-expression が指定されていない場合、この属性は適用されません。 |
| 25 | Since version 4.1. It lets a transaction be started for the forceComplete operation. It is initiated from a group-timeout(-expression) or by a MessageGroupStoreReaper and is not applied to the normal add、release、discard operations. Only this subelement or <expire-advice-chain/> is allowed. |
| 26 | Since version 4.1. It allows the configuration of any Advice for the forceComplete operation. It is initiated from a group-timeout(-expression) or by a MessageGroupStoreReaper and is not applied to the normal add、release、discard operations. Only this subelement or <expire-transactional/> is allowed. A transaction Advice can also be configured here by using the Spring tx namespace. |
期限切れのグループ 期限切れ(完全に削除)のグループに関連する 2 つの属性があります。グループの有効期限が切れると、そのレコードはありません。また、同じ相関で新しいメッセージが到着すると、新しいグループが開始されます。グループが(有効期限なしで)完了すると、空のグループが残り、遅れて到着したメッセージは破棄されます。空のグループは、
グループが正常に完了せず、タイムアウトのために解放または破棄された場合、グループは通常期限切れになります。バージョン 4.1 以降では、
バージョン 5.0 以降、空のグループも Starting with version 5.4, the aggregator (and resequencer) can be configured to expire orphaned groups (those in a persistent message store that might not otherwise be released). The |
通常、カスタムアグリゲーターハンドラー実装が他の <aggregator> 定義で参照される可能性がある場合は、ref 属性を使用することをお勧めします。ただし、カスタムアグリゲーターの実装が <aggregator> の単一の定義でのみ使用されている場合、次の例に示すように、<aggregator> エレメント内の集約 POJO を構成するために、内部 Bean 定義(バージョン 1.0.3 以降)を使用できます。
<aggregator input-channel="input" method="sum" output-channel="output">
<beans:bean class="org.foo.PojoAggregator"/>
</aggregator> 同じ <aggregator> 構成で ref 属性と内部 Bean 定義の両方を使用することは、あいまいな条件を作成するため許可されません。そのような場合、例外がスローされます。 |
次の例は、アグリゲーター Bean の実装を示しています。
public class PojoAggregator {
public Long add(List<Long> results) {
long total = 0l;
for (long partialResult: results) {
total += partialResult;
}
return total;
}
}前述の例の完了戦略 Bean の実装は、次のとおりです。
public class PojoReleaseStrategy {
...
public boolean canRelease(List<Long> numbers) {
int sum = 0;
for (long number: numbers) {
sum += number;
}
return sum >= maxValue;
}
}| 意味がある場合はいつでも、リリース戦略メソッドとアグリゲーターメソッドを単一の Bean に組み合わせることができます。 |
上記の例の相関戦略 Bean の実装は、次のようになります。
public class PojoCorrelationStrategy {
...
public Long groupNumbersByLastDigit(Long number) {
return number % 10;
}
}前の例のアグリゲーターは、ある基準(この場合は 10 で割った後の残り)によって番号をグループ化し、ペイロードによって提供される番号の合計が特定の値を超えるまでグループを保持します。
| そうすることが理にかなっている場合はいつでも、リリース戦略メソッド、相関戦略メソッド、アグリゲーターメソッドを単一の Bean に組み合わせることができます。(実際、それらのすべてまたは任意の 2 つを組み合わせることができます。) |
アグリゲーターと Spring 式言語 (SpEL)
Spring Integration 2.0 以降、さまざまな戦略(相関、リリース、集約)を SpEL で処理できるようになりました。これは、そのようなリリース戦略の背後にあるロジックが比較的単純な場合に推奨されます。オブジェクトの配列を受け取るように設計されたレガシーコンポーネントがあるとします。デフォルトのリリース戦略では、List 内のすべての集約メッセージが組み立てられることがわかっています。ここで 2 つの問題があります。まず、リストから個々のメッセージを抽出する必要があります。次に、各メッセージのペイロードを抽出し、オブジェクトの配列を組み立てる必要があります。次の例は、両方の問題を解決します。
public String[] processRelease(List<Message<String>> messages){
List<String> stringList = new ArrayList<>();
for (Message<String> message : messages) {
stringList.add(message.getPayload());
}
return stringList.toArray(new String[]{});
}ただし、SpEL では、このような要件は実際には 1 行の式で比較的簡単に処理できるため、カスタムクラスを記述して Bean として構成する必要はありません。次の例は、その方法を示しています。
<int:aggregator input-channel="aggChannel"
output-channel="replyChannel"
expression="#this.![payload].toArray()"/>前述の構成では、コレクション射影式を使用して、リスト内のすべてのメッセージのペイロードから新しいコレクションを組み立て、それを配列に変換します。これにより、以前の Java コードと同じ結果が得られます。
カスタムリリースおよび相関戦略を扱うときに、同じ式ベースのアプローチを適用できます。
次の例に示すように、correlation-strategy 属性でカスタム CorrelationStrategy の Bean を定義する代わりに、単純な相関ロジックを SpEL 式として実装し、correlation-strategy-expression 属性で構成できます。
correlation-strategy-expression="payload.person.id" 前述の例では、ペイロードに person 属性と id があり、メッセージの関連付けに使用されると想定しています。
同様に、ReleaseStrategy の場合、リリースロジックを SpEL 式として実装し、release-strategy-expression 属性で構成できます。評価コンテキストのルートオブジェクトは MessageGroup 自体です。メッセージの List は、式内のグループの message プロパティを使用して参照できます。
前の例が示すように、バージョン 5.0 より前のリリースでは、ルートオブジェクトは Message<?> のコレクションでした。 |
release-strategy-expression="!messages.?[payload==5].empty" 上記の例では、SpEL 評価コンテキストのルートオブジェクトは MessageGroup 自体であり、このグループに 5 のペイロードを持つメッセージがあるとすぐに、グループを解放する必要があると述べています。
アグリゲーターとグループのタイムアウト
バージョン 4.0 以降、相互に排他的な 2 つの新しい属性 group-timeout と group-timeout-expression が導入されました。XML を使用したアグリゲーターの構成を参照してください。場合によっては、現在のメッセージが到着したときに ReleaseStrategy が解放されない場合、タイムアウト後にアグリゲーターの結果を出力する(またはグループを破棄する)必要があります。この目的のために、次の例に示すように、groupTimeout オプションを使用すると、MessageGroup のスケジューリングを強制的に完了することができます。
<aggregator input-channel="input" output-channel="output"
send-partial-result-on-expiry="true"
group-timeout-expression="size() ge 2 ? 10000 : -1"
release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/> この例では、アグリゲーターが release-strategy-expression で定義された最後のメッセージを順番に受信した場合、通常のリリースが可能です。その特定のメッセージが到着しない場合、groupTimeout は、グループに少なくとも 2 つのメッセージが含まれている限り、10 秒後にグループを強制的に完了させます。
The results of forcing the group to complete depend on the ReleaseStrategy and the send-partial-result-on-expiry. First, the release strategy is again consulted to see if a normal release is to be made. While the group has not changed, the ReleaseStrategy can decide to release the group at this time. If the release strategy still does not release the group, it is expired. If send-partial-result-on-expiry is true, existing messages in the (partial) MessageGroup are released as a normal aggregator reply message to the output-channel. Otherwise, it is discarded.
groupTimeout の動作と MessageGroupStoreReaper には違いがあります (XML を使用したアグリゲーターの構成を参照)。リーパーは、定期的に MessageGroupStore 内のすべての MessageGroup の強制完了を開始します。groupTimeout は、groupTimeout 中に新しいメッセージが到着しない場合、各 MessageGroup に対して個別にこれを実行します。また、リーパーを使用して空のグループ (expire-groups-upon-completion が false の場合に遅延メッセージを破棄するために保持されるグループ) を削除することもできます。
バージョン 5.5 以降、groupTimeoutExpression は java.util.Date インスタンスに対して評価できます。これは、groupTimeoutExpression が long に評価されるときに計算されるため、現在のメッセージ到着ではなく、グループ作成時間(MessageGroup.getTimestamp())に基づいてスケジュールされたタスクの瞬間を決定する場合に役立ちます。
group-timeout-expression="size() ge 2 ? new java.util.Date(timestamp + 200) : null"アノテーション付きのアグリゲーターの構成
次の例は、アノテーションを使用して構成されたアグリゲーターを示しています。
public class Waiter {
...
@Aggregator (1)
public Delivery aggregatingMethod(List<OrderItem> items) {
...
}
@ReleaseStrategy (2)
public boolean releaseChecker(List<Message<?>> messages) {
...
}
@CorrelationStrategy (3)
public String correlateBy(OrderItem item) {
...
}
}| 1 | このメソッドをアグリゲーターとして使用する必要があることを示すアノテーション。このクラスをアグリゲーターとして使用する場合は指定する必要があります。 |
| 2 | このメソッドがアグリゲーターのリリース戦略として使用されることを示すアノテーション。どのメソッドにも存在しない場合、アグリゲーターは SimpleSequenceSizeReleaseStrategy を使用します。 |
| 3 | このメソッドをアグリゲーターの相関戦略として使用する必要があることを示すアノテーション。相関戦略が示されていない場合、アグリゲーターは CORRELATION_ID に基づいて HeaderAttributeCorrelationStrategy を使用します。 |
XML 要素によって提供されるすべての構成オプションは、@Aggregator アノテーションでも使用できます。
アグリゲーターは、XML から明示的に参照するか、クラスで @MessageEndpoint が定義されている場合、クラスパススキャンによって自動的に検出されます。
Aggregator コンポーネントのアノテーション構成(@Aggregator など)は、ほとんどのデフォルトオプションで十分な単純なユースケースのみを対象としています。アノテーション構成を使用するときにこれらのオプションをさらに制御する必要がある場合は、次の例に示すように、AggregatingMessageHandler の @Bean 定義の使用を検討し、@Bean メソッドを @ServiceActivator でマークします。
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
AggregatingMessageHandler aggregator =
new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
jdbcMessageGroupStore);
aggregator.setOutputChannel(resultsChannel());
aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
aggregator.setTaskScheduler(this.taskScheduler);
return aggregator;
} 詳細については、プログラミングモデルおよび @Bean メソッドのアノテーションを参照してください。
バージョン 4.2 以降、AggregatorFactoryBean は、AggregatingMessageHandler の Java 構成を簡素化するために使用可能です。 |
アグリゲーターでの状態の管理: MessageGroupStore
Aggregator (and some other patterns in Spring Integration) is a stateful pattern that requires decisions to be made based on a group of messages that have arrived over a period of time, all with the same correlation key. The design of the interfaces in the stateful patterns (such as ReleaseStrategy) is driven by the principle that the components (whether defined by the framework or by a user) should be able to remain stateless. All states are carried by the MessageGroup and its management is delegated to the MessageGroupStore. The MessageGroupStore interface is defined as follows:
public interface MessageGroupStore {
int getMessageCountForAllMessageGroups();
int getMarkedMessageCountForAllMessageGroups();
int getMessageGroupCount();
MessageGroup getMessageGroup(Object groupId);
MessageGroup addMessageToGroup(Object groupId, Message<?> message);
MessageGroup markMessageGroup(MessageGroup group);
MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
void removeMessageGroup(Object groupId);
void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
int expireMessageGroups(long timeout);
}詳しくは、Javadoc を参照してください。
MessageGroupStore は、リリース戦略がトリガーされるのを待っている間、MessageGroups に状態情報を蓄積しますが、そのイベントは発生しません。そのため、古いメッセージが残るのを防ぎ、揮発性ストアがアプリケーションのシャットダウン時にクリーンアップのフックを提供するために、MessageGroupStore では、MessageGroups が期限切れになったときに適用するコールバックを登録できます。次のように、インターフェースは非常に簡単です。
public interface MessageGroupCallback {
void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}コールバックはストアとメッセージグループに直接アクセスできるため、永続的な状態を管理できます(たとえば、グループをストアから完全に削除するなど)。
MessageGroupStore は、これらのコールバックのリストを維持します。これは、タイムスタンプがパラメーターとして指定された時刻よりも前のすべてのメッセージにオンデマンドで適用されます(前述の registerMessageGroupExpiryCallback(..) および expireMessageGroups(..) メソッドを参照)。
expireMessageGroups 機能に依存する場合、異なるアグリゲーターコンポーネントで同じ MessageGroupStore インスタンスを使用しないことが重要です。すべての AbstractCorrelatingMessageHandler は、forceComplete() コールバックに基づいて独自の MessageGroupCallback を登録します。この方法では、有効期限の各グループが間違ったアグリゲーターによって完了または破棄される可能性があります。バージョン 5.0.10 から、MessageGroupStore の登録コールバックに UniqueExpiryCallback が AbstractCorrelatingMessageHandler から使用されます。MessageGroupStore は、このクラスのインスタンスが存在するかどうかを確認し、コールバックセットにすでに存在する場合は適切なメッセージでエラーを記録します。このように、フレームワークは、異なるアグリゲーター / リシーケンサーでの MessageGroupStore インスタンスの使用を禁止して、特定の相関ハンドラーによって作成されていないグループの期限切れの前述の副作用を回避します。 |
タイムアウト値を指定して expireMessageGroups メソッドを呼び出すことができます。現在の時刻からこの値を引いたものより古いメッセージは期限切れになり、コールバックが適用されます。メッセージグループの「有効期限」の意味を定義するのはストアのユーザーです。
次の例に示すように、ユーザーの利便性のために、Spring Integration は MessageGroupStoreReaper の形式でメッセージの有効期限のラッパーを提供します。
<bean id="reaper" class="org...MessageGroupStoreReaper">
<property name="messageGroupStore" ref="messageStore"/>
<property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
<task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks> リーパーは Runnable です。上記の例では、メッセージグループストアの expire メソッドは 10 秒ごとに呼び出されます。タイムアウト自体は 30 秒です。
MessageGroupStoreReaper の "timeout" プロパティは概算値であり、タスクスケジューラのレートの影響を受けることを理解することが重要です。このプロパティは、次にスケジュールされた MessageGroupStoreReaper タスクの実行時にのみチェックされるためです。例: タイムアウトが 10 分間に設定されているが、MessageGroupStoreReaper タスクが 1 時間ごとに実行されるようにスケジュールされ、MessageGroupStoreReaper タスクの最後の実行がタイムアウトの 1 分前に行われた場合、MessageGroup は次の 59 分間期限切れになりません。レートを少なくともタイムアウトの値以下に設定することをお勧めします。 |
リーパーに加えて、アプリケーションが AbstractCorrelatingMessageHandler のライフサイクルコールバックを介してシャットダウンすると、有効期限コールバックが呼び出されます。
AbstractCorrelatingMessageHandler は独自の有効期限コールバックを登録します。これは、アグリゲーターの XML 構成内のブールフラグ send-partial-result-on-expiry とのリンクです。フラグが true に設定されている場合、有効期限コールバックが呼び出されると、まだリリースされていないグループ内のマークされていないメッセージを出力チャネルに送信できます。
MessageGroupStoreReaper はスケジュールされたタスクから呼び出され、ダウンストリーム統合フローへのメッセージ(sendPartialResultOnExpiry オプションによって異なります)が生成される可能性があるため、errorChannel を介して例外を処理するために MessagePublishingErrorHandler を含むカスタム TaskScheduler を提供することをお勧めします。通常のアグリゲーターリリース機能で期待される場合があります。同じロジックが、TaskScheduler にも依存するグループタイムアウト機能にも適用されます。詳細については、エラー処理を参照してください。 |
When a shared 一部の
|
Flux Aggregator
バージョン 5.2 では、FluxAggregatorMessageHandler コンポーネントが導入されました。これは、Project Reactor Flux.groupBy() および Flux.window() オペレーターに基づいています。受信メッセージは、このコンポーネントのコンストラクターで Flux.create() によって開始された FluxSink に送信されます。outputChannel が提供されていない場合、または ReactiveStreamsSubscribableChannel のインスタンスではない場合、メインの Flux へのサブスクリプションは Lifecycle.start() 実装から行われます。それ以外の場合は、ReactiveStreamsSubscribableChannel 実装によって行われるサブスクリプションに延期されます。メッセージは、グループキーに CorrelationStrategy を使用して Flux.groupBy() によってグループ化されます。デフォルトでは、メッセージの IntegrationMessageHeaderAccessor.CORRELATION_ID ヘッダーが参照されます。
デフォルトでは、閉じられたウィンドウはすべて、作成するメッセージのペイロードで Flux として解放されます。このメッセージには、ウィンドウ内の最初のメッセージのすべてのヘッダーが含まれています。出力メッセージペイロード内のこの Flux は、サブスクライブしてダウンストリームで処理する必要があります。このようなロジックは、FluxAggregatorMessageHandler の setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>) 構成オプションによってカスタマイズ (または置き換え) できます。例: 最終メッセージにペイロードの List が必要な場合は、次のように Flux.collectList() を構成できます。
fluxAggregatorMessageHandler.setCombineFunction(
(messageFlux) ->
messageFlux
.map(Message::getPayload)
.collectList()
.map(GenericMessage::new));FluxAggregatorMessageHandler には、適切なウィンドウ戦略を選択するためのいくつかのオプションがあります。
setBoundaryTrigger(Predicate<Message<?>>)- is propagated to theFlux.windowUntil()operator. See its Javadocs for more information. Has a precedence over all other window options.setWindowSize(int)およびsetWindowSizeFunction(Function<Message<?>, Integer>)-Flux.window(int)またはwindowTimeout(int, Duration)に伝搬されます。デフォルトでは、ウィンドウサイズはグループ内の最初のメッセージとそのIntegrationMessageHeaderAccessor.SEQUENCE_SIZEヘッダーから計算されます。setWindowTimespan(Duration)- ウィンドウサイズの設定に応じて、Flux.window(Duration)またはwindowTimeout(int, Duration)に伝播されます。setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)- 公開されたオプションでカバーされていないカスタムウィンドウ操作に対して、グループ化されたフラックスに変換を適用する関数。
このコンポーネントは MessageHandler 実装であるため、@ServiceActivator メッセージングアノテーションとともに @Bean 定義として単純に使用できます。Java DSL を使用すると、.handle() EIP メソッドから使用できます。以下のサンプルは、実行時に IntegrationFlow を登録する方法と、FluxAggregatorMessageHandler をアップストリームのスプリッターと相関させる方法を示しています。
IntegrationFlow fluxFlow =
(flow) -> flow
.split()
.channel(MessageChannels.flux())
.handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
this.integrationFlowContext.registration(fluxFlow)
.register();
Flux<Message<?>> window =
registration.getMessagingTemplate()
.convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);メッセージグループの条件
Starting with version 5.5, an AbstractCorrelatingMessageHandler (including its Java & XML DSLs) exposes a groupConditionSupplier option of the BiFunction<Message<?>, String, String> implementation. This function is used on each message added to the group, and a result condition sentence is stored into the group for future consideration. The ReleaseStrategy may consult this condition instead of iterating over all the messages in the group. See GroupConditionProvider JavaDocs and メッセージグループの状態 for more information.
ファイルアグリゲーターも参照してください。