欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

物联网MQTT实战

程序员文章站 2022-06-13 11:30:19
...

0、MQTT服务器选型

比较流行的开源 MQTT 服务器有几个:

1、Eclipse Mosquitto
使用 C 语言实现的 MQTT 服务器。Eclipse 组织还还包含了大量的 MQTT 客户端项目:https://www.eclipse.org/paho/#
2、EMQ X
使用 Erlang 语言开发的 MQTT 服务器,内置强大的规则引擎,支持许多其他 IoT 协议比如 MQTT-SN、 CoAP、LwM2M 等。
3、Mosca
使用 Node.JS 开发的 MQTT 服务器,简单易用。
4、VerneMQ
同样使用 Erlang 开发的 MQTT 服务器.

本文选取Mosquitto为实战对象。

1、window下安装Mosquitto

下载地址:https://mosquitto.org/download/
物联网MQTT实战
            
    
    博客分类: 技术总结WEb服务器 物联网MQTT实战 

安装步骤
物联网MQTT实战
            
    
    博客分类: 技术总结WEb服务器 物联网MQTT实战 
物联网MQTT实战
            
    
    博客分类: 技术总结WEb服务器 物联网MQTT实战 
物联网MQTT实战
            
    
    博客分类: 技术总结WEb服务器 物联网MQTT实战 
物联网MQTT实战
            
    
    博客分类: 技术总结WEb服务器 物联网MQTT实战 
物联网MQTT实战
            
    
    博客分类: 技术总结WEb服务器 物联网MQTT实战 

2、window下启动Mosquitto

在命令行窗口输入services.msc打开服务窗口,如图:
物联网MQTT实战
            
    
    博客分类: 技术总结WEb服务器 物联网MQTT实战 
找到MQTT的broker服务,如图:
物联网MQTT实战
            
    
    博客分类: 技术总结WEb服务器 物联网MQTT实战 
右击该服务,弹框可见启动和关闭服务的方式。

3、基于springboot的MQTT实战

POM文件配置如下:

<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.integration</groupId>
			<artifactId>spring-integration-mqtt</artifactId>
		</dependency>

在application.properties文件中配置内容如下:

spring.mqtt.username=roger
spring.mqtt.password=$6$clQ4Ocu312S0qWgl$Cv2wUxgEN73c6C6jlBkswqR4AkHsvDLWvtEXZZ8NpsBLgP1WAo/qA+WXcmEN/mjDNgdUwcxRAveqNMs2xUVQYA==
spring.mqtt.url= tcp://127.0.0.1:1883
spring.mqtt.client.id='mqttId1'
spring.mqtt.default.topic='topic'
spring.mqtt.topics=topic,hello,hit/us
spring.mqtt.qosValues=2,3

配置代码如下:

package com.tal.demo.config;


import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
 
import java.util.Arrays;
import java.util.List;


@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {
  @Value("${spring.mqtt.username}")
  private String username;

  @Value("${spring.mqtt.password}")
  private String password;

  @Value("${spring.mqtt.url}")
  private String hostUrl;

  @Value("${spring.mqtt.client.id}")
  private String clientId;

  @Value("${spring.mqtt.default.topic}")
  private String defaultTopic;

  @Value("#{'${spring.mqtt.topics}'.split(',')}")
  private List<String> topics;

  @Value("#{'${spring.mqtt.qosValues}'.split(',')}")
  private List<Integer> qosValues;

  @Bean
  public MqttConnectOptions getMqttConnectOptions() {
    MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
    // mqttConnectOptions.setUserName(username);
    // mqttConnectOptions.setPassword(password.toCharArray());
    mqttConnectOptions.setServerURIs(new String[] {hostUrl});
    mqttConnectOptions.setKeepAliveInterval(2);
    // 设置超时时间 单位为秒
    mqttConnectOptions.setConnectionTimeout(10);
    mqttConnectOptions.setMaxInflight(100000000);
    return mqttConnectOptions;
  }

  @Bean
  public MqttPahoClientFactory mqttClientFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(getMqttConnectOptions());
    return factory;
  }

  @Bean
  @ServiceActivator(inputChannel = "mqttOutboundChannel")
  public MessageHandler mqttOutbound() {
    MqttPahoMessageHandler messageHandler =
        new MqttPahoMessageHandler(clientId, mqttClientFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic(defaultTopic);
    return messageHandler;
  }

  @Bean
  public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
  }


  // 接收通道
  @Bean
  public MessageChannel mqttInputChannel() {
    return new DirectChannel();
  }


  // 配置client,监听的topic
  @Bean
  public MessageProducer inbound() {
    String[] strings = new String[topics.size()];
    //String[] strings1 = {"topic", "hello"};
    Integer[] ints = new Integer[qosValues.size()];
    topics.toArray(strings);
    qosValues.toArray(ints);
    System.out.println("strings==" + strings);
    int[] its = Arrays.stream(ints).mapToInt(Integer::valueOf).toArray();
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(
        clientId + "_inbound", mqttClientFactory(), strings);

    adapter.setCompletionTimeout(3000);
    adapter.setConverter(new DefaultPahoMessageConverter());
    adapter.setOutputChannel(mqttInputChannel());
    return adapter;
  }

  // 通过通道获取数据
  @Bean
  @ServiceActivator(inputChannel = "mqttInputChannel")
  public MessageHandler handler() {
    return new MessageHandler() {
      @Override
      public void handleMessage(Message<?> message) throws MessagingException {
        Object hello = message.getHeaders().get("mqtt_receivedTopic");
        String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
        Object payload = message.getPayload();
        System.out.println("msg==" + payload);
        System.out.println("topic is " + topic);
        // String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
        // String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
        // System.out.println(topic+"|"+message.getPayload().toString());
      }
    };
  }

}

服务层代码如下:

package com.tal.demo.service;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
  void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic);
}

接口层测试代码如下:

package com.tal.demo.controller;

import javax.annotation.Resource;

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.tal.demo.service.MqttGateway;

@RestController
@RequestMapping("/mqtt")
public class MQTTController {
  @Resource
  private MqttGateway mqttGateway;

  //http://localhost:8080/mqtt/sendMqtt
  @RequestMapping("/sendMqtt")
  public String sendMqtt(){
      String  sendData = "12356";
      System.out.println("消息订阅"+sendData);
      mqttGateway.sendToMqtt(sendData,"hello");
      return "OK";
  }

  @RequestMapping("/test")
  public String test(String  sendData){
      return "testOK";
  }

}

4、工程打包运行

通过mvn package 打包后,可以在target目录进行jar执行。
物联网MQTT实战
            
    
    博客分类: 技术总结WEb服务器 物联网MQTT实战 
浏览器运行请求:http://localhost:8080/mqtt/sendMqtt
物联网MQTT实战
            
    
    博客分类: 技术总结WEb服务器 物联网MQTT实战 

 

 

https://aoyouzi.iteye.com/admin/blogs/2515800

https://aoyouzi.iteye.com/admin/blogs/2515887

 

文章中涉及企业内部敏感信息,他人不得对文章内容进行复制和转载

本文