メッセージエンドポイント

この章の最初の部分では、いくつかの背景理論を取り上げ、Spring Integration のさまざまなメッセージングコンポーネントを駆動する基盤となる API についてかなり明らかにします。この情報は、バックグラウンドで何が起こっているのかを本当に理解したい場合に役立ちます。ただし、さまざまな要素の単純化された名前空間ベースの構成で起動して実行する場合は、当面はエンドポイント名前空間のサポートに進んでください。

概要で記述されていたように、メッセージエンドポイントは、さまざまなメッセージングコンポーネントをチャネルに接続するロールを果たします。次のいくつかの章では、メッセージを消費するさまざまなコンポーネントについて説明します。これらの一部は、返信メッセージを送信することもできます。メッセージの送信は非常に簡単です。メッセージチャンネルで前述したように、メッセージチャネルにメッセージを送信できます。ただし、受信はもう少し複雑です。主な理由は、ポーリングコンシューマー (英語) イベント駆動型コンシューマー (英語) の 2 種類のコンシューマーがあることです。

2 つのうち、イベント駆動型のコンシューマーははるかに単純です。別のポーラースレッドを管理およびスケジュールする必要がない場合、これらは本質的にコールバックメソッドを持つリスナーです。Spring Integration のサブスクライブ可能なメッセージチャネルの 1 つに接続する場合、このシンプルなオプションは非常に効果的です。ただし、バッファリング可能なポーリング可能なメッセージチャネルに接続する場合、一部のコンポーネントはポーリングスレッドをスケジュールおよび管理する必要があります。Spring Integration は、これら 2 つの型のコンシューマーに対応するために、2 つの異なるエンドポイント実装を提供します。コンシューマー自体はコールバックインターフェースを実装するだけで済みます。ポーリングが必要な場合、エンドポイントはコンシューマーインスタンスのコンテナーとして機能します。利点は、メッセージ駆動型 Bean をホストするためにコンテナーを使用することと似ていますが、これらのコンシューマーは ApplicationContext 内で実行される Spring 管理のオブジェクトであるため、Spring 自身の MessageListener コンテナーにより似ています。

メッセージハンドラー

Spring Integration の MessageHandler インターフェースは、フレームワーク内の多くのコンポーネントによって実装されています。つまり、これはパブリック API の一部ではなく、通常、MessageHandler を直接実装することはありません。それにもかかわらず、消費されたメッセージを実際に処理するためにメッセージコンシューマーによって使用されるため、この戦略インターフェースを認識することは、コンシューマーの全体的なロールを理解する上で役立ちます。インターフェースは次のように定義されます。

public interface MessageHandler {

    void handleMessage(Message<?> message);

}

シンプルであるにもかかわらず、このインターフェースは、次の章で説明するほとんどのコンポーネント(ルーター、トランス、スプリッター、アグリゲーター、サービスアクティベーターなど)の基盤を提供します。これらのコンポーネントはそれぞれ、処理するメッセージに対して非常に異なる機能を実行しますが、実際にメッセージを受信するための要件は同じであり、ポーリングとイベント駆動型の動作の選択も同じです。Spring Integration は、これらのコールバックベースのハンドラーをホストし、メッセージチャンネルに接続できるようにする 2 つのエンドポイント実装を提供します。

イベント駆動型のコンシューマー

2 つの方が単純なので、最初にイベント駆動型のコンシューマーエンドポイントについて説明します。SubscribableChannel インターフェースが subscribe() メソッドを提供し、そのメソッドが MessageHandler パラメーターを受け入れることを思い出してください(SubscribableChannel に示すように)。次のリストは、subscribe メソッドの定義を示しています。

subscribableChannel.subscribe(messageHandler);

チャネルにサブスクライブされたハンドラーはそのチャネルをアクティブにポーリングする必要がないため、これはイベント駆動型コンシューマーであり、Spring Integration によって提供される実装は、次の例に示すように SubscribableChannel および MessageHandler を受け入れます。

SubscribableChannel channel = context.getBean("subscribableChannel", SubscribableChannel.class);

EventDrivenConsumer consumer = new EventDrivenConsumer(channel, exampleHandler);

ポーリングコンシューマー

Spring Integration は PollingConsumer も提供し、次の例に示すように、チャネルが PollableChannel を実装する必要があることを除いて、同じ方法でインスタンス化できます。

PollableChannel channel = context.getBean("pollableChannel", PollableChannel.class);

PollingConsumer consumer = new PollingConsumer(channel, exampleHandler);
ポーリングコンシューマーの詳細については、ポーラーおよびチャンネルアダプターを参照してください。

ポーリングコンシューマーには、他にも多くの構成オプションがあります。例: トリガーは必須プロパティです。次の例は、トリガーを設定する方法を示しています。

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setTrigger(new PeriodicTrigger(30, TimeUnit.SECONDS));

PeriodicTrigger は通常、単純な間隔(ミリ秒単位)で定義されますが、initialDelay プロパティとブール fixedRate プロパティもサポートします(デフォルトは false です。つまり、固定遅延はありません)。次の例では、両方のプロパティを設定します。

PeriodicTrigger trigger = new PeriodicTrigger(1000);
trigger.setInitialDelay(5000);
trigger.setFixedRate(true);

前の例の 3 つの設定の結果は、5 秒待機してから 1 秒ごとにトリガーするトリガーです。

CronTrigger には有効な cron 式が必要です。詳細については、Javadoc を参照してください。次の例では、新しい CronTrigger を設定します。

CronTrigger trigger = new CronTrigger("*/10 * * * * MON-FRI");

前の例で定義されたトリガーの結果は、月曜日から金曜日までの 10 秒ごとにトリガーするトリガーです。

トリガーに加えて、maxMessagesPerPoll および receiveTimeout の 2 つの他のポーリング関連の構成プロパティを指定できます。次の例は、これら 2 つのプロパティを設定する方法を示しています。

PollingConsumer consumer = new PollingConsumer(channel, handler);

consumer.setMaxMessagesPerPoll(10);
consumer.setReceiveTimeout(5000);

maxMessagesPerPoll プロパティは、特定のポーリング操作内で受信するメッセージの最大数を指定します。これは、null が返されるか、最大値に達するまで、ポーラーが待機せずに receive() を呼び出し続けることを意味します。例: ポーラーに 10 秒間隔のトリガーと 25 の maxMessagesPerPoll 設定があり、キューに 100 個のメッセージがあるチャネルをポーリングしている場合、100 個のメッセージすべてを 40 秒以内に取得できます。25 を取得し、10 秒間待機し、次の 25 を取得します。maxMessagesPerPoll が負の値で構成されている場合、MessageSource.receive() は、null を返すまで、単一のポーリングサイクル内で呼び出されます。バージョン 5.5 以降、0 値には特別な意味があります。MessageSource.receive() 呼び出しを完全にスキップします。これは、後で maxMessagesPerPoll がゼロ以外の n 値に変更されるまで、このポーリングエンドポイントの一時停止と見なされる場合があります。コントロールバス経由。

receiveTimeout プロパティは、受信操作を呼び出すときに使用可能なメッセージがない場合にポーラーが待機する時間を指定します。例: 表面上は似ているが実際にはまったく異なる 2 つのオプションを検討します。最初のオプションには 5 秒の間隔トリガーと 50 ミリ秒の受信タイムアウトがあり、2 番目のオプションには 50 ミリ秒の間隔トリガーと 5 の受信タイムアウトがあります。秒。最初のメッセージは、チャネルに到着してから最大 4950 ミリ秒後にメッセージを受信する場合があります(ポーリングコールの 1 つが返された直後にメッセージが到着した場合)。一方、2 番目の構成では、メッセージが 50 ミリ秒を超えることはありません。違いは、2 番目のオプションでは待機するスレッドが必要なことです。ただし、その結果、到着したメッセージにはるかに迅速に応答できます。「ロングポーリング」と呼ばれるこの手法は、ポーリングされたソースでイベント駆動型の動作をエミュレートするために使用できます。

次の例に示すように、ポーリングコンシューマーは Spring TaskExecutor に委譲することもできます。

PollingConsumer consumer = new PollingConsumer(channel, handler);

TaskExecutor taskExecutor = context.getBean("exampleExecutor", TaskExecutor.class);
consumer.setTaskExecutor(taskExecutor);

さらに、PollingConsumer には adviceChain というプロパティがあります。このプロパティを使用すると、トランザクションを含む追加の横断的関心事を処理するための AOP アドバイスの List を指定できます。これらのアドバイスは、doPoll() メソッドに適用されます。詳細については、AOP アドバイスチェーンおよびエンドポイント名前空間のサポートのトランザクションサポートに関するセクションを参照してください。

前の例は、依存関係のルックアップを示しています。ただし、これらのコンシューマーはほとんどの場合 Spring Bean 定義として構成されていることに注意してください。実際、Spring Integration は、チャネルの型に基づいて適切なコンシューマー型を作成する ConsumerEndpointFactoryBean と呼ばれる FactoryBean も提供します。また、Spring Integration はこれらの詳細をさらに隠すために完全な XML 名前空間をサポートしています。このガイドでは、名前空間ベースの構成が各コンポーネント型の導入に合わせて取り上げられています。

MessageHandler 実装の多くは、応答メッセージを生成できます。前述のように、メッセージの送信はメッセージの受信と比較すると簡単です。それでも、送信される応答メッセージのタイミングと数は、ハンドラーの型によって異なります。例: アグリゲーターは、多数のメッセージの到着を待機し、多くの場合、スプリッターのダウンストリームコンシューマーとして構成されます。スプリッターは、処理する各メッセージに対して複数の応答を生成できます。名前空間の構成を使用する場合、すべての詳細を厳密に知る必要はありません。ただし、これらのコンポーネントのいくつかが共通の基本クラスである AbstractReplyProducingMessageHandler を共有し、setOutputChannel(..) メソッドを提供することを知っておく価値があるかもしれません。

エンドポイント名前空間のサポート

このリファレンスマニュアル全体で、ルーター、トランスフォーマー、サービスアクティベーターなどのエンドポイント要素の特定の構成例を見つけることができます。これらのほとんどは input-channel 属性をサポートし、多くは output-channel 属性をサポートします。解析後、これらのエンドポイント要素は、参照される input-channel の型に応じて、それぞれ PollableChannel または SubscribableChannel の PollingConsumer または EventDrivenConsumer のいずれかのインスタンスを生成します。チャネルがポーリング可能な場合、ポーリングの動作はエンドポイント要素の poller サブ要素とその属性に基づいています。

以下のリストは、poller で使用可能なすべての構成オプションをリストしています。

<int:poller cron=""                                  (1)
            default="false"                          (2)
            error-channel=""                         (3)
            fixed-delay=""                           (4)
            fixed-rate=""                            (5)
            id=""                                    (6)
            max-messages-per-poll=""                 (7)
            receive-timeout=""                       (8)
            ref=""                                   (9)
            task-executor=""                         (10)
            time-unit="MILLISECONDS"                 (11)
            trigger="">                              (12)
            <int:advice-chain />                     (13)
            <int:transactional />                    (14)
</int:poller>
1Cron 式を使用してポーラーを構成する機能を提供します。基礎となる実装は org.springframework.scheduling.support.CronTrigger を使用します。この属性が設定されている場合、次の属性のいずれも指定する必要はありません: fixed-delaytriggerfixed-rateref
2 この属性を true に設定することにより、グローバルデフォルトポーラーを 1 つだけ定義できます。アプリケーションコンテキストで複数のデフォルトポーラーが定義されている場合、例外が発生します。PollableChannel (PollingConsumer)または明示的に構成されたポーラーを持たない SourcePollingChannelAdapter に接続されたエンドポイントは、グローバルデフォルトポーラーを使用します。デフォルトは false です。オプション。
3 このポーラーの呼び出しで障害が発生した場合にエラーメッセージが送信されるチャネルを識別します。例外を完全に抑制するには、nullChannel への参照を提供できます。オプション。
4 固定遅延トリガーは、内部で PeriodicTrigger を使用します。time-unit 属性を使用しない場合、指定された値はミリ秒単位で表されます。この属性が設定されている場合、次の属性のいずれも指定する必要はありません: fixed-ratetriggercronref
5 固定レートトリガーは、内部で PeriodicTrigger を使用します。time-unit 属性を使用しない場合、指定された値はミリ秒単位で表されます。この属性が設定されている場合、次の属性のいずれも指定する必要はありません: fixed-delaytriggercronref
6 型 org.springframework.integration.scheduling.PollerMetadata の、ポーラーの基礎となる Bean 定義を参照する ID。id 属性は、デフォルトのポーラー(default="true")でない限り、最上位のポーラー要素に必要です。
7 詳細については、受信チャネルアダプターの構成を参照してください。指定しない場合、デフォルト値はコンテキストによって異なります。PollingConsumer を使用する場合、この属性はデフォルトで -1 になります。ただし、SourcePollingChannelAdapter を使用する場合、max-messages-per-poll 属性はデフォルトで 1 になります。オプション。
8 基になるクラス PollerMetadata に値が設定されます。指定しない場合、デフォルトは 1000(ミリ秒)です。オプション。
9 別のトップレベルのポーラーへの Bean 参照。ref 属性は、最上位の poller 要素に存在してはなりません。ただし、この属性が設定されている場合は、次の属性を指定する必要はありません: fixed-ratetriggercronfixed-delay
10 カスタムタスクエグゼキューターを参照する機能を提供します。詳細については、TaskExecutor サポートを参照してください。オプション。
11 この属性は、基になる org.springframework.scheduling.support.PeriodicTrigger の java.util.concurrent.TimeUnit 列挙値を指定します。この属性は fixed-delay または fixed-rate 属性と組み合わせてのみ使用できます。cron または trigger 参照属性のいずれかと組み合わせると、エラーが発生します。PeriodicTrigger でサポートされる最小の粒度はミリ秒です。使用可能なオプションはミリ秒と秒のみです。この値が提供されない場合、fixed-delay または fixed-rate 値はミリ秒として解釈されます。基本的に、この列挙型は秒ベースの間隔トリガー値に便利です。時間ごと、日ごと、月ごとの設定では、代わりに cron トリガーを使用することをお勧めします。
12org.springframework.scheduling.Trigger インターフェースを実装する Spring 構成の Bean への参照。ただし、この属性が設定されている場合は、次の属性を指定する必要はありません: fixed-delayfixed-ratecronref。オプション。
13 追加の横断的関心事を処理するための追加の AOP アドバイスを指定できます。詳細については、トランザクションサポートを参照してください。オプション。
14 ポーラーはトランザクション対応にすることができます。詳細については、AOP アドバイスチェーンを参照してください。オプション。
サンプル

1 秒間隔の単純な間隔ベースのポーラーは、次のように構成できます。

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller fixed-rate="1000"/>
</int:transformer>

fixed-rate 属性を使用する代わりに、fixed-delay 属性を使用することもできます。

Cron 式に基づくポーラーの場合、次の例に示すように、代わりに cron 属性を使用します。

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller cron="*/10 * * * * MON-FRI"/>
</int:transformer>

入力チャネルが PollableChannel である場合、ポーラー構成が必要です。具体的には、前述のように、trigger は PollingConsumer クラスの必須プロパティです。ポーリングコンシューマーエンドポイントの構成の poller サブ要素を省略すると、例外がスローされる場合があります。また、ポーリング不可能なチャネルに接続されている要素でポーラーを構成しようとすると、例外がスローされる場合があります。

トップレベルのポーラーを作成することもできます。この場合、次の例に示すように、ref 属性のみが必要です。

<int:poller id="weekdayPoller" cron="*/10 * * * * MON-FRI"/>

<int:transformer input-channel="pollable"
    ref="transformer"
    output-channel="output">
    <int:poller ref="weekdayPoller"/>
</int:transformer>
ref 属性は、内部ポーラー定義でのみ許可されます。最上位のポーラーでこの属性を定義すると、アプリケーションコンテキストの初期化中に構成例外がスローされます。
グローバルデフォルトポーラー

設定をさらに簡素化するために、グローバルデフォルトポーラーを定義できます。XML DSL の単一のトップレベルポーラーコンポーネントでは、default 属性が true に設定されている場合があります。この場合、Java 構成の場合、PollerMetadata.DEFAULT_POLLER 名の PollerMetadata Bean を宣言する必要があります。その場合、入力チャネルに PollableChannel があり、同じ ApplicationContext 内で定義され、明示的に構成された poller がないエンドポイントは、そのデフォルトを使用します。次の例は、そのようなポーラーとそれを使用するトランスフォーマーを示しています。

Java DSL
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

// No 'poller' attribute because there is a default global poller
@Bean
public IntegrationFlow transformFlow(MyTransformer transformer) {
    return IntegrationFlows.from(MessageChannels.queue("pollable"))
                           .transform(transformer) // No 'poller' attribute because there is a default global poller
                           .channel("output")
                           .get();
}
Java
@Bean(PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setMaxMessagesPerPoll(5);
    pollerMetadata.setTrigger(new PeriodicTrigger(3000));
    return pollerMetadata;
}

@Bean
public QueueChannel pollable() {
   return new QueueChannel();
}
// No 'poller' attribute because there is a default global poller
@Transformer(inputChannel = "pollable", outputChannel = "output")
public Object transform(Object payload) {
    ...
}
Kotlin DSL
@Bean(PollerMetadata.DEFAULT_POLLER)
fun defaultPoller() =
    PollerMetadata()
        .also {
            it.maxMessagesPerPoll = 5
            it.trigger = PeriodicTrigger(3000)
        }

@Bean
fun convertFlow() =
    integrationFlow(MessageChannels.queue("pollable")) {
    	transform(transformer) // No 'poller' attribute because there is a default global poller
    	channel("output")
    }
XML
<int:poller id="defaultPoller" default="true" max-messages-per-poll="5" fixed-delay="3000"/>

<!-- No <poller/> sub-element is necessary, because there is a default -->
<int:transformer input-channel="pollable"
                 ref="transformer"
                 output-channel="output"/>
トランザクションサポート

Spring Integration は、ポーラーのトランザクションサポートも提供するため、各送受信操作をアトミックな作業単位として実行できます。ポーラーのトランザクションを構成するには、<transactional/> サブ要素を追加します。次の例は、使用可能な属性を示しています。

<int:poller fixed-delay="1000">
    <int:transactional transaction-manager="txManager"
                       propagation="REQUIRED"
                       isolation="REPEATABLE_READ"
                       timeout="10000"
                       read-only="false"/>
</int:poller>

詳しくは、ポーラートランザクションサポートを参照してください。

AOP アドバイスチェーン

Spring トランザクションサポートは、ポーラーによって開始されたメッセージフローのトランザクション動作を処理する TransactionInterceptor (AOP アドバイス)を使用したプロキシメカニズムに依存するため、ポーラーに関連する他のクロスカット動作を処理するために追加のアドバイスを提供する必要がある場合があります。そのために、poller は、MethodInterceptor インターフェースを実装するクラスにさらにアドバイスを追加できる advice-chain 要素を定義します。次の例は、poller の advice-chain を定義する方法を示しています。

<int:service-activator id="advicedSa" input-channel="goodInputWithAdvice" ref="testBean"
		method="good" output-channel="output">
	<int:poller max-messages-per-poll="1" fixed-rate="10000">
		 <int:advice-chain>
			<ref bean="adviceA" />
			<beans:bean class="org.something.SampleAdvice" />
			<ref bean="txAdvice" />
		</int:advice-chain>
	</int:poller>
</int:service-activator>

MethodInterceptor インターフェースの実装方法の詳細については、Spring Framework リファレンスガイドの AOP セクションを参照してください。アドバイスチェーンは、トランザクション構成を持たないポーラーにも適用でき、ポーラーによって開始されたメッセージフローの動作を強化できます。

アドバイスチェーンを使用する場合、<transactional/> 子要素は指定できません。代わりに、<tx:advice/> Bean を宣言し、<advice-chain/> に追加します。構成の詳細については、ポーラートランザクションサポートを参照してください。
TaskExecutor サポート

ポーリングスレッドは、Spring の TaskExecutor 抽象化の任意のインスタンスによって実行できます。これにより、エンドポイントまたはエンドポイントのグループの同時実行が可能になります。Spring 3.0 以降、コア Spring Framework には task 名前空間があり、その <executor/> 要素は単純なスレッドプールエグゼキューターの作成をサポートしています。その要素は、プールサイズやキュー容量などの一般的な同時実行設定の属性を受け入れます。スレッドプーリングエグゼキューターを構成すると、負荷がかかった状態でのエンドポイントのパフォーマンスに大きな違いが生じます。エンドポイントのパフォーマンスは考慮すべき主要な要素の 1 つであるため、これらの設定は各エンドポイントで使用できます(他の主要な要素は、エンドポイントがサブスクライブするチャネルの予想ボリュームです)。XML 名前空間のサポートを使用して構成されたポーリングエンドポイントの同時実行を有効にするには、<poller/> 要素で task-executor 参照を提供し、次の例に示す 1 つ以上のプロパティを提供します。

<int:poller task-executor="pool" fixed-rate="1000"/>

<task:executor id="pool"
               pool-size="5-25"
               queue-capacity="20"
               keep-alive="120"/>

タスクエグゼキューターを提供しない場合、コンシューマーのハンドラーは呼び出し元のスレッドで呼び出されます。通常、呼び出し元はデフォルトの TaskScheduler です(タスクスケジューラの構成を参照)。また、task-executor 属性は、Bean 名を指定することにより、Spring の TaskExecutor インターフェースの実装への参照を提供できることに留意してください。前に示した executor 要素は、便宜上提供されています。

ポーリングコンシューマーのバックグラウンドセクションで前述したように、イベント駆動型の動作をエミュレートするようにポーリングコンシューマーを構成することもできます。受信タイムアウトが長く、トリガーの間隔が短いため、ポーリングされたメッセージソースであっても、到着したメッセージに対して非常にタイムリーな反応を保証できます。これは、タイムアウトのあるブロッキング待機呼び出しがあるソースにのみ適用されることに注意してください。例: ファイルポーラーはブロックしません。各 receive() 呼び出しはすぐに戻り、新しいファイルが含まれるかどうかが決まります。ポーラーに長い receive-timeout が含まれている場合でも、そのようなシナリオではその値が使用されることはありません。一方、Spring Integration 独自のキューベースのチャネルを使用する場合、タイムアウト値は参加する機会があります。次の例は、ポーリングコンシューマーがほぼ瞬時にメッセージを受信する方法を示しています。

<int:service-activator input-channel="someQueueChannel"
    output-channel="output">
    <int:poller receive-timeout="30000" fixed-rate="10"/>

</int:service-activator>

このアプローチを使用しても、オーバーヘッドはあまり多くありません。内部的には、待機スレッドに過ぎず、スラッシング、無限 while ループなどの CPU リソース使用量をほとんど必要としません。

実行時のポーリングレートの変更

fixed-delay または fixed-rate 属性でポーラーを構成する場合、デフォルトの実装は PeriodicTrigger インスタンスを使用します。PeriodicTrigger は、コア Spring Framework の一部です。コンストラクター引数としてのみ間隔を受け入れます。実行時に変更することはできません。

ただし、org.springframework.scheduling.Trigger インターフェースの独自の実装を定義できます。PeriodicTrigger を出発点として使用することもできます。次に、間隔(期間)に setter を追加するか、トリガー自体に独自の調整ロジックを埋め込むこともできます。period プロパティは、nextExecutionTime への各呼び出しで使用され、次のポーリングをスケジュールします。ポーラー内でこのカスタムトリガーを使用するには、アプリケーションコンテキストでカスタムトリガーの Bean 定義を宣言し、カスタムトリガー Bean インスタンスを参照する trigger 属性を使用して、ポーラー構成に依存関係を注入します。これで、トリガー Bean への参照を取得し、ポーリング間のポーリング間隔を変更できます。

例については、Spring Integration サンプル [GitHub] (英語) プロジェクトを参照してください。dynamic-poller というサンプルが含まれています。これは、カスタムトリガーを使用し、実行時にポーリング間隔を変更する機能を示しています。

このサンプルは、org.springframework.scheduling.Trigger (Javadoc) インターフェースを実装するカスタムトリガーを提供します。サンプルのトリガーは、Spring の PeriodicTrigger (Javadoc) 実装に基づいています。ただし、カスタムトリガーのフィールドは最終的なものではなく、プロパティには明示的な getter と setter があり、実行時にポーリング期間を動的に変更できます。

ただし、Trigger メソッドは nextExecutionTime() であるため、動的トリガーへの変更は、既存の構成に基づいて次のポーリングまで有効にならないことに注意することが重要です。現在設定されている次の実行時間の前にトリガーを強制的に起動することはできません。

ペイロード型変換

このリファレンスマニュアル全体を通して、メッセージまたは任意の Object を入力パラメーターとして受け入れるさまざまなエンドポイントの特定の構成および実装例を確認することもできます。Object の場合、このようなパラメーターは、メッセージペイロードまたはペイロードまたはヘッダーの一部にマップされます(Spring 式言語を使用する場合)。ただし、エンドポイントメソッドの入力パラメーターの型がペイロードまたはその部分の型と一致しない場合があります。このシナリオでは、型変換を実行する必要があります。Spring Integration は、integrationConversionService という名前の変換サービス Bean の独自のインスタンス内に(Spring ConversionService を使用して)型コンバーターを登録するための便利な方法を提供します。その Bean は、Spring Integration インフラストラクチャーを使用して最初のコンバーターが定義されるとすぐに自動的に作成されます。コンバーターを登録するには、org.springframework.core.convert.converter.Converterorg.springframework.core.convert.converter.GenericConverterorg.springframework.core.convert.converter.ConverterFactory を実装できます。

Converter 実装は最も単純で、単一の型から別の型に変換します。クラス階層への変換など、より高度な処理を行うには、GenericConverter および場合によっては ConditionalConverter を実装できます。これらにより、from および to 型記述子への完全なアクセスが可能になり、複雑な変換が可能になります。例: 変換のターゲットである Something という抽象クラス(パラメーター型、チャネルデータ型など)がある場合、Thing1 および Thing という 2 つの具体的な実装があり、どちらかに変換したい入力型に基づいて、GenericConverter が適しています。詳細については、これらのインターフェースの Javadoc を参照してください。

コンバーターを実装したら、次の例に示すように、便利な名前空間サポートでコンバーターを登録できます。

<int:converter ref="sampleConverter"/>

<bean id="sampleConverter" class="foo.bar.TestConverter"/>

または、次の例に示すように、内部 Bean を使用できます。

<int:converter>
    <bean class="o.s.i.config.xml.ConverterParserTests$TestConverter3"/>
</int:converter>

Spring Integration 4.0 以降、次の例に示すように、アノテーションを使用して上記の構成を作成できます。

@Component
@IntegrationConverter
public class TestConverter implements Converter<Boolean, Number> {

	public Number convert(Boolean source) {
		return source ? 1 : 0;
	}

}

または、次の例に示すように、@Configuration アノテーションを使用できます。

@Configuration
@EnableIntegration
public class ContextConfiguration {

	@Bean
	@IntegrationConverter
	public SerializingConverter serializingConverter() {
		return new SerializingConverter();
	}

}

アプリケーションコンテキストを設定する場合、Spring Framework では conversionService Bean を追加できます(ConversionService の構成の章を参照)。このサービスは、必要に応じて、Bean の作成および構成中に適切な変換を実行するために使用されます。

対照的に、integrationConversionService はランタイム変換に使用されます。これらの用途はまったく異なります。Bean コンストラクターの引数とプロパティを接続するときに使用するコンバーターは、データ型チャネル、ペイロード型トランスフォーマーなど内のメッセージに対する Spring Integration 式評価の実行時に使用すると、意図しない結果を生成する場合があります。

ただし、Spring conversionService を Spring Integration integrationConversionService として使用する場合は、次の例に示すように、アプリケーションコンテキストでエイリアスを設定できます。

<alias name="conversionService" alias="integrationConversionService"/>

この場合、conversionService が提供するコンバーターは Spring Integration ランタイム変換に使用できます。

コンテンツ型の変換

バージョン 5.0 以降、デフォルトでは、メソッド呼び出しメカニズムは org.springframework.messaging.handler.invocation.InvocableHandlerMethod インフラストラクチャに基づいています。HandlerMethodArgumentResolver 実装(PayloadArgumentResolver や MessageMethodArgumentResolver など)は、MessageConverter 抽象化を使用して、受信 payload をターゲットメソッドの引数型に変換できます。変換は、contentType メッセージヘッダーに基づくことができます。この目的のために、Spring Integration は ConfigurableCompositeMessageConverter を提供します。ConfigurableCompositeMessageConverter は、登録されたコンバーターのリストに委譲され、そのうちの 1 つが非 null の結果を返すまで呼び出されます。デフォルトでは、このコンバーターは以下を提供します(厳密な順序で):

変換の目的と適切な contentType 値の詳細については、Javadoc(前のリストにリンクされています)を参照してください。ConfigurableCompositeMessageConverter は、他の MessageConverter 実装(前述のデフォルトコンバーターを含むか除外する)で提供できるため、使用されます。また、次の例に示すように、アプリケーションコンテキストで適切な Bean として登録し、デフォルトコンバーターをオーバーライドすることもできます。

@Bean(name = IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME)
public ConfigurableCompositeMessageConverter compositeMessageConverter() {
    List<MessageConverter> converters =
        Arrays.asList(new MarshallingMessageConverter(jaxb2Marshaller()),
                 new JavaSerializationMessageConverter());
    return new ConfigurableCompositeMessageConverter(converters);
}

これらの 2 つの新しいコンバーターは、デフォルトの前にコンポジットに登録されます。ConfigurableCompositeMessageConverter を使用することはできませんが、integrationArgumentResolverMessageConverter という名前で Bean を登録することにより(IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME プロパティを設定することにより)独自の MessageConverter を提供することもできます。

SpZZ メソッド呼び出しを使用する場合、MessageConverter ベース(contentType ヘッダーを含む)変換は使用できません。この場合、上記のペイロード型変換での通常のクラスからクラスへの変換のみが使用可能です。

非同期ポーリング

ポーリングを非同期にしたい場合、ポーラーはオプションで、任意の TaskExecutor Bean の既存のインスタンスを指す task-executor 属性を指定できます(Spring 3.0 は、task 名前空間を通じて便利な名前空間構成を提供します)。ただし、TaskExecutor を使用してポーラーを構成する際に理解しておく必要がある特定の事項があります。

問題は、ポーラーと TaskExecutor の 2 つの構成があることです。それらは互いに調和している必要があります。そうしないと、人為的なメモリリークが発生する可能性があります。

次の構成を検討してください。

<int:channel id="publishChannel">
    <int:queue />
</int:channel>

<int:service-activator input-channel="publishChannel" ref="myService">
	<int:poller receive-timeout="5000" task-executor="taskExecutor" fixed-rate="50" />
</int:service-activator>

<task:executor id="taskExecutor" pool-size="20" />

上記の構成は、不一致の構成を示しています。

デフォルトでは、タスクエグゼキューターには無制限のタスクキューがあります。ポーラーは、すべてのスレッドがブロックされていても新しいタスクのスケジューリングを続け、新しいメッセージの到着またはタイムアウトの期限切れを待ちます。5 秒のタイムアウトでタスクを実行する 20 のスレッドがあるとすると、それらは 1 秒あたり 4 の割合で実行されます。ただし、新しいタスクは 1 秒あたり 20 のレートでスケジュールされているため、タスクエグゼキューターの内部キューは 1 秒あたり 16 のレートで増加するため(プロセスがアイドル状態のとき)、メモリリークが発生します。

これを処理する方法の 1 つは、タスクエグゼキューターの queue-capacity 属性を設定することです。0 でも妥当な値です。また、Task Executor の rejection-policy 属性を設定して(たとえば DISCARD に)キューに入れられないメッセージの処理を指定することで管理できます。つまり、TaskExecutor を構成する際に理解する必要がある特定の詳細があります。このテーマの詳細については、Spring リファレンスマニュアルの “タスクの実行とスケジューリング” を参照してください。

エンドポイント内部 Bean

多くのエンドポイントは複合 Bean です。これには、すべてのコンシューマーと、ポーリングされたすべての受信チャネルアダプターが含まれます。コンシューマー (ポーリングまたはイベント駆動) は、MessageHandler に委譲します。ポーリングされたアダプターは、MessageSource に委譲することでメッセージを取得します。多くの場合、実行時またはテスト時に構成を変更する場合など、委譲 Bean への参照を取得すると便利です。これらの Bean は、よく知られている名前で ApplicationContext から取得できます。MessageHandler インスタンスは、someConsumer.handler に似た Bean ID でアプリケーションコンテキストに登録されます (「コンシューマー」はエンドポイントの id 属性の値です)。MessageSource インスタンスは、somePolledAdapter.source に似た Bean ID で登録されます ( "somePolledAdapter" はアダプターの ID です)。

上記は、フレームワークコンポーネント自体にのみ適用されます。次の例に示すように、代わりに内部 Bean 定義を使用できます。

<int:service-activator id="exampleServiceActivator" input-channel="inChannel"
            output-channel = "outChannel" method="foo">
    <beans:bean class="org.foo.ExampleServiceActivator"/>
</int:service-activator>

Bean は、宣言された内部 Bean のように扱われ、アプリケーションコンテキストに登録されません。他の方法でこの Bean にアクセスする場合は、id を使用してトップレベルで宣言し、代わりに ref 属性を使用します。詳細については、Spring ドキュメントを参照してください。

エンドポイントのロール

バージョン 4.2 から、エンドポイントをロールに割り当てることができます。ロールを使用すると、エンドポイントをグループとして開始および停止できます。これは、リーダーシップの選択を使用する場合に特に役立ちます。リーダーシップの選択では、リーダーシップが許可または取り消されたときにエンドポイントのセットをそれぞれ開始または停止できます。このために、フレームワークは SmartLifecycleRoleController Bean をアプリケーションコンテキストに IntegrationContextUtils.INTEGRATION_LIFECYCLE_ROLE_CONTROLLER という名前で登録します。ライフサイクルを制御する必要があるときはいつでも、この Bean を挿入するか @Autowired できます:

<bean class="com.some.project.SomeLifecycleControl">
    <property name="roleController" ref="integrationLifecycleRoleController"/>
</bean>

XML、Java 構成、プログラムを使用して、エンドポイントをロールに割り当てることができます。次の例は、XML を使用してエンドポイントロールを設定する方法を示しています。

<int:inbound-channel-adapter id="ica" channel="someChannel" expression="'foo'" role="cluster"
        auto-startup="false">
    <int:poller fixed-rate="60000" />
</int:inbound-channel-adapter>

次の例は、Java で作成された Bean のエンドポイントロールを構成する方法を示しています。

@Bean
@ServiceActivator(inputChannel = "sendAsyncChannel", autoStartup="false")
@Role("cluster")
public MessageHandler sendAsyncHandler() {
    return // some MessageHandler
}

次の例は、Java のメソッドでエンドポイントロールを構成する方法を示しています。

@Payload("#args[0].toLowerCase()")
@Role("cluster")
public String handle(String payload) {
    return payload.toUpperCase();
}

次の例は、Java で SmartLifecycleRoleController を使用してエンドポイントロールを設定する方法を示しています。

@Autowired
private SmartLifecycleRoleController roleController;
...
    this.roleController.addSmartLifeCycleToRole("cluster", someEndpoint);
...

次の例は、Java で IntegrationFlow を使用してエンドポイントロールを設定する方法を示しています。

IntegrationFlow flow -> flow
        .handle(..., e -> e.role("cluster"));

これらはそれぞれ、エンドポイントを cluster ロールに追加します。

roleController.startLifecyclesInRole("cluster") および対応する stop…​ メソッドを呼び出すと、エンドポイントが開始および停止します。

SmartLifecycle を実装するオブジェクトは、エンドポイントだけでなく、プログラムで追加できます。

SmartLifecycleRoleController は ApplicationListener<AbstractLeaderEvent> を実装し、リーダーシップが許可または取り消された場合(一部の Bean がそれぞれ OnGrantedEvent または OnRevokedEvent を発行した場合)、構成された SmartLifecycle オブジェクトを自動的に開始および停止します。

リーダーシップの選択を使用してコンポーネントを起動および停止する場合、auto-startup XML 属性(autoStartup Bean プロパティ)を false に設定して、コンテキストの初期化中にアプリケーションコンテキストがコンポーネントを起動しないようにすることが重要です。

バージョン 4.3.8 以降、SmartLifecycleRoleController はいくつかのステータスメソッドを提供します。

public Collection<String> getRoles() (1)

public boolean allEndpointsRunning(String role) (2)

public boolean noEndpointsRunning(String role) (3)

public Map<String, Boolean> getEndpointsRunningStatus(String role) (4)
1 管理されているロールのリストを返します。
2 ロール内のすべてのエンドポイントが実行されている場合、true を返します。
3 ロール内のエンドポイントがどれも実行されていない場合、true を返します。
4component name : running status のマップを返します。コンポーネント名は通常、Bean 名です。

リーダーシップイベントの取り扱い

エンドポイントのグループは、それぞれリーダーシップの付与または取り消しに基づいて開始および停止できます。これは、共有リソースを単一のインスタンスのみで使用する必要があるクラスター化されたシナリオで役立ちます。この例は、共有ディレクトリをポーリングしているファイル受信チャネルアダプターです。(ファイルを読むを参照)。

リーダーの選出に参加し、リーダーが選ばれたとき、リーダーシップが取り消されたとき、リーダーになるためのリソースを獲得できなかったときに通知を受けるために、アプリケーションは「リーダーイニシエーター」と呼ばれるアプリケーションコンテキストにコンポーネントを作成します。通常、リーダーイニシエーターは SmartLifecycle であるため、コンテキストの開始時に(オプションで)開始し、リーダーシップが変化すると通知を発行します。障害が発生した場合に特定のアクションを実行したい場合は、publishFailedEvents を true (バージョン 5.0 以降)に設定することにより、障害通知を受け取ることもできます。慣例により、コールバックを受け取る Candidate を提供する必要があります。また、フレームワークによって提供される Context オブジェクトを介してリーダーシップを取り消すこともできます。コードは、o.s.i.leader.event.AbstractLeaderEvent インスタンス(OnGrantedEvent および OnRevokedEvent のスーパークラス)をリッスンし、それに応じて応答することもできます(たとえば、SmartLifecycleRoleController を使用して)。イベントには、Context オブジェクトへの参照が含まれています。次のリストは、Context インターフェースの定義を示しています。

public interface Context {

	boolean isLeader();

	void yield();

	String getRole();

}

バージョン 5.0.6 以降、コンテキストは候補者のロールへの参照を提供します。

Spring Integration は、LockRegistry 抽象化に基づいたリーダーイニシエーターの基本的な実装を提供します。これを使用するには、次の例に示すように、Bean としてインスタンスを作成する必要があります。

@Bean
public LockRegistryLeaderInitiator leaderInitiator(LockRegistry locks) {
    return new LockRegistryLeaderInitiator(locks);
}

ロックレジストリが正しく実装されている場合、リーダーは 1 人だけです。ロックレジストリが、有効期限が切れたり壊れたときに例外(理想的には InterruptedException)をスローするロックも提供する場合、リーダーレス期間の期間は、ロック実装の固有の待機時間で許される限り短くすることができます。デフォルトでは、busyWaitMillis プロパティは、ロックが不完全であり、ロックを再度取得しようとすると期限切れになることがわかっている(より一般的な)場合に CPU の枯渇を防ぐために、レイテンシを追加します。

Zookeeper を使用するリーダーの選出とイベントの詳細については、"Zookeeper リーダーシップイベントの取り扱い" を参照してください。