MQTT サポート
Spring Integration は、メッセージキューテレメトリートランスポート(MQTT)プロトコルをサポートする受信および送信チャネルアダプターを提供します。
この依存関係をプロジェクトに含める必要があります。
現在の実装では、Eclipse Paho MQTT クライアント (英語) ライブラリを使用しています。
XML 構成とこの章の大部分は、MQTT v3.1 プロトコルのサポートとそれぞれの Paho クライアントに関するものです。それぞれのプロトコルのサポートについては、MQTTv5 サポートの段落を参照してください。 |
両方のアダプターの構成は、DefaultMqttPahoClientFactory
を使用して実現されます。構成オプションの詳細については、Paho のドキュメントを参照してください。
ファクトリ自体に(非推奨)オプションを設定する代わりに、MqttConnectOptions オブジェクトを構成してファクトリに注入することをお勧めします。 |
受信(メッセージ駆動型)チャネルアダプター
受信チャネルアダプターは MqttPahoMessageDrivenChannelAdapter
によって実装されます。便宜上、名前空間を使用して構成できます。最小構成は次のとおりです。
<bean id="clientFactory"
class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
<property name="connectionOptions">
<bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
<property name="userName" value="${mqtt.username}"/>
<property name="password" value="${mqtt.password}"/>
</bean>
</property>
</bean>
<int-mqtt:message-driven-channel-adapter id="mqttInbound"
client-id="${mqtt.default.client.id}.src"
url="${mqtt.url}"
topics="sometopic"
client-factory="clientFactory"
channel="output"/>
次のリストは、使用可能な属性を示しています。
<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
client-id="foo" (1)
url="tcp://localhost:1883" (2)
topics="bar,baz" (3)
qos="1,2" (4)
converter="myConverter" (5)
client-factory="clientFactory" (6)
send-timeout="123" (7)
error-channel="errors" (8)
recovery-interval="10000" (9)
manual-acks="false" (10)
channel="out" />
1 | クライアント ID。 |
2 | ブローカー URL。 |
3 | このアダプターがメッセージを受信するトピックのコンマ区切りリスト。 |
4 | QoS 値のコンマ区切りリスト。すべてのトピックに適用される単一の値または各トピックの値にすることができます(この場合、リストは同じ長さでなければなりません)。 |
5 | MqttMessageConverter (オプション)。デフォルトでは、デフォルトの DefaultPahoMessageConverter は、次のヘッダーを持つ String ペイロードを持つメッセージを生成します。
|
6 | クライアントファクトリ。 |
7 | send() タイムアウト。これは、チャネルがブロックされる可能性がある場合にのみ適用されます (現在満杯の制限付き QueueChannel など)。 |
8 | エラーチャネル。ダウンストリーム例外は、ErrorMessage でこのチャネルに送信されます(指定されている場合)。ペイロードは、失敗したメッセージと原因を含む MessagingException です。 |
9 | 回復間隔。これは、アダプターが障害後に再接続を試行する間隔を制御します。デフォルトは 10000ms (10 秒)です。 |
10 | 確認応答モード。手動で確認する場合は true に設定します。 |
バージョン 4.1 以降、URL を省略できます。代わりに、DefaultMqttPahoClientFactory の serverURIs プロパティでサーバー URI を提供できます。これにより、たとえば、高可用性(HA)クラスターへの接続が可能になります。 |
バージョン 4.2.2 以降、アダプターがトピックを正常にサブスクライブすると、MqttSubscribedEvent
が公開されます。接続またはサブスクリプションが失敗すると、MqttConnectionFailedEvent
イベントが発行されます。ApplicationListener
を実装する Bean がこれらのイベントを受信できます。
また、recoveryInterval
という新しいプロパティは、アダプターが障害後に再接続を試みる間隔を制御します。デフォルトは 10000ms
(10 秒)です。
バージョン 4.2.3 より前は、クライアントはアダプターが停止したときに常にサブスクライブ解除されました。クライアントの QOS が 0 より大きい場合、アダプターが停止している間に到着したメッセージが次回の開始時に配信されるように、サブスクリプションをアクティブに保つ必要があるため、これは間違っていました。これには、クライアントファクトリの バージョン 4.2.3 以降、 この動作は、ファクトリで 4.2.3 より前の動作に戻すには、 |
バージョン 5.0 以降、 |
実行時のトピックの追加と削除
バージョン 4.1 以降、アダプターがサブスクライブするトピックをプログラムで変更できます。Spring Integration は、addTopic()
および removeTopic()
メソッドを提供します。トピックを追加するときに、オプションで QoS
を指定できます(デフォルト: 1)。適切なメッセージを適切なペイロードで <control-bus/>
に送信して、トピックを変更することもできます(例: "myMqttAdapter.addTopic('foo', 1)"
)。
アダプターを停止して開始しても、トピックリストには影響しません(構成の元の設定に戻りません)。変更は、アプリケーションコンテキストのライフサイクルを超えて保持されません。新しいアプリケーションコンテキストは、構成された設定に戻ります。
アダプターが停止している(またはブローカーから切断されている)間にトピックを変更すると、次に接続が確立されたときに有効になります。
手動 ACK
バージョン 5.3 以降、manualAcks
プロパティを true に設定できます。多くの場合、配信を非同期的に確認するために使用されます。true
に設定すると、ヘッダー(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
)がメッセージに追加され、値は SimpleAcknowledgment
になります。配信を完了するには、acknowledge()
メソッドを呼び出す必要があります。詳細については、IMqttClient
setManualAcks()
、messageArrivedComplete()
の Javadoc を参照してください。便宜上、ヘッダーアクセサーが提供されています。
StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();
バージョン 5.2.11
以降、メッセージコンバーターが例外をスローするか、MqttMessage
変換から null
を返すと、MqttPahoMessageDrivenChannelAdapter
は ErrorMessage
を errorChannel
に送信します(提供されている場合)。それ以外の場合は、この変換エラーを MQTT クライアントコールバックに再スローします。
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
"topic1", "topic2");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
Java DSL を使用した構成
次の Spring Boot アプリケーションは、Java DSL を使用して受信アダプターを構成する例を示しています。
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttInbound() {
return IntegrationFlow.from(
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
"testClient", "topic1", "topic2"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
送信チャネルアダプター
発信チャネルアダプターは MqttPahoMessageHandler
によって実装され、ConsumerEndpoint
にラップされています。便宜上、名前空間を使用して構成できます。
バージョン 4.1 から、アダプターは非同期送信操作をサポートし、配信が確認されるまでブロッキングを回避します。必要に応じて、アプリケーションが配信を確認できるように、アプリケーションイベントを発行できます。
次のリストは、送信チャネルアダプターで使用可能な属性を示しています。
<int-mqtt:outbound-channel-adapter id="withConverter"
client-id="foo" (1)
url="tcp://localhost:1883" (2)
converter="myConverter" (3)
client-factory="clientFactory" (4)
default-qos="1" (5)
qos-expression="" (6)
default-retained="true" (7)
retained-expression="" (8)
default-topic="bar" (9)
topic-expression="" (10)
async="false" (11)
async-events="false" (12)
channel="target" />
1 | クライアント ID。 |
2 | ブローカー URL。 |
3 | MqttMessageConverter (オプション)。デフォルトの DefaultPahoMessageConverter は、次のヘッダーを認識します。
|
4 | クライアントファクトリ。 |
5 | デフォルトのサービス品質。mqtt_qos ヘッダーが見つからない場合、または qos-expression が null を返す場合に使用されます。カスタム converter を提供する場合は使用されません。 |
6 | QoS を決定するために評価する式。デフォルトは headers[mqtt_qos] です。 |
7 | 保持フラグのデフォルト値。mqtt_retained ヘッダーが見つからない場合に使用されます。カスタム converter が提供されている場合は使用されません。 |
8 | 保持されたブール値を決定するために評価する式。デフォルトは headers[mqtt_retained] です。 |
9 | メッセージの送信先のデフォルトトピック(mqtt_topic ヘッダーが見つからない場合に使用)。 |
10 | 宛先トピックを決定するために評価する式。デフォルトは headers['mqtt_topic'] です。 |
11 | true の場合、呼び出し元はブロックしません。むしろ、メッセージが送信されると配信確認を待ちます。デフォルトは false です(配信が確認されるまで送信はブロックされます)。 |
12 | async と async-events が両方とも true である場合、MqttMessageSentEvent が発行されます ( イベントを参照)。これには、メッセージ、トピック、クライアントライブラリによって生成された messageId 、clientId 、clientInstance (クライアントが接続されるたびに増加します) が含まれます。クライアントライブラリによって配信が確認されると、MqttMessageDeliveredEvent が発行されます。これには messageId 、clientId 、clientInstance が含まれており、配信を send() と関連付けることができます。ApplicationListener またはイベント受信チャネルアダプターはこれらのイベントを受信できます。MqttMessageDeliveredEvent が MqttMessageSentEvent よりも前に受信される可能性があることに注意してください。デフォルトは false です。 |
バージョン 4.1 以降、URL は省略できます。代わりに、DefaultMqttPahoClientFactory の serverURIs プロパティでサーバー URI を提供できます。これにより、たとえば、高可用性(HA)クラスターへの接続が可能になります。 |
Java 構成を使用した構成
次の Spring Boot アプリケーションは、Java 構成で送信アダプターを構成する方法の例を示しています。
@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToMqtt("foo");
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
options.setUserName("username");
options.setPassword("password".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("testClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("testTopic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MyGateway {
void sendToMqtt(String data);
}
}
Java DSL を使用した構成
次の Spring Boot アプリケーションは、Java DSL を使用して送信アダプターを構成する例を示しています。
@SpringBootApplication
public class MqttJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(MqttJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow mqttOutboundFlow() {
return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
}
}
イベント
特定のアプリケーションイベントは、アダプターによって発行されます。
MqttConnectionFailedEvent
- 接続に失敗した場合、またはその後接続が失われた場合、両方のアダプターによって発行されます。MQTT v5 Paho クライアントの場合、このイベントは、サーバーが通常の切断を実行したときにも発行されます。この場合、失われた接続のcause
はnull
です。MqttMessageSentEvent
- 非同期モードで実行されている場合、メッセージが送信されたときに送信アダプターによって公開されます。MqttMessageDeliveredEvent
- 非同期モードで実行している場合に、クライアントがメッセージが配信されたことを示したときに送信アダプターによって公開されます。MqttMessageNotDeliveredEvent
- 非同期モードで実行されている場合、クライアントがメッセージが配信されていないことを示すと、送信アダプターによって公開されます。MqttSubscribedEvent
- トピックをサブスクライブした後、受信アダプターによって公開されます。
これらのイベントは、ApplicationListener<MqttIntegrationEvent>
または @EventListener
メソッドで受信できます。
イベントのソースを判別するには、以下を使用します。Bean 名や接続オプション(サーバー URI などにアクセスするため)を確認できます。
MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();
MQTTv5 サポート
バージョン 5.5.5 以降、spring-integration-mqtt
モジュールは MQTTv5 プロトコル用のチャネルアダプター実装を提供します。org.eclipse.paho:org.eclipse.paho.mqttv5.client
は optional
の依存関係であるため、ターゲットプロジェクトに明示的に含める必要があります。
MQTT v5 プロトコルは MQTT メッセージで追加の任意のプロパティをサポートするため、MqttHeaderMapper
実装が導入されて、パブリッシュおよび受信操作でヘッダーとの間でマッピングされます。デフォルトでは、(*
パターンを介して) 受信したすべての PUBLISH
フレームプロパティ (ユーザープロパティを含む) をマップします。送信側では、PUBLISH
フレームのヘッダーのサブセット contentType
、mqtt_messageExpiryInterval
、mqtt_responseTopic
、mqtt_correlationData
をマップします。
MQTT v5 プロトコルの送信チャネルアダプターは、Mqttv5PahoMessageHandler
として存在します。clientId
および MQTT ブローカーの URL または MqttConnectionOptions
参照が必要です。MqttClientPersistence
オプションをサポートし、async
にすることができ、その場合は MqttIntegrationEvent
オブジェクトを発行できます(asyncEvents
オプションを参照)。リクエストメッセージのペイロードが org.eclipse.paho.mqttv5.common.MqttMessage
の場合、内部 IMqttAsyncClient
を介してそのまま公開されます。ペイロードが byte[]
の場合、ターゲット MqttMessage
ペイロードが公開するためにそのまま使用されます。ペイロードが String
の場合、公開するために byte[]
に変換されます。残りのユースケースは、アプリケーションコンテキストから IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAME
ConfigurableCompositeMessageConverter
Bean である提供された MessageConverter
に委譲されます。注: リクエストされたメッセージペイロードがすでに MqttMessage
である場合、提供された HeaderMapper<MqttProperties>
は使用されません。次の JavaDSL 構成サンプルは、統合フローでこのチャネルアダプターを使用する方法を示しています。
@Bean
public IntegrationFlow mqttOutFlow() {
Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
messageHandler.setHeaderMapper(mqttHeaderMapper);
messageHandler.setAsync(true);
messageHandler.setAsyncEvents(true);
messageHandler.setConverter(mqttStringToBytesConverter());
return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter は、その契約が MQTT v3 プロトコルのみを対象としているため、Mqttv5PahoMessageHandler と一緒に使用することはできません。 |
起動時または実行時に接続が失敗した場合、Mqttv5PahoMessageHandler
は、このハンドラーに対して生成された次のメッセージで再接続を試みます。この手動再接続が失敗した場合、接続は例外であり、呼び出し元にスローバックされます。この場合、リクエストハンドラーのアドバイスを含む標準の Spring Integration エラー処理手順が適用されます。再試行またはサーキットブレーカー。
詳細については、Mqttv5PahoMessageHandler
javadoc とそのスーパークラスを参照してください。
MQTT v5 プロトコルの受信チャネルアダプターは Mqttv5PahoMessageDrivenChannelAdapter
として存在します。clientId
および MQTT ブローカー URL または MqttConnectionOptions
参照に加えて、サブスクライブして使用するトピックが必要です。MqttClientPersistence
オプションをサポートしており、デフォルトでメモリ内にあります。予期される payloadType
(デフォルトでは byte[]
) を構成でき、受信した MqttMessage
の byte[]
からの変換のために提供された SmartMessageConverter
に伝播されます。manualAck
オプションが設定されている場合、IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK
ヘッダーがメッセージに追加され、SimpleAcknowledgment
のインスタンスとして生成されます。HeaderMapper<MqttProperties>
は、PUBLISH
フレームプロパティ (ユーザープロパティを含む) をターゲットメッセージヘッダーにマップするために使用されます。qos
、id
、dup
、retained
などの標準 MqttMessage
プロパティと受信したトピックは、常にヘッダーにマップされます。詳細については、MqttHeaders
を参照してください。
バージョン 6.3 以降、Mqttv5PahoMessageDrivenChannelAdapter
は、単純なトピック名の代わりに、きめ細かい設定を行うための MqttSubscription
に基づくコンストラクターを提供します。これらのサブスクリプションが提供されている場合、チャネルアダプターの qos
オプションは使用できません。これは、そのような qos
モードが MqttSubscription
API の一部であるためです。
次の JavaDSL 構成サンプルは、統合フローでこのチャネルアダプターを使用する方法を示しています。
@Bean
public IntegrationFlow mqttInFlow() {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
messageProducer.setPayloadType(String.class);
messageProducer.setMessageConverter(mqttStringToBytesConverter());
messageProducer.setManualAcks(true);
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter は、その契約が MQTT v3 プロトコルのみを対象としているため、Mqttv5PahoMessageDrivenChannelAdapter と一緒に使用することはできません。 |
詳細については、Mqttv5PahoMessageDrivenChannelAdapter
javadoc とそのスーパークラスを参照してください。
内部 IMqttAsyncClient インスタンスが再接続を処理できるように、MqttConnectionOptions#setAutomaticReconnect(boolean) を true に設定することをお勧めします。それ以外の場合、Mqttv5PahoMessageDrivenChannelAdapter の手動再起動のみが、切断時の MqttConnectionFailedEvent 処理による再接続を処理できます。 |
共有 MQTT クライアントのサポート
複数の統合に 1 つの MQTT ClientID が必要な場合、複数の MQTT クライアントインスタンスを使用することはできません。これは、MQTT ブローカーが ClientID ごとの接続数に制限を設けている可能性があるためです (通常、1 つの接続が許可されます)。単一のクライアントを異なるチャネルアダプターに再利用するには、org.springframework.integration.mqtt.core.ClientManager
コンポーネントを使用して、必要なチャネルアダプターに渡すことができます。MQTT 接続のライフサイクルを管理し、必要に応じて自動再接続を行います。また、カスタム接続オプションと MqttClientPersistence
をクライアントマネージャーに提供することもできます。これは、現在チャネルアダプターコンポーネントに対して提供されているのと同じです。
MQTT v5 と v3 の両方のチャネルアダプターがサポートされていることに注意してください。
次の Java DSL 構成サンプルは、統合フローでこのクライアントマネージャーを使用する方法を示しています。
@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
connectionOptions.setConnectionTimeout(30000);
connectionOptions.setMaxReconnectDelay(1000);
connectionOptions.setAutomaticReconnect(true);
Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
clientManager.setPersistence(new MqttDefaultFilePersistence());
return clientManager;
}
@Bean
public IntegrationFlow mqttInFlowTopic1(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttInFlowTopic2(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
return IntegrationFlow.from(messageProducer)
.channel(c -> c.queue("fromMqttChannel"))
.get();
}
@Bean
public IntegrationFlow mqttOutFlow(
ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {
return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}
バージョン 6.4 以降では、対応する ClientManager から IntegrationFlowContext を使用して、実行時に MqttPahoMessageDrivenChannelAdapter と Mqttv5PahoMessageDrivenChannelAdapter の複数のインスタンスを追加できるようになりました。 |
private void addAddRuntimeAdapter(IntegrationFlowContext flowContext, Mqttv5ClientManager clientManager,
String topic, MessageChannel channel) {
flowContext
.registration(
IntegrationFlow
.from(new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, topic))
.channel(channel)
.get())
.register();
}