Spring Boot 基于Spring Integration 实现MQTT客户端简单订阅发布功能

in 网站建设
关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9

本文属于翻译,转载注明出处,欢迎关注微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net

[TOC]

1 简介

Spring Integration 提供入站(inbound)和出站(outbound)通道适配器,以支持MQTT消息协议。使用这两适配器,需要加入依赖:

<!-- Maven -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.2.1.RELEASE</version>
</dependency>
// Gradle
compile "org.springframework.integration:spring-integration-mqtt:5.2.1.RELEASE"

当前的MQTT Integration实现使用的是Eclipse Paho MQTT客户端库。两个适配器的配置都是使用DefaultMqttPahoClientFactory实现的。有关配置选项的更多信息,请参阅Eclipse Paho MQTT文档定义。

建议配置MqttConnectOptions对象并将其注入工厂(factory),而不是在工厂本身里设置(不推荐使用)MQTT连接选项。

2 Inbound(消息驱动)通道适配器

入站通道适配器由MqttPahoMessageDrivenChannelAdapter实现。常用的配置项有:

从Spring 4.1版开始,可以省略URL。相反,你可以在DefaultMqttPahoClientFactoryserver URIs属性中提供服务器uri。例如,这样做允许连接到高可用(HA)集群。

Spring 4.2.2开始,当适配器成功订阅到主题了,MqttSubscribedEvent事件就会被触发。当连接失败或者订阅失败,MqttConnectionFailedEvent事件会被触发。这两个事件都能够被一个Bean通过实现ApplicationListener而接收到。另外,名为recoveryInterval的新属性控制适配器在失败后尝试重新连接的时间间隔。默认为10000毫秒(10秒)。

@Component
public class MQTTSubscribedListener implements ApplicationListener<MqttSubscribedEvent> {
    private static final Logger LOGGER = LogManager.getLogger(MQTTSubscribedListener.class);

    @Override
    public void onApplicationEvent(MqttSubscribedEvent event) {
        LOGGER.debug("Subscribed Success: " + event.getMessage());
    }
}
在版本Spring 4.2.3之前,当适配器停止时,客户端总是取消订阅。这是不正确的,因为如果客户端QOS大于0,我们需要保持订阅处于活动状态,以便在下次启动时传递适配器停止时到达的消息。这还需要将客户机工厂上的cleanSession属性设置为false。默认为true。从4.2.3版开始,如果cleanSession属性为false,则适配器不会取消订阅(默认情况下),这个默认行为可以通过在工厂上设置consumerCloseAction属性来重写此行为。它可以有以下值:UNSUBSCRIBE_ALWAYSUNSUBSCRIBE_NEVERUNSUBSCRIBE_CLEAN,最后一项(默认设置)仅在cleanSession属性为true时取消订阅。若要还原到4.2.3之前的行为,请始终使用“取消订阅”设置项。

注意:从Spring 5.0开始,topic、qos和retained属性映射到.RECEIVED_…headers(MqttHeaders.RECEIVED_topic、MqttHeaders.RECEIVED_qos和MqttHeaders.RECEIVED_retained),以避免意外传播到(默认情况下)使用MqttHeaders.topic、MqttHeaders.qos和MqttHeaders.retained headers的出站消息。

public MessageHandler handler() {
    return new MessageHandler() {
        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            LOGGER.debug("===Received Msg(topic {}): {}", message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC), message.getPayload());
        }
    };
}

2.1 在运行时添加和删除主题

Spring4.1开始,你可以通过编程更改适配器订阅的主题。Spring Integration提供了addTopic()removeTopic()方法。添加主题时,可以选择指定QoS值(默认是1)。你还可以通过向具有适当有效负载的<control bus/>发送适当的消息来修改主题。示例:

myMqttAdapter.addTopic('foo', 1)

停止和启动适配器对主题列表(topics设置项)没有影响(它不会还原到配置中的原始设置)。这些更改不会保留到应用程序上下文的生命周期之外。新的应用程序上下文将还原为配置的设置。

在适配器停止(或与代理断开连接)时更改主题列表(topics)将在下次建立连接时生效。

2.2 使用Java配置配置

以下Spring Boot应用程序显示了如何使用Java配置配置入站(inbound)适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
                .web(false)
                .run(args);
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "testClient",
                                                 "topic1", "topic2");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }
}

2.3 使用Java DSL配置

下面的Spring Boot应用程序提供了使用Java DSL配置入站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow mqttInbound() {
        return IntegrationFlows.from(
                         new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                                        "testClient", "topic1", "topic2");)
                .handle(m -> System.out.println(m.getPayload()))
                .get();
    }

}

3 出站通道适配器

出站通道适配器由MqttPahoMessageHandler实现,MqttPahoMessageHandler包装在ConsumerEndpoint中。为了方便起见,可以使用名称空间配置它。

从Spring 4.1开始,适配器支持异步发送操作,在确认交付之前避免阻塞。如果需要,可以发出应用程序事件以使应用程序确认传递。

以下列表显示出站通道适配器可用的属性:

<int-mqtt:outbound-channel-adapter id="withConverter"
    client-id="foo"  
    url="tcp://localhost:1883"  
    converter="myConverter"  
    client-factory="clientFactory"  
    default-qos="1"  
    qos-expression="" 
    default-retained="true"  
    retained-expression="" 
    default-topic="bar"  
    topic-expression="" 
    async="false"  
    async-events="false"  
    channel="target" />
注意,同样地,从Spring 4.1开始,可以省略URL。相反,可以在DefaultMqttPahoClientFactorserver URIs属性中提供服务器uri。例如,这允许连接到高可用(HA)集群。

3.1 使用Java配置配置

下面的Spring Boot应用程序展示了如何使用Java配置配置出站适配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class MqttJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                new SpringApplicationBuilder(MqttJavaApplication.class)
                        .web(false)
                        .run(args);
        MyGateway gateway = context.getBean(MyGateway.class);
        gateway.sendToMqtt("foo");
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://host1:1883", "tcp://host2:1883" });
        options.setUserName("username");
        options.setPassword("password".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                       new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("testTopic");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MyGateway {

        void sendToMqtt(String data);

    }

}

3.2 使用Java DSL配置

下面的Spring Boot应用程序提供了使用Java DSL配置出站适配器的示例:

@SpringBootApplication
public class MqttJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(MqttJavaApplication.class)
            .web(false)
            .run(args);
    }

       @Bean
       public IntegrationFlow mqttOutboundFlow() {
           return f -> f.handle(new MqttPahoMessageHandler("tcp://host1:1883", "someMqttClient"));
    }

}

4 参考资料

本文属于原创,转载注明出处,欢迎关注CSDNfreeape或微信小程序小白AI博客 微信公众号小白AI或者网站 https://xiaobaiai.net

关注公众号【好便宜】( ID:haopianyi222 ),领红包啦~
阿里云,国内最大的云服务商,注册就送数千元优惠券:https://t.cn/AiQe5A0g
腾讯云,良心云,价格优惠: https://t.cn/AieHwwKl
搬瓦工,CN2 GIA 优质线路,搭梯子、海外建站推荐: https://t.cn/AieHwfX9
扫一扫关注公众号添加购物返利助手,领红包
Comments are closed.

推荐使用阿里云服务器

超多优惠券

服务器最低一折,一年不到100!

朕已阅去看看