MQTT简单介绍与实现
1. mqtt 介绍
它是一种 机器之间通讯 machine-to-machine (m2m)、物联网 internet of things (iot)常用的一种轻量级消息传输协议
适用于网络带宽较低的场合
包含发布、订阅模式,通过一个代理服务器(broker),任何一个客户端(client)都可以订阅或者发布某个主题的消息,然后订阅了该主题的客户端则会收到该消息
1.1 消息主题
发布消息或者订阅消息都要选定一个消息主题,消息主题可以任意定制,类似文件系统,用 “/” 进行分隔,例如主题为 /a/b/c/d 的消息
客户端可以使用完全字符匹配消息,也可以使用通配符进行消息匹配
通配符 + :替换任意单个层级。比如订阅 /a/b/c/d、/a/+/c/d 、+/+/+/+ 主题的消息即可收到主题为 /a/b/c/d 的消息,而 b/+/c/d 、 +/+/+ 不会匹配
通配符 # :匹配任意层级,只能用于末尾, #、a/# 可以匹配上面的主题消息
长度为 0 的主题层级也是允许的。比如发布主题为 a//topic 的消息,客户端可以用 a/+/topic 进行匹配。/a/topic 的主题用 +/a/topic、#、/# 可以匹配。
1.2 服务质量(quality of service,qos)
mqtt 定义了三种客户端与代理服务器之间消息到达的难度
0:broker/client 之间消息传一次,并不确认传到没有,消息可能丢失
1:broker/client 之间消息至少一次,带确认消息的传输,可能重复收到
2:broker/client 之间消息仅有一次,利用四次握手进行确认,网络延迟可能会增加
当客户端订阅的消息质量与代理服务器发布主题的质量不同时,客户端会选择难度最小的 qos 接收消息
发布等级为 2 ,客户端订阅等级为 0, 那么客户端接收到的 qos = 0
发布等级为 0 ,订阅等级为 2,那么客户端接收到的 qos = 0
1.3 消息保留
即当 broker 正在发送消息给 client 时,消息会保存,如果此时有新的 client 订阅了该主题的消息,那么它也会收到消息。这种做法的好处就是当消息主题经常变换的时候,如果有新的 client 订阅该消息,那么它不用等待太长的时间就可以收到消息
1.4 会话清除
client 可以设置 clean session 标志位,当 clean session = false 时,client 失去连接时, broker 会一直保留消息直到 client 重新连接。而 clean session = true 时,broker 会清除所有的消息当这个 client 失去连接。
1.5 消息意愿
当 client 连接上 broker 时,client 会提示 broker 它有一个意愿消息,这个意愿消息将会在 client 失去连接时,broker 发送出去。消息意愿和普通消息一样都包含主题和内容。
2. 实例
js实现过程
function rndnum(n){
var rnd="";
for(var i=0;i<n;i++)
rnd+=math.floor(math.random()*10);
return rnd;
}
var hostname = '服务器',
port = “端口号”,
clientid = rndnum(5),
keepalive = 100,
cleansession = false,
username = '',
password = '',
topic = '订阅消息';
client = new paho.mqtt.client(hostname, port, clientid);
//建立客户端实例
var options = {
invocationcontext: {
host: hostname,
port: port,
path: client.path,
clientid: clientid
},
keepaliveinterval: keepalive,
cleansession: cleansession,
username: username,
password: password,
onsuccess: onconnect,
};
client.connect(options);
//连接服务器并注册连接成功处理事件
function onconnect() {
console.log("onconnected");
client.subscribe(topic);
}
client.onconnectionlost = onconnectionlost;
//注册连接断开处理事件
client.onmessagearrived = onmessagearrived;
//注册消息接收处理事件
function onconnectionlost(responseobject) {
console.log(responseobject);
if (responseobject.errorcode !== 0) {
console.log("onconnectionlost:" + responseobject.errormessage);
console.log("连接已断开");
}
}
function onmessagearrived(message) {
let msg = message.payloadstring;
// 消息处理
}
var count = 0;
function start() {
window.tester = window.setinterval(function () {
if(count!=0){
if (client.isconnected) {
var s = "content:" + (count++) ;
message = new paho.mqtt.message(s);
message.destinationname = topic;
client.send(message);
}
}else
{
window.clearinterval(window.tester);
}
});
}
java spring boot实现过程
1.添加依赖
<dependency>
<groupid>org.eclipse.paho</groupid>
<artifactid>org.eclipse.paho.client.mqttv3</artifactid>
<version>1.1.1</version>
</dependency>
2.创建类
public class mqttutil {
private string serviceuri = "服务器端口号";
private string clientid = "客户端id";
private mqttclientpersistence persistence = new memorypersistence();
private string username = "";
private string password = "";
private string topic = "订阅";
private string messagestr = "";
private int qos = 0;
/**
* 消息订阅
**/
public void subscribe() {
try {
mqttclient client = new mqttclient(serviceuri, clientid, persistence);
client.setcallback(new mqttcallback() {
public void connectionlost(throwable cause) {
system.out.println("订阅者连接丢失...");
system.out.println(cause.getmessage());
}
public void messagearrived(string topic, mqttmessage message) {
messagestr = message.tostring();
// system.out.println("订阅者接收到消息:"+messagestr);
}
public void deliverycomplete(imqttdeliverytoken token) {
}
});
mqttconnectoptions connectoptions = new mqttconnectoptions();
connectoptions.setusername(username);
connectoptions.setpassword(password.tochararray());
connectoptions.setcleansession(false);
//订阅者连接订阅主题
client.connect(connectoptions);
client.subscribe(topic, qos);
system.out.println("订阅者连接状态: " + client.isconnected());
} catch (mqttexception e) {
e.printstacktrace();
}
}
下一篇: 死的很难看