メッセージの受信
これは、Spring で JMS を使用してメッセージを受信する方法を説明しています。
同期受信
JMS は通常、非同期処理に関連付けられますが、メッセージを同期的に受信することも可能です。JmsTemplate および JmsClient の receive(..) メソッドはこの機能を提供します。同期受信中、呼び出しスレッドはメッセージが利用可能になるまでブロックされます。呼び出しスレッドが無期限にブロックされる可能性があるため、これは危険な操作となる可能性があります。receiveTimeout プロパティは、受信側がメッセージの受信を諦めるまでの待機時間を指定します。
非同期受信: メッセージ駆動型 POJO
Spring は、@JmsListener アノテーションの使用を通じてアノテーション付きリスナーエンドポイントもサポートし、エンドポイントをプログラムで登録するためのオープンインフラストラクチャを提供します。これは、非同期レシーバーを設定するための最も便利な方法です。詳細については、リスナーエンドポイントアノテーションを有効にするを参照してください。 |
EJB の世界のメッセージ駆動型 Bean(MDB)と同様の方法で、メッセージ駆動型 POJO(MDP)は JMS メッセージのレシーバーとして機能します。MDP の 1 つの制限(ただし MessageListenerAdapter を使用するを参照)は、jakarta.jms.MessageListener インターフェースを実装する必要があるということです。POJO が複数のスレッドでメッセージを受信する場合、実装がスレッドセーフであることを確認することが重要です。
次の例は、MDP の簡単な実装を示しています。
Java
Kotlin
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}class ExampleListener : MessageListener {
override fun onMessage(message: Message) {
if (message is TextMessage) {
try {
println(message.text)
} catch (ex: JMSException) {
throw RuntimeException(ex)
}
} else {
throw IllegalArgumentException("Message must be of type TextMessage")
}
}
}MessageListener を実装したら、メッセージリスナーコンテナーを作成します。
次の例は、Spring(この場合は DefaultMessageListenerContainer)に同梱されているメッセージリスナーコンテナーの 1 つを定義および構成する方法を示しています。
Java
Kotlin
XML
@Bean
ExampleListener messageListener() {
return new ExampleListener();
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}@Bean
fun messageListener() = ExampleListener()
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>各実装でサポートされる機能の詳細な説明については、さまざまなメッセージリスナーコンテナー(すべて MessageListenerContainer (Javadoc) を実装)の Spring javadoc を参照してください。
SessionAwareMessageListener インターフェースの使用
SessionAwareMessageListener インターフェースは、JMS MessageListener インターフェースと同様の契約を提供する Spring 固有のインターフェースですが、Message の受信元である JMS Session へのメッセージ処理メソッドアクセスも提供します。次のリストは、SessionAwareMessageListener インターフェースの定義を示しています。
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}MDP が(onMessage(Message, Session) メソッドで提供される Session を使用して)受信したメッセージに応答できるようにする場合は、MDP にこのインターフェースを実装することを選択できます(標準 JMS MessageListener インターフェースに優先)。Spring に同梱されているすべてのメッセージリスナーコンテナーの実装は、MessageListener または SessionAwareMessageListener インターフェースのいずれかを実装する MDP をサポートしています。SessionAwareMessageListener を実装するクラスには、インターフェースを介して Spring に結び付けられるという警告が付いています。使用するかどうかの選択は、アプリケーション開発者またはアーキテクトとして完全にあなたに任されています。
SessionAwareMessageListener インターフェースの onMessage(..) メソッドは JMSException をスローすることに注意してください。標準の JMS MessageListener インターフェースとは異なり、SessionAwareMessageListener インターフェースを使用する場合、スローされた例外を処理するのはクライアントコードの責任です。
MessageListenerAdapter を使用する
MessageListenerAdapter クラスは、Spring の非同期メッセージングサポートの最後のコンポーネントです。簡単に言えば、ほぼすべてのクラスを MDP として公開できます(ただし、いくつかの制約があります)。
次のインターフェース定義を検討してください。
Java
Kotlin
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}interface MessageDelegate {
fun handleMessage(message: String)
fun handleMessage(message: Map<*, *>)
fun handleMessage(message: ByteArray)
fun handleMessage(message: Serializable)
} インターフェースは MessageListener インターフェースも SessionAwareMessageListener インターフェースも拡張しませんが、MessageListenerAdapter クラスを使用することにより、MDP として使用できることに注意してください。また、受信および処理できるさまざまな Message 型の内容に従って、さまざまなメッセージ処理メソッドがどのように強く型付けされているかに注意してください。
次に、MessageDelegate インターフェースの以下の実装を検討してください。
Java
Kotlin
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
// ...
}
@Override
public void handleMessage(Map message) {
// ...
}
@Override
public void handleMessage(byte[] message) {
// ...
}
@Override
public void handleMessage(Serializable message) {
// ...
}
}class DefaultMessageDelegate : MessageDelegate {
override fun handleMessage(message: String) {
// ...
}
override fun handleMessage(message: Map<*, *>) {
// ...
}
override fun handleMessage(message: ByteArray) {
// ...
}
override fun handleMessage(message: Serializable) {
// ...
}
} 特に、MessageDelegate インターフェースの前の実装(DefaultMessageDelegate クラス)に JMS 依存関係がまったくないことに注意してください。これは、次の構成を介して MDP を作成できる POJO です。
Java
Kotlin
XML
@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
return MessageListenerAdapter(messageDelegate)
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean> 次の例は、JMS TextMessage メッセージの受信のみを処理できる別の MDP を示しています。メッセージ処理メソッドが実際に receive と呼ばれることに注意してください(MessageListenerAdapter のメッセージ処理メソッドの名前はデフォルトで handleMessage になります)が、構成可能です(このセクションで後述します)。また、receive(..) メソッドが JMS TextMessage メッセージのみを受信して応答するように厳密に入力されていることに注意してください。次のリストは、TextMessageDelegate インターフェースの定義を示しています。
Java
Kotlin
public interface TextMessageDelegate {
void receive(TextMessage message);
}interface TextMessageDelegate {
fun receive(message: TextMessage)
} 次のリストは、TextMessageDelegate インターフェースを実装するクラスを示しています。
Java
Kotlin
public class DefaultTextMessageDelegate implements TextMessageDelegate {
@Override
public void receive(TextMessage message) {
// ...
}
}class DefaultTextMessageDelegate : TextMessageDelegate {
override fun receive(message: TextMessage) {
// ...
}
} アテンダント MessageListenerAdapter の構成は次のようになります。
Java
Kotlin
XML
@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
setDefaultListenerMethod("receive")
// We don't want automatic message context extraction
setMessageConverter(null)
}<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>messageListener が TextMessage 以外の型の JMS Message を受信した場合、IllegalStateException がスローされる(その後、飲み込まれる)ことに注意してください。MessageListenerAdapter クラスのもう 1 つの機能は、ハンドラーメソッドが非 void 値を返す場合に、自動的にレスポンス Message を送り返す機能です。次のインターフェースとクラスを検討してください。
Java
Kotlin
public interface ResponsiveTextMessageDelegate {
// Notice the return type...
String receive(TextMessage message);
}interface ResponsiveTextMessageDelegate {
// Notice the return type...
fun receive(message: TextMessage): String
}Java
Kotlin
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
@Override
public String receive(TextMessage message) {
return "message";
}
}class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {
override fun receive(message: TextMessage): String {
return "message"
}
}DefaultResponsiveTextMessageDelegate を MessageListenerAdapter と組み合わせて使用する場合、'receive(..)' メソッドの実行から返される null 以外の値は(デフォルト構成で) TextMessage に変換されます。結果の TextMessage は、元の Message の JMS Reply-To プロパティで定義された Destination (存在する場合)または MessageListenerAdapter に設定されたデフォルト Destination (設定されている場合)に送信されます。Destination が見つからない場合、InvalidDestinationException がスローされます(この例外は飲み込まれず、呼び出しスタックに伝播することに注意してください)。
トランザクション内のメッセージの処理
トランザクション内でメッセージリスナーを呼び出すには、リスナーコンテナーの再構成のみが必要です。
リスナーコンテナー定義の sessionTransacted フラグを使用して、ローカルリソーストランザクションをアクティブ化できます。各メッセージリスナーの呼び出しはアクティブな JMS トランザクション内で動作し、リスナーの実行が失敗した場合はメッセージの受信がロールバックされます。レスポンスメッセージの送信 (SessionAwareMessageListener 経由) は同じローカルトランザクションの一部ですが、その他のリソース操作 (データベースアクセスなど) は独立して動作します。通常、データベース処理はコミットされましたがメッセージ処理がコミットに失敗したケースに対応するために、リスナー実装で重複メッセージの検出が必要になります。
以下の Bean 定義を考慮してください。
Java
Kotlin
XML
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
isSessionTransacted = true
}<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean> 外部管理トランザクションに参加するには、トランザクションマネージャーを構成し、外部管理トランザクションをサポートするリスナーコンテナー(通常は DefaultMessageListenerContainer)を使用する必要があります。
XA トランザクション参加用のメッセージリスナーコンテナーを構成するには、JtaTransactionManager (デフォルトでは、Jakarta EE サーバーのトランザクションサブシステムに委譲) を構成する必要があります。基盤となる JMS ConnectionFactory は XA 対応であり、JTA トランザクションコーディネータに適切に登録されている必要があることに注意してください (Jakarta EE サーバーの JNDI リソースの構成を確認してください)。これにより、メッセージの受信と (たとえば) データベースアクセスが同じトランザクションの一部になります (統一されたコミットセマンティクスにより、XA トランザクションログのオーバーヘッドが発生します)。
次の Bean 定義は、トランザクションマネージャーを作成します。
Java
Kotlin
XML
@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}@Bean
fun transactionManager() = JtaTransactionManager()<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>次に、以前のコンテナー構成に追加する必要があります。コンテナーが残りを処理します。次の例は、その方法を示しています。
Java
Kotlin
XML
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
transactionManager: JtaTransactionManager) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
setTransactionManager(transactionManager)
}<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>