メッセージの公開

(アスペクト指向プログラミング)AOP メッセージ発行機能を使用すると、メソッド呼び出しの副産物としてメッセージを作成および送信できます。例: コンポーネントがあり、このコンポーネントの状態が変わるたびに、メッセージで通知されることを想定しています。このような通知を送信する最も簡単な方法は、専用チャネルにメッセージを送信することですが、オブジェクトの状態を変更するメソッド呼び出しをメッセージ送信プロセスにどのように接続し、通知メッセージをどのように構成する必要がありますか? AOP メッセージ発行機能は、これらの責任を構成主導のアプローチで処理します。

メッセージ公開構成

Spring Integration は、XML 構成とアノテーション駆動(Java)構成の 2 つのアプローチを提供します。

@Publisher アノテーションを使用したアノテーション駆動型の構成

アノテーション駆動型のアプローチでは、@Publisher アノテーションを使用して任意のメソッドにアノテーションを付けて、'channel' 属性を指定できます。バージョン 5.1 以降、この機能を有効にするには、@Configuration クラスで @EnablePublisher アノテーションを使用する必要があります。詳細については、構成と @EnableIntegration を参照してください。メッセージは、メソッド呼び出しの戻り値から作成され、'channel' 属性によって指定されたチャネルに送信されます。メッセージ構造をさらに管理するために、@Payload アノテーションと @Header アノテーションの両方を組み合わせて使用することもできます。

内部的には、Spring Integration のこのメッセージパブリッシング機能は、PublisherAnnotationAdvisor と Spring Expression Language(SpEL)を定義することで Spring AOP の両方を使用し、パブリッシュする Message の構造をかなり柔軟に制御できます。

PublisherAnnotationAdvisor は、次の変数を定義およびバインドします。

  • #return: 戻り値にバインドして、その値またはその属性を参照できるようにします (たとえば、#return.something、"something" は #return にバインドされたオブジェクトの属性)

  • #exception: メソッド呼び出しによってスローされた場合、例外にバインドします

  • #args: メソッドの引数にバインドし、名前で個々の引数を抽出できるようにします (たとえば、#args.fname)

次の例を考えてみましょう。

@Publisher
public String defaultPayload(String fname, String lname) {
  return fname + " " + lname;
}

前述の例では、メッセージは次の構造で構成されています。

  • メッセージペイロードは、メソッドの戻り値の型と値です。これがデフォルトです。

  • 新しく作成されたメッセージは、アノテーションポストプロセッサーで構成された既定の発行者チャネルに送信されます(このセクションで後述します)。

次の例は、デフォルトの公開チャネルを使用しないことを除いて、前述の例と同じです。

@Publisher(channel="testChannel")
public String defaultPayload(String fname, @Header("last") String lname) {
  return fname + " " + lname;
}

デフォルトの公開チャネルを使用する代わりに、@Publisher アノテーションの 'channel' 属性を設定して公開チャネルを指定します。また、@Header アノテーションを追加します。これにより、"last" という名前のメッセージヘッダーが "lname" メソッドパラメーターと同じ値になります。そのヘッダーは、新しく作成されたメッセージに追加されます。

次の例は、前の例とほとんど同じです。

@Publisher(channel="testChannel")
@Payload
public String defaultPayloadButExplicitAnnotation(String fname, @Header String lname) {
  return fname + " " + lname;
}

唯一の違いは、メソッドの @Payload アノテーションを使用して、メソッドの戻り値をメッセージのペイロードとして使用することを明示的に指定することです。

次の例は、@Payload アノテーションで Spring Expression Language を使用して、フレームワークにメッセージの作成方法をさらに指示することにより、以前の構成を拡張します。

@Publisher(channel="testChannel")
@Payload("#return + #args.lname")
public String setName(String fname, String lname, @Header("x") int num) {
  return fname + " " + lname;
}

上記の例では、メッセージはメソッド呼び出しの戻り値と 'lname' 入力引数の連結です。"x" という名前のメッセージヘッダーの値は、"num" 入力引数によって決定されます。そのヘッダーは、新しく作成されたメッセージに追加されます。

@Publisher(channel="testChannel")
public String argumentAsPayload(@Payload String fname, @Header String lname) {
  return fname + " " + lname;
}

前の例では、@Payload アノテーションの別の使用箇所がわかります。ここでは、新しく構築されたメッセージのペイロードになるメソッド引数にアノテーションを付けます。

Spring の他のほとんどのアノテーション駆動型機能と同様に、ポストプロセッサー(PublisherAnnotationBeanPostProcessor)を登録する必要があります。次の例は、その方法を示しています。

<bean class="org.springframework.integration.aop.PublisherAnnotationBeanPostProcessor"/>

より簡潔な構成の場合、次の例に示すように、代わりに名前空間サポートを使用できます。

<int:annotation-config>
    <int:enable-publisher default-publisher-channel="defaultChannel"/>
</int:annotation-config>

Java 構成の場合、次の例に示すように、@EnablePublisher アノテーションを使用する必要があります。

@Configuration
@EnableIntegration
@EnablePublisher("defaultChannel")
public class IntegrationConfiguration {
    ...
}

バージョン 5.1.3 から、<int:enable-publisher> コンポーネントと @EnablePublisher アノテーションには、ProxyFactory 構成を調整するための proxy-target-class および order 属性があります。

他の Spring アノテーション(@Component@Scheduled など)と同様に、@Publisher をメタアノテーションとして使用することもできます。これは、@Publisher 自体と同じように扱われる独自のアノテーションを定義できることを意味します。次の例は、その方法を示しています。

@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Publisher(channel="auditChannel")
public @interface Audit {
...
}

前の例では、@Audit アノテーションを定義します。これには、@Publisher アノテーションが付けられています。また、メタアノテーションで channel 属性を定義して、このアノテーション内でメッセージが送信される場所をカプセル化できることに注意してください。次の例に示すように、@Audit アノテーションを使用してメソッドにアノテーションを付けることができます。

@Audit
public String test() {
    return "Hello";
}

上記の例では、test() メソッドを呼び出すたびに、戻り値から作成されたペイロードを持つメッセージが生成されます。各メッセージは、auditChannel という名前のチャネルに送信されます。この手法の利点の 1 つは、複数のアノテーションにわたって同じチャネル名が重複することを回避できることです。また、独自の潜在的にドメイン固有のアノテーションとフレームワークによって提供されるアノテーションとの間に間接性のレベルを提供できます。

クラスにアノテーションを付けることもできます。これにより、次の例に示すように、このアノテーションのプロパティをそのクラスのすべての public メソッドに適用できます。

@Audit
static class BankingOperationsImpl implements BankingOperations {

  public String debit(String amount) {
     . . .

  }

  public String credit(String amount) {
     . . .
  }

}

<publishing-interceptor> 要素を使用した XML ベースのアプローチ

XML ベースのアプローチにより、MessagePublishingInterceptor のネームスペースベースの構成と同じ AOP ベースのメッセージ公開機能を構成できます。AOP ポイントカット式を使用して、複数のメソッドを一度にインターセプトしたり、ソースコードを持っていないメソッドをインターセプトして公開したりできるため、アノテーション駆動型のアプローチよりも確かにいくつかの利点があります。

XML を使用してメッセージパブリッシングを構成するには、次の 2 つのことだけを行う必要があります。

  • <publishing-interceptor> XML エレメントを使用して、MessagePublishingInterceptor の構成を提供します。

  • MessagePublishingInterceptor を管理対象オブジェクトに適用する AOP 構成を提供します。

次の例は、publishing-interceptor 要素を構成する方法を示しています。

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(testBean)" />
</aop:config>
<publishing-interceptor id="interceptor" default-channel="defaultChannel">
  <method pattern="echo" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" value="something"/>
  </method>
  <method pattern="repl*" payload="'Echoing: ' + #return" channel="echoChannel">
    <header name="things" expression="'something'.toUpperCase()"/>
  </method>
  <method pattern="echoDef*" payload="#return"/>
</publishing-interceptor>

<publishing-interceptor> 構成は、アノテーションベースのアプローチにかなり似ており、Spring Expression Language のパワーも使用します。

上記の例では、testBean の echo メソッドを実行すると、Message が次の構造でレンダリングされます。

  • Message ペイロードの型は String で、内容は Echoing: [value] です。value は、実行されたメソッドによって返される値です。

  • Message には、名前が things で値が something のヘッダーがあります。

  • Message は echoChannel に送信されます。

2 番目の方法は最初の方法と非常に似ています。ここで、"repl" で始まるすべてのメソッドは、次の構造を持つ Message をレンダリングします。

  • Message ペイロードは、前のサンプルと同じです。

  • Message には things という名前のヘッダーがあり、その値は SpEL 式 'something'.toUpperCase() の結果です。

  • Message は echoChannel に送信されます。

echoDef で始まるメソッドの実行をマッピングする 2 番目のメソッドは、次の構造を持つ Message を生成します。

  • Message ペイロードは、実行されたメソッドによって返される値です。

  • channel 属性が提供されていないため、Message は publisher によって定義された defaultChannel に送信されます。

単純なマッピングルールの場合、次の例が示すように、publisher のデフォルトに依存できます。

<publishing-interceptor id="anotherInterceptor"/>

上記の例では、ポイントカット式に一致するすべてのメソッドの戻り値をペイロードにマッピングし、default-channel に送信します。defaultChannel を指定しない場合(前の例では指定されていません)、メッセージはグローバル nullChannel (/dev/null に相当)に送信されます。

非同期公開

パブリッシュは、コンポーネントの実行と同じスレッドで行われます。デフォルトでは同期的です。つまり、メッセージフロー全体は、パブリッシャーのフローが完了するまで待機する必要があります。ただし、開発者は、このメッセージパブリッシュ機能を使用して非同期フローを開始するという、まったく逆のことを望むことがよくあります。たとえば、リモートリクエストを受信するサービス (HTTP、WS など) をホストするとします。このリクエストを、しばらく時間がかかる可能性があるプロセスに内部的に送信する必要がある場合があります。ただし、ユーザーにすぐに返信する必要がある場合もあります。そのため、処理のために受信リクエストを出力チャネルに送信する (従来の方法) 代わりに、'output-channel' または 'replyChannel' ヘッダーを使用して、メッセージパブリッシャー機能を使用して複雑なフローを開始しながら、呼び出し元に簡単な確認応答のような返信を返すことができます。

次の例のサービスは、複雑なペイロード(処理のためにさらに送信する必要があります)を受信しますが、単純な確認応答で呼び出し元に応答する必要もあります。

public String echo(Object complexPayload) {
     return "ACK";
}

複雑なフローを出力チャネルに接続する代わりに、メッセージパブリッシング機能を使用します。サービスメソッドの入力引数 (前の例を参照) を使用して新しいメッセージを作成し、それを 'localProcessChannel' に送信するように構成します。このフローが非同期であることを確認するには、任意の型の非同期チャネル (次の例では ExecutorChannel ) に送信するだけです。次の例は、非同期 publishing-interceptor を実行する方法を示しています。

<int:service-activator  input-channel="inputChannel" output-channel="outputChannel" ref="sampleservice"/>

<bean id="sampleService" class="test.SampleService"/>

<aop:config>
  <aop:advisor advice-ref="interceptor" pointcut="bean(sampleService)" />
</aop:config>

<int:publishing-interceptor id="interceptor" >
  <int:method pattern="echo" payload="#args[0]" channel="localProcessChannel">
    <int:header name="sample_header" expression="'some sample value'"/>
  </int:method>
</int:publishing-interceptor>

<int:channel id="localProcessChannel">
  <int:dispatcher task-executor="executor"/>
</int:channel>

<task:executor id="executor" pool-size="5"/>

この型のシナリオを処理する別の方法は、ワイヤータップを使用することです。ワイヤータップを参照してください。

スケジュールされたトリガーに基づいたメッセージの生成と公開

これまでのセクションでは、メッセージの発行機能について説明しました。この機能は、メソッド呼び出しの副産物としてメッセージを作成して発行します。ただし、これらの場合でも、メソッドを呼び出す必要があります。Spring Integration 2.0 は、'inbound-channel-adapter' 要素の新しい expression 属性を使用して、スケジュールされたメッセージプロデューサーとパブリッシャーのサポートを追加しました。複数のトリガーに基づいてスケジュールを設定でき、そのいずれかを 'poller' 要素で構成できます。現在、cronfixed-ratefixed-delay と、ユーザーによって実装され、'trigger' 属性値によって参照されるカスタムトリガーをサポートしています。

前述のように、スケジュールされたプロデューサーおよびパブリッシャーのサポートは、<inbound-channel-adapter> XML エレメントを介して提供されます。次の例を考えてみましょう。

<int:inbound-channel-adapter id="fixedDelayProducer"
       expression="'fixedDelayTest'"
       channel="fixedDelayChannel">
    <int:poller fixed-delay="1000"/>
</int:inbound-channel-adapter>

上記の例では、Message を構築する受信チャネルアダプターを作成します。そのペイロードは、expression 属性で定義された式の結果です。このようなメッセージは、fixed-delay 属性で指定された遅延が発生するたびに作成および送信されます。

次の例は、fixed-rate 属性を使用することを除いて、前述の例に似ています。

<int:inbound-channel-adapter id="fixedRateProducer"
       expression="'fixedRateTest'"
       channel="fixedRateChannel">
    <int:poller fixed-rate="1000"/>
</int:inbound-channel-adapter>

fixed-rate 属性を使用すると、固定レートでメッセージを送信できます(各タスクの開始時間から測定)。

次の例は、cron 属性で指定された値で Cron トリガーを適用する方法を示しています。

<int:inbound-channel-adapter id="cronProducer"
       expression="'cronTest'"
       channel="cronChannel">
    <int:poller cron="7 6 5 4 3 ?"/>
</int:inbound-channel-adapter>

次の例は、追加のヘッダーをメッセージに挿入する方法を示しています。

<int:inbound-channel-adapter id="headerExpressionsProducer"
       expression="'headerExpressionsTest'"
       channel="headerExpressionsChannel"
       auto-startup="false">
    <int:poller fixed-delay="5000"/>
    <int:header name="foo" expression="6 * 7"/>
    <int:header name="bar" value="x"/>
</int:inbound-channel-adapter>

追加のメッセージヘッダーは、スカラー値または Spring 式の評価結果を取ることができます。

独自のカスタムトリガーを実装する必要がある場合は、trigger 属性を使用して、org.springframework.scheduling.Trigger インターフェースを実装する Spring 構成の Bean への参照を提供できます。次の例は、その方法を示しています。

<int:inbound-channel-adapter id="triggerRefProducer"
       expression="'triggerRefTest'" channel="triggerRefChannel">
    <int:poller trigger="customTrigger"/>
</int:inbound-channel-adapter>

<beans:bean id="customTrigger" class="o.s.scheduling.support.PeriodicTrigger">
    <beans:constructor-arg value="9999"/>
</beans:bean>