JMS サポート
Spring Integration は、JMS メッセージを送受信するためのチャネルアダプターを提供します。
この依存関係をプロジェクトに含める必要があります。
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
<version>5.5.16</version>
</dependency>
compile "org.springframework.integration:spring-integration-jms:5.5.16"
javax.jms:javax.jms-api
は、Apache ActiveMQ などの JMS ベンダー固有の実装を介して明示的に追加する必要があります。
実際には、2 つの JMS ベースの受信チャネルアダプターがあります。最初は、Spring の JmsTemplate
を使用して、ポーリング期間に基づいて受信します。2 つ目は「メッセージ駆動型」で、Spring MessageListener
コンテナーに依存しています。発信チャネルアダプターは、JmsTemplate
を使用して、オンデマンドで JMS メッセージを変換および送信します。
JmsTemplate
と MessageListener
コンテナーを使用することにより、Spring Integration は Spring の JMS サポートに依存しています。これらのアダプターで公開される属性のほとんどは、基礎となる JmsTemplate
および MessageListener
コンテナーを構成するため、これは理解することが重要です。JmsTemplate
および MessageListener
コンテナーの詳細については、Spring JMS ドキュメントを参照してください。
JMS チャネルアダプターは単方向メッセージング(送信専用または受信専用)を対象としていますが、Spring Integration はリクエストおよび応答操作用の受信および送信 JMS ゲートウェイも提供します。受信ゲートウェイは、メッセージ駆動型受信のために Spring の MessageListener
コンテナー実装の 1 つに依存しています。また、受信したメッセージによって提供されるように、戻り値を reply-to
宛先に送信することもできます。送信ゲートウェイは、JMS メッセージを request-destination
(または request-destination-name
または request-destination-expression
)に送信し、応答メッセージを受信します。reply-destination
参照(または reply-destination-name
または reply-destination-expression
)を明示的に構成できます。それ以外の場合、送信ゲートウェイは JMS TemporaryQueue [Oracle] (英語) を使用します。
Spring Integration 2.2 より前は、必要に応じて、リクエストまたは応答ごとに TemporaryQueue
が作成(および削除)されていました。Spring Integration 2.2 以降では、新しい(またはキャッシュされた) Consumer
を直接使用して各リクエストの応答を受信する代わりに、MessageListener
コンテナーを使用して応答を受信するように送信ゲートウェイを構成できます。そのように構成されていて、明示的な応答の宛先が提供されていない場合、各ゲートウェイでは、リクエストごとにではなく、単一の TemporaryQueue
が使用されます。
受信チャネルアダプター
受信チャネルアダプターには、単一の JmsTemplate
インスタンスまたは ConnectionFactory
と Destination
の両方への参照が必要です( "destination" 参照の代わりに "destinationName" を指定できます)。次の例では、Destination
参照を使用して受信チャネルアダプターを定義しています。
@Bean
public IntegrationFlow jmsInbound(ConnectionFactory connectionFactory) {
return IntegrationFlows.from(
Jms.inboundAdapter(connectionFactory)
.destination("inQueue"),
e -> e.poller(poller -> poller.fixedRate(30000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
fun jmsInbound(connectionFactory: ConnectionFactory) =
integrationFlow(
Jms.inboundAdapter(connectionFactory).destination("inQueue"),
{ poller { Pollers.fixedRate(30000) } })
{
handle { m -> println(m.payload) }
}
@Bean
@InboundChannelAdapter(value = "exampleChannel", poller = @Poller(fixedRate = "30000"))
public MessageSource<Object> jmsIn(ConnectionFactory connectionFactory) {
JmsDestinationPollingSource source = new JmsDestinationPollingSource(new JmsTemplate(connectionFactory));
source.setDestinationName("inQueue");
return source;
}
<int-jms:inbound-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel">
<int:poller fixed-rate="30000"/>
</int-jms:inbound-channel-adapter>
上記の構成から、inbound-channel-adapter はポーリングコンシューマーであることに注意してください。つまり、トリガーされると receive() を呼び出します。これは、ポーリングが比較的頻繁に行われず、適時性が重要でない状況でのみ使用してください。他のすべての状況(JMS ベースのユースケースの大部分)では、message-driven-channel-adapter (後述)がより良いオプションです。 |
デフォルトでは、ConnectionFactory への参照を必要とするすべての JMS アダプターは、jmsConnectionFactory という名前の Bean を自動的に検索します。そのため、多くの例で connection-factory 属性が表示されません。ただし、JMS ConnectionFactory の Bean 名が異なる場合、その属性を提供する必要があります。 |
extract-payload
が true
(デフォルト)に設定されている場合、受信した JMS メッセージは MessageConverter
を通過します。デフォルトの SimpleMessageConverter
に依存する場合、これは、結果の Spring Integration メッセージのペイロードとして JMS メッセージの本文が含まれることを意味します。JMS TextMessage
は文字列ベースのペイロードを生成し、JMS BytesMessage
はバイト配列ペイロードを生成し、JMS ObjectMessage
の直列化可能なインスタンスが Spring Integration メッセージのペイロードになります。生の JMS メッセージを Spring Integration メッセージのペイロードとして使用する場合は、extractPayload
オプションを false
に設定します。
バージョン 5.0.8 以降、receive-timeout
のデフォルト値は org.springframework.jms.connection.CachingConnectionFactory
および cacheConsumers
の -1
(待機なし)です。それ以外の場合は 1 秒です。JMS 受信チャネルアダプターは、提供された ConnectionFactory
とオプションに基づいて DynamicJmsTemplate
を作成します。外部 JmsTemplate
が必要な場合(Spring Boot 環境など)、ConnectionFactory
がキャッシュされていない場合、cacheConsumers
がない場合、ノンブロッキング消費が予想される場合は jmsTemplate.receiveTimeout(-1)
を設定することをお勧めします。
Jms.inboundAdapter(connectionFactory)
.destination(queueName)
.configureJmsTemplate(template -> template.receiveTimeout(-1))
トランザクション
バージョン 4.0 以降、受信チャネルアダプターは session-transacted
属性をサポートします。以前のバージョンでは、sessionTransacted
を true
に設定して JmsTemplate
を挿入する必要がありました。(アダプターにより、acknowledge
属性を transacted
に設定できましたが、これは正しくなく、機能しませんでした)。
ただし、session-transacted
を true
に設定しても、トランザクションは receive()
操作の直後、メッセージが channel
に送信される前にコミットされるため、ほとんど価値がないことに注意してください。
フロー全体をトランザクションにしたい場合(たとえば、ダウンストリーム送信チャネルアダプターがある場合)、transactional
ポーラーと JmsTransactionManager
を使用する必要があります。または、acknowledge
を transacted
(デフォルト)に設定して jms-message-driven-channel-adapter
を使用することを検討してください。
メッセージ駆動型チャネルアダプター
message-driven-channel-adapter
には、Spring MessageListener
コンテナーのインスタンス(AbstractMessageListenerContainer
のサブクラス)または ConnectionFactory
と Destination
の両方( "destinationName" を "destination" 参照の代わりに提供できます)のいずれかの参照が必要です。次の例では、Destination
参照を使用してメッセージ駆動型チャネルアダプターを定義しています。
@Bean
public IntegrationFlow jmsMessageDrivenRedeliveryFlow() {
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue"))
.channel("exampleChannel")
.get();
}
@Bean
fun jmsMessageDrivenFlowWithContainer() =
integrationFlow(
Jms.messageDrivenChannelAdapter(jmsConnectionFactory())
.destination("inQueue")) {
channel("exampleChannel")
}
@Bean
public JmsMessageDrivenEndpoint jmsIn() {
JmsMessageDrivenEndpoint endpoint = new JmsMessageDrivenEndpoint(container(), listener());
return endpoint;
}
@Bean
public AbstractMessageListenerContainer container() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(cf());
container.setDestinationName("inQueue");
return container;
}
@Bean
public ChannelPublishingJmsMessageListener listener() {
ChannelPublishingJmsMessageListener listener = new ChannelPublishingJmsMessageListener();
listener.setRequestChannelName("exampleChannel");
return listener;
}
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue" channel="exampleChannel"/>
メッセージ駆動型アダプターは、 カスタムリスナーコンテナーの実装(通常は |
Spring JMS 名前空間要素
|
バージョン 4.2 以降、デフォルトの acknowledge モードは transacted です。ただし、外部コンテナーを提供する場合を除きます。その場合、必要に応じてコンテナーを構成する必要があります。メッセージの損失を避けるため、transacted を DefaultMessageListenerContainer と併用することをお勧めします。 |
"extract-payload" プロパティには同じ効果があり、デフォルト値は "true" です。poller
要素は、アクティブに呼び出されるため、メッセージ駆動型チャネルアダプターには適用できません。ほとんどのシナリオでは、メッセージは基盤となる JMS コンシューマーから受信されるとすぐに MessageChannel
に渡されるため、メッセージ駆動型のアプローチの方が適しています。
最後に、<message-driven-channel-adapter>
要素は 'error-channel' 属性も受け入れます。これにより、GatewayProxyFactoryBean
を入力してくださいで説明されているのと同じ基本機能が提供されます。次の例は、メッセージ駆動型チャネルアダプターでエラーチャネルを設定する方法を示しています。
<int-jms:message-driven-channel-adapter id="jmsIn" destination="inQueue"
channel="exampleChannel"
error-channel="exampleErrorChannel"/>
前述の例を、後ほど説明する汎用ゲートウェイ構成または JMS「受信ゲートウェイ」と比較すると、重要な違いは、これがゲートウェイではなく「チャネルアダプター」であるため、一方向のフローになっていることです。「エラーチャネル」からのダウンストリームフローも一方向である必要があります。例: ロギングハンドラーに送信したり、別の JMS <outbound-channel-adapter>
要素に接続したりできます。
トピックから使用する場合は、pub-sub-domain
属性を true に設定します。永続サブスクリプションの場合は subscription-durable
を true
に、共有サブスクリプションの場合は subscription-shared
を設定します(これには JMS 2.0 ブローカーが必要であり、バージョン 4.2 以降で使用可能です)。subscription-name
を使用して、サブスクリプションに名前を付けます。
バージョン 5.1 以降、アプリケーションの実行中にエンドポイントが停止すると、基礎となるリスナーコンテナーがシャットダウンされ、共有接続とコンシューマーが閉じられます。以前は、接続とコンシューマーは開いたままでした。前の動作に戻すには、JmsMessageDrivenEndpoint
の shutdownContainerOnStop
を false
に設定します。
受信変換エラー
バージョン 4.2 以降、変換エラーにも「エラーチャネル」が使用されます。以前は、JMS <message-driven-channel-adapter/>
または <inbound-gateway/>
が変換エラーのためにメッセージを配信できなかった場合、例外がコンテナーにスローされていました。コンテナーがトランザクションを使用するように設定されている場合、メッセージはロールバックされ、繰り返し再配信されます。変換プロセスはメッセージ構築の前後に発生するため、このようなエラーは「エラーチャネル」に送信されません。現在、このような変換例外により、payload
を例外として ErrorMessage
が「エラーチャネル」に送信されます。トランザクションをロールバックし、「エラーチャネル」を定義している場合、「エラーチャネル」の統合フローは例外(または別の例外)を再スローする必要があります。エラーフローが例外をスローしない場合、トランザクションはコミットされ、メッセージは削除されます。「エラーチャネル」が定義されていない場合、以前と同様に例外がコンテナーにスローバックされます。
送信チャネルアダプター
JmsSendingMessageHandler
は MessageHandler
インターフェースを実装し、Spring Integration Messages
を JMS メッセージに変換してから、JMS 宛先に送信できます。jmsTemplate
参照、または jmsConnectionFactory
と destination
参照の両方が必要です(destination
の代わりに destinationName
が提供される場合があります)。受信チャネルアダプターと同様に、このアダプターを構成する最も簡単な方法は、名前空間のサポートです。次の構成では、exampleChannel
から Spring Integration メッセージを受信し、JMS メッセージに変換して、Bean 名が outQueue
である JMS 宛先参照に送信するアダプターを作成します。
@Bean
public IntegrationFlow jmsOutboundFlow() {
return f -> f
.handle(Jms.outboundAdapter(cachingConnectionFactory())
.destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
.configureJmsTemplate(t -> t.id("jmsOutboundFlowTemplate")));
}
@Bean
fun jmsOutboundFlow() =
integrationFlow {
handle(Jms.outboundAdapter(jmsConnectionFactory())
.apply {
destinationExpression("headers." + SimpMessageHeaderAccessor.DESTINATION_HEADER)
deliveryModeFunction<Any> { DeliveryMode.NON_PERSISTENT }
timeToLiveExpression("10000")
configureJmsTemplate { it.explicitQosEnabled(true) }
}
)
}
@Bean
@ServiceActivator(inputChannel = "exampleChannel")
public MessageHandler jmsOut() {
JmsSendingMessageHandler handler = new JmsSendingMessageHandler(new JmsTemplate(connectionFactory));
handler.setDestinationName("outQueue");
return handler;
}
<int-jms:outbound-channel-adapter id="jmsOut" destination="outQueue" channel="exampleChannel"/>
受信チャネルアダプターと同様に、"extract-payload" プロパティがあります。ただし、送信アダプターの場合は意味が逆になります。ブールプロパティは、JMS メッセージに適用するのではなく、Spring Integration メッセージペイロードに適用します。つまり、Spring Integration メッセージ自体を JMS メッセージ本文として渡すか、Spring Integration メッセージペイロードを JMS メッセージ本文として渡すかを決定します。デフォルト値は "true" です。ペイロードが String
である Spring Integration メッセージを渡すと、JMS TextMessage
が作成されます。一方、実際の Spring Integration メッセージを JMS を介して別のシステムに送信する場合は、"false" に設定します。
デフォルトのコンバーターに依存するか、MessageConverter の別のインスタンスへの参照を提供する限り、ペイロード抽出のブール値に関係なく、Spring Integration MessageHeaders は JMS プロパティにマップします。(「受信」アダプターについても同様です。ただし、これらの場合、JMS プロパティは Spring Integration MessageHeaders にマップされます)。 |
バージョン 5.1 から、<int-jms:outbound-channel-adapter>
(JmsSendingMessageHandler
)を deliveryModeExpression
および timeToLiveExpression
プロパティで構成して、リクエスト Spring Message
に対して実行時に送信する JMS メッセージの適切な QoS 値を評価できます。DefaultJmsHeaderMapper
の新しい setMapInboundDeliveryMode(true)
および setMapInboundExpiration(true)
オプションは、メッセージヘッダーからの動的 deliveryMode
および timeToLive
の情報のソースとして容易になります。
<int-jms:outbound-channel-adapter delivery-mode-expression="headers.jms_deliveryMode"
time-to-live-expression="headers.jms_expiration - T(System).currentTimeMillis()"/>
受信ゲートウェイ
Spring Integration のメッセージ駆動型 JMS 受信ゲートウェイは、MessageListener
コンテナーにデリゲートし、同時コンシューマーの動的な調整をサポートし、返信も処理できます。受信ゲートウェイには、ConnectionFactory
およびリクエスト Destination
(または "requestDestinationName" )への参照が必要です。次の例では、Bean id inQueue
によって参照される JMS キューから受信し、exampleChannel
という名前の Spring Integration チャネルに送信する JMS inbound-gateway
を定義しています。
<int-jms:inbound-gateway id="jmsInGateway"
request-destination="inQueue"
request-channel="exampleChannel"/>
ゲートウェイは、単方向の送信または受信動作の代わりにリクエスト応答動作を提供するため、「ペイロード抽出」の 2 つの異なるプロパティもあります(チャネルアダプターの "extract-payload" 設定について前述したように)。受信ゲートウェイの場合、"extract-request-payload" プロパティは、受信した JMS メッセージ本文を抽出するかどうかを決定します。"false" の場合、JMS メッセージ自体が Spring Integration メッセージペイロードになります。デフォルトは "true" です。
同様に、受信ゲートウェイの場合、"extract-reply-payload" プロパティは、応答 JMS メッセージに変換される Spring Integration メッセージに適用されます。Spring Integration メッセージ全体を(JMS ObjectMessage の本体として)渡したい場合は、これを "false" に設定します。デフォルトでは、Spring Integration メッセージペイロードが JMS メッセージに変換されることも "true" です(たとえば、String
ペイロードは JMS TextMessage になります)。
他のものと同様に、ゲートウェイの呼び出しはエラーになる可能性があります。デフォルトでは、プロデューサーには、コンシューマー側で発生した可能性のあるエラーが通知されず、応答を待機してタイムアウトします。ただし、エラー状態をコンシューマーに通知する必要がある場合があります(言い換えると、例外をメッセージにマッピングすることにより、例外を有効な応答として扱いたい場合があります)。これを実現するために、JMS 受信ゲートウェイは、処理のためにエラーを送信できるメッセージチャネルのサポートを提供します。これにより、呼び出し元が「エラー」応答として期待するものを定義する契約に準拠する応答メッセージペイロードが潜在的に生成されます。次の例に示すように、error-channel 属性を使用して、このようなチャネルを構成できます。
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="jmsInputChannel"
error-channel="errorTransformationChannel"/>
<int:transformer input-channel="exceptionTransformationChannel"
ref="exceptionTransformer" method="createErrorResponse"/>
この例は、GatewayProxyFactoryBean
を入力してくださいに含まれている例と非常によく似ていることに気付くかもしれません。ここでも同じ考えが当てはまります。exceptionTransformer
は、エラーレスポンスオブジェクトを作成する POJO である可能性があります。nullChannel
を参照してエラーを抑制したり、「エラーチャネル」を省略して例外を伝播したりできます。
受信変換エラーを参照してください。
トピックから使用する場合は、pub-sub-domain
属性を true に設定します。subscription-durable
を永続サブスクリプションの場合は true
に設定し、共有サブスクリプションの場合は subscription-shared
を設定します(JMS 2.0 ブローカーが必要であり、バージョン 4.2 以降で使用可能です)。subscription-name
を使用して、サブスクリプションに名前を付けます。
バージョン 4.2 以降、デフォルトの acknowledge モードは transacted ですが、外部コンテナーが提供されている場合を除きます。その場合、必要に応じてコンテナーを構成する必要があります。メッセージの損失を防ぐため、transacted と DefaultMessageListenerContainer を併用することをお勧めします。 |
バージョン 5.1 以降、アプリケーションの実行中にエンドポイントが停止すると、基礎となるリスナーコンテナーがシャットダウンされ、共有接続とコンシューマーが閉じられます。以前は、接続とコンシューマーは開いたままでした。前の動作に戻すには、JmsInboundGateway
の shutdownContainerOnStop
を false
に設定します。
送信ゲートウェイ
送信ゲートウェイは、Spring Integration メッセージから JMS メッセージを作成し、"request-destination" に送信します。次に、セレクターを使用して構成した "reply-destination" から受信するか、"reply-destination" が指定されていない場合は JMS TemporaryQueue
インスタンスを作成することにより、JMS 応答メッセージを処理します。
cacheZZ を 返信先を指定する場合は、キャッシュされたコンシューマーを使用しないことをお勧めします。または、以下で説明するように |
次の例は、送信ゲートウェイを設定する方法を示しています。
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies"/>
「送信ゲートウェイ」ペイロード抽出プロパティは、「受信ゲートウェイ」のプロパティに反比例します(前述の説明を参照)。つまり、"extract-request-payload" プロパティ値は、リクエストとして送信される JMS メッセージに変換される Spring Integration メッセージに適用されます。"extract-reply-payload" プロパティ値は、応答として受信した JMS メッセージに適用され、その後、前述の構成例に示すように "reply-channel" に送信される Spring Integration メッセージに変換されます。
<reply-listener/>
を使用する
Spring Integration 2.2 は、返信を処理するための代替手法を導入しました。応答ごとにコンシューマーを作成するのではなく、<reply-listener/>
子要素をゲートウェイに追加する場合、MessageListener
コンテナーを使用して応答を受信し、リクエスト側スレッドに渡します。これにより、多くのパフォーマンス上の利点が提供されるだけでなく、前の注意で説明したキャッシュされたコンシューマーメモリ使用率の問題が軽減されます。
reply-destination
のない送信ゲートウェイで <reply-listener/>
を使用する場合、リクエストごとに TemporaryQueue
を作成する代わりに、単一の TemporaryQueue
が使用されます。(ブローカーへの接続が失われ、回復された場合、ゲートウェイは必要に応じて追加の TemporaryQueue
を作成します)。
correlation-key
を使用する場合、リスナーコンテナーは各ゲートウェイに固有のセレクターを使用するため、複数のゲートウェイが同じ応答宛先を共有できます。
応答リスナーを指定し、応答宛先(または応答宛先名)を指定しても相関キーを提供しない場合、ゲートウェイは警告をログに記録し、バージョン 2.2 より前の動作にフォールバックします。これは、この場合、セレクターを構成する方法がないためです。同じ応答宛先で構成されている可能性のある別のゲートウェイへの応答を回避する方法はありません。 この状況では、リクエストごとに新しいコンシューマーが使用され、上記の注意事項で説明したように、コンシューマーはメモリに蓄積できることに注意してください。この場合、キャッシュされたコンシューマーは使用しないでください。 |
次の例は、デフォルト属性を持つ応答リスナーを示しています。
<int-jms:outbound-gateway id="jmsOutGateway"
request-destination="outQueue"
request-channel="outboundJmsRequests"
reply-channel="jmsReplies">
<int-jms:reply-listener />
</int-jms-outbound-gateway>
リスナーは非常に軽量であり、ほとんどの場合、必要なコンシューマーは 1 つだけであると予想されます。ただし、concurrent-consumers
、max-concurrent-consumers
などの属性を追加することはできます。サポートされている属性の完全なリストについてはスキーマを参照し、その意味については Spring JMS ドキュメントを参照してください。
アイドル応答リスナー
バージョン 4.2 以降、ゲートウェイのライフサイクルの期間中に実行する代わりに、必要に応じて応答リスナーを開始(およびアイドル時間後に停止)できます。これは、ほとんどがアイドル状態であるアプリケーションコンテキストに多くのゲートウェイがある場合に役立ちます。そのような状況の 1 つは、パーティション配布に Spring Integration と JMS を使用する多くの(非アクティブな)パーティション化された Spring Batch ジョブのコンテキストです。すべての応答リスナーがアクティブな場合、JMS ブローカーにはゲートウェイごとにアクティブなコンシューマーがあります。アイドルタイムアウトを有効にすることにより、各コンシューマーは、対応するバッチジョブの実行中(および完了後の短時間)にのみ存在します。
属性参照の idle-reply-listener-timeout
を参照してください。
ゲートウェイ応答相関
このセクションでは、ゲートウェイの構成方法に応じて、応答相関(発信元ゲートウェイがリクエストに対する応答のみを受信するようにする)に使用されるメカニズムについて説明します。ここで説明する属性の詳細については、属性参照を参照してください。
次のリストは、さまざまなシナリオを説明しています(番号は識別のためのものです。順序は関係ありません)。
reply-destination*
プロパティおよび<reply-listener>
なしTemporaryQueue
は各リクエストに対して作成され、リクエストが完了すると(正常にまたはその他の方法で)削除されます。correlation-key
は無関係です。reply-destination*
プロパティが提供され、<reply-listener/>
もcorrelation-key
も提供されません発信メッセージ IS と等しい
JMSCorrelationID
は、コンシューマーのメッセージセレクターとして使用されます。messageSelector = "JMSCorrelationID = '" + messageId + "'"
応答システムは、応答
JMSCorrelationID
で受信JMSMessageID
を返すことが期待されています。これは一般的なパターンであり、Spring Integration 受信ゲートウェイと、メッセージ駆動型 POJO 用の Spring のMessageListenerAdapter
によって実装されます。この構成を使用する場合、返信にトピックを使用しないでください。返信が失われる可能性があります。 reply-destination*
プロパティが提供され、<reply-listener/>
は提供されず、correlation-key="JMSCorrelationID"
ゲートウェイは一意の相関 IS を生成し、
JMSCorrelationID
ヘッダーに挿入します。メッセージセレクタは次のとおりです。messageSelector = "JMSCorrelationID = '" + uniqueId + "'"
応答システムは、応答
JMSCorrelationID
で受信JMSCorrelationID
を返すことが期待されています。これは一般的なパターンであり、Spring Integration 受信ゲートウェイと、メッセージ駆動型 POJO 用の Spring のMessageListenerAdapter
によって実装されます。reply-destination*
プロパティが提供され、<reply-listener/>
は提供されず、correlation-key="myCorrelationHeader"
ゲートウェイは一意の相関 ID を生成し、それを
myCorrelationHeader
メッセージプロパティに挿入します。correlation-key
には、任意のユーザー定義値を指定できます。メッセージセレクタは次のとおりです。messageSelector = "myCorrelationHeader = '" + uniqueId + "'"
応答システムは、応答
myCorrelationHeader
で受信myCorrelationHeader
を返すことが期待されています。reply-destination*
プロパティが提供され、<reply-listener/>
は提供されず、correlation-key="JMSCorrelationID*"
(相関キーの*
に注意してください。)ゲートウェイは、リクエストメッセージの
jms_correlationId
ヘッダー(存在する場合)の値を使用し、それをJMSCorrelationID
ヘッダーに挿入します。メッセージセレクタは次のとおりです。messageSelector = "JMSCorrelationID = '" + headers['jms_correlationId'] + "'"
ユーザーは、この値が一意であることを確認する必要があります。
ヘッダーが存在しない場合、ゲートウェイは
3
のように動作します。応答システムは、応答
JMSCorrelationID
で受信JMSCorrelationID
を返すことが期待されています。これは一般的なパターンであり、Spring Integration 受信ゲートウェイと、メッセージ駆動型 POJO 用の Spring のMessageListenerAdapter
によって実装されます。reply-destination*
プロパティは提供されず、<reply-listener>
は提供されます一時キューが作成され、このゲートウェイインスタンスからのすべての応答に使用されます。メッセージには相関データは必要ありませんが、発信
JMSMessageID
はゲートウェイで内部的に使用され、応答を正しいリクエストスレッドに送信します。reply-destination*
プロパティが提供され、<reply-listener>
が提供され、correlation-key
は提供されません禁じられています。
<reply-listener/>
構成は無視され、ゲートウェイは2
のように動作します。この状況を示す警告ログメッセージが書き込まれます。reply-destination*
プロパティが提供され、<reply-listener>
が提供され、correlation-key="JMSCorrelationID"
ゲートウェイには一意の相関 ID があり、
JMSCorrelationID
ヘッダーの増分値(gatewayId + "_" + ++seq
)とともにそれを挿入します。メッセージセレクタは次のとおりです。messageSelector = "JMSCorrelationID LIKE '" + gatewayId%'"
応答システムは、応答
JMSCorrelationID
で受信JMSCorrelationID
を返すことが期待されています。これは一般的なパターンであり、Spring Integration 受信ゲートウェイと、メッセージ駆動型 POJO 用の Spring のMessageListenerAdapter
によって実装されます。各ゲートウェイには一意の ID があるため、各インスタンスは独自の応答のみを取得します。完全な相関データを使用して、応答を正しいリクエストスレッドにルーティングします。reply-destination*
プロパティが提供され、<reply-listener/>
が提供され、correlation-key="myCorrelationHeader"
ゲートウェイには一意の相関 ID があり、
myCorrelationHeader
プロパティの増分値(gatewayId + "_" + ++seq
)とともにそれを挿入します。correlation-key
には、任意のユーザー定義値を指定できます。メッセージセレクタは次のとおりです。messageSelector = "myCorrelationHeader LIKE '" + gatewayId%'"
応答システムは、応答
myCorrelationHeader
で受信myCorrelationHeader
を返すことが期待されています。各ゲートウェイには一意の ID があるため、各インスタンスは独自の応答のみを取得します。完全な相関データを使用して、応答を正しいリクエストスレッドにルーティングします。reply-destination*
プロパティが提供され、<reply-listener/>
が提供され、correlation-key="JMSCorrelationID*"
(相関キーの
*
に注意してください)禁じられています。
ユーザー指定の相関 ID は、応答リスナーでは許可されていません。ゲートウェイは、この構成では初期化されません。
非同期ゲートウェイ
バージョン 4.3 から、送信ゲートウェイの構成時に async="true"
(または Java の setAsync(true)
)を指定できるようになりました。
デフォルトでは、リクエストがゲートウェイに送信されると、リクエストスレッドは応答が受信されるまで中断されます。その後、フローはそのスレッドで続行されます。async
が true
の場合、リクエスト側のスレッドは送信が完了するとすぐに解放され、リスナーコンテナースレッドで応答が返されます(そしてフローが続行されます)。これは、ゲートウェイがポーラースレッドで呼び出されるときに便利です。スレッドがリリースされ、フレームワーク内の他のタスクで使用できます。
async
には <reply-listener/>
(または Java 構成を使用する場合は setUseReplyContainer(true)
)が必要です。また、correlationKey
(通常は JMSCorrelationID
)を指定する必要があります。これらの条件のいずれかが満たされない場合、async
は無視されます。
属性参照
次のリストは、outbound-gateway
で使用可能なすべての属性を示しています。
<int-jms:outbound-gateway
connection-factory="connectionFactory" (1)
correlation-key="" (2)
delivery-persistent="" (3)
destination-resolver="" (4)
explicit-qos-enabled="" (5)
extract-reply-payload="true" (6)
extract-request-payload="true" (7)
header-mapper="" (8)
message-converter="" (9)
priority="" (10)
receive-timeout="" (11)
reply-channel="" (12)
reply-destination="" (13)
reply-destination-expression="" (14)
reply-destination-name="" (15)
reply-pub-sub-domain="" (16)
reply-timeout="" (17)
request-channel="" (18)
request-destination="" (19)
request-destination-expression="" (20)
request-destination-name="" (21)
request-pub-sub-domain="" (22)
time-to-live="" (23)
requires-reply="" (24)
idle-reply-listener-timeout="" (25)
async=""> (26)
<int-jms:reply-listener /> (27)
</int-jms:outbound-gateway>
1 | javax.jms.ConnectionFactory への参照。デフォルトの jmsConnectionFactory 。 | ||
2 | レスポンスとレスポンスを関連付けるための相関データを含むプロパティの名前。省略した場合、ゲートウェイは、レスポンスシステムが JMSCorrelationID ヘッダーの発信 JMSMessageID ヘッダーの値を返すことを期待します。指定した場合、ゲートウェイは相関 ID を生成し、指定されたプロパティにそれを設定します。レスポンスシステムは、同じプロパティでその値をエコーバックする必要があります。JMSCorrelationID に設定できます。この場合、相関データを保持するために String プロパティの代わりに標準ヘッダーが使用されます。<reply-container/> を使用する場合、明示的な reply-destination を指定する場合は、correlation-key を指定する必要があります。バージョン 4.0.1 以降、この属性は値 JMSCorrelationID* もサポートします。つまり、発信メッセージにすでに JMSCorrelationID (jms_correlationId からマッピングされた)ヘッダーがある場合、新しいヘッダーを生成する代わりに使用されます。<reply-container/> を使用する場合、JMSCorrelationID* キーは許可されないことに注意してください。これは、初期化中にコンテナーがメッセージセレクターを設定する必要があるためです。
| ||
3 | 配信モードが DeliveryMode.PERSISTENT (true )か DeliveryMode.NON_PERSISTENT (false )かを示すブール値。この設定は、explicit-qos-enabled が true の場合にのみ有効です。 | ||
4 | DestinationResolver デフォルトは DynamicDestinationResolver で、宛先名をその名前のキューまたはトピックにマップします。 | ||
5 | true に設定すると、サービス品質属性 priority 、delivery-mode 、time-to-live の使用が有効になります。 | ||
6 | true (デフォルト)に設定すると、Spring Integration 応答メッセージのペイロードが JMS Reply メッセージの本文から(MessageConverter を使用して)作成されます。false に設定すると、JMS メッセージ全体が Spring Integration メッセージのペイロードになります。 | ||
7 | true (デフォルト)に設定すると、Spring Integration メッセージのペイロードは JMSMessage に変換されます(MessageConverter を使用)。false に設定すると、Spring Integration メッセージ全体が JMSMessage に変換されます。どちらの場合でも、Spring Integration メッセージヘッダーは、HeaderMapper を使用して JMS ヘッダーとプロパティにマップされます。 | ||
8 | Spring Integration メッセージヘッダーと JMS メッセージヘッダーおよびプロパティとのマッピングに使用される HeaderMapper 。 | ||
9 | JMS メッセージと Spring Integration メッセージペイロード(または extract-request-payload が false の場合はメッセージ)の間の変換のための MessageConverter への参照。デフォルトは SimpleMessageConverter です。 | ||
10 | リクエストメッセージのデフォルトの優先度。存在する場合、メッセージ優先度ヘッダーによってオーバーライドされます。その範囲は 0 から 9 です。この設定は、explicit-qos-enabled が true の場合にのみ有効です。 | ||
11 | 応答を待つ時間(ミリ秒)。デフォルトは 5000 (5 秒)です。 | ||
12 | 応答メッセージが送信されるチャネル。 | ||
13 | JMSReplyTo ヘッダーとして設定されている Destination への参照。最大で、reply-destination 、reply-destination-expression 、reply-destination-name のいずれか 1 つのみが許可されます。何も指定されていない場合、TemporaryQueue がこのゲートウェイへの応答に使用されます。 | ||
14 | JMSReplyTo ヘッダーとして設定される Destination に評価される SpEL 式。式は、Destination オブジェクトまたは String になる可能性があります。これは、実際の Destination を解決するために DestinationResolver によって使用されます。最大で、reply-destination 、reply-destination-expression 、reply-destination-name のいずれか 1 つのみが許可されます。何も指定されていない場合、TemporaryQueue がこのゲートウェイへの応答に使用されます。 | ||
15 | JMSReplyTo ヘッダーとして設定されている宛先の名前。これは、実際の Destination を解決するために DestinationResolver によって使用されます。最大で、reply-destination 、reply-destination-expression 、reply-destination-name のいずれか 1 つのみが許可されます。何も指定されていない場合、TemporaryQueue がこのゲートウェイへの応答に使用されます。 | ||
16 | true に設定すると、DestinationResolver によって解決された応答 Destination が Queue ではなく Topic になります。 | ||
17 | reply-channel に応答メッセージを送信するときにゲートウェイが待機する時間。これは、reply-channel がブロックできる場合にのみ効果があります。たとえば、容量制限が現在満杯の QueueChannel などです。デフォルトは無限です。 | ||
18 | このゲートウェイがリクエストメッセージを受信するチャネル。 | ||
19 | リクエストメッセージの送信先となる Destination への参照。reply-destination 、reply-destination-expression 、reply-destination-name のいずれかが必要です。これらの 3 つの属性のうち 1 つのみを使用できます。 | ||
20 | リクエストメッセージの送信先である Destination に評価される SpEL 式。式は、Destination オブジェクトまたは String になる可能性があります。これは、実際の Destination を解決するために DestinationResolver によって使用されます。reply-destination 、reply-destination-expression 、reply-destination-name のいずれかが必要です。これらの 3 つの属性のうち 1 つのみを使用できます。 | ||
21 | リクエストメッセージの送信先の名前。これは、実際の Destination を解決するために DestinationResolver によって使用されます。reply-destination 、reply-destination-expression 、reply-destination-name のいずれかが必要です。これらの 3 つの属性のうち 1 つのみを使用できます。 | ||
22 | true に設定すると、DestinationResolver によって解決されたリクエスト Destination が Queue ではなく Topic になります。 | ||
23 | メッセージの存続時間を指定します。この設定は、explicit-qos-enabled が true の場合にのみ有効です。 | ||
24 | この送信ゲートウェイが null 以外の値を返す必要があるかどうかを指定します。デフォルトでは、この値は true であり、基になるサービスが receive-timeout の後に値を返さない場合、MessageTimeoutException がスローされます。サービスが応答を返さないことが予想される場合は、requires-reply="false" で <int-jms:outbound-gateway/> の代わりに <int-jms:outbound-channel-adapter/> を使用することをお勧めします。後者の場合、送信スレッドはブロックされ、receive-timeout 期間の応答を待機します。 | ||
25 | <reply-listener /> を使用すると、そのライフサイクル(開始および停止)はデフォルトでゲートウェイのライフサイクルと一致します。この値が 0 よりも大きい場合、コンテナーはオンデマンドで開始されます(リクエストが送信されるとき)。コンテナーは、リクエストが受信されずに少なくともこの時間が経過するまで(および未解決の応答がなくなるまで)実行を続けます。コンテナーは次のリクエストで再び開始されます。停止時間は最小であり、実際にはこの値まで 1.5x になる場合があります。 | ||
26 | 非同期ゲートウェイを参照してください。 | ||
27 | この要素を含めると、応答ごとにコンシューマーを作成するのではなく、非同期 MessageListenerContainer が応答を受信します。多くの場合、これはより効率的です。 |
JMS メッセージとの間のメッセージヘッダーのマッピング
JMS メッセージには、JMS API ヘッダーや単純なプロパティなどのメタ情報を含めることができます。JmsHeaderMapper
を使用して、Spring Integration メッセージヘッダーとの間でマッピングできます。JMS API ヘッダーは適切な setter メソッド(setJMSReplyTo
など)に渡されますが、他のヘッダーは JMS メッセージの一般的なプロパティにコピーされます。JMS 送信ゲートウェイは、標準の JMS API ヘッダーとプリミティブまたは String
メッセージヘッダーをマップする JmsHeaderMapper
のデフォルト実装でブートストラップされます。受信および送信ゲートウェイの header-mapper
属性を使用して、カスタムヘッダーマッパーを提供することもできます。
多くの JMS ベンダー固有のクライアントでは、作成済みの JMS メッセージに deliveryMode 、priority 、timeToLive プロパティを直接設定することはできません。これらは QoS プロパティと見なされるため、ターゲット MessageProducer.send(message, deliveryMode, priority, timeToLive) API に伝播する必要があります。このため、DefaultJmsHeaderMapper は、適切な Spring Integration ヘッダー(または式の結果)を前述の JMS メッセージプロパティにマップしません。代わりに、DynamicJmsTemplate は JmsSendingMessageHandler によって使用され、ヘッダー値をリクエストメッセージから MessageProducer.send() API に伝搬します。この機能を有効にするには、explicitQosEnabled プロパティを true に設定した DynamicJmsTemplate を使用して送信エンドポイントを構成する必要があります。Spring Integration Java DSL はデフォルトで DynamicJmsTemplate を構成しますが、それでも explicitQosEnabled プロパティを設定する必要があります。 |
バージョン 4.0 以降、JMSPriority ヘッダーは受信メッセージの標準 priority ヘッダーにマップされます。以前は、priority ヘッダーは送信メッセージにのみ使用されていました。以前の動作に戻す(つまり、受信優先順位をマップしない)には、DefaultJmsHeaderMapper の mapInboundPriority プロパティを false に設定します。 |
バージョン 4.3 以降、DefaultJmsHeaderMapper は toString() メソッドを呼び出すことにより、標準の correlationId ヘッダーをメッセージプロパティとしてマッピングします(correlationId は、JMS でサポートされていない UUID であることがよくあります)。受信側では、String としてマップされます。これは、JMSCorrelationID ヘッダーとの間でマッピングされる jms_correlationId ヘッダーとは無関係です。JMSCorrelationID は通常、リクエストと応答を相関させるために使用されますが、correlationId は関連するメッセージを(アグリゲーターやリシーケンサーなどを使用して)グループにまとめるためによく使用されます。 |
バージョン 5.1 以降、DefaultJmsHeaderMapper
は、受信 JMSDeliveryMode
および JMSExpiration
プロパティをマッピングするために構成できます。
@Bean
public DefaultJmsHeaderMapper jmsHeaderMapper() {
DefaultJmsHeaderMapper mapper = new DefaultJmsHeaderMapper();
mapper.setMapInboundDeliveryMode(true)
mapper.setMapInboundExpiration(true)
return mapper;
}
これらの JMS プロパティは、それぞれ JmsHeaders.DELIVERY_MODE
および JmsHeaders.EXPIRATION
Spring メッセージヘッダーにマップされます。
メッセージの変換、マーシャリング、アンマーシャリング
メッセージを変換する必要がある場合、すべての JMS アダプターおよびゲートウェイで、message-converter
属性を設定することにより、MessageConverter
を提供できます。そのためには、同じ ApplicationContext 内で使用可能な MessageConverter
のインスタンスの Bean 名を提供します。また、マーシャラーおよびアンマーシャラーインターフェースとの一貫性を提供するために、Spring には MarshallingMessageConverter
が用意されており、独自のカスタムマーシャラーおよびアンマーシャラーで構成できます。次の例は、その方法を示しています
<int-jms:inbound-gateway request-destination="requestQueue"
request-channel="inbound-gateway-channel"
message-converter="marshallingMessageConverter"/>
<bean id="marshallingMessageConverter"
class="org.springframework.jms.support.converter.MarshallingMessageConverter">
<constructor-arg>
<bean class="org.bar.SampleMarshaller"/>
</constructor-arg>
<constructor-arg>
<bean class="org.bar.SampleUnmarshaller"/>
</constructor-arg>
</bean>
独自の MessageConverter インスタンスを提供する場合、まだ HeaderMappingMessageConverter 内にラップされています。これは、"extract-request-payload" および "extract-reply-payload" プロパティが、コンバーターに渡される実際のオブジェクトに影響を与える可能性があることを意味します。HeaderMappingMessageConverter 自体がターゲット MessageConverter に委譲すると同時に、Spring Integration MessageHeaders を JMS メッセージプロパティにマッピングし、再びマッピングします。 |
JMS でサポートされるメッセージチャネル
前述のチャネルアダプターとゲートウェイはすべて、他の外部システムと統合するアプリケーションを対象としています。受信オプションは、他のシステムが JMS 宛先に JMS メッセージを送信していることを前提とし、発信オプションは、他のシステムが宛先から受信していることを前提としています。他のシステムは、Spring Integration アプリケーションである場合とそうでない場合があります。もちろん、Spring Integration メッセージインスタンスを JMS メッセージ自体の本体として( "extract-payload" 値を false
に設定して)送信する場合、他のシステムは Spring Integration に基づいていると想定されます。ただし、決して要件ではありません。この柔軟性は、「チャネル」(または JMS の場合は宛先)を抽象化してメッセージベースの統合オプションを使用する利点の 1 つです。
特定の JMS 宛先のプロデューサーとコンシューマーの両方が、同じプロセス内で実行される同じアプリケーションの一部であることが意図されている場合があります。これは、受信と送信のチャネルアダプターのペアを使用して実現できます。このアプローチの問題は、概念的には 1 つのメッセージチャネルを持つことがゴールですが、2 つのアダプターが必要なことです。Spring Integration バージョン 2.0 では、より良いオプションがサポートされています。次の例に示すように、JMS ネームスペースを使用するときに単一の「チャネル」を定義できるようになりました。
<int-jms:channel id="jmsChannel" queue="exampleQueue"/>
上記の例のチャネルは、メイン Spring Integration 名前空間の通常の <channel/>
要素とほとんど同じように動作します。任意のエンドポイントの input-channel
属性と output-channel
属性の両方で参照できます。違いは、このチャネルが exampleQueue
という名前の JMS キューインスタンスによってサポートされていることです。これは、生成エンドポイントと消費エンドポイント間で非同期メッセージングが可能であることを意味します。ただし、非 JMS <channel/>
要素内に <queue/>
要素を追加することにより作成される単純な非同期メッセージチャネルとは異なり、メッセージはメモリ内キューに格納されません。代わりに、これらのメッセージは JMS メッセージ本文内で渡され、基礎となる JMS プロバイダーのフルパワーがそのチャネルで利用可能になります。おそらく、この代替手段を使用する最も一般的な理由は、JMS メッセージングのストアアンドフォワードアプローチによって提供される永続性を活用することです。
適切に構成されている場合、JMS でサポートされるメッセージチャネルはトランザクションもサポートします。つまり、プロデューサーは、送信操作がロールバックするトランザクションの一部である場合、実際にはトランザクション対応の JMS バックチャネルに書き込みません。同様に、そのメッセージの受信がロールバックするトランザクションの一部である場合、コンシューマーは JMS メッセージをチャネルから物理的に削除しません。このようなシナリオでは、プロデューサーとコンシューマーのトランザクションは別々であることに注意してください。これは、<queue/>
子要素を持たない単純な同期 <channel/>
要素にわたるトランザクションコンテキストの伝播とは大きく異なります。
上記の例は JMS キューインスタンスを参照しているため、ポイントツーポイントチャネルとして機能します。一方、パブリッシュ / サブスクライブの動作が必要な場合は、代わりに別の要素を使用して JMS トピックを参照できます。次の例は、その方法を示しています。
<int-jms:publish-subscribe-channel id="jmsChannel" topic="exampleTopic"/>
次の例に示すように、どちらの型の JMS バックアップチャネルでも、参照の代わりに宛先の名前を指定できます。
<int-jms:channel id="jmsQueueChannel" queue-name="exampleQueueName"/>
<jms:publish-subscribe-channel id="jmsTopicChannel" topic-name="exampleTopicName"/>
上記の例では、宛先名は Spring のデフォルトの DynamicDestinationResolver
実装によって解決されていますが、DestinationResolver
インターフェースの実装を提供できます。また、JMS ConnectionFactory
はチャネルの必須プロパティですが、デフォルトでは、期待される Bean 名は jmsConnectionFactory
になります。次の例は、JMS 宛先名の解決のためのカスタムインスタンスと ConnectionFactory
の別の名前の両方を提供します。
<int-jms:channel id="jmsChannel" queue-name="exampleQueueName"
destination-resolver="customDestinationResolver"
connection-factory="customConnectionFactory"/>
<publish-subscribe-channel />
の場合、durable
属性を永続サブスクリプションの場合は true
に、共有サブスクリプションの場合は subscription-shared
に設定します(JMS 2.0 ブローカーが必要であり、バージョン 4.2 以降で使用可能です)。subscription
を使用して、サブスクリプションに名前を付けます。
JMS メッセージセレクターの使用
JMS メッセージセレクタを使用すると、JMS ヘッダーと JMS プロパティに基づいて JMS メッセージ [Oracle] (英語) をフィルタリングできます。例: カスタム JMS ヘッダープロパティ myHeaderProperty
が something
と等しいメッセージをリッスンする場合、次の式を指定できます。
myHeaderProperty = 'something'
メッセージセレクター式は、SQL-92 [Wikipedia] (英語) 条件式構文のサブセットであり、Java メッセージサービス [Oracle] (英語) 仕様の一部として定義されています。次の Spring Integration JMS コンポーネントの XML 名前空間構成を使用して、JMS メッセージ selector
属性を指定できます。
JMS チャネル
JMS パブリッシュサブスクライブチャネル
JMS 受信チャネルアダプター
JMS 受信ゲートウェイ
JMS メッセージ駆動型チャネルアダプター
JMS メッセージセレクターを使用してメッセージ本文の値を参照することはできません。 |
JMS サンプル
これらの JMS アダプターを試すには、https://github.com/spring-projects/spring-integration-samples/tree/master/basic/jms (英語) の Spring Integration Samples Git リポジトリで使用可能な JMS サンプルを確認してください。
そのリポジトリには 2 つのサンプルが含まれています。1 つは受信および送信チャネルアダプターを提供し、もう 1 つは受信および送信ゲートウェイを提供します。組み込み ActiveMQ [Apache] (英語) プロセスで実行するように構成されていますが、各サンプルの common.xml [GitHub] (英語) Spring アプリケーションコンテキストファイルを変更して、異なる JMS プロバイダーまたはスタンドアロン ActiveMQ プロセスをサポートできます。
つまり、構成を分割して、受信アダプターと送信アダプターが別々の JVM で実行されるようにすることができます。ActiveMQ がインストールされている場合は、common.xml
ファイル内の brokerURL
プロパティを変更して、(vm://localhost
の代わりに) tcp://localhost:61616
を使用します。両方のサンプルは、stdin からの入力を受け入れ、stdout にエコーバックします。構成を調べて、これらのメッセージが JMS を介してどのようにルーティングされるかを確認してください。