メッセージエンドポイント

この章の最初の部分では、いくつかの背景理論を取り上げ、Spring Integration のさまざまなメッセージングコンポーネントを駆動する基盤となる API についてかなり明らかにします。この情報は、バックグラウンドで何が起こっているのかを本当に理解したい場合に役立ちます。ただし、さまざまな要素の単純化された名前空間ベースの構成で起動して実行する場合は、当面はエンドポイント名前空間のサポートに進んでください。

概要で記述されていたように、メッセージエンドポイントは、さまざまなメッセージングコンポーネントをチャネルに接続するロールを果たします。次のいくつかの章では、メッセージを消費するさまざまなコンポーネントについて説明します。これらの一部は、返信メッセージを送信することもできます。メッセージの送信は非常に簡単です。メッセージチャンネルで前述したように、メッセージチャネルにメッセージを送信できます。ただし、受信はもう少し複雑です。主な理由は、ポーリングコンシューマー (英語) イベント駆動型コンシューマー (英語) の 2 種類のコンシューマーがあることです。

2 つのうち、イベント駆動型のコンシューマーははるかに単純です。別のポーラースレッドを管理およびスケジュールする必要がない場合、これらは本質的にコールバックメソッドを持つリスナーです。Spring Integration のサブスクライブ可能なメッセージチャネルの 1 つに接続する場合、このシンプルなオプションは非常に効果的です。ただし、バッファリング可能なポーリング可能なメッセージチャネルに接続する場合、一部のコンポーネントはポーリングスレッドをスケジュールおよび管理する必要があります。Spring Integration は、これら 2 つの型のコンシューマーに対応するために、2 つの異なるエンドポイント実装を提供します。コンシューマー自体はコールバックインターフェースを実装するだけで済みます。ポーリングが必要な場合、エンドポイントはコンシューマーインスタンスのコンテナーとして機能します。利点は、メッセージ駆動型 Bean をホストするためにコンテナーを使用することと似ていますが、これらのコンシューマーは ApplicationContext 内で実行される Spring 管理のオブジェクトであるため、Spring 自身の MessageListener コンテナーにより似ています。

メッセージハンドラー

Spring Integration の MessageHandler インターフェースは、フレームワーク内の多くのコンポーネントによって実装されています。つまり、これはパブリック API の一部ではなく、通常、MessageHandler を直接実装することはありません。それにもかかわらず、消費されたメッセージを実際に処理するためにメッセージコンシューマーによって使用されるため、この戦略インターフェースを認識することは、コンシューマーの全体的なロールを理解する上で役立ちます。インターフェースは次のように定義されます。

public interface MessageHandler {

    void handleMessage(Message<?> message);

}

シンプルであるにもかかわらず、このインターフェースは、次の章で説明するほとんどのコンポーネント(ルーター、トランス、スプリッター、アグリゲーター、サービスアクティベーターなど)の基盤を提供します。これらのコンポーネントはそれぞれ、処理するメッセージに対して非常に異なる機能を実行しますが、実際にメッセージを受信するための要件は同じであり、ポーリングとイベント駆動型の動作の選択も同じです。Spring Integration は、これらのコールバックベースのハンドラーをホストし、メッセージチャンネルに接続できるようにする 2 つのエンドポイント実装を提供します。

イベント駆動型のコンシューマー

2 つの方が単純なので、最初にイベント駆動型のコンシューマーエンドポイントについて説明します。SubscribableChannel インターフェースが subscribe() メソッドを提供し、そのメソッドが MessageHandler パラメーターを受け入れることを思い出してください(SubscribableChannel に示すように)。次のリストは、subscribe メソッドの定義を示しています。

subscribableChannel.subscribe(messageHandler);

チャネルにサブスクライブされたハンドラーはそのチャネルをアクティブにポーリングする必要がないため、これはイベント駆動型コンシューマーであり、Spring Integration によって提供される実装は、次の例に示すように SubscribableChannel および MessageHandler を受け入れます。

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);

ポーリングコンシューマー

Spring Integration は PollingConsumer も提供し、次の例に示すように、チャネルが PollableChannel を実装する必要があることを除いて、同じ方法でインスタンス化できます。

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
ポーリングコンシューマーの詳細については、チャンネルアダプターおよびチャンネルアダプターを参照してください。

ポーリングコンシューマーには他にも多くの構成オプションがあります。次の例は、トリガーを設定する方法を示しています。

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(Duration.ofSeconds(30)));

PeriodicTrigger は通常、単純な間隔 (Duration) で定義されますが、initialDelay プロパティとブール値の fixedRate プロパティもサポートします (デフォルトは false です。つまり、固定遅延はありません)。次の例では、両方のプロパティを設定します。

PeriodicTrigger trigger = new PeriodicTrigger(Duration.ofSeconds(1));
trigger.setInitialDelay(Duration.ofSeconds(5));
trigger.setFixedRate(true);

前の例の 3 つの設定の結果は、5 秒待機してから 1 秒ごとにトリガーするトリガーです。

CronTrigger には有効な cron 式が必要です。詳細については、Javadoc を参照してください。次の例では、新しい CronTrigger を設定します。

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

前の例で定義されたトリガーの結果は、月曜日から金曜日までの 10 秒ごとにトリガーするトリガーです。

ポーリングエンドポイントのデフォルトのトリガーは、1 秒の固定遅延期間を持つ PeriodicTrigger インスタンスです。

トリガーに加えて、maxMessagesPerPoll および receiveTimeout の 2 つの他のポーリング関連の構成プロパティを指定できます。次の例は、これら 2 つのプロパティを設定する方法を示しています。

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);

maxMessagesPerPoll プロパティは、特定のポーリング操作内で受信するメッセージの最大数を指定します。つまり、ポーラーは、null が返されるか、最大値に達するまで、待機せずに receive() を呼び出し続けます。例: ポーラーに 10 秒間隔のトリガーがあり、maxMessagesPerPoll 設定が 25 で、キューに 100 件のメッセージがあるチャネルをポーリングしている場合、100 件のメッセージすべてを 40 秒以内に取得できます。25 件を取得し、10 秒待機してから次の 25 件を取得する、というように繰り返します。maxMessagesPerPoll が負の値で構成されている場合は、null を返すまで、1 回のポーリングサイクル内で MessageSource.receive() が呼び出されます。バージョン 5.5 以降、0 値には特別な意味があります。MessageSource.receive() 呼び出しを完全にスキップします。これは、後で maxMessagesPerPoll が制御バスなどを介してゼロ以外の値に変更されるまで、このポーリングエンドポイントを一時停止するものと見なすことができます。

receiveTimeout プロパティは、ポーラーが受信操作を呼び出すときに使用可能なメッセージがない場合に待機する時間を指定します。例: 表面的には似ているように見えますが、実際にはまったく異なる 2 つのオプションを考えてみましょう。最初のオプションの間隔トリガーは 5 秒、受信タイムアウトは 50 ミリ秒ですが、2 番目のオプションの間隔トリガーは 50 ミリ秒、受信タイムアウトは 5 です。秒。最初のメッセージは、チャネルで受け入れられてから最大 4950 ミリ秒遅れてメッセージを受信する可能性があります (ポーリング呼び出しの 1 つが返された直後にメッセージが到着した場合)。一方、2 番目の構成では、メッセージを 50 ミリ秒以上見逃すことはありません。違いは、2 番目のオプションではスレッドが待機する必要があることです。ただし、その結果、受信したメッセージに対してより迅速に応答できるようになります。「ロングポーリング」として知られるこの手法は、ポーリングされたソース上でイベント駆動の動作をエミュレートするために使用できます。

次の例に示すように、ポーリングコンシューマーは Spring TaskExecutor に委譲することもできます。

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);

さらに、PollingConsumer には adviceChain と呼ばれるプロパティがあります。このプロパティを使用すると、トランザクションを含む追加の横断的な問題を処理するための AOP アドバイスの List を指定できます。これらのアドバイスは、doPoll() メソッドに適用されます。さらに詳しい情報については、AOP アドバイスチェーンおよびエンドポイント名前空間のサポートのトランザクションサポートに関するセクションを参照してください。@Poller アノテーション Javadoc およびそれぞれのメッセージングアノテーションのサポートセクションも参照してください。Java DSL は、それぞれの Pollers ファクトリで .poller() エンドポイント構成オプションも提供します。

前の例は、依存関係のルックアップを示しています。ただし、これらのコンシューマーはほとんどの場合 Spring Bean 定義として構成されていることに注意してください。実際、Spring Integration は、チャネルの型に基づいて適切なコンシューマー型を作成する ConsumerEndpointFactoryBean と呼ばれる FactoryBean も提供します。また、Spring Integration はこれらの詳細をさらに隠すために完全な XML 名前空間をサポートしています。このガイドでは、名前空間ベースの構成が各コンポーネント型の導入に合わせて取り上げられています。

MessageHandler 実装の多くは、応答メッセージを生成できます。前述したように、メッセージの送信は、メッセージの受信に比べれば簡単です。ただし、いつ、どれだけの応答メッセージが送信されるかは、ハンドラーの種類によって異なります。例: アグリゲーターは多数のメッセージが到着するのを待ち、多くの場合、処理するメッセージごとに複数の応答を生成できるスプリッターのダウンストリームコンシューマーとして構成されます。名前空間構成を使用する場合、厳密にすべての詳細を知っている必要はありません。ただし、これらのコンポーネントのいくつかは共通の基本クラスである AbstractReplyProducingMessageHandler を共有し、それが setOutputChannel(..) メソッドを提供することを知っておく価値はあります。

エンドポイント名前空間のサポート

このリファレンスマニュアル全体で、ルーター、トランスフォーマー、サービスアクティベーターなどのエンドポイント要素の特定の構成例を見つけることができます。これらのほとんどは input-channel 属性をサポートし、多くは output-channel 属性をサポートします。解析後、これらのエンドポイント要素は、参照される input-channel の型に応じて、それぞれ PollableChannel または SubscribableChannel の PollingConsumer または EventDrivenConsumer のいずれかのインスタンスを生成します。チャネルがポーリング可能な場合、ポーリングの動作はエンドポイント要素の poller サブ要素とその属性に基づいています。

以下は、poller で使用可能なすべての構成オプションのリストです。

<int:poller cron=""                                  (1)
            default="false"                          (2)
            error-channel=""                         (3)
            fixed-delay=""                           (4)
            fixed-rate=""                            (5)
            initial-delay=""                         (6)
            id=""                                    (7)
            max-messages-per-poll=""                 (8)
            receive-timeout=""                       (9)
            ref=""                                   (10)
            task-executor=""                         (11)
            time-unit="MILLISECONDS"                 (12)
            trigger="">                              (13)
            <int:advice-chain />                     (14)
            <int:transactional />                    (15)
</int:poller>
1Cron 式を使用してポーラーを構成する機能を提供します。基礎となる実装は org.springframework.scheduling.support.CronTrigger を使用します。この属性が設定されている場合、次の属性のいずれも指定する必要はありません: fixed-delaytriggerfixed-rateref
2 この属性を true に設定することにより、グローバルデフォルトポーラーを 1 つだけ定義できます。アプリケーションコンテキストで複数のデフォルトポーラーが定義されている場合、例外が発生します。PollableChannel (PollingConsumer)または明示的に構成されたポーラーを持たない SourcePollingChannelAdapter に接続されたエンドポイントは、グローバルデフォルトポーラーを使用します。デフォルトは false です。オプション。
3 このポーラーの呼び出しで障害が発生した場合にエラーメッセージが送信されるチャネルを識別します。例外を完全に抑制するには、nullChannel への参照を提供できます。オプション。
4 固定遅延トリガーは内部で PeriodicTrigger を使用します。数値は time-unit 形式、または期間形式 (バージョン 6.2 以降) にすることもできます。PT10SP1D。この属性が設定されている場合は、fixed-ratetriggercronref 属性をいずれも指定する必要はありません。
5 固定レートトリガーは内部で PeriodicTrigger を使用します。数値は time-unit 形式、または期間形式 (バージョン 6.2 以降) にすることもできます。PT10SP1D。この属性が設定されている場合は、fixed-delaytriggercronref 属性をいずれも指定する必要はありません。
6PeriodicTrigger の内部での初期遅延 (バージョン 6.2 以降)。数値は time-unit 形式であるか、期間形式 (例: PT10SP1D
7 型 org.springframework.integration.scheduling.PollerMetadata の、ポーラーの基礎となる Bean 定義を参照する ID。id 属性は、デフォルトのポーラー(default="true")でない限り、最上位のポーラー要素に必要です。
8 詳細については、受信チャネルアダプターの構成を参照してください。指定しない場合、デフォルト値はコンテキストによって異なります。PollingConsumer を使用する場合、この属性はデフォルトで -1 になります。ただし、SourcePollingChannelAdapter を使用する場合、max-messages-per-poll 属性はデフォルトで 1 になります。オプション。
9 基になるクラス PollerMetadata に値が設定されます。指定しない場合、デフォルトは 1000(ミリ秒)です。オプション。
10 別のトップレベルのポーラーへの Bean 参照。ref 属性は、最上位の poller 要素に存在してはなりません。ただし、この属性が設定されている場合は、次の属性を指定する必要はありません: fixed-ratetriggercronfixed-delay
11 カスタムタスクエグゼキューターを参照する機能を提供します。詳細については、TaskExecutor サポートを参照してください。オプション。
12 この属性は、基になる org.springframework.scheduling.support.PeriodicTrigger の java.util.concurrent.TimeUnit 列挙値を指定します。この属性は fixed-delay または fixed-rate 属性と組み合わせてのみ使用できます。cron または trigger 参照属性のいずれかと組み合わせると、エラーが発生します。PeriodicTrigger でサポートされる最小の粒度はミリ秒です。使用可能なオプションはミリ秒と秒のみです。この値が提供されない場合、fixed-delay または fixed-rate 値はミリ秒として解釈されます。基本的に、この列挙型は秒ベースの間隔トリガー値に便利です。時間ごと、日ごと、月ごとの設定では、代わりに cron トリガーを使用することをお勧めします。
13org.springframework.scheduling.Trigger インターフェースを実装する Spring 構成の Bean への参照。ただし、この属性が設定されている場合は、次の属性を指定する必要はありません: fixed-delayfixed-ratecronref。オプション。
14 追加の横断的関心事を処理するための追加の AOP アドバイスを指定できます。詳細については、トランザクションを参照してください。オプション。
15 ポーラーはトランザクション対応にすることができます。詳細については、AOP アドバイスチェーンを参照してください。オプション。

サンプル

1 秒間隔の単純な間隔ベースのポーラーは、次のように構成できます。

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller fixed-rate="1000"/>
</int:transformer>

fixed-rate 属性を使用する代わりに、fixed-delay 属性を使用することもできます。

Cron 式に基づくポーラーの場合、次の例に示すように、代わりに cron 属性を使用します。

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>

入力チャネルが PollableChannel である場合、ポーラー構成が必要です。具体的には、前述のように、trigger は PollingConsumer クラスの必須プロパティです。ポーリングコンシューマーエンドポイントの構成の poller サブ要素を省略すると、例外がスローされる場合があります。また、ポーリング不可能なチャネルに接続されている要素でポーラーを構成しようとすると、例外がスローされる場合があります。

トップレベルのポーラーを作成することもできます。この場合、次の例に示すように、ref 属性のみが必要です。

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller ref="weekdayPoller"/>
</int:transformer>
ref 属性は、内部ポーラー定義でのみ許可されます。最上位のポーラーでこの属性を定義すると、アプリケーションコンテキストの初期化中に構成例外がスローされます。

グローバルデフォルトポーラー

設定をさらに簡素化するために、グローバルデフォルトポーラーを定義できます。XML DSL の単一のトップレベルポーラーコンポーネントでは、default 属性が true に設定されている場合があります。この場合、Java 構成の場合、PollerMetadata.DEFAULT_POLLER 名の PollerMetadata Bean を宣言する必要があります。その場合、入力チャネルに PollableChannel があり、同じ ApplicationContext 内で定義され、明示的に構成された poller がないエンドポイントは、そのデフォルトを使用します。次の例は、そのようなポーラーとそれを使用するトランスフォーマーを示しています。

  • Java DSL

  • Java

  • Kotlin DSL

  • XML

@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
    return IntegrationFlow.from(MessageChannels.queue("pollable"))
                           .transform(transformer) // No 'poller' attribute because there is a default global poller
                           .channel("output")
                           .get();
}
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

@Bean
public QueueChannel pollable() {
   return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
    ...
}
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
    PollerMetadata()
        .also {
            it.maxMessagesPerPoll = 5
            it.trigger = PeriodicTrigger(3000)
        }

@Bean
fun convertFlow() =
    integrationFlow(MessageChannels.queue("pollable")) {
    	transform(transformer) // No 'poller' attribute because there is a default global poller
    	channel("output")
    }
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>

<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
                 ref="transformer"
                 output-channel="output"/>

トランザクションサポート

Spring Integration は、ポーラーのトランザクションサポートも提供するため、各送受信操作をアトミックな作業単位として実行できます。ポーラーのトランザクションを構成するには、<transactional/> サブ要素を追加します。次の例は、使用可能な属性を示しています。

<int:poller fixed-delay="1000">
    <int:transactional transaction-manager="txManager"
                       propagation="REQUIRED"
                       isolation="REPEATABLE_READ"
                       timeout="10000"
                       read-only="false"/>
</int:poller>

詳しくは、ポーラートランザクションサポートを参照してください。

AOP アドバイスチェーン

Spring トランザクションサポートは、ポーラーによって開始されたメッセージフローのトランザクション動作を処理する TransactionInterceptor (AOP アドバイス)を使用したプロキシメカニズムに依存するため、ポーラーに関連する他のクロスカット動作を処理するために追加のアドバイスを提供する必要がある場合があります。そのために、poller は、MethodInterceptor インターフェースを実装するクラスにさらにアドバイスを追加できる advice-chain 要素を定義します。次の例は、poller の advice-chain を定義する方法を示しています。

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
		method="good" output-channel="output">
	<int:poller max-messages-per-poll="1" fixed-rate="10000">
		 <int:advice-chain>
			<ref bean="adviceA" />
			<beans:bean class="org.something.SampleAdvice" />
			<ref bean="txAdvice" />
		</int:advice-chain>
	</int:poller>
</int:service-activator>

MethodInterceptor インターフェースの実装方法の詳細については、Spring Framework リファレンスガイドの AOP セクションを参照してください。アドバイスチェーンは、トランザクション構成を持たないポーラーにも適用でき、ポーラーによって開始されたメッセージフローの動作を強化できます。

アドバイスチェーンを使用する場合、<transactional/> 子要素は指定できません。代わりに、<tx:advice/> Bean を宣言し、<advice-chain/> に追加します。構成の詳細については、ポーラートランザクションサポートを参照してください。

TaskExecutor サポート

ポーリングスレッドは、Spring の TaskExecutor 抽象化の任意のインスタンスによって実行できます。これにより、エンドポイントまたはエンドポイントのグループの同時実行が可能になります。Spring 3.0 以降、コア Spring Framework には task 名前空間があり、その <executor/> 要素は単純なスレッドプールエグゼキューターの作成をサポートしています。その要素は、プールサイズやキュー容量などの一般的な同時実行設定の属性を受け入れます。スレッドプーリングエグゼキューターを構成すると、負荷がかかった状態でのエンドポイントのパフォーマンスに大きな違いが生じます。エンドポイントのパフォーマンスは考慮すべき主要な要素の 1 つであるため、これらの設定は各エンドポイントで使用できます(他の主要な要素は、エンドポイントがサブスクライブするチャネルの予想ボリュームです)。XML 名前空間のサポートを使用して構成されたポーリングエンドポイントの同時実行を有効にするには、<poller/> 要素で task-executor 参照を提供し、次の例に示す 1 つ以上のプロパティを提供します。

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

タスクエグゼキューターを提供しない場合、コンシューマーのハンドラーは呼び出し元のスレッドで呼び出されます。通常、呼び出し元はデフォルトの TaskScheduler です(タスクスケジューラの構成を参照)。また、task-executor 属性は、Bean 名を指定することにより、Spring の TaskExecutor インターフェースの実装への参照を提供できることに留意してください。前に示した executor 要素は、便宜上提供されています。

ポーリングコンシューマーのバックグラウンドセクションで前述したように、イベント駆動型の動作をエミュレートするようにポーリングコンシューマーを構成することもできます。受信タイムアウトが長く、トリガーの間隔が短いため、ポーリングされたメッセージソースであっても、到着したメッセージに対して非常にタイムリーな反応を保証できます。これは、タイムアウトのあるブロッキング待機呼び出しがあるソースにのみ適用されることに注意してください。例: ファイルポーラーはブロックしません。各 receive() 呼び出しはすぐに戻り、新しいファイルが含まれるかどうかが決まります。ポーラーに長い receive-timeout が含まれている場合でも、そのようなシナリオではその値が使用されることはありません。一方、Spring Integration 独自のキューベースのチャネルを使用する場合、タイムアウト値は参加する機会があります。次の例は、ポーリングコンシューマーがほぼ瞬時にメッセージを受信する方法を示しています。

<int:service-activator input-channel="someQueueChannel"
    output-channel="output">
    <int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>

このアプローチを使用しても、オーバーヘッドはあまり多くありません。内部的には、待機スレッドに過ぎず、スラッシング、無限 while ループなどの CPU リソース使用量をほとんど必要としません。

実行時のポーリングレートの変更

fixed-delay または fixed-rate 属性でポーラーを構成する場合、デフォルトの実装は PeriodicTrigger インスタンスを使用します。PeriodicTrigger は、コア Spring Framework の一部です。コンストラクター引数としてのみ間隔を受け入れます。実行時に変更することはできません。

ただし、org.springframework.scheduling.Trigger インターフェースの独自の実装を定義できます。PeriodicTrigger を出発点として使用することもできます。次に、間隔(期間)に setter を追加するか、トリガー自体に独自の調整ロジックを埋め込むこともできます。period プロパティは、nextExecutionTime への各呼び出しで使用され、次のポーリングをスケジュールします。ポーラー内でこのカスタムトリガーを使用するには、アプリケーションコンテキストでカスタムトリガーの Bean 定義を宣言し、カスタムトリガー Bean インスタンスを参照する trigger 属性を使用して、ポーラー構成に依存関係を注入します。これで、トリガー Bean への参照を取得し、ポーリング間のポーリング間隔を変更できます。

例については、Spring Integration サンプル [GitHub] (英語) プロジェクトを参照してください。dynamic-poller というサンプルが含まれています。これは、カスタムトリガーを使用し、実行時にポーリング間隔を変更する機能を示しています。

このサンプルは、org.springframework.scheduling.Trigger (Javadoc) インターフェースを実装するカスタムトリガーを提供します。サンプルのトリガーは、Spring の PeriodicTrigger (Javadoc) 実装に基づいています。ただし、カスタムトリガーのフィールドは最終的なものではなく、プロパティには明示的な getter と setter があり、実行時にポーリング期間を動的に変更できます。

ただし、Trigger メソッドは nextExecutionTime() であるため、動的トリガーへの変更は、既存の構成に基づいて次のポーリングまで有効にならないことに注意することが重要です。現在設定されている次の実行時間の前にトリガーを強制的に起動することはできません。

ペイロード型変換

このリファレンスマニュアル全体を通して、メッセージまたは任意の Object を入力パラメーターとして受け入れるさまざまなエンドポイントの特定の構成および実装例を確認することもできます。Object の場合、このようなパラメーターは、メッセージペイロードまたはペイロードまたはヘッダーの一部にマップされます(Spring 式言語を使用する場合)。ただし、エンドポイントメソッドの入力パラメーターの型がペイロードまたはその部分の型と一致しない場合があります。このシナリオでは、型変換を実行する必要があります。Spring Integration は、integrationConversionService という名前の変換サービス Bean の独自のインスタンス内に(Spring ConversionService を使用して)型コンバーターを登録するための便利な方法を提供します。その Bean は、Spring Integration インフラストラクチャーを使用して最初のコンバーターが定義されるとすぐに自動的に作成されます。コンバーターを登録するには、org.springframework.core.convert.converter.Converterorg.springframework.core.convert.converter.GenericConverterorg.springframework.core.convert.converter.ConverterFactory を実装できます。

Converter 実装は最も単純で、単一の型から別の型に変換します。クラス階層への変換など、より高度な処理を行うには、GenericConverter および場合によっては ConditionalConverter を実装できます。これらにより、from および to 型記述子への完全なアクセスが可能になり、複雑な変換が可能になります。例: 変換のターゲットである Something という抽象クラス(パラメーター型、チャネルデータ型など)がある場合、Thing1 および Thing という 2 つの具体的な実装があり、どちらかに変換したい入力型に基づいて、GenericConverter が適しています。詳細については、これらのインターフェースの Javadoc を参照してください。

コンバーターを実装したら、次の例に示すように、便利な名前空間サポートでコンバーターを登録できます。

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>

または、次の例に示すように、内部 Bean を使用できます。

<int:converter>
    <bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>

Spring Integration 4.0 以降、次の例に示すように、アノテーションを使用して上記の構成を作成できます。

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

	public Number convert(Boolean source) {
		return source ? 1 : 0;
	}

}

または、次の例に示すように、@Configuration アノテーションを使用できます。

@Configuration
@EnableIntegration
public class ContextConfiguration {

	@Bean
	@IntegrationConverter
	public SerializingConverter serializingConverter() {
		return new SerializingConverter();
	}

}

アプリケーションコンテキストを設定する場合、Spring Framework では conversionService Bean を追加できます(ConversionService の構成の章を参照)。このサービスは、必要に応じて、Bean の作成および構成中に適切な変換を実行するために使用されます。

対照的に、integrationConversionService はランタイム変換に使用されます。これらの用途はまったく異なります。Bean コンストラクターの引数とプロパティを接続するときに使用するコンバーターは、データ型チャネル、ペイロード型トランスフォーマーなど内のメッセージに対する Spring Integration 式評価の実行時に使用すると、意図しない結果を生成する場合があります。

ただし、Spring conversionService を Spring Integration integrationConversionService として使用する場合は、次の例に示すように、アプリケーションコンテキストでエイリアスを設定できます。

<alias name="conversionService" alias="integrationConversionService"/>

この場合、conversionService が提供するコンバーターは Spring Integration ランタイム変換に使用できます。

コンテンツ型の変換

バージョン 5.0 以降、デフォルトでは、メソッド呼び出しメカニズムは org.springframework.messaging.handler.invocation.InvocableHandlerMethod インフラストラクチャに基づいています。HandlerMethodArgumentResolver 実装(PayloadArgumentResolver や MessageMethodArgumentResolver など)は、MessageConverter 抽象化を使用して、受信 payload をターゲットメソッドの引数型に変換できます。変換は、contentType メッセージヘッダーに基づくことができます。この目的のために、Spring Integration は ConfigurableCompositeMessageConverter を提供します。ConfigurableCompositeMessageConverter は、登録されたコンバーターのリストに委譲され、そのうちの 1 つが非 null の結果を返すまで呼び出されます。デフォルトでは、このコンバーターは以下を提供します(厳密な順序で):

それらの目的と変換に適した contentType 値の詳細については、Javadoc (前のリストにリンクされています) を参照してください。ConfigurableCompositeMessageConverter が使用されるのは、前述のデフォルトコンバーターを含め、または除外して、他の MessageConverter 実装と共に提供できるためです。次の例に示すように、アプリケーションコンテキストで適切な Bean として登録して、デフォルトのコンバーターをオーバーライドすることもできます。

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
    List<MessageConverter> converters =
        Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
                 new JavaSerializationMessageConverter());
    return new ConfigurableCompositeMessageConverter(converters);
}

これらの 2 つの新しいコンバーターは、デフォルトの前にコンポジットに登録されます。ConfigurableCompositeMessageConverter を使用することはできませんが、integrationArgumentResolverMessageConverter という名前で Bean を登録することにより(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME プロパティを設定することにより)独自の MessageConverter を提供することもできます。

SpZZ メソッド呼び出しを使用する場合、MessageConverter ベース(contentType ヘッダーを含む)変換は使用できません。この場合、上記のペイロード型変換での通常のクラスからクラスへの変換のみが使用可能です。

非同期ポーリング

ポーリングを非同期にしたい場合、ポーラーはオプションで、任意の TaskExecutor Bean の既存のインスタンスを指す task-executor 属性を指定できます(Spring 3.0 は、task 名前空間を通じて便利な名前空間構成を提供します)。ただし、TaskExecutor を使用してポーラーを構成する際に理解しておく必要がある特定の事項があります。

問題は、ポーラーと TaskExecutor の 2 つの構成があることです。それらは互いに調和している必要があります。そうしないと、人為的なメモリリークが発生する可能性があります。

次の構成を検討してください。

<int:channel id="publishChannel">
    <int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
	<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />

上記の構成は、不一致の構成を示しています。

デフォルトでは、タスクエグゼキューターには無制限のタスクキューがあります。ポーラーは、すべてのスレッドがブロックされていても新しいタスクのスケジューリングを続け、新しいメッセージの到着またはタイムアウトの期限切れを待ちます。5 秒のタイムアウトでタスクを実行する 20 のスレッドがあるとすると、それらは 1 秒あたり 4 の割合で実行されます。ただし、新しいタスクは 1 秒あたり 20 のレートでスケジュールされているため、タスクエグゼキューターの内部キューは 1 秒あたり 16 のレートで増加するため(プロセスがアイドル状態のとき)、メモリリークが発生します。

これを処理する方法の 1 つは、タスクエグゼキューターの queue-capacity 属性を設定することです。0 でも妥当な値です。また、Task Executor の rejection-policy 属性を設定して(たとえば DISCARD に)キューに入れられないメッセージの処理を指定することで管理できます。つまり、TaskExecutor を構成する際に理解する必要がある特定の詳細があります。このテーマの詳細については、Spring リファレンスマニュアルの “タスクの実行とスケジューリング” を参照してください。

エンドポイント内部 Bean

多くのエンドポイントは複合 Bean です。これには、すべてのコンシューマーと、ポーリングされたすべての受信チャネルアダプターが含まれます。コンシューマー (ポーリングまたはイベント駆動) は、MessageHandler に委譲します。ポーリングされたアダプターは、MessageSource に委譲することでメッセージを取得します。多くの場合、実行時またはテスト時に構成を変更する場合など、委譲 Bean への参照を取得すると便利です。これらの Bean は、よく知られている名前で ApplicationContext から取得できます。MessageHandler インスタンスは、someConsumer.handler に似た Bean ID でアプリケーションコンテキストに登録されます (「コンシューマー」はエンドポイントの id 属性の値です)。MessageSource インスタンスは、somePolledAdapter.source に似た Bean ID で登録されます ( "somePolledAdapter" はアダプターの ID です)。

上記は、フレームワークコンポーネント自体にのみ適用されます。次の例に示すように、代わりに内部 Bean 定義を使用できます。

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="foo">
    <beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>

Bean は、宣言された内部 Bean のように扱われ、アプリケーションコンテキストに登録されません。他の方法でこの Bean にアクセスする場合は、id を使用してトップレベルで宣言し、代わりに ref 属性を使用します。詳細については、Spring ドキュメントを参照してください。