メッセージの受信

これは、Spring で JMS を使用してメッセージを受信する方法を説明しています。

同期受信

通常、JMS は非同期処理に関連付けられていますが、メッセージを同期的に消費できます。オーバーロードされた receive(..) メソッドは、この機能を提供します。同期受信中、呼び出しスレッドはメッセージが利用可能になるまでブロックします。これは、呼び出しスレッドが潜在的に無期限にブロックされる可能性があるため、危険な操作になる可能性があります。receiveTimeout プロパティは、メッセージの待機をあきらめるまで受信者が待機する時間を指定します。

非同期受信: メッセージ駆動型 POJO

Spring は、@JmsListener アノテーションを使用してアノテーション付きリスナーエンドポイントもサポートし、エンドポイントをプログラムで登録するためのオープンなインフラストラクチャを提供します。これは、非同期レシーバーをセットアップする最も便利な方法です。詳細については、リスナーエンドポイントアノテーションを有効にするを参照してください。

EJB の世界のメッセージ駆動型 Bean(MDB)と同様の方法で、メッセージ駆動型 POJO(MDP)は JMS メッセージのレシーバーとして機能します。MDP の 1 つの制限(ただし MessageListenerAdapter を使用するを参照)は、jakarta.jms.MessageListener インターフェースを実装する必要があるということです。POJO が複数のスレッドでメッセージを受信する場合、実装がスレッドセーフであることを確認することが重要です。

次の例は、MDP の簡単な実装を示しています。

  • Java

  • Kotlin

public class ExampleListener implements MessageListener {

	public void onMessage(Message message) {
		if (message instanceof TextMessage textMessage) {
			try {
				System.out.println(textMessage.getText());
			}
			catch (JMSException ex) {
				throw new RuntimeException(ex);
			}
		}
		else {
			throw new IllegalArgumentException("Message must be of type TextMessage");
		}
	}
}
class ExampleListener : MessageListener {

	override fun onMessage(message: Message) {
		if (message is TextMessage) {
			try {
				println(message.text)
			} catch (ex: JMSException) {
				throw RuntimeException(ex)
			}
		} else {
			throw IllegalArgumentException("Message must be of type TextMessage")
		}
	}
}

MessageListener を実装したら、メッセージリスナーコンテナーを作成します。

次の例は、Spring(この場合は DefaultMessageListenerContainer)に同梱されているメッセージリスナーコンテナーの 1 つを定義および構成する方法を示しています。

  • Java

  • Kotlin

  • XML

@Bean
ExampleListener messageListener() {
	return new ExampleListener();
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener() = ExampleListener()

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="jmsexample.ExampleListener"/>

<!-- and this is the message listener container -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

各実装でサポートされる機能の詳細な説明については、さまざまなメッセージリスナーコンテナー(すべて MessageListenerContainer (Javadoc) を実装)の Spring javadoc を参照してください。

SessionAwareMessageListener インターフェースの使用

SessionAwareMessageListener インターフェースは、JMS MessageListener インターフェースと同様の契約を提供する Spring 固有のインターフェースですが、Message の受信元である JMS Session へのメッセージ処理メソッドアクセスも提供します。次のリストは、SessionAwareMessageListener インターフェースの定義を示しています。

package org.springframework.jms.listener;

public interface SessionAwareMessageListener {

	void onMessage(Message message, Session session) throws JMSException;
}

MDP が(onMessage(Message, Session) メソッドで提供される Session を使用して)受信したメッセージに応答できるようにする場合は、MDP にこのインターフェースを実装することを選択できます(標準 JMS MessageListener インターフェースに優先)。Spring に同梱されているすべてのメッセージリスナーコンテナーの実装は、MessageListener または SessionAwareMessageListener インターフェースのいずれかを実装する MDP をサポートしています。SessionAwareMessageListener を実装するクラスには、インターフェースを介して Spring に結び付けられるという警告が付いています。使用するかどうかの選択は、アプリケーション開発者またはアーキテクトとして完全にあなたに任されています。

SessionAwareMessageListener インターフェースの onMessage(..) メソッドは JMSException をスローすることに注意してください。標準の JMS MessageListener インターフェースとは異なり、SessionAwareMessageListener インターフェースを使用する場合、スローされた例外を処理するのはクライアントコードの責任です。

MessageListenerAdapter を使用する

MessageListenerAdapter クラスは、Spring の非同期メッセージングサポートの最後のコンポーネントです。簡単に言えば、ほぼすべてのクラスを MDP として公開できます(ただし、いくつかの制約があります)。

次のインターフェース定義を検討してください。

  • Java

  • Kotlin

public interface MessageDelegate {

	void handleMessage(String message);

	void handleMessage(Map message);

	void handleMessage(byte[] message);

	void handleMessage(Serializable message);
}
interface MessageDelegate {
	fun handleMessage(message: String)
	fun handleMessage(message: Map<*, *>)
	fun handleMessage(message: ByteArray)
	fun handleMessage(message: Serializable)
}

インターフェースは MessageListener インターフェースも SessionAwareMessageListener インターフェースも拡張しませんが、MessageListenerAdapter クラスを使用することにより、MDP として使用できることに注意してください。また、受信および処理できるさまざまな Message 型の内容に従って、さまざまなメッセージ処理メソッドがどのように強く型付けされているかに注意してください。

次に、MessageDelegate インターフェースの以下の実装を検討してください。

  • Java

  • Kotlin

public class DefaultMessageDelegate implements MessageDelegate {

	@Override
	public void handleMessage(String message) {
		// ...
	}

	@Override
	public void handleMessage(Map message) {
		// ...
	}

	@Override
	public void handleMessage(byte[] message) {
		// ...
	}

	@Override
	public void handleMessage(Serializable message) {
		// ...
	}
}
class DefaultMessageDelegate : MessageDelegate {

	override fun handleMessage(message: String) {
		// ...
	}

	override fun handleMessage(message: Map<*, *>) {
		// ...
	}

	override fun handleMessage(message: ByteArray) {
		// ...
	}

	override fun handleMessage(message: Serializable) {
		// ...
	}
}

特に、MessageDelegate インターフェースの前の実装(DefaultMessageDelegate クラス)に JMS 依存関係がまったくないことに注意してください。これは、次の構成を介して MDP を作成できる POJO です。

  • Java

  • Kotlin

  • XML

@Bean
MessageListenerAdapter messageListener(DefaultMessageDelegate messageDelegate) {
	return new MessageListenerAdapter(messageDelegate);
}

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	return jmsContainer;
}
@Bean
fun messageListener(messageDelegate: DefaultMessageDelegate): MessageListenerAdapter {
	return MessageListenerAdapter(messageDelegate)
}

@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
	}
<!-- this is the Message Driven POJO (MDP) -->
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultMessageDelegate"/>
	</constructor-arg>
</bean>

<!-- and this is the message listener container... -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
</bean>

次の例は、JMS TextMessage メッセージの受信のみを処理できる別の MDP を示しています。メッセージ処理メソッドが実際に receive と呼ばれることに注意してください(MessageListenerAdapter のメッセージ処理メソッドの名前はデフォルトで handleMessage になります)が、構成可能です(このセクションで後述します)。また、receive(..) メソッドが JMS TextMessage メッセージのみを受信して応答するように厳密に入力されていることに注意してください。次のリストは、TextMessageDelegate インターフェースの定義を示しています。

  • Java

  • Kotlin

public interface TextMessageDelegate {

	void receive(TextMessage message);
}
interface TextMessageDelegate {
	fun receive(message: TextMessage)
}

次のリストは、TextMessageDelegate インターフェースを実装するクラスを示しています。

  • Java

  • Kotlin

public class DefaultTextMessageDelegate implements TextMessageDelegate {

	@Override
	public void receive(TextMessage message) {
		// ...
	}
}
class DefaultTextMessageDelegate : TextMessageDelegate {

	override fun receive(message: TextMessage) {
		// ...
	}
}

アテンダント MessageListenerAdapter の構成は次のようになります。

  • Java

  • Kotlin

  • XML

@Bean
MessageListenerAdapter messageListener(DefaultTextMessageDelegate messageDelegate) {
	MessageListenerAdapter messageListener = new MessageListenerAdapter(messageDelegate);
	messageListener.setDefaultListenerMethod("receive");
	// We don't want automatic message context extraction
	messageListener.setMessageConverter(null);
	return messageListener;
}
@Bean
fun messageListener(messageDelegate: DefaultTextMessageDelegate) = MessageListenerAdapter(messageDelegate).apply {
	setDefaultListenerMethod("receive")
	// We don't want automatic message context extraction
	setMessageConverter(null)
}
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
	<constructor-arg>
		<bean class="jmsexample.DefaultTextMessageDelegate"/>
	</constructor-arg>
	<property name="defaultListenerMethod" value="receive"/>
	<!-- we don't want automatic message context extraction -->
	<property name="messageConverter">
		<null/>
	</property>
</bean>

messageListener が TextMessage 以外の型の JMS Message を受信した場合、IllegalStateException がスローされる(その後、飲み込まれる)ことに注意してください。MessageListenerAdapter クラスのもう 1 つの機能は、ハンドラーメソッドが非 void 値を返す場合に、自動的にレスポンス Message を送り返す機能です。次のインターフェースとクラスを検討してください。

  • Java

  • Kotlin

public interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	String receive(TextMessage message);
}
interface ResponsiveTextMessageDelegate {

	// Notice the return type...
	fun receive(message: TextMessage): String
}
  • Java

  • Kotlin

public class DefaultResponsiveTextMessageDelegate implements ResponsiveTextMessageDelegate {

	@Override
	public String receive(TextMessage message) {
		return "message";
	}
}
class DefaultResponsiveTextMessageDelegate : ResponsiveTextMessageDelegate {

	override fun receive(message: TextMessage): String {
		return "message"
	}
}

DefaultResponsiveTextMessageDelegate を MessageListenerAdapter と組み合わせて使用する場合、'receive(..)' メソッドの実行から返される null 以外の値は(デフォルト構成で) TextMessage に変換されます。結果の TextMessage は、元の Message の JMS Reply-To プロパティで定義された Destination (存在する場合)または MessageListenerAdapter に設定されたデフォルト Destination (設定されている場合)に送信されます。Destination が見つからない場合、InvalidDestinationException がスローされます(この例外は飲み込まれず、呼び出しスタックに伝播することに注意してください)。

トランザクション内のメッセージの処理

トランザクション内でメッセージリスナーを呼び出すには、リスナーコンテナーの再構成のみが必要です。

リスナーコンテナー定義の sessionTransacted フラグを使用して、ローカルリソーストランザクションをアクティブ化できます。各メッセージリスナ呼び出しは、アクティブな JMS トランザクション内で動作し、リスナの実行に失敗した場合にメッセージ受信がロールバックされます。(SessionAwareMessageListener を介した)レスポンスメッセージの送信は同じローカルトランザクションの一部ですが、他のリソース操作(データベースアクセスなど)は独立して動作します。これには通常、データベース処理はコミットされましたがメッセージ処理がコミットに失敗した場合に対応するために、リスナー実装での重複メッセージの検出が必要です。

以下の Bean 定義を考慮してください。

  • Java

  • Kotlin

  • XML

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		isSessionTransacted = true
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="sessionTransacted" value="true"/>
</bean>

外部管理トランザクションに参加するには、トランザクションマネージャーを構成し、外部管理トランザクションをサポートするリスナーコンテナー(通常は DefaultMessageListenerContainer)を使用する必要があります。

XA トランザクション参加用のメッセージリスナーコンテナーを構成するには、JtaTransactionManager (デフォルトでは、Jakarta EE サーバーのトランザクションサブシステムに委譲する)を構成します。基礎となる JMS ConnectionFactory は XA 対応であり、JTA トランザクションコーディネーターに適切に登録される必要があることに注意してください。(Jakarta EE サーバーの JNDI リソースの設定を確認してください)これにより、メッセージ受信と(たとえば)データベースアクセスを同じトランザクションの一部にすることができます(XA トランザクションログのオーバーヘッドを犠牲にして、コミットセマンティクスを統一)。

次の Bean 定義は、トランザクションマネージャーを作成します。

  • Java

  • Kotlin

  • XML

@Bean
JtaTransactionManager transactionManager()  {
	return new JtaTransactionManager();
}
@Bean
fun transactionManager() = JtaTransactionManager()
<bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/>

次に、以前のコンテナー構成に追加する必要があります。コンテナーが残りを処理します。次の例は、その方法を示しています。

  • Java

  • Kotlin

  • XML

@Bean
DefaultMessageListenerContainer jmsContainer(ConnectionFactory connectionFactory, Destination destination,
		ExampleListener messageListener) {

	DefaultMessageListenerContainer jmsContainer = new DefaultMessageListenerContainer();
	jmsContainer.setConnectionFactory(connectionFactory);
	jmsContainer.setDestination(destination);
	jmsContainer.setMessageListener(messageListener);
	jmsContainer.setSessionTransacted(true);
	return jmsContainer;
}
@Bean
fun jmsContainer(connectionFactory: ConnectionFactory, destination: Destination, messageListener: ExampleListener,
				 transactionManager: JtaTransactionManager) =
	DefaultMessageListenerContainer().apply {
		setConnectionFactory(connectionFactory)
		setDestination(destination)
		setMessageListener(messageListener)
		setTransactionManager(transactionManager)
	}
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
	<property name="connectionFactory" ref="connectionFactory"/>
	<property name="destination" ref="destination"/>
	<property name="messageListener" ref="messageListener"/>
	<property name="transactionManager" ref="transactionManager"/>
</bean>