MQTT使用三之Spring Boot集成MQTT简单使用

MQTT使用三之Spring Boot集成MQTT简单使用

Laughing
2021-05-17 / 2 评论 / 1,345 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2024年03月18日,已超过305天没有更新,若内容或图片失效,请留言反馈。

Spring Boot集成MQTT支持动态创建topic

添加依赖

 <!-- MQTT -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

添加配置文件

server:
  port: 9999
#mqtt的配置
mqtt:
  server:
    url: tcp://ip:1883
    port: 1883
    username: 用户名
    password: 密码
  client:
    consumerId: consumerCo
    publishId: publishCo
  default:
    topic: topic
    completionTimeout: 3000

MQTT配置文件

package net.xiangcaowuyu.mqtt.config;

import net.xiangcaowuyu.mqtt.utils.MqttReceiveHandle;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;

import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;

/**
 * Description:消息订阅配置
 *
 * @author : laughing
 * DateTime: 2021-05-18 13:31
 */
@Configuration
public class MqttConfig {

    public final Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final byte[] WILL_DATA;

    static {
        WILL_DATA = "offline".getBytes();
    }
    @Resource
    private MqttReceiveHandle mqttReceiveHandle;

    @Value("${mqtt.server.url}")
    private final String url = "tcp://139.198.172.114:1883";
    @Value("${mqtt.server.port}")
    private final String port = "1883";
    @Value("${mqtt.server.username}")
    private final String username = "admin";
    @Value("${mqtt.server.password}")
    private final String password = "public";
    @Value("${mqtt.client.consumerId}")
    private final String consumerId = "consumerClient";
    @Value("${mqtt.client.publishId}")
    private final String publishId = "publishClient";
    @Value("${mqtt.default.topic}")
    private final String topic = "topic";
    @Value("${mqtt.default.completionTimeout}")
    private final Integer completionTimeout = 3000;

    //消息驱动
    private MqttPahoMessageDrivenChannelAdapter adapter;

    //订阅的主题列表
    private String listenTopics = "";

//    //mqtt消息接收接口
//    private MqttReceiveService mqttReceiveService;
//
//    public void setMqttReceiveService(MqttReceiveService mqttReceiveService){
//        this.mqttReceiveService = mqttReceiveService;
//    }

    /**
     *  MQTT连接器选项
     * **/
    @Bean(value = "getMqttConnectOptions")
    public MqttConnectOptions getMqttConnectOptions(){
        MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
        mqttConnectOptions.setCleanSession(true);
        // 设置超时时间 单位为秒
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{url});
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
        mqttConnectOptions.setKeepAliveInterval(10);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
        return mqttConnectOptions;
    }

    /**
     * MQTT工厂
     * **/
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    /**
     * MQTT信息通道(生产者)
     * **/
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息处理器(生产者)
     * **/
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(publishId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(topic);
        return messageHandler;
    }

    /**
     * 配置client,监听的topic
     * MQTT消息订阅绑定(消费者)
     * **/
    @Bean
    public MessageProducer inbound() {
        if(adapter == null){
            adapter = new MqttPahoMessageDrivenChannelAdapter(consumerId, mqttClientFactory(),
                    topic);
        }
        String [] topics = listenTopics.split(",");
        for(String topic: topics){
            if(!StringUtils.isEmpty(topic)){
                adapter.addTopic(topic,1);
            }
        }
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    /**
     * 增加监听的topic
     * @param topicArr 消息列表
     * @return 结果
     */
    public List<String> addListenTopic(String [] topicArr){
        if(adapter == null){
            adapter = new MqttPahoMessageDrivenChannelAdapter(consumerId, mqttClientFactory(),
                    topic);
        }
        List<String> listTopic = Arrays.asList(adapter.getTopic());
        for(String topic: topicArr){
            if(!StringUtils.isEmpty(topic)){
                if(!listTopic.contains(topic)){
                    adapter.addTopic(topic,1);
                }
            }
        }
        return Arrays.asList(adapter.getTopic());
    }

    /**
     * 移除一个监听的topic
     * @param topic
     * @return
     */
    public List<String> removeListenTopic(String topic){
        if(adapter == null){
            adapter = new MqttPahoMessageDrivenChannelAdapter(consumerId, mqttClientFactory(),
                    topic);
        }
        List<String> listTopic = Arrays.asList(adapter.getTopic());
        if(listTopic.contains(topic)){
            adapter.removeTopic(topic);
        }
        return Arrays.asList(adapter.getTopic());
    }

    /**
     * MQTT信息通道(消费者)
     * **/
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息处理器(消费者)
     * **/
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //处理接收消息
                mqttReceiveHandle.handle(message);
                //String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                //String msg   = ((String) message.getPayload()).toString();
                //mqttReceiveService.handlerMqttMessage(topic,msg);
            }
        };
    }
}

消息处理

package net.xiangcaowuyu.mqtt.utils;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;

@Component
public class MqttReceiveHandle implements MqttCallback {

    private final Logger logger = LoggerFactory.getLogger(MqttReceiveHandle.class);

    public void handle(Message<?> message) {
        try {
            logger.info("{},客户端号:{},主题:{},QOS:{},消息接收到的数据:{}",
                    new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
                    message.getHeaders().get(MqttHeaders.ID),
                    message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),
                    message.getHeaders().get(MqttHeaders.RECEIVED_QOS),
                    message.getPayload());
            //处理mqtt数据
            this.handle(message.getPayload().toString());
        } catch (Exception e) {
            e.printStackTrace();
            logger.error("处理错误" + e.getMessage());
        }

    }

    private void handle(String str) throws Exception {
        logger.info(str);
    }

    @Override
    public void connectionLost(Throwable throwable) {
        logger.warn("连接丢失");
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        logger.info("消息到达:" + topic + "\n" + "消息内容:" + new String(mqttMessage.getPayload()) + "\nclientId:" + mqttMessage.getId());

    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("clientId:" + iMqttDeliveryToken.getClient().getClientId());
    }
}

消息发送

package net.xiangcaowuyu.mqtt.utils;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

/**
 * Description:
 *
 * @author : laughing
 * DateTime: 2021-05-18 13:44
 */
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    /**
     * 发送信息到MQTT服务器
     *
     * @param data 发送的文本
     */
    void sendToMqtt(String data);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
                    String payload);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param qos 对消息处理的几种机制。
     * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
     * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
     * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
                    @Header(MqttHeaders.QOS) int qos,
                    String payload);
}

测试

package net.xiangcaowuyu.mqtt.controller;

import net.xiangcaowuyu.mqtt.config.MqttConfig;
import net.xiangcaowuyu.mqtt.utils.MqttGateway;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.List;

@RestController
public class MqttController {

    @Resource
    private MqttGateway mqttGateway;

    @Resource
    private MqttConfig mqttConfig;

    @GetMapping("/add/{topic}")
    public String addTopic(@PathVariable("topic") String topic) {
        String[] topics = {topic};
        List<String> list = mqttConfig.addListenTopic(topics);
        return list.toString();
    }

    @GetMapping("/pub")
    public String pubTopic() {
        String topic = "temperature1";
        String msg = "client msg at: " + String.valueOf(System.currentTimeMillis());
        mqttGateway.sendToMqtt(topic, 2, msg);
        return "OK";

    }

    @GetMapping("/del/{topic}")
    public String delTopic(@PathVariable("topic") String topic) {
        List<String> list = mqttConfig.removeListenTopic(topic);
        return list.toString();
    }

}
0

评论 (2)

取消
  1. 头像
    Laughing 作者
    Windows 10 · Google Chrome
    @

    是的,今天一早起来确实不能用了,我们正在寻找最新的

    回复
  2. 头像
    Laughing 作者
    Windows 10 · Google Chrome

    表情

    回复