メッセージ

Spring Integration Message は、データの汎用コンテナーです。任意のオブジェクトをペイロードとして提供でき、各 Message インスタンスには、キーと値のペアとしてユーザー拡張可能なプロパティを含むヘッダーが含まれます。

Message インターフェース

次のリストは、Message インターフェースの定義を示しています。

public interface Message<T> {

    T getPayload();

    MessageHeaders getHeaders();

}

Message インターフェースは、API の中核部分です。データを汎用ラッパーにカプセル化することにより、メッセージングシステムはデータの種類を知らなくてもデータを渡すことができます。アプリケーションが新しい型をサポートするように進化するとき、または型自体が変更または拡張されるときに、メッセージングシステムは影響を受けません。一方、メッセージングシステムの一部のコンポーネントが Message に関する情報へのアクセスを必要とする場合、そのようなメタデータは通常、メッセージヘッダーのメタデータに格納され、そこから取得できます。

メッセージヘッダー

Spring Integration が Object を Message のペイロードとして使用できるように、ヘッダー値として Object 型もサポートします。実際、次のクラス定義が示すように、MessageHeaders クラスは java.util.Map_ interface を実装します。

public final class MessageHeaders implements Map<String, Object>, Serializable {
  ...
}
MessageHeaders クラスは Map を実装していますが、事実上読み取り専用の実装です。マップ内の値を put しようとすると、UnsupportedOperationException になります。同じことが remove と clear にも当てはまります。メッセージは複数のコンシューマーに渡される可能性があるため、Map の構造は変更できません。同様に、最初の作成後、メッセージのペイロード Object を set にすることはできません。ただし、ヘッダー値自体(またはペイロードオブジェクト)の可変性は、フレームワークユーザーの意思決定として意図的に残されています。

Map の実装として、ヘッダーの名前で get(..) を呼び出すことでヘッダーを取得できます。あるいは、予想される Class を追加パラメーターとして提供できます。さらに良いことに、定義済みの値の 1 つを取得する場合、便利な getter を使用できます。次の例は、これら 3 つのオプションのそれぞれを示しています。

Object someValue = message.getHeaders().get("someKey");

CustomerId customerId = message.getHeaders().get("customerId", CustomerId.class);

Long timestamp = message.getHeaders().getTimestamp();

次の表で、事前定義されたメッセージヘッダーについて説明します。

表 1: 定義済みのメッセージヘッダー
ヘッダー名 ヘッダー型 使用方法
 MessageHeaders.ID
 java.util.UUID

このメッセージインスタンスの識別子。メッセージが変更されるたびに変更されます。

 MessageHeaders.
TIMESTAMP
 java.lang.Long

メッセージが作成された時刻。メッセージが変更されるたびに変更されます。

 MessageHeaders.
REPLY_CHANNEL
 java.lang.Object
(String or
MessageChannel)

明示的な出力チャネルが構成されておらず、ROUTING_SLIP がない場合、ROUTING_SLIP が使い果たされた場合に、応答(ある場合)が送信されるチャネル。値が String の場合、Bean 名を表すか、ChannelRegistry. によって生成されている必要があります

 MessageHeaders.
ERROR_CHANNEL
 java.lang.Object
(String or
MessageChannel)

エラーの送信先のチャネル。値が String の場合、Bean 名を表すか、ChannelRegistry. によって生成されている必要があります

多くの受信および送信アダプターの実装も特定のヘッダーを提供または期待しており、追加のユーザー定義ヘッダーを構成できます。これらのヘッダーの定数は、そのようなヘッダーが存在するモジュールで見つけることができます。たとえば。AmqpHeadersJmsHeaders など。

MessageHeaderAccessor API

Spring Framework 4.0 と Spring Integration 4.0 から、コアメッセージングの抽象化は spring-messaging モジュールに移動され、メッセージング実装の抽象化を追加するために MessageHeaderAccessor API が導入されました。すべての(コア)Spring 統合固有のメッセージヘッダー定数が IntegrationMessageHeaderAccessor クラスで宣言されるようになりました。次の表で、事前定義されたメッセージヘッダーについて説明します。

表 2: 定義済みのメッセージヘッダー
ヘッダー名 ヘッダー型 使用方法
 IntegrationMessageHeaderAccessor.
CORRELATION_ID
 java.lang.Object

2 つ以上のメッセージを相互に関連付けるために使用されます。

 IntegrationMessageHeaderAccessor.
SEQUENCE_NUMBER
 java.lang.Integer

通常、SEQUENCE_SIZE のメッセージのグループを含むシーケンス番号ですが、<resequencer/> で使用して、メッセージの無制限のグループを再シーケンスすることもできます。

 IntegrationMessageHeaderAccessor.
SEQUENCE_SIZE
 java.lang.Integer

相関メッセージのグループ内のメッセージの数。

 IntegrationMessageHeaderAccessor.
EXPIRATION_DATE
 java.lang.Long

メッセージの有効期限が切れたことを示します。フレームワークでは直接使用されませんが、ヘッダーエンリッチャーで設定し、UnexpiredMessageSelector で構成された <filter/> で使用できます。

 IntegrationMessageHeaderAccessor.
PRIORITY
 java.lang.Integer

メッセージの優先度 — たとえば、PriorityChannel 内。

 IntegrationMessageHeaderAccessor.
DUPLICATE_MESSAGE
 java.lang.Boolean

メッセージが dem 等受信インターセプターによって重複として検出された場合は真。べき等レシーバーエンタープライズ統合パターンを参照してください。

 IntegrationMessageHeaderAccessor.
CLOSEABLE_RESOURCE
 java.io.Closeable

このヘッダーは、メッセージ処理が完了したときに閉じる必要がある Closeable にメッセージが関連付けられている場合に存在します。例として、FTP、SFTP などを使用したストリーミングファイル転送に関連付けられた Session があります。

 IntegrationMessageHeaderAccessor.
DELIVERY_ATTEMPT
 java.lang.
AtomicInteger

メッセージ駆動型チャネルアダプターが RetryTemplate の構成をサポートしている場合、このヘッダーには現在の配信試行が含まれます。

 IntegrationMessageHeaderAccessor.
ACKNOWLEDGMENT_CALLBACK
 o.s.i.support.
Acknowledgment
Callback

受信エンドポイントがサポートしている場合は、コールバックしてメッセージを受け入れる、拒否する、キューに再登録します。遅延確認応答可能なメッセージソースおよび MQTT 手動確認を参照してください。

次の例に示すように、これらのヘッダーの一部に便利な型付き getter が IntegrationMessageHeaderAccessor クラスで提供されます。

IntegrationMessageHeaderAccessor accessor = new IntegrationMessageHeaderAccessor(message);
int sequenceNumber = accessor.getSequenceNumber();
Object correlationId = accessor.getCorrelationId();
...

次の表では、IntegrationMessageHeaderAccessor にも表示されますが、一般にユーザーコードでは使用されないヘッダーについて説明します(つまり、一般に Spring Integration の内部部分で使用されます。ここに含まれているのは完全を期すためです)。

表 3: 定義済みのメッセージヘッダー
ヘッダー名 ヘッダー型 使用方法
 IntegrationMessageHeaderAccessor.
SEQUENCE_DETAILS
 java.util.
List<List<Object>>

ネストされた相関が必要な場合に使用される相関データのスタック(splitter→…​→splitter→…​→aggregator→…​→aggregator など)。

 IntegrationMessageHeaderAccessor.
ROUTING_SLIP
 java.util.
Map<List<Object>, Integer>

ルーティングスリップを参照してください。

メッセージ ID 生成

アプリケーションを介してメッセージが遷移すると、メッセージが(たとえば、トランスフォーマーによって)変更されるたびに、新しいメッセージ ID が割り当てられます。メッセージ ID は UUID です。Spring Integration 3.0 以降、IS 生成に使用されるデフォルトの戦略は、以前の java.util.UUID.randomUUID() 実装よりも効率的です。毎回安全な乱数を作成する代わりに、安全な乱数シードに基づく単純な乱数を使用します。

アプリケーションコンテキストで org.springframework.util.IdGenerator を実装する Bean を宣言することにより、異なる UUID 生成戦略を選択できます。

クラスローダーで使用できる UUID 生成戦略は 1 つだけです。つまり、2 つ以上のアプリケーションコンテキストが同じクラスローダーで実行される場合、同じ戦略を共有します。コンテキストの 1 つが戦略を変更すると、すべてのコンテキストで使用されます。同じクラスローダー内の 2 つ以上のコンテキストが org.springframework.util.IdGenerator 型の Bean を宣言する場合、すべて同じクラスのインスタンスでなければなりません。そうでない場合、カスタム戦略を置換しようとするコンテキストは初期化に失敗します。戦略が同じであるが、パラメーター化されている場合、初期化される最初のコンテキストの戦略が使用されます。

デフォルトの戦略に加えて、2 つの追加の IdGenerators が提供されています。org.springframework.util.JdkIdGenerator は、以前の UUID.randomUUID() メカニズムを使用します。UUID が実際に必要でなく、単純な増分値で十分な場合は、o.s.i.support.IdGenerators.SimpleIncrementingIdGenerator を使用できます。

読み取り専用ヘッダー

MessageHeaders.ID および MessageHeaders.TIMESTAMP は読み取り専用ヘッダーであり、オーバーライドできません。

バージョン 4.3.2 以降、MessageBuilder は readOnlyHeaders(String…​ readOnlyHeaders) API を提供して、アップストリーム Message からコピーすべきではないヘッダーのリストをカスタマイズします。デフォルトでは、MessageHeaders.ID および MessageHeaders.TIMESTAMP のみが読み取り専用です。グローバル spring.integration.readOnly.headers プロパティ(グローバルプロパティを参照)は、フレームワークコンポーネント用に DefaultMessageBuilderFactory をカスタマイズするために提供されています。これは、ObjectToJsonTransformer による contentType などの一部のすぐに使用可能なヘッダーを取り込まないようにする場合に役立ちます(JSONTransformers を参照)。

MessageBuilder を使用して新しいメッセージを作成しようとすると、この種類のヘッダーは無視され、特定の INFO メッセージがログに出力されます。

バージョン 5.0 以降、メッセージングゲートウェイヘッダーエンリッチャーコンテンツエンリッチャーヘッダーフィルターでは、DefaultMessageBuilderFactory が使用されている場合に MessageHeaders.ID および MessageHeaders.TIMESTAMP ヘッダー名を構成できず、それらは BeanInitializationException をスローします。

ヘッダーの伝播

メッセージ生成エンドポイント(サービスアクティベーターなど)によってメッセージが処理(および変更)されると、一般に、受信ヘッダーが送信メッセージに伝播されます。これの 1 つの例外は、完全なメッセージがフレームワークに返されるときのトランスフォーマーです。その場合、ユーザーコードは送信メッセージ全体を担当します。トランスフォーマーがペイロードを返すだけで、受信ヘッダーが伝播されます。また、ヘッダーは送信メッセージにまだ存在しない場合にのみ伝播され、必要に応じてヘッダー値を変更できます。

バージョン 4.3.10 から、メッセージハンドラー(メッセージを変更して出力を生成する)を構成して、特定のヘッダーの伝搬を抑制することができます。コピーしたくないヘッダーを構成するには、MessageProducingMessageHandler 抽象クラスで setNotPropagatedHeaders() または addNotPropagatedHeaders() メソッドを呼び出します。

また、META-INF/spring.integration.properties の readOnlyHeaders プロパティをヘッダーのコンマ区切りリストに設定することにより、特定のメッセージヘッダーの伝播をグローバルに抑制することができます。

バージョン 5.0 以降、AbstractMessageProducingHandler での setNotPropagatedHeaders() 実装は、単純なパターン(xxx*xxx*xxx、または xxx*yyy)を適用して、共通の接尾部または接頭部を持つヘッダーをフィルタリングできるようにします。詳細については、PatternMatchUtils Javadoc を参照してください。パターンの 1 つが * (アスタリスク)の場合、ヘッダーは伝搬されません。他のすべてのパターンは無視されます。その場合、サービスアクティベーターはトランスフォーマーと同じように動作し、必要なヘッダーはサービスメソッドから返される Message で提供する必要があります。notPropagatedHeaders() オプションは、Java DSL の ConsumerEndpointSpec で使用可能です。not-propagated-headers 属性として <service-activator> コンポーネントの XML 構成にも使用可能です。

ヘッダー伝播抑制は、ブリッジルーターなど、メッセージを変更しないエンドポイントには適用されません。

メッセージの実装

Message インターフェースの基本実装は GenericMessage<T> であり、次のリストに示す 2 つのコンストラクターを提供します。

new GenericMessage<T>(T payload);

new GenericMessage<T>(T payload, Map<String, Object> headers)

Message が作成されると、ランダムな一意の ID が生成されます。Map のヘッダーを受け入れるコンストラクターは、提供されたヘッダーを新しく作成された Message にコピーします。

エラー状態を伝えるために設計された Message の便利な実装もあります。次の例に示すように、この実装は Throwable オブジェクトをペイロードとして受け取ります。

ErrorMessage message = new ErrorMessage(someThrowable);

Throwable t = message.getPayload();

この実装は、GenericMessage 基本クラスがパラメーター化されているという事実を利用していることに注意してください。両方の例に示すように、Message ペイロード Object を取得するときにキャストは必要ありません。

MessageBuilder ヘルパークラス

Message インターフェースはペイロードとヘッダーの取得方法を定義しているが、setter を提供していないことに気付くかもしれません。この理由は、Message は最初の作成後に変更できないためです。Message インスタンスが複数のコンシューマーに(たとえば、パブリッシュ / サブスクライブチャネルを介して)送信される場合、それらのコンシューマーの 1 つが異なるペイロード型で応答を送信する必要がある場合、新しい Message を作成する必要があります。その結果、他のコンシューマーはこれらの変更の影響を受けません。複数のコンシューマーが同じペイロードインスタンスまたはヘッダー値にアクセスする可能性があり、そのようなインスタンス自体が不変であるかどうかはユーザーに委ねられていることに注意してください。言い換えれば、Message インスタンスの契約は変更不可能な Collection の契約に似ており、MessageHeaders マップはそれをさらに例示しています。MessageHeaders クラスは java.util.Map を実装しますが、MessageHeaders インスタンスで put 操作(または「削除」または「クリア」)を呼び出そうとすると、UnsupportedOperationException になります。

GenericMessage コンストラクターに渡すために Map の作成と移植を必要とするのではなく、Spring Integration は、Messages: MessageBuilder を構築するためのはるかに便利な方法を提供します。MessageBuilder は、既存の Message から、またはペイロード Object で Message インスタンスを作成するための 2 つのファクトリメソッドを提供します。既存の Message から構築する場合、次の例に示すように、その Message のヘッダーとペイロードが新しい Message にコピーされます。

Message<String> message1 = MessageBuilder.withPayload("test")
        .setHeader("foo", "bar")
        .build();

Message<String> message2 = MessageBuilder.fromMessage(message1).build();

assertEquals("test", message2.getPayload());
assertEquals("bar", message2.getHeaders().get("foo"));

新しいペイロードを使用して Message を作成する必要があるが、既存の Message からヘッダーをコピーしたい場合は、次の例に示すように、'copy' メソッドのいずれかを使用できます。

Message<String> message3 = MessageBuilder.withPayload("test3")
        .copyHeaders(message1.getHeaders())
        .build();

Message<String> message4 = MessageBuilder.withPayload("test4")
        .setHeader("foo", 123)
        .copyHeadersIfAbsent(message1.getHeaders())
        .build();

assertEquals("bar", message3.getHeaders().get("foo"));
assertEquals(123, message4.getHeaders().get("foo"));

copyHeadersIfAbsent メソッドは既存の値を上書きしないことに注意してください。また、前の例では、setHeader を使用してユーザー定義ヘッダーを設定する方法を確認できます。最後に、事前定義されたヘッダーに使用可能な set メソッドと、ヘッダーを設定する非破棄的なメソッドがあります(MessageHeaders は、事前定義されたヘッダー名の定数も定義します)。

次の例に示すように、MessageBuilder を使用してメッセージの優先度を設定することもできます。

Message<Integer> importantMessage = MessageBuilder.withPayload(99)
        .setPriority(5)
        .build();

assertEquals(5, importantMessage.getHeaders().getPriority());

Message<Integer> lessImportantMessage = MessageBuilder.fromMessage(importantMessage)
        .setHeaderIfAbsent(IntegrationMessageHeaderAccessor.PRIORITY, 2)
        .build();

assertEquals(2, lessImportantMessage.getHeaders().getPriority());

priority ヘッダーは、PriorityChannel を使用する場合にのみ考慮されます(次の章で説明します)。java.lang.Integer として定義されています。