0、MQTT服务器选型
比较流行的开源 MQTT 服务器有几个:
1、Eclipse Mosquitto
使用 C 语言实现的 MQTT 服务器。Eclipse 组织还还包含了大量的 MQTT 客户端项目:https://www.eclipse.org/paho/#
2、EMQ X
使用 Erlang 语言开发的 MQTT 服务器,内置强大的规则引擎,支持许多其他 IoT 协议比如 MQTT-SN、 CoAP、LwM2M 等。
3、Mosca
使用 Node.JS 开发的 MQTT 服务器,简单易用。
4、VerneMQ
同样使用 Erlang 开发的 MQTT 服务器.
本文选取Mosquitto为实战对象。
1、window下安装Mosquitto
下载地址:https://mosquitto.org/download/
安装步骤
2、window下启动Mosquitto
在命令行窗口输入services.msc打开服务窗口,如图:
找到MQTT的broker服务,如图:
右击该服务,弹框可见启动和关闭服务的方式。
3、基于springboot的MQTT实战
POM文件配置如下:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
在application.properties文件中配置内容如下:
spring.mqtt.username=roger
spring.mqtt.password=$6$clQ4Ocu312S0qWgl$Cv2wUxgEN73c6C6jlBkswqR4AkHsvDLWvtEXZZ8NpsBLgP1WAo/qA+WXcmEN/mjDNgdUwcxRAveqNMs2xUVQYA==
spring.mqtt.url= tcp://127.0.0.1:1883
spring.mqtt.client.id='mqttId1'
spring.mqtt.default.topic='topic'
spring.mqtt.topics=topic,hello,hit/us
spring.mqtt.qosValues=2,3
配置代码如下:
package com.tal.demo.config;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import java.util.Arrays;
import java.util.List;
@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${spring.mqtt.client.id}")
private String clientId;
@Value("${spring.mqtt.default.topic}")
private String defaultTopic;
@Value("#{'${spring.mqtt.topics}'.split(',')}")
private List<String> topics;
@Value("#{'${spring.mqtt.qosValues}'.split(',')}")
private List<Integer> qosValues;
@Bean
public MqttConnectOptions getMqttConnectOptions() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
// mqttConnectOptions.setUserName(username);
// mqttConnectOptions.setPassword(password.toCharArray());
mqttConnectOptions.setServerURIs(new String[] {hostUrl});
mqttConnectOptions.setKeepAliveInterval(2);
// 设置超时时间 单位为秒
mqttConnectOptions.setConnectionTimeout(10);
mqttConnectOptions.setMaxInflight(100000000);
return mqttConnectOptions;
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getMqttConnectOptions());
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(clientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
// 接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
// 配置client,监听的topic
@Bean
public MessageProducer inbound() {
String[] strings = new String[topics.size()];
//String[] strings1 = {"topic", "hello"};
Integer[] ints = new Integer[qosValues.size()];
topics.toArray(strings);
qosValues.toArray(ints);
System.out.println("strings==" + strings);
int[] its = Arrays.stream(ints).mapToInt(Integer::valueOf).toArray();
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
clientId + "_inbound", mqttClientFactory(), strings);
adapter.setCompletionTimeout(3000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
// 通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
Object hello = message.getHeaders().get("mqtt_receivedTopic");
String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
Object payload = message.getPayload();
System.out.println("msg==" + payload);
System.out.println("topic is " + topic);
// String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
// String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
// System.out.println(topic+"|"+message.getPayload().toString());
}
};
}
}
服务层代码如下:
package com.tal.demo.service;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic);
}
接口层测试代码如下:
package com.tal.demo.controller;
import javax.annotation.Resource;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.tal.demo.service.MqttGateway;
@RestController
@RequestMapping("/mqtt")
public class MQTTController {
@Resource
private MqttGateway mqttGateway;
//http://localhost:8080/mqtt/sendMqtt
@RequestMapping("/sendMqtt")
public String sendMqtt(){
String sendData = "12356";
System.out.println("消息订阅"+sendData);
mqttGateway.sendToMqtt(sendData,"hello");
return "OK";
}
@RequestMapping("/test")
public String test(String sendData){
return "testOK";
}
}
4、工程打包运行
通过mvn package 打包后,可以在target目录进行jar执行。
浏览器运行请求:http://localhost:8080/mqtt/sendMqtt
https://aoyouzi.iteye.com/admin/blogs/2515800
https://aoyouzi.iteye.com/admin/blogs/2515887