欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

windows10 安装mosquitto服务

程序员文章站 2022-07-10 19:06:20
官网下载地址 http://www.eclipse.org/paho/components/tool/得到压缩包,直接解压即可服务(本地) :启动服务在根目录.\mosquitto.exe -c .\mosquitto.conf.\mosquitto_sub.exe -t 'dissun/topic' -v.\mosquitto_pub.exe -t 'dissun/topic' -m '爱你'到此无账号密码安装操完成java操作–mosquitto无账号密码

官网下载地址 http://www.eclipse.org/paho/components/tool/
windows10 安装mosquitto服务
得到压缩包,直接解压即可
windows10 安装mosquitto服务
服务(本地) :启动服务
windows10 安装mosquitto服务
在根目录

.\mosquitto.exe -c .\mosquitto.conf
.\mosquitto_sub.exe -t 'dissun/topic' -v
.\mosquitto_pub.exe -t 'dissun/topic' -m '爱你'

windows10 安装mosquitto服务

到此无账号密码安装操完成

java操作–mosquitto无账号密码

<dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.client.mqttv3 -->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.0</version>
        </dependency>

windows10 安装mosquitto服务

##################
#  MQTT 配置
##################
# 推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613
mqtt.url=tcp://127.0.0.1:1883,tcp://127.0.0.1:1883
#mqtt.username=admin
#mqtt.password=password
mqtt.producer.clientId=cs
#topic可在传参中动态设置
mqtt.producer.topic=dissun/topic
mqtt.timeout=10000

windows10 安装mosquitto服务
创建MqttConfig.java

package com.example.mqttdemo.config;

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessagingException;
import org.springframework.util.StringUtils;
import org.springframework.messaging.MessageHandler;

/**
 * MQTT配置,生产者
 *
 * @author BBF
 */
@Configuration
public class MqttConfig {

    private static final Logger LOGGER = LoggerFactory.getLogger(com.example.mqttdemo.config.MqttConfig.class);

    private static final byte[] WILL_DATA;

    static {
        WILL_DATA = "offline".getBytes();
    }

    /**
     * 订阅的bean名称
     */
    /**
     * 发布的bean名称
     */
    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";

//    @Value("${mqtt.username}")
//    private String username;
//
//    @Value("${mqtt.password}")
//    private String password;
//
    @Value("${mqtt.url}")
    private String url;

    @Value("${mqtt.producer.clientId}")
    private String producerClientId;

//    @Value("${mqtt.producer.defaultTopic}")
    private String producerDefaultTopic = "down_command/{province_code}/{city_code}/{district_code}/{community_code}/{unit_code}/{device_code}";

//    public void setProducerDefaultTopic(String s){
//        if(null!=s&&"".equals(s)){
//            producerDefaultTopic = s;
//        }
//    }


    /**
     * MQTT连接器选项
     *
     *
     */
    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions options = new MqttConnectOptions();
        // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
        // 这里设置为true表示每次连接到服务器都以新的身份连接
        options.setCleanSession(true);
//        // 设置连接的用户名
//        options.setUserName(username);
//        // 设置连接的密码
//        options.setPassword(password.toCharArray());
        System.out.println(StringUtils.split(url,","));
        options.setServerURIs(StringUtils.split(url,","));
        // 设置超时时间 单位为秒
        options.setConnectionTimeout(10);
        // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
        options.setKeepAliveInterval(20);
        // 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
        options.setWill("willTopic", WILL_DATA, 2, false);
        return options;
    }


    /**
     * MQTT客户端
     *
     * @return {@link org.springframework.integration.mqtt.core.MqttPahoClientFactory}
     */
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    /**
     * MQTT信息通道(生产者)
     *
     * @return {@link org.springframework.messaging.MessageChannel}
     */
    @Bean(name = CHANNEL_NAME_OUT)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * MQTT消息处理器(生产者)
     *
     * @return {@link org.springframework.messaging.MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
                producerClientId,
                mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(producerDefaultTopic);
        return messageHandler;
    }




//    @Value("${mqtt.producer.clientId}")
//    private String clientId;

    @Value("${mqtt.producer.topic}")
    private String defaultTopic;

    @Value("${mqtt.timeout}")
    private int completionTimeout;   //连接超时

    //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    //配置client,监听的topic
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(producerClientId + "_inbound", mqttClientFactory(),
                        defaultTopic);
        adapter.setCompletionTimeout(completionTimeout);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    //通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println("主题:"+ message.getHeaders().get("mqtt_receivedTopic")+ "消息接收到的数据:"+message.getPayload());
            }
        };
    }

}

创建MqttSenderService.java

package com.example.mqttdemo.service;



import com.example.mqttdemo.config.MqttConfig;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * MQTT生产者消息发送接口
 * <p>MessagingGateway要指定生产者的通道名称</p>
 *
 * @author BBF
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface MqttSenderService {

    /**
     * 发送信息到MQTT服务器
     *
     * @param data 发送的文本
     */
    void sendToMqtt(String data);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic   主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
                    String payload);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic   主题
     * @param qos     对消息处理的几种机制。<br> 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>
     *                1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
     *                2 多了一次去重的动作,确保订阅者收到的消息有一次。
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
                    @Header(MqttHeaders.QOS) int qos,
                    String payload);
}

创建TestController.java

package com.example.mqttdemo.controller;

import com.example.mqttdemo.service.MqttSenderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;

@RestController
public class TestController {

    @Autowired
    MqttSenderService mqttSenderService;

    @RequestMapping("mqtt")
    public String sen(@RequestBody Map map){
        try {
            mqttSenderService.sendToMqtt(map.get("topic").toString(),map.get("payload").toString());
            return "操作成功";
        }catch (Exception e){
            e.printStackTrace();
            return "操作失败";
        }

    }
}

完成以上操作,直接启动项目即可

windows10 安装mosquitto服务

本文地址:https://blog.csdn.net/weixin_40887953/article/details/107682560

相关标签: java