メッセージエンドポイント
この章の最初の部分では、いくつかの背景理論を取り上げ、Spring Integration のさまざまなメッセージングコンポーネントを駆動する基盤となる API についてかなり明らかにします。この情報は、バックグラウンドで何が起こっているのかを本当に理解したい場合に役立ちます。ただし、さまざまな要素の単純化された名前空間ベースの構成で起動して実行する場合は、当面はエンドポイント名前空間のサポートに進んでください。
概要で記述されていたように、メッセージエンドポイントは、さまざまなメッセージングコンポーネントをチャネルに接続するロールを果たします。次のいくつかの章では、メッセージを消費するさまざまなコンポーネントについて説明します。これらの一部は、返信メッセージを送信することもできます。メッセージの送信は非常に簡単です。メッセージチャンネルで前述したように、メッセージチャネルにメッセージを送信できます。ただし、受信はもう少し複雑です。主な理由は、ポーリングコンシューマー (英語) とイベント駆動型コンシューマー (英語) の 2 種類のコンシューマーがあることです。
2 つのうち、イベント駆動型のコンシューマーははるかに単純です。別のポーラースレッドを管理およびスケジュールする必要がない場合、これらは本質的にコールバックメソッドを持つリスナーです。Spring Integration のサブスクライブ可能なメッセージチャネルの 1 つに接続する場合、このシンプルなオプションは非常に効果的です。ただし、バッファリング可能なポーリング可能なメッセージチャネルに接続する場合、一部のコンポーネントはポーリングスレッドをスケジュールおよび管理する必要があります。Spring Integration は、これら 2 つの型のコンシューマーに対応するために、2 つの異なるエンドポイント実装を提供します。コンシューマー自体はコールバックインターフェースを実装するだけで済みます。ポーリングが必要な場合、エンドポイントはコンシューマーインスタンスのコンテナーとして機能します。利点は、メッセージ駆動型 Bean をホストするためにコンテナーを使用することと似ていますが、これらのコンシューマーは ApplicationContext
内で実行される Spring 管理のオブジェクトであるため、Spring 自身の MessageListener
コンテナーにより似ています。
メッセージハンドラー
Spring Integration の MessageHandler
インターフェースは、フレームワーク内の多くのコンポーネントによって実装されています。つまり、これはパブリック API の一部ではなく、通常、MessageHandler
を直接実装することはありません。それにもかかわらず、消費されたメッセージを実際に処理するためにメッセージコンシューマーによって使用されるため、この戦略インターフェースを認識することは、コンシューマーの全体的なロールを理解する上で役立ちます。インターフェースは次のように定義されます。
public interface MessageHandler {
void handleMessage(Message<?> message);
}
シンプルであるにもかかわらず、このインターフェースは、次の章で説明するほとんどのコンポーネント(ルーター、トランス、スプリッター、アグリゲーター、サービスアクティベーターなど)の基盤を提供します。これらのコンポーネントはそれぞれ、処理するメッセージに対して非常に異なる機能を実行しますが、実際にメッセージを受信するための要件は同じであり、ポーリングとイベント駆動型の動作の選択も同じです。Spring Integration は、これらのコールバックベースのハンドラーをホストし、メッセージチャンネルに接続できるようにする 2 つのエンドポイント実装を提供します。
イベント駆動型のコンシューマー
2 つの方が単純なので、最初にイベント駆動型のコンシューマーエンドポイントについて説明します。SubscribableChannel
インターフェースが subscribe()
メソッドを提供し、そのメソッドが MessageHandler
パラメーターを受け入れることを思い出してください(SubscribableChannel
に示すように)。次のリストは、subscribe
メソッドの定義を示しています。
subscribableChannel.subscribe(messageHandler);
チャネルにサブスクライブされたハンドラーはそのチャネルをアクティブにポーリングする必要がないため、これはイベント駆動型コンシューマーであり、Spring Integration によって提供される実装は、次の例に示すように SubscribableChannel
および MessageHandler
を受け入れます。
SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);
EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);
ポーリングコンシューマー
Spring Integration は PollingConsumer
も提供し、次の例に示すように、チャネルが PollableChannel
を実装する必要があることを除いて、同じ方法でインスタンス化できます。
PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);
PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
ポーリングコンシューマーの詳細については、ポーラーおよびチャンネルアダプターを参照してください。 |
ポーリングコンシューマーには、他にも多くの構成オプションがあります。例: トリガーは必須プロパティです。次の例は、トリガーを設定する方法を示しています。
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setTrigger(new IntervalTrigger(30, TimeUnit.SECONDS));
Spring Integration は現在、Trigger
インターフェースの 2 つの実装、IntervalTrigger
と CronTrigger
を提供しています。IntervalTrigger
は通常、単純な間隔(ミリ秒単位)で定義されますが、initialDelay
プロパティとブール fixedRate
プロパティもサポートします(デフォルトは false
— つまり、固定遅延はありません)。次の例では、両方のプロパティを設定します。
IntervalTrigger trigger = new IntervalTrigger(1000);
trigger.setInitialDelay(5000);
trigger.setFixedRate(true);
前の例の 3 つの設定の結果は、5 秒待機してから 1 秒ごとにトリガーするトリガーです。
CronTrigger
には有効な cron 式が必要です。詳細については、Javadoc を参照してください。次の例では、新しい CronTrigger
を設定します。
CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");
前の例で定義されたトリガーの結果は、月曜日から金曜日までの 10 秒ごとにトリガーするトリガーです。
トリガーに加えて、maxMessagesPerPoll
および receiveTimeout
の 2 つの他のポーリング関連の構成プロパティを指定できます。次の例は、これら 2 つのプロパティを設定する方法を示しています。
PollingConsumer consumer = new PollingConsumer(channel, handler);
consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);
maxMessagesPerPoll
プロパティは、特定のポーリング操作内で受信するメッセージの最大数を指定します。つまり、ポーラーは、null
が返されるか最大値に達するまで、待機することなく receive() を呼び出し続けます。例: ポーラーに 10 秒の間隔トリガーと 25
の maxMessagesPerPoll
設定があり、キューに 100 個のメッセージがあるチャネルをポーリングしている場合、100 個すべてのメッセージを 40 秒以内に取得できます。25 を取得し、10 秒間待機し、次の 25 を取得します。
receiveTimeout
プロパティは、受信操作を呼び出すときに使用可能なメッセージがない場合にポーラーが待機する時間を指定します。例: 表面上は似ているが実際にはまったく異なる 2 つのオプションを検討します。最初のオプションには 5 秒の間隔トリガーと 50 ミリ秒の受信タイムアウトがあり、2 番目のオプションには 50 ミリ秒の間隔トリガーと 5 の受信タイムアウトがあります。秒。最初のメッセージは、チャネルに到着してから最大 4950 ミリ秒後にメッセージを受信する場合があります(ポーリングコールの 1 つが返された直後にメッセージが到着した場合)。一方、2 番目の構成では、メッセージが 50 ミリ秒を超えることはありません。違いは、2 番目のオプションでは待機するスレッドが必要なことです。ただし、その結果、到着したメッセージにはるかに迅速に応答できます。「ロングポーリング」と呼ばれるこの手法は、ポーリングされたソースでイベント駆動型の動作をエミュレートするために使用できます。
次の例に示すように、ポーリングコンシューマーは Spring TaskExecutor
に委譲することもできます。
PollingConsumer consumer = new PollingConsumer(channel, handler);
TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);
さらに、PollingConsumer
には adviceChain
というプロパティがあります。このプロパティを使用すると、トランザクションを含む追加の横断的関心事を処理するための AOP アドバイスの List
を指定できます。これらのアドバイスは、doPoll()
メソッドに適用されます。詳細については、AOP アドバイスチェーンおよびエンドポイント名前空間のサポートのトランザクションサポートに関するセクションを参照してください。
前の例は、依存関係のルックアップを示しています。ただし、これらのコンシューマーはほとんどの場合 Spring Bean 定義として構成されていることに注意してください。実際、Spring Integration は、チャネルの型に基づいて適切なコンシューマー型を作成する ConsumerEndpointFactoryBean
と呼ばれる FactoryBean
も提供します。また、Spring Integration はこれらの詳細をさらに隠すために完全な XML 名前空間をサポートしています。このガイドでは、名前空間ベースの構成が各コンポーネント型の導入に合わせて取り上げられています。
MessageHandler 実装の多くは、応答メッセージを生成できます。前述のように、メッセージの送信はメッセージの受信と比較すると簡単です。それでも、送信される応答メッセージのタイミングと数は、ハンドラーの型によって異なります。例: アグリゲーターは、多数のメッセージの到着を待機し、多くの場合、スプリッターのダウンストリームコンシューマーとして構成されます。スプリッターは、処理する各メッセージに対して複数の応答を生成できます。名前空間の構成を使用する場合、すべての詳細を厳密に知る必要はありません。ただし、これらのコンポーネントのいくつかが共通の基本クラスである AbstractReplyProducingMessageHandler を共有し、setOutputChannel(..) メソッドを提供することを知っておく価値があるかもしれません。 |
エンドポイント名前空間のサポート
このリファレンスマニュアル全体で、ルーター、トランスフォーマー、サービスアクティベーターなどのエンドポイント要素の特定の構成例を見つけることができます。これらのほとんどは input-channel
属性をサポートし、多くは output-channel
属性をサポートします。解析後、これらのエンドポイント要素は、参照される input-channel
の型に応じて、それぞれ PollableChannel
または SubscribableChannel
の PollingConsumer
または EventDrivenConsumer
のいずれかのインスタンスを生成します。チャネルがポーリング可能な場合、ポーリングの動作はエンドポイント要素の poller
サブ要素とその属性に基づいています。
以下のリストは、poller
で使用可能なすべての構成オプションをリストしています。
<int:poller cron="" (1)
default="false" (2)
error-channel="" (3)
fixed-delay="" (4)
fixed-rate="" (5)
id="" (6)
max-messages-per-poll="" (7)
receive-timeout="" (8)
ref="" (9)
task-executor="" (10)
time-unit="MILLISECONDS" (11)
trigger=""> (12)
<int:advice-chain /> (13)
<int:transactional /> (14)
</int:poller>
1 | Cron 式を使用してポーラーを構成する機能を提供します。基礎となる実装は org.springframework.scheduling.support.CronTrigger を使用します。この属性が設定されている場合、次の属性のいずれも指定する必要はありません: fixed-delay 、trigger 、fixed-rate 、ref 。 |
2 | この属性を true に設定することにより、グローバルデフォルトポーラーを 1 つだけ定義できます。アプリケーションコンテキストで複数のデフォルトポーラーが定義されている場合、例外が発生します。PollableChannel (PollingConsumer )または明示的に構成されたポーラーを持たない SourcePollingChannelAdapter に接続されたエンドポイントは、グローバルデフォルトポーラーを使用します。デフォルトは false です。オプション。 |
3 | このポーラーの呼び出しで障害が発生した場合にエラーメッセージが送信されるチャネルを識別します。例外を完全に抑制するには、nullChannel への参照を提供できます。オプション。 |
4 | 固定遅延トリガーは、内部で PeriodicTrigger を使用します。time-unit 属性を使用しない場合、指定された値はミリ秒単位で表されます。この属性が設定されている場合、次の属性のいずれも指定する必要はありません: fixed-rate 、trigger 、cron 、ref 。 |
5 | 固定レートトリガーは、内部で PeriodicTrigger を使用します。time-unit 属性を使用しない場合、指定された値はミリ秒単位で表されます。この属性が設定されている場合、次の属性のいずれも指定する必要はありません: fixed-delay 、trigger 、cron 、ref 。 |
6 | 型 org.springframework.integration.scheduling.PollerMetadata の、ポーラーの基礎となる Bean 定義を参照する ID。id 属性は、デフォルトのポーラー(default="true" )でない限り、最上位のポーラー要素に必要です。 |
7 | 詳細については、受信チャネルアダプターの構成を参照してください。指定しない場合、デフォルト値はコンテキストによって異なります。PollingConsumer を使用する場合、この属性はデフォルトで -1 になります。ただし、SourcePollingChannelAdapter を使用する場合、max-messages-per-poll 属性はデフォルトで 1 になります。オプション。 |
8 | 基になるクラス PollerMetadata に値が設定されます。指定しない場合、デフォルトは 1000(ミリ秒)です。オプション。 |
9 | 別のトップレベルのポーラーへの Bean 参照。ref 属性は、最上位の poller 要素に存在してはなりません。ただし、この属性が設定されている場合は、次の属性を指定する必要はありません: fixed-rate 、trigger 、cron 、fixed-delay 。 |
10 | カスタムタスクエグゼキューターを参照する機能を提供します。詳細については、TaskExecutor サポートを参照してください。オプション。 |
11 | この属性は、基になる org.springframework.scheduling.support.PeriodicTrigger の java.util.concurrent.TimeUnit 列挙値を指定します。この属性は fixed-delay または fixed-rate 属性と組み合わせてのみ使用できます。cron または trigger 参照属性のいずれかと組み合わせると、エラーが発生します。PeriodicTrigger でサポートされる最小の粒度はミリ秒です。使用可能なオプションはミリ秒と秒のみです。この値が提供されない場合、fixed-delay または fixed-rate 値はミリ秒として解釈されます。基本的に、この列挙型は秒ベースの間隔トリガー値に便利です。時間ごと、日ごと、月ごとの設定では、代わりに cron トリガーを使用することをお勧めします。 |
12 | org.springframework.scheduling.Trigger インターフェースを実装する Spring 構成の Bean への参照。ただし、この属性が設定されている場合は、次の属性を指定する必要はありません: fixed-delay 、fixed-rate 、cron 、ref 。オプション。 |
13 | 追加の横断的関心事を処理するための追加の AOP アドバイスを指定できます。詳細については、トランザクションサポートを参照してください。オプション。 |
14 | ポーラーはトランザクション対応にすることができます。詳細については、AOP アドバイスチェーンを参照してください。オプション。 |
サンプル
1 秒間隔の単純な間隔ベースのポーラーは、次のように構成できます。
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller fixed-rate="1000"/>
</int:transformer>
fixed-rate
属性を使用する代わりに、fixed-delay
属性を使用することもできます。
Cron 式に基づくポーラーの場合、次の例に示すように、代わりに cron
属性を使用します。
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>
入力チャネルが PollableChannel
である場合、ポーラー構成が必要です。具体的には、前述のように、trigger
は PollingConsumer
クラスの必須プロパティです。ポーリングコンシューマーエンドポイントの構成の poller
サブ要素を省略すると、例外がスローされる場合があります。また、ポーリング不可能なチャネルに接続されている要素でポーラーを構成しようとすると、例外がスローされる場合があります。
トップレベルのポーラーを作成することもできます。この場合、次の例に示すように、ref
属性のみが必要です。
<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output">
<int:poller ref="weekdayPoller"/>
</int:transformer>
ref 属性は、内部ポーラー定義でのみ許可されます。最上位のポーラーでこの属性を定義すると、アプリケーションコンテキストの初期化中に構成例外がスローされます。 |
グローバルデフォルトポーラー
設定をさらに簡素化するために、グローバルデフォルトポーラーを定義できます。ApplicationContext
内の単一のトップレベルポーラーでは、default
属性が true
に設定されている場合があります。その場合、同じ ApplicationContext
内で定義され、明示的に構成された poller
サブ要素を持たない、その入力チャネルの PollableChannel
を持つエンドポイントは、そのデフォルトを使用します。次の例は、このようなポーラーとそれを使用するトランスフォーマーを示しています。
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-rate="3000"/>
<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
ref="transformer"
output-channel="output"/>
トランザクションサポート
Spring Integration は、ポーラーのトランザクションサポートも提供するため、各送受信操作をアトミックな作業単位として実行できます。ポーラーのトランザクションを構成するには、<transactional/>
サブ要素を追加します。次の例は、使用可能な属性を示しています。
<int:poller fixed-delay="1000">
<int:transactional transaction-manager="txManager"
propagation="REQUIRED"
isolation="REPEATABLE_READ"
timeout="10000"
read-only="false"/>
</int:poller>
詳しくは、ポーラートランザクションサポートを参照してください。
AOP アドバイスチェーン
Spring トランザクションサポートは、ポーラーによって開始されたメッセージフローのトランザクション動作を処理する TransactionInterceptor
(AOP アドバイス)を使用したプロキシメカニズムに依存するため、ポーラーに関連する他のクロスカット動作を処理するために追加のアドバイスを提供する必要がある場合があります。そのために、poller
は、MethodInterceptor
インターフェースを実装するクラスにさらにアドバイスを追加できる advice-chain
要素を定義します。次の例は、poller
の advice-chain
を定義する方法を示しています。
<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
method="good" output-channel="output">
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<int:advice-chain>
<ref bean="adviceA" />
<beans:bean class="org.something.SampleAdvice" />
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int:service-activator>
MethodInterceptor
インターフェースの実装方法の詳細については、Spring Framework リファレンスガイドの AOP セクションを参照してください。アドバイスチェーンは、トランザクション構成を持たないポーラーにも適用でき、ポーラーによって開始されたメッセージフローの動作を強化できます。
アドバイスチェーンを使用する場合、<transactional/> 子要素は指定できません。代わりに、<tx:advice/> Bean を宣言し、<advice-chain/> に追加します。構成の詳細については、ポーラートランザクションサポートを参照してください。 |
TaskExecutor サポート
ポーリングスレッドは、Spring の TaskExecutor
抽象化の任意のインスタンスによって実行できます。これにより、エンドポイントまたはエンドポイントのグループの同時実行が可能になります。Spring 3.0 以降、コア Spring Framework には task
名前空間があり、その <executor/>
要素は単純なスレッドプールエグゼキューターの作成をサポートしています。その要素は、プールサイズやキュー容量などの一般的な同時実行設定の属性を受け入れます。スレッドプーリングエグゼキューターを構成すると、負荷がかかった状態でのエンドポイントのパフォーマンスに大きな違いが生じます。エンドポイントのパフォーマンスは考慮すべき主要な要素の 1 つであるため、これらの設定は各エンドポイントで使用できます(他の主要な要素は、エンドポイントがサブスクライブするチャネルの予想ボリュームです)。XML 名前空間のサポートを使用して構成されたポーリングエンドポイントの同時実行を有効にするには、<poller/>
要素で task-executor
参照を提供し、次の例に示す 1 つ以上のプロパティを提供します。
<int:poller task-executor="pool" fixed-rate="1000"/>
<task:executor id="pool"
pool-size="5-25"
queue-capacity="20"
keep-alive="120"/>
タスクエグゼキューターを提供しない場合、コンシューマーのハンドラーは呼び出し元のスレッドで呼び出されます。通常、呼び出し元はデフォルトの TaskScheduler
です(タスクスケジューラの構成を参照)。また、task-executor
属性は、Bean 名を指定することにより、Spring の TaskExecutor
インターフェースの実装への参照を提供できることに留意してください。前に示した executor
要素は、便宜上提供されています。
コンシューマーのポーリングの背景セクションで前述したように、イベント駆動型の動作をエミュレートするような方法でポーリングコンシューマーを構成することもできます。長い receive-timeout
と短い interval-trigger
を使用すると、ポーリングされたメッセージソースであっても、受信メッセージに対する非常にタイムリーな反応を保証できます。これは、タイムアウト付きのブロッキング待機呼び出しがあるソースにのみ適用されることに注意してください。例: ファイルポーラーはブロックしません。各 receive() 呼び出しはすぐに戻り、新しいファイルが含まれているか含まれていません。ポーラーに長い receive-timeout
が含まれていても、そのようなシナリオではその値は使用されません。一方、Spring Integration 独自のキューベースのチャネルを使用する場合、タイムアウト値に参加する機会があります。次の例は、ポーリングコンシューマーがほぼ瞬時にメッセージを受信する方法を示しています。
<int:service-activator input-channel="someQueueChannel"
output-channel="output">
<int:poller receive-timeout="30000" fixed-rate="10"/>
</int:service-activator>
このアプローチを使用しても、オーバーヘッドはあまり多くありません。内部的には、待機スレッドに過ぎず、スラッシング、無限 while ループなどの CPU リソース使用量をほとんど必要としません。
実行時のポーリングレートの変更
fixed-delay
または fixed-rate
属性でポーラーを構成する場合、デフォルトの実装は PeriodicTrigger
インスタンスを使用します。PeriodicTrigger
は、コア Spring Framework の一部です。コンストラクター引数としてのみ間隔を受け入れます。実行時に変更することはできません。
ただし、org.springframework.scheduling.Trigger
インターフェースの独自の実装を定義できます。PeriodicTrigger
を出発点として使用することもできます。次に、間隔(期間)に setter を追加するか、トリガー自体に独自の調整ロジックを埋め込むこともできます。period
プロパティは、nextExecutionTime
への各呼び出しで使用され、次のポーリングをスケジュールします。ポーラー内でこのカスタムトリガーを使用するには、アプリケーションコンテキストでカスタムトリガーの Bean 定義を宣言し、カスタムトリガー Bean インスタンスを参照する trigger
属性を使用して、ポーラー構成に依存関係を注入します。これで、トリガー Bean への参照を取得し、ポーリング間のポーリング間隔を変更できます。
例については、Spring Integration サンプル [GitHub] (英語) プロジェクトを参照してください。dynamic-poller
というサンプルが含まれています。これは、カスタムトリガーを使用し、実行時にポーリング間隔を変更する機能を示しています。
このサンプルは、org.springframework.scheduling.Trigger
(Javadoc) インターフェースを実装するカスタムトリガーを提供します。サンプルのトリガーは、Spring の PeriodicTrigger
(Javadoc) 実装に基づいています。ただし、カスタムトリガーのフィールドは最終的なものではなく、プロパティには明示的な getter と setter があり、実行時にポーリング期間を動的に変更できます。
ただし、Trigger メソッドは nextExecutionTime() であるため、動的トリガーへの変更は、既存の構成に基づいて次のポーリングまで有効にならないことに注意することが重要です。現在設定されている次の実行時間の前にトリガーを強制的に起動することはできません。 |
ペイロード型変換
このリファレンスマニュアル全体を通して、メッセージまたは任意の Object
を入力パラメーターとして受け入れるさまざまなエンドポイントの特定の構成および実装例を確認することもできます。Object
の場合、このようなパラメーターは、メッセージペイロードまたはペイロードまたはヘッダーの一部にマップされます(Spring 式言語を使用する場合)。ただし、エンドポイントメソッドの入力パラメーターの型がペイロードまたはその部分の型と一致しない場合があります。このシナリオでは、型変換を実行する必要があります。Spring Integration は、integrationConversionService
という名前の変換サービス Bean の独自のインスタンス内に(Spring ConversionService
を使用して)型コンバーターを登録するための便利な方法を提供します。その Bean は、Spring Integration インフラストラクチャーを使用して最初のコンバーターが定義されるとすぐに自動的に作成されます。コンバーターを登録するには、org.springframework.core.convert.converter.Converter
、org.springframework.core.convert.converter.GenericConverter
、org.springframework.core.convert.converter.ConverterFactory
を実装できます。
Converter
実装は最も単純で、単一の型から別の型に変換します。クラス階層への変換など、より高度な処理を行うには、GenericConverter
および場合によっては ConditionalConverter
を実装できます。これらにより、from
および to
型記述子への完全なアクセスが可能になり、複雑な変換が可能になります。例: 変換のターゲットである Something
という抽象クラス(パラメーター型、チャネルデータ型など)がある場合、Thing1
および Thing
という 2 つの具体的な実装があり、どちらかに変換したい入力型に基づいて、GenericConverter
が適しています。詳細については、これらのインターフェースの Javadoc を参照してください。
コンバーターを実装したら、次の例に示すように、便利な名前空間サポートでコンバーターを登録できます。
<int:converter ref="sampleConverter"/>
<bean id="sampleConverter" class="foo.bar.TestConverter"/>
または、次の例に示すように、内部 Bean を使用できます。
<int:converter>
<bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>
Spring Integration 4.0 以降、次の例に示すように、アノテーションを使用して上記の構成を作成できます。
@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {
public Number convert(Boolean source) {
return source ? 1 : 0;
}
}
または、次の例に示すように、@Configuration
アノテーションを使用できます。
@Configuration
@EnableIntegration
public class ContextConfiguration {
@Bean
@IntegrationConverter
public SerializingConverter serializingConverter() {
return new SerializingConverter();
}
}
アプリケーションコンテキストを設定する場合、Spring Framework では 対照的に、 ただし、Spring
この場合、 |
コンテンツ型の変換
バージョン 5.0 以降、デフォルトでは、メソッド呼び出しメカニズムは org.springframework.messaging.handler.invocation.InvocableHandlerMethod
インフラストラクチャに基づいています。HandlerMethodArgumentResolver
実装(PayloadArgumentResolver
や MessageMethodArgumentResolver
など)は、MessageConverter
抽象化を使用して、受信 payload
をターゲットメソッドの引数型に変換できます。変換は、contentType
メッセージヘッダーに基づくことができます。この目的のために、Spring Integration は ConfigurableCompositeMessageConverter
を提供します。ConfigurableCompositeMessageConverter
は、登録されたコンバーターのリストに委譲され、そのうちの 1 つが非 null の結果を返すまで呼び出されます。デフォルトでは、このコンバーターは以下を提供します(厳密な順序で):
変換の目的と適切な contentType
値の詳細については、Javadoc(前のリストにリンクされています)を参照してください。ConfigurableCompositeMessageConverter
は、他の MessageConverter
実装(前述のデフォルトコンバーターを含むか除外する)で提供できるため、使用されます。また、次の例に示すように、アプリケーションコンテキストで適切な Bean として登録し、デフォルトコンバーターをオーバーライドすることもできます。
@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
List<MessageConverter> converters =
Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
new JavaSerializationMessageConverter());
return new ConfigurableCompositeMessageConverter(converters);
}
これらの 2 つの新しいコンバーターは、デフォルトの前にコンポジットに登録されます。ConfigurableCompositeMessageConverter
を使用することはできませんが、integrationArgumentResolverMessageConverter
という名前で Bean を登録することにより(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
プロパティを設定することにより)独自の MessageConverter
を提供することもできます。
SpZZ メソッド呼び出しを使用する場合、MessageConverter ベース(contentType ヘッダーを含む)変換は使用できません。この場合、上記のペイロード型変換での通常のクラスからクラスへの変換のみが使用可能です。 |
非同期ポーリング
ポーリングを非同期にしたい場合、ポーラーはオプションで、任意の TaskExecutor
Bean の既存のインスタンスを指す task-executor
属性を指定できます(Spring 3.0 は、task
名前空間を通じて便利な名前空間構成を提供します)。ただし、TaskExecutor
を使用してポーラーを構成する際に理解しておく必要がある特定の事項があります。
問題は、ポーラーと TaskExecutor
の 2 つの構成があることです。それらは互いに調和している必要があります。そうしないと、人為的なメモリリークが発生する可能性があります。
次の構成を検討してください。
<int:channel id="publishChannel">
<int:queue />
</int:channel>
<int:service-activator input-channel="publishChannel" ref="myService">
<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>
<task:executor id="taskExecutor" pool-size="20" />
上記の構成は、不一致の構成を示しています。
デフォルトでは、タスクエグゼキューターには無制限のタスクキューがあります。ポーラーは、すべてのスレッドがブロックされていても新しいタスクのスケジューリングを続け、新しいメッセージの到着またはタイムアウトの期限切れを待ちます。5 秒のタイムアウトでタスクを実行する 20 のスレッドがあるとすると、それらは 1 秒あたり 4 の割合で実行されます。ただし、新しいタスクは 1 秒あたり 20 のレートでスケジュールされているため、タスクエグゼキューターの内部キューは 1 秒あたり 16 のレートで増加するため(プロセスがアイドル状態のとき)、メモリリークが発生します。
これを処理する方法の 1 つは、タスクエグゼキューターの queue-capacity
属性を設定することです。0 でも妥当な値です。また、Task Executor の rejection-policy
属性を設定して(たとえば DISCARD
に)キューに入れられないメッセージの処理を指定することで管理できます。つまり、TaskExecutor
を構成する際に理解する必要がある特定の詳細があります。このテーマの詳細については、Spring リファレンスマニュアルの “タスクの実行とスケジューリング” を参照してください。
エンドポイント内部 Bean
多くのエンドポイントは複合 Bean です。これには、すべてのコンシューマーと、ポーリングされたすべての受信チャネルアダプターが含まれます。コンシューマー (ポーリングまたはイベント駆動) は、MessageHandler
に委譲します。ポーリングされたアダプターは、MessageSource
に委譲することでメッセージを取得します。多くの場合、実行時またはテスト時に構成を変更する場合など、委譲 Bean への参照を取得すると便利です。これらの Bean は、よく知られている名前で ApplicationContext
から取得できます。MessageHandler
インスタンスは、someConsumer.handler
に似た Bean ID でアプリケーションコンテキストに登録されます (「コンシューマー」はエンドポイントの id
属性の値です)。MessageSource
インスタンスは、somePolledAdapter.source
に似た Bean ID で登録されます ( "somePolledAdapter" はアダプターの ID です)。
上記は、フレームワークコンポーネント自体にのみ適用されます。次の例に示すように、代わりに内部 Bean 定義を使用できます。
<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
output-channel = "outChannel" method="foo">
<beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>
Bean は、宣言された内部 Bean のように扱われ、アプリケーションコンテキストに登録されません。他の方法でこの Bean にアクセスする場合は、id
を使用してトップレベルで宣言し、代わりに ref
属性を使用します。詳細については、Spring ドキュメントを参照してください。
エンドポイントのロール
バージョン 4.2 から、エンドポイントをロールに割り当てることができます。ロールを使用すると、エンドポイントをグループとして開始および停止できます。これは、リーダーシップの選択を使用する場合に特に役立ちます。リーダーシップの選択では、リーダーシップが許可または取り消されたときにエンドポイントのセットをそれぞれ開始または停止できます。このために、フレームワークは SmartLifecycleRoleController
Bean をアプリケーションコンテキストに IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER
という名前で登録します。ライフサイクルを制御する必要があるときはいつでも、この Bean を挿入するか @Autowired
できます:
<bean class="com.some.project.SomeLifecycleControl">
<property name="roleController" ref="integrationLifecycleRoleController"/>
</bean>
XML、Java 構成、プログラムを使用して、エンドポイントをロールに割り当てることができます。次の例は、XML を使用してエンドポイントロールを設定する方法を示しています。
<int:inbound-channel-adapter id="ica" channel="someChannel" expression="'foo'" role="cluster"
auto-startup="false">
<int:poller fixed-rate="60000" />
</int:inbound-channel-adapter>
次の例は、Java で作成された Bean のエンドポイントロールを構成する方法を示しています。
@Bean
@ServiceActivator(inputChannel = "sendAsyncChannel", autoStartup="false")
@Role("cluster")
public MessageHandler sendAsyncHandler() {
return // some MessageHandler
}
次の例は、Java のメソッドでエンドポイントロールを構成する方法を示しています。
@Payload("#args[0].toLowerCase()")
@Role("cluster")
public String handle(String payload) {
return payload.toUpperCase();
}
次の例は、Java で SmartLifecycleRoleController
を使用してエンドポイントロールを設定する方法を示しています。
@Autowired
private SmartLifecycleRoleController roleController;
...
this.roleController.addSmartLifeCycleToRole("cluster", someEndpoint);
...
次の例は、Java で IntegrationFlow
を使用してエンドポイントロールを設定する方法を示しています。
IntegrationFlow flow -> flow
.handle(..., e -> e.role("cluster"));
これらはそれぞれ、エンドポイントを cluster
ロールに追加します。
roleController.startLifecyclesInRole("cluster")
および対応する stop…
メソッドを呼び出すと、エンドポイントが開始および停止します。
SmartLifecycle を実装するオブジェクトは、エンドポイントだけでなく、プログラムで追加できます。 |
SmartLifecycleRoleController
は ApplicationListener<AbstractLeaderEvent>
を実装し、リーダーシップが許可または取り消された場合(一部の Bean がそれぞれ OnGrantedEvent
または OnRevokedEvent
を発行した場合)、構成された SmartLifecycle
オブジェクトを自動的に開始および停止します。
リーダーシップの選択を使用してコンポーネントを起動および停止する場合、auto-startup XML 属性(autoStartup Bean プロパティ)を false に設定して、コンテキストの初期化中にアプリケーションコンテキストがコンポーネントを起動しないようにすることが重要です。 |
バージョン 4.3.8 以降、SmartLifecycleRoleController
はいくつかのステータスメソッドを提供します。
public Collection<String> getRoles() (1)
public boolean allEndpointsRunning(String role) (2)
public boolean noEndpointsRunning(String role) (3)
public Map<String, Boolean> getEndpointsRunningStatus(String role) (4)
1 | 管理されているロールのリストを返します。 |
2 | ロール内のすべてのエンドポイントが実行されている場合、true を返します。 |
3 | ロール内のエンドポイントがどれも実行されていない場合、true を返します。 |
4 | component name : running status のマップを返します。コンポーネント名は通常、Bean 名です。 |
リーダーシップイベントの取り扱い
エンドポイントのグループは、それぞれリーダーシップの付与または取り消しに基づいて開始および停止できます。これは、共有リソースを単一のインスタンスのみで使用する必要があるクラスター化されたシナリオで役立ちます。この例は、共有ディレクトリをポーリングしているファイル受信チャネルアダプターです。(ファイルを読むを参照)。
リーダーの選出に参加し、リーダーが選ばれたとき、リーダーシップが取り消されたとき、リーダーになるためのリソースを獲得できなかったときに通知を受けるために、アプリケーションは「リーダーイニシエーター」と呼ばれるアプリケーションコンテキストにコンポーネントを作成します。通常、リーダーイニシエーターは SmartLifecycle
であるため、コンテキストの開始時に(オプションで)開始し、リーダーシップが変化すると通知を発行します。障害が発生した場合に特定のアクションを実行したい場合は、publishFailedEvents
を true
(バージョン 5.0 以降)に設定することにより、障害通知を受け取ることもできます。慣例により、コールバックを受け取る Candidate
を提供する必要があります。また、フレームワークによって提供される Context
オブジェクトを介してリーダーシップを取り消すこともできます。コードは、o.s.i.leader.event.AbstractLeaderEvent
インスタンス(OnGrantedEvent
および OnRevokedEvent
のスーパークラス)をリッスンし、それに応じて応答することもできます(たとえば、SmartLifecycleRoleController
を使用して)。イベントには、Context
オブジェクトへの参照が含まれています。次のリストは、Context
インターフェースの定義を示しています。
public interface Context {
boolean isLeader();
void yield();
String getRole();
}
バージョン 5.0.6 以降、コンテキストは候補者のロールへの参照を提供します。
Spring Integration は、LockRegistry
抽象化に基づいたリーダーイニシエーターの基本的な実装を提供します。これを使用するには、次の例に示すように、Bean としてインスタンスを作成する必要があります。
@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) {
return new LockRegistryLeaderInitiator(locks);
}
ロックレジストリが正しく実装されている場合、リーダーは 1 人だけです。ロックレジストリが、有効期限が切れたり壊れたときに例外(理想的には InterruptedException
)をスローするロックも提供する場合、リーダーレス期間の期間は、ロック実装の固有の待機時間で許される限り短くすることができます。デフォルトでは、busyWaitMillis
プロパティは、ロックが不完全であり、ロックを再度取得しようとすると期限切れになることがわかっている(より一般的な)場合に CPU の枯渇を防ぐために、レイテンシを追加します。
Zookeeper を使用するリーダーの選出とイベントの詳細については、"Zookeeper リーダーシップイベントの取り扱い" を参照してください。