BC26 NB-IoT模组通过MQTT实现数据上云
1.在云服务上部署MQTT
提供了Ubuntu
和CentOS
的方法,根据系统二选一
1.1安装所需的组件
Ubuntu
sudo apt update && sudo apt install -y \
apt-transport-https \
ca-certificates \
curl \
gnupg-agent \
software-properties-common \
mosquitto-clients
CentOS
sudo yum install -y yum-utils device-mapper-persistent-data lvm2 mosquitto-clients
1.2设置稳定存储库
Ubuntu
curl -fsSL https://repos.emqx.io/gpg.pub | sudo apt-key add -
sudo apt-key fingerprint 3E640D53
sudo add-apt-repository \
"deb [arch=amd64] https://repos.emqx.io/emqx-ce/deb/ubuntu/ \
./$(lsb_release -cs) \
stable"
CentOS
sudo yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo
1.3安装EMQ X Broker
Ubuntu
sudo apt update
sudo apt install emqx
CentOS
sudo yum install emqx
1.4启动 EMQ X Broker
sudo emqx start
2.设置服务器端口允许访问
在云服务器安装组中允许18083
和1883
访问
查看防火墙中允许访问的端口
sudo firewall-cmd --list-all
显示只开启了3306
端口
使其开启18083
端口
sudo firewall-cmd --zone=public --add-port=18083/tcp --permanent
sudo firewall-cmd --zone=public --add-port=1883/tcp --permanent
重启防火墙
sudo firewall-cmd --reload
再次查看防火墙中允许访问的端口列表
sudo firewall-cmd --list-all
1883
和13083
端口已开启
3.登陆MQTT后台
打开浏览器,输入服务器ip:18083
进入MQTT后台管理系统,初始账户为admin
,密码为public
。
可以看到一些关于MQTT的参数
4.将NB-IoT模块连接至MQTT服务器
注意事项:如果使用USB-TTL模块上电之后一直循环收到上电信息则是因为电压不稳,采用稳压电源给模块供电方可解决,USB-TTL需和直流电源共地,否则无法收到返回信息!
模块插入已激活的4G物联网卡,方向及正反要正确,接上4G天线。
模块上的RX
、TX
分别与USB-TTL的TX
、RX
连接,模块3V3
与稳压电源正极
相连;模块的G
和USB-TTL的GND
与稳压电源的负极
相连;
模块上电60秒后,可以开始发送指令。打开串口助手,选择串口,波特率115200
,取消HEX发送
,发送指令要加上空行
AT
查看网络是否附着,1表示已附着
AT+CGATT?
打开MQTT客户端网络,参数是服务器IP
加上MQTT端口1883
AT+QMTOPEN=0,"106.52.150.xxx",1883
+QMTOPEN:0,0
表示MQTT 客户端网络成功打开,第一个0为连接id,第二个0表示打开网络成功
在返回+QMTOPEN:0,0
后10秒内需要将NB-IoT模块连接至MQTT服务器,过时将断开,需要重新打开MQTT客户端网络。
将BC26连接至MQTT服务器
AT+QMTCONN=0,"BC26"
+QMTCONN: 0,0,0
显示成功连接,第一个0为连接id,第二个0表示数据包发送成功且从服务器接收到 ACK,第三个0表示接受连接。
到MQTT后台查看,BC26已经连接至MQTT服务器中
5.数据传输测试
在服务器命令行界面中输入指令,订阅/mqtt/pub
主题的消息
mosquitto_sub -t /mqtt/pub
NB-IoT模块向/mqtt/pub
发送数据
AT+QMTPUB=0,1,1,0,"/mqtt/pub","can you hear me?"
数据传输测试已经通过,那么需要应用到实际项目中
6.项目应用测试
代码参考自java连接MQTT服务器
在pom.xml
中引入maven依赖,里面的ip地址
更改为MQTT部署的服务器地址
<!-- MQTT依赖 -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
MQTT工具类代码
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* MQTT工具类操作
*/
@Slf4j
@Component
public class MQTTConnect {
private String HOST = "tcp://106.52.150.xxx:1883"; //mqtt服务器的地址和端口号
private final String clientId = "DC" + (int) (Math.random() * 100000000);
private MqttClient mqttClient;
/**
* 客户端connect连接mqtt服务器
*
* @param userName 用户名
* @param passWord 密码
* @param mqttCallback 回调函数
**/
public void setMqttClient(String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
MqttConnectOptions options = mqttConnectOptions(userName, passWord);
if (mqttCallback == null) {
mqttClient.setCallback(new Callback());
} else {
mqttClient.setCallback(mqttCallback);
}
mqttClient.connect(options);
}
/**
* MQTT连接参数设置
*/
private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException {
mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
options.setConnectionTimeout(10);///默认:30
options.setAutomaticReconnect(true);//默认:false
options.setCleanSession(false);//默认:true
//options.setKeepAliveInterval(20);//默认:60
return options;
}
/**
* 关闭MQTT连接
*/
public void close() throws MqttException {
mqttClient.close();
mqttClient.disconnect();
}
/**
* 向某个主题发布消息 默认qos:1
*
* @param topic:发布的主题
* @param msg:发布的消息
*/
public void pub(String topic, String msg) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
//mqttMessage.setQos(2);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 向某个主题发布消息
*
* @param topic: 发布的主题
* @param msg: 发布的消息
* @param qos: 消息质量 Qos:0、1、2
*/
public void pub(String topic, String msg, int qos) throws MqttException {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setQos(qos);
mqttMessage.setPayload(msg.getBytes());
MqttTopic mqttTopic = mqttClient.getTopic(topic);
MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
token.waitForCompletion();
}
/**
* 订阅某一个主题 ,此方法默认的的Qos等级为:1
*
* @param topic 主题
*/
public void sub(String topic) throws MqttException {
mqttClient.subscribe(topic);
}
/**
* 订阅某一个主题,可携带Qos
*
* @param topic 所要订阅的主题
* @param qos 消息质量:0、1、2
*/
public void sub(String topic, int qos) throws MqttException {
mqttClient.subscribe(topic, qos);
}
/**
* main函数自己测试用
*/
public static void main(String[] args) throws MqttException {
MQTTConnect mqttConnect = new MQTTConnect();
mqttConnect.setMqttClient("", "", new Callback());
mqttConnect.sub("/mqtt/pub");
//mqttConnect.pub("/mqtt/sub", "can you hear me?" + (int) (Math.random() * 100000000));
}
}
MQTT的回调函数代码
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* MQTT回调函数
*/
@Slf4j
public class Callback implements MqttCallback {
/**
* MQTT 断开连接会执行此方法
*/
@Override
public void connectionLost(Throwable throwable) {
log.info("断开了MQTT连接 :{}", throwable.getMessage());
log.error(throwable.getMessage(), throwable);
}
/**
* publish发布成功后会执行到这里
*/
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
log.info("发布消息成功");
}
/**
* subscribe订阅后得到的消息会执行到这里
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// TODO 此处可以将订阅得到的消息进行业务处理、数据存储
log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
}
}
MQTT监听器代码
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;
/**
* 项目启动 监听主题
*/
@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {
private final MQTTConnect server;
@Autowired
public MQTTListener(MQTTConnect server) {
this.server = server;
}
@Override
public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
try {
server.setMqttClient("", "", new Callback());
server.sub("/mqtt/sub");
} catch (MqttException e) {
log.error(e.getMessage(), e);
}
}
}
运行MQTT工具类代码
中的main函数
已经成功收到数据了,那么编写相对应的业务逻辑即可将数据存储进数据库中,前端可以看到有NB-IoT发来的数据。
欢迎小伙伴的讨论,若有问题请在评论区评论或私信,谢谢你。
本文地址:https://blog.csdn.net/bean_business/article/details/109009279
上一篇: 汉服不适合去风大的地方炫