欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SpringBoot整合MQTT

程序员文章站 2022-04-07 18:08:26
使用java代码通过MQTT完成发布订阅操作pom 导入依赖

使用java代码通过MQTT完成发布订阅操作

pom 导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hzx</groupId>
    <artifactId>mqtt_test_02</artifactId>
    <version>1.0-SNAPSHOT</version>
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.1.9.RELEASE</version>
</parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
            <version>5.4.1</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
            <version>5.4.1</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.12</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>5.2.8.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpcore</artifactId>
            <version>4.4.12</version>
        </dependency>
    </dependencies>

</project>

yml文件中配置

spring:
  mqtt:
    username: admin
    password: public
    host-url: tcp://127.0.0.1:1883
    client-id: test
    default-topic: test
    timeout: 100
    keepalive: 100

获取配置

package com.mqtt.common;

import lombok.Getter;
import lombok.Setter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
@ConfigurationProperties("spring.mqtt")
@Setter
@Getter
public class MqttConfig {

    @Autowired
    private MqttPushClient mqttPushClient;

    private String username;
    private String password;
    private String hostUrl;
    private String clientId;
    private String defaultTopic;
    private int timeout;
    private int keepalive;

    @Bean
    public MqttPushClient getMqttPushClient(){
        mqttPushClient.connect(hostUrl,clientId,username,password,timeout,keepalive);
        mqttPushClient.subscribe("test/#",0);
        return mqttPushClient;
    }
}

MQTT推送客户端

package com.mqtt.common;


import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MqttPushClient {
    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private PushCallback pushCallback;


    private static MqttClient client;

    private static MqttClient getClient() {
        return client;
    }

    private static void setClient(MqttClient client) {
        MqttPushClient.client = client;
    }

    /**
     * 客户端连接
     *
     * @param host      ip+端口
     * @param clientId
     * @param username
     * @param password
     * @param timeout
     * @param keepalive 保留数
     */
    public void connect(String host, String clientId, String username, String password, int timeout, int keepalive) {
        MqttClient client;
        try {
            client = new MqttClient(host, clientId, new MemoryPersistence());
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(username);
            options.setPassword(password.toCharArray());
            options.setConnectionTimeout(timeout);
            options.setKeepAliveInterval(keepalive);
            MqttPushClient.setClient(client);
            try {
                client.setCallback(pushCallback);
                client.connect(options);
            } catch (Exception e) {
                e.printStackTrace();
            }
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 发布
     *
     * @param qos         连接方式
     * @param retained    是否保留
     * @param topic       主题
     * @param pushMessage 消息体
     */
    public void publish(int qos, boolean retained, String topic, String pushMessage) {
        MqttMessage message = new MqttMessage();
        message.setQos(qos);
        message.setRetained(retained);
        message.setPayload(pushMessage.getBytes());
        MqttTopic mqttTopic = MqttPushClient.getClient().getTopic(topic);
        if (mqttTopic == null) {
            logger.error("topic not exist");
        }
        MqttDeliveryToken token;
        try {
            token = mqttTopic.publish(message);
            token.waitForCompletion();
        } catch (MqttPersistenceException e) {
            e.printStackTrace();
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }

    /**
     * 订阅某个主题
     * @param topic
     * @param qos
     */
    public void subscribe(String topic,int qos){
        logger.info("开始订阅主题"+topic);
        try {
            MqttPushClient.getClient().subscribe(topic, qos);
        } catch (MqttException e) {
            e.printStackTrace();
        }
    }
}

监听类

package com.mqtt.common;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class PushCallback implements MqttCallback {

    private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);

    @Autowired
    private MqttConfig mqttConfig;

    private static MqttClient client;

    @Override
    public void connectionLost(Throwable throwable) {
        // 连接丢失后,在此处重连
        logger.info("连接断开,可以重连");
        if (client == null || !client.isConnected()) {
            mqttConfig.getMqttPushClient();
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
        // subscribe后得到的消息会执行到这里面
        logger.info("接收消息主题:"+ topic);
        logger.info("接收消息QoS:" + mqttMessage.getQos());
        logger.info("接收消息内容:" + new String(mqttMessage.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        logger.info("deliveryCompplete-------------"+iMqttDeliveryToken.isComplete());
    }
}

Controller返回值格式

package com.mqtt.common;

import org.springframework.http.HttpStatus;

import java.util.HashMap;
import java.util.Map;

public class RUtils extends HashMap<String,Object> {
    private static final long serialVersionUID = 1L;

    public RUtils(){
        put("code",200);
        put("msg","success");
    }

    public static RUtils error(HttpStatus code, String msg){
        RUtils r = new RUtils();
        r.put("code",code);
        r.put("msg",msg);
        return r;
    }

    public static RUtils error(int code,String msg,String data){
        RUtils r = new RUtils();
        r.put("code",code);
        r.put("msg",msg);
        r.put("data",data);
        return r;
    }

    public static RUtils ok(String msg){
        RUtils r = new RUtils();
        r.put("msg",msg);
        return r;
    }

    public static RUtils ok(Map<String,Object> map){
        RUtils r = new RUtils();
        r.putAll(map);
        return r;
    }

    public static RUtils error(){
        return error(HttpStatus.INTERNAL_SERVER_ERROR,"未知异常");
    }

    public static RUtils error(String msg){
        return error(HttpStatus.INTERNAL_SERVER_ERROR,msg);
    }
}

controller测试

@RestController
@RequestMapping("/")
public class TestController {
    @Autowired
    private MqttPushClient mqttPushClient;

    @GetMapping(value = "/publishTopic")
    public RUtils publishTopic(){
        mqttPushClient.publish(0,false,"test/test","测试发布消息");
        return RUtils.ok("发布成功");
    }
}

测试结果

访问http://localhost:18083/#/clients
有新的客户端连接
SpringBoot整合MQTT
SpringBoot整合MQTT

在MQTT中,在主题test/test中发布消息
SpringBoot整合MQTT
在java控制台接收到消息
SpringBoot整合MQTT
在MQTTX中,在主题test/te中发布消息
SpringBoot整合MQTT
在java控制台接收消息
SpringBoot整合MQTT

本文地址:https://blog.csdn.net/m0_51994489/article/details/110822246

相关标签: 代码