欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

maven项目通过java加载mqtt存储到mysql数据库,实现发布和接收

程序员文章站 2021-12-12 11:39:30
物联网mqtt协议是可以发布和订阅通过java就可以实现 话不多说,mqtt,加载pom.xml文件格式 1

物联网mqtt协议是可以发布和订阅通过java就可以实现

话不多说,mqtt,加载pom.xml文件格式

 1 <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 2   <modelversion>4.0.0</modelversion>
 3   <groupid>com.mqtt</groupid>
 4   <artifactid>mqttmosquitto</artifactid>
 5   <version>0.0.1-snapshot</version>
 6   <dependencies>
 7     <!-- mqtt依赖 -->
 8         <dependency>
 9             <groupid>org.eclipse.paho</groupid>
10             <artifactid>org.eclipse.paho.client.mqttv3</artifactid>
11             <version>1.0.2</version>
12         </dependency>
13     <!-- jdbc -->
14         <dependency>
15             <groupid>mysql</groupid>
16             <artifactid>mysql-connector-java</artifactid>
17             <version>5.1.39</version>
18         </dependency>
19         
20         <!-- 阿里巴巴json -->
21          <dependency>
22             <groupid>com.alibaba</groupid>
23             <artifactid>fastjson</artifactid>
24             <version>1.2.28</version>
25         </dependency> 
26         
27         
28   </dependencies>
29   
30   <build>
31         <plugins>
32             <plugin>
33                 <artifactid>maven-compiler-plugin</artifactid>
34                 <version>2.3.2</version>
35                 <configuration>
36                     <source>1.8</source>
37                     <target>1.8</target>
38                 </configuration>
39             </plugin>
40             <plugin>
41                    <artifactid> maven-assembly-plugin </artifactid>
42                    <configuration>
43                         <descriptorrefs>
44                         <descriptorref>mqtt</descriptorref>
45                         </descriptorrefs>
46                         <archive>
47                              <manifest>
48                                   <mainclass>com.mqtt.servermqtt</mainclass>
49                              </manifest>
50                         </archive>
51                    </configuration>
52                    <executions>
53                         <execution>
54                              <id>make-assembly</id>
55                              <phase>package</phase>
56                              <goals>
57                                   <goal>single</goal>
58                              </goals>
59                         </execution>
60                    </executions>
61               </plugin>
62         </plugins>
63     </build>
64     
65 </project>

 

这块是发布信息代码

 1 package com.mqtt;
 2 
 3 import org.eclipse.paho.client.mqttv3.imqttdeliverytoken;
 4 import org.eclipse.paho.client.mqttv3.mqttcallback;
 5 import org.eclipse.paho.client.mqttv3.mqttclient;
 6 import org.eclipse.paho.client.mqttv3.mqttconnectoptions;
 7 import org.eclipse.paho.client.mqttv3.mqttdeliverytoken;
 8 import org.eclipse.paho.client.mqttv3.mqttexception;
 9 import org.eclipse.paho.client.mqttv3.mqttmessage;
10 import org.eclipse.paho.client.mqttv3.mqttpersistenceexception;
11 import org.eclipse.paho.client.mqttv3.mqtttopic;
12 import org.eclipse.paho.client.mqttv3.persist.memorypersistence;
13 
14 public class servermqtt {
15 
16     public static void main(string[] args) throws mqttexception {
17         servermqtt server = new servermqtt();
18         
19         server.message = new mqttmessage();
20         server.message.setqos(2);
21         server.message.setretained(true);
22         //编辑消息
23         server.message.setpayload("你的肉".getbytes());
24         server.publish(server.topic , server.message);
25         system.out.println(server.message.isretained() + "------ratained状态");
26     }
27         
28         //mqtt安装的服务器地址和端口号
29         public static final string host = "tcp://localhost:1883";
30         //定义一个主题
31         public static final string topic = "test";
32         //定义mqtt的id,可以在mqtt服务配置中指定
33         private static final string clientid = "client-1";
34         
35         private mqttclient client;
36         private mqtttopic topic;
       //用户和密码 37 private string username = "zhny"; 38 private string password = "zhny2020"; 39 private mqttmessage message; 40 41 /** 42 * g构造函数 43 */ 44 public servermqtt() throws mqttexception { 45 // memorypersistence设置clientid的保存形式,默认为以内存保存 46 client = new mqttclient(host, clientid, new memorypersistence()); 47 connect(); 48 } 49 50 /** 51 * l连接服务器 52 */ 53 private void connect() { 54 mqttconnectoptions options = new mqttconnectoptions(); 55 options.setcleansession(false); 56 options.setusername(username); 57 options.setpassword(password.tochararray()); 58 // 设置超时时间 59 options.setconnectiontimeout(10); 60 // 设置会话心跳时间 61 options.setkeepaliveinterval(20); 62 try { 63 client.setcallback(new mqttcallback() { 64 public void connectionlost(throwable cause) { 65 // 连接丢失后,一般在这里面进行重连 66 system.out.println("连接断开……(可以做重连)"); 67 } 68 69 public void deliverycomplete(imqttdeliverytoken token) { 70 system.out.println("deliverycomplete---------" + token.iscomplete()); 71 } 72 73 public void messagearrived(string topic, mqttmessage message) throws exception { 74 // subscribe后得到的消息会执行到这里面 75 system.out.println("接收消息主题:" + topic + " 接收消息qos:" + message.getqos() + "接收消息内容:" + new string(message.getpayload())); 76 } 77 }); 78 client.connect(options); 79 80 topic = client.gettopic(topic); 81 } catch (exception e) { 82 e.printstacktrace(); 83 } 84 } 85 86 /** 87 * t推送消息 88 */ 89 public void publish(mqtttopic topic , mqttmessage message) throws mqttpersistenceexception, mqttexception { 90 mqttdeliverytoken token = topic.publish(message); 91 token.waitforcompletion(); 92 system.out.println("测试成功为true失败为false " + token.iscomplete()); 93 } 94 95 96 97 }

订阅接收代码

 1 package com.mqtt;
 2  
 3 import org.eclipse.paho.client.mqttv3.mqttclient;
 4 import org.eclipse.paho.client.mqttv3.mqttconnectoptions;
 5 import org.eclipse.paho.client.mqttv3.mqttexception;
 6 import org.eclipse.paho.client.mqttv3.mqtttopic;
 7 import org.eclipse.paho.client.mqttv3.persist.memorypersistence;
 8  
 9 /**
10  * mqtt工具类操作
11  *
12  */
13 public class mqttconnect {
14  
15         
16         public static void main(string[] args) throws mqttexception {
17             mqttconnect client = new mqttconnect();
18             client.start();
19         }
20         
21         
22         
23         
24         
25         //mqtt安装的服务器地址和端口号(本机的ip)
26         public static final string host = "tcp://localhost:1883";
27         //定义一个主题
28         public static final string topic = "test";
29         //定义mqtt的id,可以在mqtt服务配置中指定
30         private static final string clientid = "client-2";
31         private mqttclient client;
32         private mqttconnectoptions options;
       //用户和密码 33 private string username = "zhny"; 34 private string password = "zhny2020"; 35 36 // private scheduledexecutorservice scheduler; 37 38 public void start() { 39 try { 40 // host为主机名,clientid即连接mqtt的客户端id,一般以唯一标识符表示,memorypersistence设置clientid的保存形式,默认为以内存保存 41 client = new mqttclient(host, clientid, new memorypersistence()); 42 // mqtt的连接设置 43 options = new mqttconnectoptions(); 44 // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 45 options.setcleansession(true); 46 // 设置连接的用户名 47 options.setusername(username); 48 // 设置连接的密码 49 options.setpassword(password.tochararray()); 50 // 设置超时时间 单位为秒 51 options.setconnectiontimeout(10); 52 // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 53 options.setkeepaliveinterval(20); 54 55 // 设置回调,client.setcallback就可以调用pushcallback类中的messagearrived()方法 56 client.setcallback(new pushcallback()); 57 mqtttopic topic = client.gettopic(topic); 58 59 int qos = 2; 60 61 //setwill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 62 //options.setwill(topic, "this is yizhu...".getbytes(), qos, true); 63 client.connect(options); 64 //订阅消息 65 int[] qos = {qos}; 66 string[] topic1 = {topic}; 67 client.subscribe(topic1, qos); 68 69 } catch (exception e) { 70 e.printstacktrace(); 71 } 72 } 73 74 } 75 76 77 78 79 80 81 82
 1 package com.mqtt;
 2 
 3 
 4 
 5 
 6 
 7 import java.text.simpledateformat;
 8 
 9 import org.eclipse.paho.client.mqttv3.imqttdeliverytoken;
10 import org.eclipse.paho.client.mqttv3.mqttcallback;
11 import org.eclipse.paho.client.mqttv3.mqttmessage;
12 
13 import com.alibaba.fastjson.jsonobject;
14 import com.jdbc.mysqldemo;
15 
16 
17 
18 /**
19  * 必须实现mqttcallback的接口并实现对应的相关接口方法callback 类将实现 mqttcallback。 
20  * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 
21  * 在回调中,将它用来标识已经启动了该回调的哪个实例。 
22  * 必须在回调类中实现三个方法: 
23  *  public void messagearrived(mqtttopic topic, mqttmessage message)接收已经预订的发布。 
24  *  public void connectionlost(throwable cause)在断开连接时调用。 
25  *  public void deliverycomplete(mqttdeliverytoken token)) 
26  *  接收到已经发布的 qos 1 或 qos 2 消息的传递令牌时调用。 
27  *  由 mqttclient.connect 激活此回调。 
28  */
29 public class pushcallback implements mqttcallback{
30     
31     
32    public void connectionlost(throwable cause) {
33        // 连接丢失后,一般在这里面进行重连
34        system.out.println("连接断开……(可以做重连)");
35    }
36    
37    
38 
39    public void deliverycomplete(imqttdeliverytoken token) {
40        system.out.println("deliverycomplete---------" + token.iscomplete());
41    }
42    
43    
44         public void messagearrived(string topic, mqttmessage message) throws exception {
45             // subscribe后得到的消息会执行到这里面
46             system.out.println("接收消息主题:" + topic + "  接收消息qos:" + message.getqos() + "接收消息内容:" + new string(message.getpayload()));
47             
          //特别注意一下,messagearrived这个里面不允许调用代码和写方法,如果调用就会断开连接这个是个坑,然后用我这个try{}catch{}就没有问题了,就不会再去掉线断开连接
          try { 48 jsonobject jsonobject = jsonobject.parseobject(message.tostring()); 49 string equipment=jsonobject.getstring("equipment"); 50 double temperature=double.parsedouble(jsonobject.getstring("temperature")); 51 double humidity=double.parsedouble(jsonobject.getstring("humidity")); 52 string datetime=jsonobject.getstring("datetime"); 53 simpledateformat simpledateformat = new simpledateformat("yyyy-mm-dd hh-mm-ss"); 54 system.out.println(equipment); 55 56 mysqldemo mysqldemo=new mysqldemo(); 57 mysqldemo.add(equipment, temperature, humidity, datetime); 58 } catch (exception e) { 59 e.printstacktrace(); 60 } 61 62 } 63 64 private void mqttpublisherfetch() { 65 // 66 // jsonobject jsonobject = jsonobject.parseobject(messtr); 67 // 68 // jsonobject.put("*", "1111"); 69 // 70 // jsonobject.put("李12", "222"); 71 // 72 // system.out.println(jsonobject.tostring()); 73 74 // system.out.println(equipment+"-------"+temperature+"---------------"+humidity+"-----------"+datetime+"-----------猪蹄---"); 75 } 76 77 78 } 79 80
特别注意一下,messagearrived这个里面不允许调用代码和写方法,如果调用就会断开连接这个是个坑,然后用我这个try{}catch{}就没有问题了,就不会再去掉线断开连接

简单的一个小maven项目集成mqtt还有个jdbc简单的也给大家粘上去

  1 package com.jdbc;
  2  
  3 import java.sql.*;
  4 
  5 import com.alibaba.fastjson.jsonobject;
  6  
  7 public class mysqldemo {
  8  
  9     // mysql 8.0 以下版本 - jdbc 驱动名及数据库 url
 10     static final string jdbc_driver = "com.mysql.jdbc.driver";  
 11     static final string db_url = "jdbc:mysql://localhost:3306/zhny?useunicode=true&characterencoding=utf-8&allowmultiqueries=true";
 12  
 13     // mysql 8.0 以上版本 - jdbc 驱动名及数据库 url
 14     //static final string jdbc_driver = "com.mysql.cj.jdbc.driver";  
 15     //static final string db_url = "jdbc:mysql://localhost:3306/runoob?usessl=false&servertimezone=utc";
 16  
 17  
 18     // 数据库的用户名与密码,需要根据自己的设置
 19     static final string user = "root";
 20     static final string pass = "root";
 21  
 22     
 23     public void select() {
 24           connection conn = null;
 25           statement stmt = null;
 26           try{
 27               // 注册 jdbc 驱动
 28               class.forname(jdbc_driver);
 29           
 30               // 打开链接
 31               system.out.println("连接数据库...");
 32               conn = drivermanager.getconnection(db_url,user,pass);
 33               
 34               
 35               // 执行查询
 36               system.out.println(" 实例化statement对象...");
 37               stmt = conn.createstatement();
 38               string sql;
 39               sql = "select * from crop";
 40               resultset rs = stmt.executequery(sql);
 41               // 展开结果集数据库
 42               while(rs.next()){
 43                   // 通过字段检索
 44                   int id  = rs.getint("id");
 45                   string name = rs.getstring("crop_name");
 46       
 47                   // 输出数据
 48                   system.out.print("id: " + id);
 49                   system.out.print(", 站点名称: " + name);
 50                   system.out.print("\n");
 51               }
 52               // 完成后关闭
 53               rs.close();
 54               stmt.close();
 55               conn.close();
 56           }catch(sqlexception se){
 57               // 处理 jdbc 错误
 58               se.printstacktrace();
 59           }catch(exception e){
 60               // 处理 class.forname 错误
 61               e.printstacktrace();
 62           }finally{
 63               // 关闭资源
 64               try{
 65                   if(stmt!=null) stmt.close();
 66               }catch(sqlexception se2){
 67               }// 什么都不做
 68               try{
 69                   if(conn!=null) conn.close();
 70               }catch(sqlexception se){
 71                   se.printstacktrace();
 72               }
 73           }
 74           system.out.println("goodbye!");
 75     }
 76     
 77     public void add(string equipment,double temperature,double humidity,string datetime) {
 78         
 79         
 80         connection conn = null;
 81         statement stmt = null;
 82         try{
 83             // 注册 jdbc 驱动
 84             class.forname(jdbc_driver);
 85         
 86             // 打开链接
 87             system.out.println("连接数据库...");
 88             conn = drivermanager.getconnection(db_url,user,pass);
 89             
 90             
 91             // 执行查询
 92             system.out.println(" 实例化statement对象...");
 93             stmt = conn.createstatement();
 94             string sql = "insert into sensor (model,device_id,temperature,humidity,date_time,section_id)\r\n" + 
 95                     "values('"+equipment+"','"+temperature+"','"+temperature+"','"+humidity+"','"+datetime+"',12)";//传感器
 96             int rs = stmt.executeupdate(sql);
 97             if (rs>0) {
 98                  system.out.println("true"+rs);
 99             }else {
100                  system.out.println("flase"+rs);
101             }
102            
103             
104             
105             stmt.close();
106             conn.close();
107         }catch(sqlexception se){
108             // 处理 jdbc 错误
109             se.printstacktrace();
110         }catch(exception e){
111             // 处理 class.forname 错误
112             e.printstacktrace();
113         }finally{
114             // 关闭资源
115             try{
116                 if(stmt!=null) stmt.close();
117             }catch(sqlexception se2){
118             }// 什么都不做
119             try{
120                 if(conn!=null) conn.close();
121             }catch(sqlexception se){
122                 se.printstacktrace();
123             }
124         }
125         system.out.println("goodbye!");
126     }
127 }

 最后接收结果

maven项目通过java加载mqtt存储到mysql数据库,实现发布和接收