SpringBoot整合MQTT
程序员文章站
2022-04-07 18:08:26
使用java代码通过MQTT完成发布订阅操作pom 导入依赖
使用java代码通过MQTT完成发布订阅操作
pom 导入依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.hzx</groupId>
<artifactId>mqtt_test_02</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.9.RELEASE</version>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>5.4.1</version>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
<version>5.4.1</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.12</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore</artifactId>
<version>4.4.12</version>
</dependency>
</dependencies>
</project>
yml文件中配置
spring:
mqtt:
username: admin
password: public
host-url: tcp://127.0.0.1:1883
client-id: test
default-topic: test
timeout: 100
keepalive: 100
获取配置
package com.mqtt.common;
import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("spring.mqtt")
@Setter
@Getter
public class MqttConfig {
@Autowired
private MqttPushClient mqttPushClient;
private String username;
private String password;
private String hostUrl;
private String clientId;
private String defaultTopic;
private int timeout;
private int keepalive;
@Bean
public MqttPushClient getMqttPushClient(){
mqttPushClient.connect(hostUrl,clientId,username,password,timeout,keepalive);
mqttPushClient.subscribe("test/#",0);
return mqttPushClient;
}
}
MQTT推送客户端
package com.mqtt.common;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MqttPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
private static MqttClient getClient() {
return client;
}
private static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
/**
* 客户端连接
*
* @param host ip+端口
* @param clientId
* @param username
* @param password
* @param timeout
* @param keepalive 保留数
*/
public void connect(String host, String clientId, String username, String password, int timeout, int keepalive) {
MqttClient client;
try {
client = new MqttClient(host, clientId, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client);
try {
client.setCallback(pushCallback);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public void publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mqttTopic = MqttPushClient.getClient().getTopic(topic);
if (mqttTopic == null) {
logger.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mqttTopic.publish(message);
token.waitForCompletion();
} catch (MqttPersistenceException e) {
e.printStackTrace();
} catch (MqttException e) {
e.printStackTrace();
}
}
/**
* 订阅某个主题
* @param topic
* @param qos
*/
public void subscribe(String topic,int qos){
logger.info("开始订阅主题"+topic);
try {
MqttPushClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
监听类
package com.mqtt.common;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class PushCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private MqttConfig mqttConfig;
private static MqttClient client;
@Override
public void connectionLost(Throwable throwable) {
// 连接丢失后,在此处重连
logger.info("连接断开,可以重连");
if (client == null || !client.isConnected()) {
mqttConfig.getMqttPushClient();
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// subscribe后得到的消息会执行到这里面
logger.info("接收消息主题:"+ topic);
logger.info("接收消息QoS:" + mqttMessage.getQos());
logger.info("接收消息内容:" + new String(mqttMessage.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
logger.info("deliveryCompplete-------------"+iMqttDeliveryToken.isComplete());
}
}
Controller返回值格式
package com.mqtt.common;
import org.springframework.http.HttpStatus;
import java.util.HashMap;
import java.util.Map;
public class RUtils extends HashMap<String,Object> {
private static final long serialVersionUID = 1L;
public RUtils(){
put("code",200);
put("msg","success");
}
public static RUtils error(HttpStatus code, String msg){
RUtils r = new RUtils();
r.put("code",code);
r.put("msg",msg);
return r;
}
public static RUtils error(int code,String msg,String data){
RUtils r = new RUtils();
r.put("code",code);
r.put("msg",msg);
r.put("data",data);
return r;
}
public static RUtils ok(String msg){
RUtils r = new RUtils();
r.put("msg",msg);
return r;
}
public static RUtils ok(Map<String,Object> map){
RUtils r = new RUtils();
r.putAll(map);
return r;
}
public static RUtils error(){
return error(HttpStatus.INTERNAL_SERVER_ERROR,"未知异常");
}
public static RUtils error(String msg){
return error(HttpStatus.INTERNAL_SERVER_ERROR,msg);
}
}
controller测试
@RestController
@RequestMapping("/")
public class TestController {
@Autowired
private MqttPushClient mqttPushClient;
@GetMapping(value = "/publishTopic")
public RUtils publishTopic(){
mqttPushClient.publish(0,false,"test/test","测试发布消息");
return RUtils.ok("发布成功");
}
}
测试结果
访问http://localhost:18083/#/clients
有新的客户端连接
在MQTT中,在主题test/test中发布消息
在java控制台接收到消息
在MQTTX中,在主题test/te中发布消息
在java控制台接收消息
本文地址:https://blog.csdn.net/m0_51994489/article/details/110822246
上一篇: Linux 系统如何快速入门?分享民工哥总结的经验
下一篇: 100个numpy问题49-100