MQTT サポート

Spring Integration は、メッセージキューテレメトリートランスポート(MQTT)プロトコルをサポートする受信および送信チャネルアダプターを提供します。

この依存関係をプロジェクトに含める必要があります。

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>6.3.1</version>
</dependency>
compile "org.springframework.integration:spring-integration-mqtt:6.3.1"

現在の実装では、Eclipse Paho MQTT クライアント (英語) ライブラリを使用しています。

XML 構成とこの章の大部分は、MQTT v3.1 プロトコルのサポートとそれぞれの Paho クライアントに関するものです。それぞれのプロトコルのサポートについては、MQTTv5 サポートの段落を参照してください。

両方のアダプターの構成は、DefaultMqttPahoClientFactory を使用して実現されます。構成オプションの詳細については、Paho のドキュメントを参照してください。

ファクトリ自体に(非推奨)オプションを設定する代わりに、MqttConnectOptions オブジェクトを構成してファクトリに注入することをお勧めします。

受信(メッセージ駆動型)チャネルアダプター

受信チャネルアダプターは MqttPahoMessageDrivenChannelAdapter によって実装されます。便宜上、名前空間を使用して構成できます。最小構成は次のとおりです。

<bean id="clientFactory"
        class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
    <property name="connectionOptions">
        <bean class="org.eclipse.paho.client.mqttv3.MqttConnectOptions">
            <property name="userName" value="${mqtt.username}"/>
            <property name="password" value="${mqtt.password}"/>
        </bean>
    </property>
</bean>

<int-mqtt:message-driven-channel-adapter id="mqttInbound"
    client-id="${mqtt.default.client.id}.src"
    url="${mqtt.url}"
    topics="sometopic"
    client-factory="clientFactory"
    channel="output"/>

次のリストは、使用可能な属性を示しています。

<int-mqtt:message-driven-channel-adapter id="oneTopicAdapter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    topics="bar,baz"  (3)
    qos="1,2"  (4)
    converter="myConverter"  (5)
    client-factory="clientFactory"  (6)
    send-timeout="123"  (7)
    error-channel="errors"  (8)
    recovery-interval="10000"  (9)
    manual-acks="false" (10)
    channel="out" />
1 クライアント ID。
2 ブローカー URL。
3 このアダプターがメッセージを受信するトピックのコンマ区切りリスト。
4QoS 値のコンマ区切りリスト。すべてのトピックに適用される単一の値または各トピックの値にすることができます(この場合、リストは同じ長さでなければなりません)。
5MqttMessageConverter (オプション)。デフォルトでは、デフォルトの DefaultPahoMessageConverter は、次のヘッダーを持つ String ペイロードを持つメッセージを生成します。
  • mqtt_topic: メッセージの受信元のトピック

  • mqtt_duplicate: メッセージが重複している場合は true 

  • mqtt_qos: サービス品質 <bean/> として宣言し、payloadAsBytes プロパティを true に設定することにより、ペイロードで byte[] をそのまま返すように DefaultPahoMessageConverter を構成できます。

6 クライアントファクトリ。
7send() タイムアウト。これは、チャネルがブロックされる可能性がある場合にのみ適用されます (現在満杯の制限付き QueueChannel など)。
8 エラーチャネル。ダウンストリーム例外は、ErrorMessage でこのチャネルに送信されます(指定されている場合)。ペイロードは、失敗したメッセージと原因を含む MessagingException です。
9 回復間隔。これは、アダプターが障害後に再接続を試行する間隔を制御します。デフォルトは 10000ms (10 秒)です。
10 確認応答モード。手動で確認する場合は true に設定します。
バージョン 4.1 以降、URL を省略できます。代わりに、DefaultMqttPahoClientFactory の serverURIs プロパティでサーバー URI を提供できます。これにより、たとえば、高可用性(HA)クラスターへの接続が可能になります。

バージョン 4.2.2 以降、アダプターがトピックを正常にサブスクライブすると、MqttSubscribedEvent が公開されます。接続またはサブスクリプションが失敗すると、MqttConnectionFailedEvent イベントが発行されます。ApplicationListener を実装する Bean がこれらのイベントを受信できます。

また、recoveryInterval という新しいプロパティは、アダプターが障害後に再接続を試みる間隔を制御します。デフォルトは 10000ms (10 秒)です。

バージョン 4.2.3 より前は、クライアントはアダプターが停止したときに常にサブスクライブ解除されました。クライアントの QOS が 0 より大きい場合、アダプターが停止している間に到着したメッセージが次回の開始時に配信されるように、サブスクリプションをアクティブに保つ必要があるため、これは間違っていました。これには、クライアントファクトリの cleanSession プロパティを false に設定する必要もあります。デフォルトは true です。

バージョン 4.2.3 以降、cleanSession プロパティが false の場合、アダプターはサブスクライブを解除しません(デフォルト)。

この動作は、ファクトリで consumerCloseAction プロパティを設定することで上書きできます。値は UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN にすることができます。後者(デフォルト)は、cleanSession プロパティが true の場合にのみサブスクライブを解除します。

4.2.3 より前の動作に戻すには、UNSUBSCRIBE_ALWAYS を使用します。

バージョン 5.0 以降、topicqosretained プロパティは .RECEIVED_…​ ヘッダー(MqttHeaders.RECEIVED_TOPICMqttHeaders.RECEIVED_QOSMqttHeaders.RECEIVED_RETAINED)にマップされ、(デフォルトで) MqttHeaders.TOPICMqttHeaders.QOSMqttHeaders.RETAINED ヘッダーを使用する送信メッセージへの不注意な伝播を回避します。

実行時のトピックの追加と削除

バージョン 4.1 以降、アダプターがサブスクライブするトピックをプログラムで変更できます。Spring Integration は、addTopic() および removeTopic() メソッドを提供します。トピックを追加するときに、オプションで QoS を指定できます(デフォルト: 1)。適切なメッセージを適切なペイロードで <control-bus/> に送信して、トピックを変更することもできます(例: "myMqttAdapter.addTopic('foo', 1)")。

アダプターを停止して開始しても、トピックリストには影響しません(構成の元の設定に戻りません)。変更は、アプリケーションコンテキストのライフサイクルを超えて保持されません。新しいアプリケーションコンテキストは、構成された設定に戻ります。

アダプターが停止している(またはブローカーから切断されている)間にトピックを変更すると、次に接続が確立されたときに有効になります。

手動 ACK

バージョン 5.3 以降、manualAcks プロパティを true に設定できます。多くの場合、配信を非同期的に確認するために使用されます。true に設定すると、ヘッダー(IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK)がメッセージに追加され、値は SimpleAcknowledgment になります。配信を完了するには、acknowledge() メソッドを呼び出す必要があります。詳細については、IMqttClientsetManualAcks()messageArrivedComplete() の Javadoc を参照してください。便宜上、ヘッダーアクセサーが提供されています。

StaticMessageHeaderAccessor.acknowledgment(someMessage).acknowledge();

バージョン 5.2.11 以降、メッセージコンバーターが例外をスローするか、MqttMessage 変換から null を返すと、MqttPahoMessageDrivenChannelAdapter は ErrorMessage を errorChannel に送信します(提供されている場合)。それ以外の場合は、この変換エラーを MQTT クライアントコールバックに再スローします。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java 構成で受信アダプターを構成する方法の例を示しています。

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }

}

Java DSL を使用した構成

次の Spring Boot アプリケーションは、Java DSL を使用して受信アダプターを構成する例を示しています。

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlow.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2"))
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

送信チャネルアダプター

発信チャネルアダプターは MqttPahoMessageHandler によって実装され、ConsumerEndpoint にラップされています。便宜上、名前空間を使用して構成できます。

バージョン 4.1 から、アダプターは非同期送信操作をサポートし、配信が確認されるまでブロッキングを回避します。必要に応じて、アプリケーションが配信を確認できるように、アプリケーションイベントを発行できます。

次のリストは、送信チャネルアダプターで使用可能な属性を示しています。

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  (1)
    url="tcp://localhost:1883"  (2)
    converter="myConverter"  (3)
    client-factory="clientFactory"  (4)
    default-qos="1"  (5)
    qos-expression="" (6)
    default-retained="true"  (7)
    retained-expression="" (8)
    default-topic="bar"  (9)
    topic-expression="" (10)
    async="false"  (11)
    async-events="false"  (12)
    channel="target" />
1 クライアント ID。
2 ブローカー URL。
3MqttMessageConverter (オプション)。デフォルトの DefaultPahoMessageConverter は、次のヘッダーを認識します。
  • mqtt_topic: メッセージの送信先のトピック

  • mqtt_retained: メッセージを保持する場合は true 

  • mqtt_qos: サービスの質

4 クライアントファクトリ。
5 デフォルトのサービス品質。mqtt_qos ヘッダーが見つからない場合、または qos-expression が null を返す場合に使用されます。カスタム converter を提供する場合は使用されません。
6QoS を決定するために評価する式。デフォルトは headers[mqtt_qos] です。
7 保持フラグのデフォルト値。mqtt_retained ヘッダーが見つからない場合に使用されます。カスタム converter が提供されている場合は使用されません。
8 保持されたブール値を決定するために評価する式。デフォルトは headers[mqtt_retained] です。
9 メッセージの送信先のデフォルトトピック(mqtt_topic ヘッダーが見つからない場合に使用)。
10 宛先トピックを決定するために評価する式。デフォルトは headers['mqtt_topic'] です。
11true の場合、呼び出し元はブロックしません。むしろ、メッセージが送信されると配信確認を待ちます。デフォルトは false です(配信が確認されるまで送信はブロックされます)。
12async と async-events が両方とも true である場合、MqttMessageSentEvent が発行されます ( イベントを参照)。これには、メッセージ、トピック、クライアントライブラリによって生成された messageIdclientIdclientInstance (クライアントが接続されるたびに増加します) が含まれます。クライアントライブラリによって配信が確認されると、MqttMessageDeliveredEvent が発行されます。これには messageIdclientIdclientInstance が含まれており、配信を send() と関連付けることができます。ApplicationListener またはイベント受信チャネルアダプターはこれらのイベントを受信できます。MqttMessageDeliveredEvent が MqttMessageSentEvent よりも前に受信される可能性があることに注意してください。デフォルトは false です。
バージョン 4.1 以降、URL は省略できます。代わりに、DefaultMqttPahoClientFactory の serverURIs プロパティでサーバー URI を提供できます。これにより、たとえば、高可用性(HA)クラスターへの接続が可能になります。

Java 構成を使用した構成

次の Spring Boot アプリケーションは、Java 構成で送信アダプターを構成する方法の例を示しています。

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

Java DSL を使用した構成

次の Spring Boot アプリケーションは、Java DSL を使用して送信アダプターを構成する例を示しています。

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

   	@Bean
   	public IntegrationFlow mqttOutboundFlow() {
   	    return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

イベント

特定のアプリケーションイベントは、アダプターによって発行されます。

  • MqttConnectionFailedEvent - 接続に失敗した場合、またはその後接続が失われた場合、両方のアダプターによって発行されます。MQTT v5 Paho クライアントの場合、このイベントは、サーバーが通常の切断を実行したときにも発行されます。この場合、失われた接続の cause は null です。

  • MqttMessageSentEvent - 非同期モードで実行されている場合、メッセージが送信されたときに送信アダプターによって公開されます。

  • MqttMessageDeliveredEvent - 非同期モードで実行している場合に、クライアントがメッセージが配信されたことを示したときに送信アダプターによって公開されます。

  • MqttSubscribedEvent - トピックをサブスクライブした後、受信アダプターによって公開されます。

これらのイベントは、ApplicationListener<MqttIntegrationEvent> または @EventListener メソッドで受信できます。

イベントのソースを判別するには、以下を使用します。Bean 名や接続オプション(サーバー URI などにアクセスするため)を確認できます。

MqttPahoComponent source = event.getSourceAsType();
String beanName = source.getBeanName();
MqttConnectOptions options = source.getConnectionInfo();

MQTTv5 サポート

バージョン 5.5.5 以降、spring-integration-mqtt モジュールは MQTTv5 プロトコル用のチャネルアダプター実装を提供します。org.eclipse.paho:org.eclipse.paho.mqttv5.client は optional の依存関係であるため、ターゲットプロジェクトに明示的に含める必要があります。

MQTT v5 プロトコルは MQTT メッセージで追加の任意のプロパティをサポートするため、MqttHeaderMapper 実装が導入されて、パブリッシュおよび受信操作でヘッダーとの間でマッピングされます。デフォルトでは、(* パターンを介して) 受信したすべての PUBLISH フレームプロパティ (ユーザープロパティを含む) をマップします。送信側では、PUBLISH フレームのヘッダーのサブセット contentTypemqtt_messageExpiryIntervalmqtt_responseTopicmqtt_correlationData をマップします。

MQTT v5 プロトコルの送信チャネルアダプターは、Mqttv5PahoMessageHandler として存在します。clientId および MQTT ブローカーの URL または MqttConnectionOptions 参照が必要です。MqttClientPersistence オプションをサポートし、async にすることができ、その場合は MqttIntegrationEvent オブジェクトを発行できます(asyncEvents オプションを参照)。リクエストメッセージのペイロードが org.eclipse.paho.mqttv5.common.MqttMessage の場合、内部 IMqttAsyncClient を介してそのまま公開されます。ペイロードが byte[] の場合、ターゲット MqttMessage ペイロードが公開するためにそのまま使用されます。ペイロードが String の場合、公開するために byte[] に変換されます。残りのユースケースは、アプリケーションコンテキストから IntegrationContextUtils.ARGUMENT_RESOLVER_MESSAGE_CONVERTER_BEAN_NAMEConfigurableCompositeMessageConverter Bean である提供された MessageConverter に委譲されます。注: リクエストされたメッセージペイロードがすでに MqttMessage である場合、提供された HeaderMapper<MqttProperties> は使用されません。次の JavaDSL 構成サンプルは、統合フローでこのチャネルアダプターを使用する方法を示しています。

@Bean
public IntegrationFlow mqttOutFlow() {
    Mqttv5PahoMessageHandler messageHandler = new Mqttv5PahoMessageHandler(MQTT_URL, "mqttv5SIout");
    MqttHeaderMapper mqttHeaderMapper = new MqttHeaderMapper();
    mqttHeaderMapper.setOutboundHeaderNames("some_user_header", MessageHeaders.CONTENT_TYPE);
    messageHandler.setHeaderMapper(mqttHeaderMapper);
    messageHandler.setAsync(true);
    messageHandler.setAsyncEvents(true);
    messageHandler.setConverter(mqttStringToBytesConverter());

    return f -> f.handle(messageHandler);
}
org.springframework.integration.mqtt.support.MqttMessageConverter は、その契約が MQTT v3 プロトコルのみを対象としているため、Mqttv5PahoMessageHandler と一緒に使用することはできません。

起動時または実行時に接続が失敗した場合、Mqttv5PahoMessageHandler は、このハンドラーに対して生成された次のメッセージで再接続を試みます。この手動再接続が失敗した場合、接続は例外であり、呼び出し元にスローバックされます。この場合、リクエストハンドラーのアドバイスを含む標準の Spring Integration エラー処理手順が適用されます。再試行またはサーキットブレーカー。

詳細については、Mqttv5PahoMessageHandler javadoc とそのスーパークラスを参照してください。

MQTT v5 プロトコルの受信チャネルアダプターは Mqttv5PahoMessageDrivenChannelAdapter として存在します。clientId および MQTT ブローカー URL または MqttConnectionOptions 参照に加えて、サブスクライブして使用するトピックが必要です。MqttClientPersistence オプションをサポートしており、デフォルトでメモリ内にあります。予期される payloadType (デフォルトでは byte[] ) を構成でき、受信した MqttMessage の byte[] からの変換のために提供された SmartMessageConverter に伝播されます。manualAck オプションが設定されている場合、IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK ヘッダーがメッセージに追加され、SimpleAcknowledgment のインスタンスとして生成されます。HeaderMapper<MqttProperties> は、PUBLISH フレームプロパティ (ユーザープロパティを含む) をターゲットメッセージヘッダーにマップするために使用されます。qosiddupretained などの標準 MqttMessage プロパティと受信したトピックは、常にヘッダーにマップされます。詳細については、MqttHeaders を参照してください。

バージョン 6.3 以降、Mqttv5PahoMessageDrivenChannelAdapter は、単純なトピック名の代わりに、きめ細かい設定を行うための MqttSubscription に基づくコンストラクターを提供します。これらのサブスクリプションが提供されている場合、チャネルアダプターの qos オプションは使用できません。これは、そのような qos モードが MqttSubscription API の一部であるためです。

次の JavaDSL 構成サンプルは、統合フローでこのチャネルアダプターを使用する方法を示しています。

@Bean
public IntegrationFlow mqttInFlow() {
    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(MQTT_URL, "mqttv5SIin", "siTest");
    messageProducer.setPayloadType(String.class);
    messageProducer.setMessageConverter(mqttStringToBytesConverter());
    messageProducer.setManualAcks(true);

    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}
org.springframework.integration.mqtt.support.MqttMessageConverter は、その契約が MQTT v3 プロトコルのみを対象としているため、Mqttv5PahoMessageDrivenChannelAdapter と一緒に使用することはできません。

詳細については、Mqttv5PahoMessageDrivenChannelAdapter javadoc とそのスーパークラスを参照してください。

内部 IMqttAsyncClient インスタンスが再接続を処理できるように、MqttConnectionOptions#setAutomaticReconnect(boolean) を true に設定することをお勧めします。それ以外の場合、Mqttv5PahoMessageDrivenChannelAdapter の手動再起動のみが、切断時の MqttConnectionFailedEvent 処理による再接続を処理できます。

共有 MQTT クライアントのサポート

複数の統合に 1 つの MQTT ClientID が必要な場合、複数の MQTT クライアントインスタンスを使用することはできません。これは、MQTT ブローカーが ClientID ごとの接続数に制限を設けている可能性があるためです (通常、1 つの接続が許可されます)。単一のクライアントを異なるチャネルアダプターに再利用するには、org.springframework.integration.mqtt.core.ClientManager コンポーネントを使用して、必要なチャネルアダプターに渡すことができます。MQTT 接続のライフサイクルを管理し、必要に応じて自動再接続を行います。また、カスタム接続オプションと MqttClientPersistence をクライアントマネージャーに提供することもできます。これは、現在チャネルアダプターコンポーネントに対して提供されているのと同じです。

MQTT v5 と v3 の両方のチャネルアダプターがサポートされていることに注意してください。

次の Java DSL 構成サンプルは、統合フローでこのクライアントマネージャーを使用する方法を示しています。

@Bean
public ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager() {
    MqttConnectionOptions connectionOptions = new MqttConnectionOptions();
    connectionOptions.setServerURIs(new String[]{ "tcp://localhost:1883" });
    connectionOptions.setConnectionTimeout(30000);
    connectionOptions.setMaxReconnectDelay(1000);
    connectionOptions.setAutomaticReconnect(true);
    Mqttv5ClientManager clientManager = new Mqttv5ClientManager(connectionOptions, "client-manager-client-id-v5");
    clientManager.setPersistence(new MqttDefaultFilePersistence());
    return clientManager;
}

@Bean
public IntegrationFlow mqttInFlowTopic1(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic1");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttInFlowTopic2(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    Mqttv5PahoMessageDrivenChannelAdapter messageProducer =
        new Mqttv5PahoMessageDrivenChannelAdapter(clientManager, "topic2");
    return IntegrationFlow.from(messageProducer)
            .channel(c -> c.queue("fromMqttChannel"))
            .get();
}

@Bean
public IntegrationFlow mqttOutFlow(
        ClientManager<IMqttAsyncClient, MqttConnectionOptions> clientManager) {

    return f -> f.handle(new Mqttv5PahoMessageHandler(clientManager));
}