メッセージの受信
これは、Spring で JMS を使用してメッセージを受信する方法を説明しています。
同期受信
通常、JMS は非同期処理に関連付けられていますが、メッセージを同期的に消費できます。オーバーロードされた receive(..)
メソッドは、この機能を提供します。同期受信中、呼び出しスレッドはメッセージが利用可能になるまでブロックします。これは、呼び出しスレッドが潜在的に無期限にブロックされる可能性があるため、危険な操作になる可能性があります。receiveTimeout
プロパティは、メッセージの待機をあきらめるまで受信者が待機する時間を指定します。
非同期受信: メッセージ駆動型 POJO
Spring は、@JmsListener アノテーションを使用してアノテーション付きリスナーエンドポイントもサポートし、エンドポイントをプログラムで登録するためのオープンなインフラストラクチャを提供します。これは、非同期レシーバーをセットアップする最も便利な方法です。詳細については、リスナーエンドポイントアノテーションを有効にするを参照してください。 |
EJB の世界のメッセージ駆動型 Bean(MDB)と同様の方法で、メッセージ駆動型 POJO(MDP)は JMS メッセージのレシーバーとして機能します。MDP の 1 つの制限(ただし MessageListenerAdapter
を使用するを参照)は、jakarta.jms.MessageListener
インターフェースを実装する必要があるということです。POJO が複数のスレッドでメッセージを受信する場合、実装がスレッドセーフであることを確認することが重要です。
次の例は、MDP の簡単な実装を示しています。
Java
Kotlin
public class ExampleListener implements MessageListener {
public void onMessage(Message message) {
if (message instanceof TextMessage textMessage) {
try {
System.out.println(textMessage.getText());
}
catch (JMSException ex) {
throw new RuntimeException(ex);
}
}
else {
throw new IllegalArgumentException("Message must be of type TextMessage");
}
}
}
class ExampleListener : MessageListener {
override fun onMessage(message: Message) {
if (message is TextMessage) {
try {
println(message.text)
} catch (ex: JMSException) {
throw RuntimeException(ex)
}
} else {
throw IllegalArgumentException("Message must be of type TextMessage")
}
}
}
MessageListener
を実装したら、メッセージリスナーコンテナーを作成します。
次の例は、Spring(この場合は DefaultMessageListenerContainer
)に同梱されているメッセージリスナーコンテナーの 1 つを定義および構成する方法を示しています。
Java
Kotlin
XML
@Bean
ExampleListener messageListener() {
return new ExampleListener();
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>
<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
各実装でサポートされる機能の詳細な説明については、さまざまなメッセージリスナーコンテナー(すべて MessageListenerContainer (Javadoc) を実装)の Spring javadoc を参照してください。
SessionAwareMessageListener
インターフェースの使用
SessionAwareMessageListener
インターフェースは、JMS MessageListener
インターフェースと同様の契約を提供する Spring 固有のインターフェースですが、Message
の受信元である JMS Session
へのメッセージ処理メソッドアクセスも提供します。次のリストは、SessionAwareMessageListener
インターフェースの定義を示しています。
package org.springframework.jms.listener;
public interface SessionAwareMessageListener {
void onMessage(Message message, Session session) throws JMSException;
}
MDP が(onMessage(Message, Session)
メソッドで提供される Session
を使用して)受信したメッセージに応答できるようにする場合は、MDP にこのインターフェースを実装することを選択できます(標準 JMS MessageListener
インターフェースに優先)。Spring に同梱されているすべてのメッセージリスナーコンテナーの実装は、MessageListener
または SessionAwareMessageListener
インターフェースのいずれかを実装する MDP をサポートしています。SessionAwareMessageListener
を実装するクラスには、インターフェースを介して Spring に結び付けられるという警告が付いています。使用するかどうかの選択は、アプリケーション開発者またはアーキテクトとして完全にあなたに任されています。
SessionAwareMessageListener
インターフェースの onMessage(..)
メソッドは JMSException
をスローすることに注意してください。標準の JMS MessageListener
インターフェースとは異なり、SessionAwareMessageListener
インターフェースを使用する場合、スローされた例外を処理するのはクライアントコードの責任です。
MessageListenerAdapter
を使用する
MessageListenerAdapter
クラスは、Spring の非同期メッセージングサポートの最後のコンポーネントです。簡単に言えば、ほぼすべてのクラスを MDP として公開できます(ただし、いくつかの制約があります)。
次のインターフェース定義を検討してください。
Java
Kotlin
public interface MessageDelegate {
void handleMessage(String message);
void handleMessage(Map message);
void handleMessage(byte[] message);
void handleMessage(Serializable message);
}
interface MessageDelegate {
fun handleMessage(message: String)
fun handleMessage(message: Map<*, *>)
fun handleMessage(message: ByteArray)
fun handleMessage(message: Serializable)
}
インターフェースは MessageListener
インターフェースも SessionAwareMessageListener
インターフェースも拡張しませんが、MessageListenerAdapter
クラスを使用することにより、MDP として使用できることに注意してください。また、受信および処理できるさまざまな Message
型の内容に従って、さまざまなメッセージ処理メソッドがどのように強く型付けされているかに注意してください。
次に、MessageDelegate
インターフェースの以下の実装を検討してください。
Java
Kotlin
public class DefaultMessageDelegate implements MessageDelegate {
@Override
public void handleMessage(String message) {
// ...
}
@Override
public void handleMessage(Map message) {
// ...
}
@Override
public void handleMessage(byte[] message) {
// ...
}
@Override
public void handleMessage(Serializable message) {
// ...
}
}
class DefaultMessageDelegate : MessageDelegate {
override fun handleMessage(message: String) {
// ...
}
override fun handleMessage(message: Map<*, *>) {
// ...
}
override fun handleMessage(message: ByteArray) {
// ...
}
override fun handleMessage(message: Serializable) {
// ...
}
}
特に、MessageDelegate
インターフェースの前の実装(DefaultMessageDelegate
クラス)に JMS 依存関係がまったくないことに注意してください。これは、次の構成を介して MDP を作成できる POJO です。
Java
Kotlin
XML
@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
return new MessageListenerAdapter(messageDelegate);
}
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
return MessageListenerAdapter(messageDelegate)
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultMessageDelegate"/>
</constructor-arg>
</bean>
<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>
次の例は、JMS TextMessage
メッセージの受信のみを処理できる別の MDP を示しています。メッセージ処理メソッドが実際に receive
と呼ばれることに注意してください(MessageListenerAdapter
のメッセージ処理メソッドの名前はデフォルトで handleMessage
になります)が、構成可能です(このセクションで後述します)。また、receive(..)
メソッドが JMS TextMessage
メッセージのみを受信して応答するように厳密に入力されていることに注意してください。次のリストは、TextMessageDelegate
インターフェースの定義を示しています。
Java
Kotlin
public interface TextMessageDelegate {
void receive(TextMessage message);
}
interface TextMessageDelegate {
fun receive(message: TextMessage)
}
次のリストは、TextMessageDelegate
インターフェースを実装するクラスを示しています。
Java
Kotlin
public class DefaultTextMessageDelegate implements TextMessageDelegate {
@Override
public void receive(TextMessage message) {
// ...
}
}
class DefaultTextMessageDelegate : TextMessageDelegate {
override fun receive(message: TextMessage) {
// ...
}
}
アテンダント MessageListenerAdapter
の構成は次のようになります。
Java
Kotlin
XML
@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
messageListener.setDefaultListenerMethod("receive");
// We don't want automatic message context extraction
messageListener.setMessageConverter(null);
return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
setDefaultListenerMethod("receive")
// We don't want automatic message context extraction
setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
<constructor-arg>
<bean class="jmsexample.DefaultTextMessageDelegate"/>
</constructor-arg>
<property name="defaultListenerMethod" value="receive"/>
<!-- we don't want automatic message context extraction -->
<property name="messageConverter">
<null/>
</property>
</bean>
messageListener
が TextMessage
以外の型の JMS Message
を受信した場合、IllegalStateException
がスローされる(その後、飲み込まれる)ことに注意してください。MessageListenerAdapter
クラスのもう 1 つの機能は、ハンドラーメソッドが非 void 値を返す場合に、自動的にレスポンス Message
を送り返す機能です。次のインターフェースとクラスを検討してください。
Java
Kotlin
public interface ResponsiveTextMessageDelegate {
// Notice the return type...
String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {
// Notice the return type...
fun receive(message: TextMessage): String
}
Java
Kotlin
public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {
@Override
public String receive(TextMessage message) {
return "message";
}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {
override fun receive(message: TextMessage): String {
return "message"
}
}
DefaultResponsiveTextMessageDelegate
を MessageListenerAdapter
と組み合わせて使用する場合、'receive(..)'
メソッドの実行から返される null 以外の値は(デフォルト構成で) TextMessage
に変換されます。結果の TextMessage
は、元の Message
の JMS Reply-To
プロパティで定義された Destination
(存在する場合)または MessageListenerAdapter
に設定されたデフォルト Destination
(設定されている場合)に送信されます。Destination
が見つからない場合、InvalidDestinationException
がスローされます(この例外は飲み込まれず、呼び出しスタックに伝播することに注意してください)。
トランザクション内のメッセージの処理
トランザクション内でメッセージリスナーを呼び出すには、リスナーコンテナーの再構成のみが必要です。
リスナーコンテナー定義の sessionTransacted
フラグを使用して、ローカルリソーストランザクションをアクティブ化できます。各メッセージリスナ呼び出しは、アクティブな JMS トランザクション内で動作し、リスナの実行に失敗した場合にメッセージ受信がロールバックされます。(SessionAwareMessageListener
を介した)レスポンスメッセージの送信は同じローカルトランザクションの一部ですが、他のリソース操作(データベースアクセスなど)は独立して動作します。これには通常、データベース処理はコミットされましたがメッセージ処理がコミットに失敗した場合に対応するために、リスナー実装での重複メッセージの検出が必要です。
以下の Bean 定義を考慮してください。
Java
Kotlin
XML
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
isSessionTransacted = true
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="sessionTransacted" value="true"/>
</bean>
外部管理トランザクションに参加するには、トランザクションマネージャーを構成し、外部管理トランザクションをサポートするリスナーコンテナー(通常は DefaultMessageListenerContainer
)を使用する必要があります。
XA トランザクション参加用のメッセージリスナーコンテナーを構成するには、JtaTransactionManager
(デフォルトでは、Jakarta EE サーバーのトランザクションサブシステムに委譲する)を構成します。基礎となる JMS ConnectionFactory
は XA 対応であり、JTA トランザクションコーディネーターに適切に登録される必要があることに注意してください。(Jakarta EE サーバーの JNDI リソースの設定を確認してください)これにより、メッセージ受信と(たとえば)データベースアクセスを同じトランザクションの一部にすることができます(XA トランザクションログのオーバーヘッドを犠牲にして、コミットセマンティクスを統一)。
次の Bean 定義は、トランザクションマネージャーを作成します。
Java
Kotlin
XML
@Bean
JtaTransactionManager transactionManager() {
return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>
次に、以前のコンテナー構成に追加する必要があります。コンテナーが残りを処理します。次の例は、その方法を示しています。
Java
Kotlin
XML
@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
ExampleListener messageListener) {
DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
jmsContainer.setConnectionFactory(connectionFactory);
jmsContainer.setDestination(destination);
jmsContainer.setMessageListener(messageListener);
jmsContainer.setSessionTransacted(true);
return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
transactionManager: JtaTransactionManager) =
DefaultMessageListenerContainer().apply {
setConnectionFactory(connectionFactory)
setDestination(destination)
setMessageListener(messageListener)
setTransactionManager(transactionManager)
}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
<property name="transactionManager" ref="transactionManager"/>
</bean>