JAVA实现MQTT客户端订阅消息并消费
程序员文章站
2022-04-02 11:17:32
MQTT客户端订阅消息mqtt发送消息首先要建立与服务器连接,接下来订阅主题topic就可以接收到这个消息客户端: package com.gw.device.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import...
MQTT客户端订阅消息
mqtt发送消息首先要建立与服务器连接,接下来订阅主题topic就可以接收到这个消息
客户端:
package com.gw.device.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
/**
* MQTT客户端
*/
@Slf4j
public class MyMqttClient {
private static String HOST = "ws://xx.xxx.com:8083";
private static String CLIENTID = "03232983-20ea-42b8-84c5-1c6c80bdabef";
private static String userName = "username";
private static String passWord = "password";
private MqttClient client;
private static volatile MyMqttClient mqttClient = null;
public static MyMqttClient getInstance() {
if (mqttClient == null) {
synchronized (MyMqttClient.class) {
if (mqttClient == null) {
mqttClient = new MyMqttClient();
}
}
}
return mqttClient;
}
private MyMqttClient() {
log.info("Connect MQTT: " + this);
connect();
}
/**
* 创建连接
*/
private void connect() {
try {
//初始化连接设置对象
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//true可以安全地使用内存持久性作为客户端断开连接时清除的所有状态
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setConnectionTimeout(30);//设置连接超时
mqttConnectOptions.setUserName(userName); // 设置连接的用户名
mqttConnectOptions.setPassword(passWord.toCharArray());// 设置连接的密码
client = new MqttClient(HOST,CLIENTID, new MemoryPersistence());
client.setCallback(new MqttRecieveCallback());//执行回调
client.connect(mqttConnectOptions);//创建连接
} catch (MqttException e) {
log.info(e.getMessage());
e.printStackTrace();
}
}
/**
* 订阅主题
*
* @param topic qos默认为1
*/
public void subTopic(String topic) {
try {
client.subscribe(topic, 1);
} catch (MqttException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
/**
* 订阅某个主题
*
* @param topic
* @param gos
*/
public void subTopic(String topic, int gos) {
try {
client.subscribe(topic, gos);
} catch (MqttException e) {
log.error(e.getMessage());
e.printStackTrace();
}
}
/**
* 清空主题
* @param topic
*/
public void cleanTopic(String topic) {
try {
client.unsubscribe(topic);
} catch (MqttException e) {
log.error(e.getMessage());
e.printStackTrace();
}
}
}
回调函数类:
package com.gw.device.mqtt;
import com.alibaba.fastjson.JSONObject;
import com.gw.common.HttpClientUtil;
import com.gw.device.pushData.AlarmBJZJInData;
import com.gw.device.pushData.LoraDeviceStatusData;
import com.gw.device.receiveData.AlarmData;
import com.gw.device.receiveData.DeviceStatusData;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* 接收消息回调函数
*/
@Slf4j
public class MqttRecieveCallback implements MqttCallback {
/**
* 断连后重新连接并订阅相关主题
* @param cause
*/
@Override
public void connectionLost(Throwable cause) {
log.error(cause.getMessage());
MyMqttClient client = MyMqttClient.getInstance();
client.subTopic("topic主题");
}
/**
* 消费消息
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String msg = new String(message.getPayload());
log.info("主题名:" + topic);
log.info("订阅消息:" + msg);
if (topic.indexOf("topic主题") > -1 ) {
}
if (topic.indexOf("topic主题")>-1) {
}
}
/**
* 接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
总结
个人整理并已实践,有问题欢迎大家交流指正!
本文地址:https://blog.csdn.net/Zeng_1997/article/details/108990100
上一篇: Centos 下 mysql 安装过程