Spring Boot集成MQTT
支持动态创建topic
。
添加依赖
<!-- MQTT -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
添加配置文件
server:
port: 9999
#mqtt的配置
mqtt:
server:
url: tcp://ip:1883
port: 1883
username: 用户名
password: 密码
client:
consumerId: consumerCo
publishId: publishCo
default:
topic: topic
completionTimeout: 3000
MQTT配置文件
package net.xiangcaowuyu.mqtt.config;
import net.xiangcaowuyu.mqtt.utils.MqttReceiveHandle;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;
import javax.annotation.Resource;
import java.util.Arrays;
import java.util.List;
/**
* Description:消息订阅配置
*
* @author : laughing
* DateTime: 2021-05-18 13:31
*/
@Configuration
public class MqttConfig {
public final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final byte[] WILL_DATA;
static {
WILL_DATA = "offline".getBytes();
}
@Resource
private MqttReceiveHandle mqttReceiveHandle;
@Value("${mqtt.server.url}")
private final String url = "tcp://139.198.172.114:1883";
@Value("${mqtt.server.port}")
private final String port = "1883";
@Value("${mqtt.server.username}")
private final String username = "admin";
@Value("${mqtt.server.password}")
private final String password = "public";
@Value("${mqtt.client.consumerId}")
private final String consumerId = "consumerClient";
@Value("${mqtt.client.publishId}")
private final String publishId = "publishClient";
@Value("${mqtt.default.topic}")
private final String topic = "topic";
@Value("${mqtt.default.completionTimeout}")
private final Integer completionTimeout = 3000;
//消息驱动
private MqttPahoMessageDrivenChannelAdapter adapter;
//订阅的主题列表
private String listenTopics = "";
// //mqtt消息接收接口
// private MqttReceiveService mqttReceiveService;
//
// public void setMqttReceiveService(MqttReceiveService mqttReceiveService){
// this.mqttReceiveService = mqttReceiveService;
// }
/**
* MQTT连接器选项
* **/
@Bean(value = "getMqttConnectOptions")
public MqttConnectOptions getMqttConnectOptions(){
MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
mqttConnectOptions.setCleanSession(true);
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[]{url});
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
mqttConnectOptions.setKeepAliveInterval(10);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
mqttConnectOptions.setWill("willTopic", WILL_DATA, 2, false);
return mqttConnectOptions;
}
/**
* MQTT工厂
* **/
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
/**
* MQTT信息通道(生产者)
* **/
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(生产者)
* **/
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(publishId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(topic);
return messageHandler;
}
/**
* 配置client,监听的topic
* MQTT消息订阅绑定(消费者)
* **/
@Bean
public MessageProducer inbound() {
if(adapter == null){
adapter = new MqttPahoMessageDrivenChannelAdapter(consumerId, mqttClientFactory(),
topic);
}
String [] topics = listenTopics.split(",");
for(String topic: topics){
if(!StringUtils.isEmpty(topic)){
adapter.addTopic(topic,1);
}
}
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(2);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
/**
* 增加监听的topic
* @param topicArr 消息列表
* @return 结果
*/
public List<String> addListenTopic(String [] topicArr){
if(adapter == null){
adapter = new MqttPahoMessageDrivenChannelAdapter(consumerId, mqttClientFactory(),
topic);
}
List<String> listTopic = Arrays.asList(adapter.getTopic());
for(String topic: topicArr){
if(!StringUtils.isEmpty(topic)){
if(!listTopic.contains(topic)){
adapter.addTopic(topic,1);
}
}
}
return Arrays.asList(adapter.getTopic());
}
/**
* 移除一个监听的topic
* @param topic
* @return
*/
public List<String> removeListenTopic(String topic){
if(adapter == null){
adapter = new MqttPahoMessageDrivenChannelAdapter(consumerId, mqttClientFactory(),
topic);
}
List<String> listTopic = Arrays.asList(adapter.getTopic());
if(listTopic.contains(topic)){
adapter.removeTopic(topic);
}
return Arrays.asList(adapter.getTopic());
}
/**
* MQTT信息通道(消费者)
* **/
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
/**
* MQTT消息处理器(消费者)
* **/
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//处理接收消息
mqttReceiveHandle.handle(message);
//String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
//String msg = ((String) message.getPayload()).toString();
//mqttReceiveService.handlerMqttMessage(topic,msg);
}
};
}
}
消息处理
package net.xiangcaowuyu.mqtt.utils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class MqttReceiveHandle implements MqttCallback {
private final Logger logger = LoggerFactory.getLogger(MqttReceiveHandle.class);
public void handle(Message<?> message) {
try {
logger.info("{},客户端号:{},主题:{},QOS:{},消息接收到的数据:{}",
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()),
message.getHeaders().get(MqttHeaders.ID),
message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC),
message.getHeaders().get(MqttHeaders.RECEIVED_QOS),
message.getPayload());
//处理mqtt数据
this.handle(message.getPayload().toString());
} catch (Exception e) {
e.printStackTrace();
logger.error("处理错误" + e.getMessage());
}
}
private void handle(String str) throws Exception {
logger.info(str);
}
@Override
public void connectionLost(Throwable throwable) {
logger.warn("连接丢失");
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
logger.info("消息到达:" + topic + "\n" + "消息内容:" + new String(mqttMessage.getPayload()) + "\nclientId:" + mqttMessage.getId());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
logger.info("clientId:" + iMqttDeliveryToken.getClient().getClientId());
}
}
消息发送
package net.xiangcaowuyu.mqtt.utils;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
/**
* Description:
*
* @author : laughing
* DateTime: 2021-05-18 13:44
*/
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 发送信息到MQTT服务器
*
* @param data 发送的文本
*/
void sendToMqtt(String data);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
String payload);
/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息处理的几种机制。
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload);
}
测试
package net.xiangcaowuyu.mqtt.controller;
import net.xiangcaowuyu.mqtt.config.MqttConfig;
import net.xiangcaowuyu.mqtt.utils.MqttGateway;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.List;
@RestController
public class MqttController {
@Resource
private MqttGateway mqttGateway;
@Resource
private MqttConfig mqttConfig;
@GetMapping("/add/{topic}")
public String addTopic(@PathVariable("topic") String topic) {
String[] topics = {topic};
List<String> list = mqttConfig.addListenTopic(topics);
return list.toString();
}
@GetMapping("/pub")
public String pubTopic() {
String topic = "temperature1";
String msg = "client msg at: " + String.valueOf(System.currentTimeMillis());
mqttGateway.sendToMqtt(topic, 2, msg);
return "OK";
}
@GetMapping("/del/{topic}")
public String delTopic(@PathVariable("topic") String topic) {
List<String> list = mqttConfig.removeListenTopic(topic);
return list.toString();
}
}
是的,今天一早起来确实不能用了,我们正在寻找最新的