maven项目通过java加载mqtt存储到mysql数据库,实现发布和接收
程序员文章站
2021-12-12 11:39:30
物联网mqtt协议是可以发布和订阅通过java就可以实现 话不多说,mqtt,加载pom.xml文件格式 1
物联网mqtt协议是可以发布和订阅通过java就可以实现
话不多说,mqtt,加载pom.xml文件格式
1 <project xmlns="http://maven.apache.org/pom/4.0.0" xmlns:xsi="http://www.w3.org/2001/xmlschema-instance" xsi:schemalocation="http://maven.apache.org/pom/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 2 <modelversion>4.0.0</modelversion> 3 <groupid>com.mqtt</groupid> 4 <artifactid>mqttmosquitto</artifactid> 5 <version>0.0.1-snapshot</version> 6 <dependencies> 7 <!-- mqtt依赖 --> 8 <dependency> 9 <groupid>org.eclipse.paho</groupid> 10 <artifactid>org.eclipse.paho.client.mqttv3</artifactid> 11 <version>1.0.2</version> 12 </dependency> 13 <!-- jdbc --> 14 <dependency> 15 <groupid>mysql</groupid> 16 <artifactid>mysql-connector-java</artifactid> 17 <version>5.1.39</version> 18 </dependency> 19 20 <!-- 阿里巴巴json --> 21 <dependency> 22 <groupid>com.alibaba</groupid> 23 <artifactid>fastjson</artifactid> 24 <version>1.2.28</version> 25 </dependency> 26 27 28 </dependencies> 29 30 <build> 31 <plugins> 32 <plugin> 33 <artifactid>maven-compiler-plugin</artifactid> 34 <version>2.3.2</version> 35 <configuration> 36 <source>1.8</source> 37 <target>1.8</target> 38 </configuration> 39 </plugin> 40 <plugin> 41 <artifactid> maven-assembly-plugin </artifactid> 42 <configuration> 43 <descriptorrefs> 44 <descriptorref>mqtt</descriptorref> 45 </descriptorrefs> 46 <archive> 47 <manifest> 48 <mainclass>com.mqtt.servermqtt</mainclass> 49 </manifest> 50 </archive> 51 </configuration> 52 <executions> 53 <execution> 54 <id>make-assembly</id> 55 <phase>package</phase> 56 <goals> 57 <goal>single</goal> 58 </goals> 59 </execution> 60 </executions> 61 </plugin> 62 </plugins> 63 </build> 64 65 </project>
这块是发布信息代码
1 package com.mqtt; 2 3 import org.eclipse.paho.client.mqttv3.imqttdeliverytoken; 4 import org.eclipse.paho.client.mqttv3.mqttcallback; 5 import org.eclipse.paho.client.mqttv3.mqttclient; 6 import org.eclipse.paho.client.mqttv3.mqttconnectoptions; 7 import org.eclipse.paho.client.mqttv3.mqttdeliverytoken; 8 import org.eclipse.paho.client.mqttv3.mqttexception; 9 import org.eclipse.paho.client.mqttv3.mqttmessage; 10 import org.eclipse.paho.client.mqttv3.mqttpersistenceexception; 11 import org.eclipse.paho.client.mqttv3.mqtttopic; 12 import org.eclipse.paho.client.mqttv3.persist.memorypersistence; 13 14 public class servermqtt { 15 16 public static void main(string[] args) throws mqttexception { 17 servermqtt server = new servermqtt(); 18 19 server.message = new mqttmessage(); 20 server.message.setqos(2); 21 server.message.setretained(true); 22 //编辑消息 23 server.message.setpayload("你的肉".getbytes()); 24 server.publish(server.topic , server.message); 25 system.out.println(server.message.isretained() + "------ratained状态"); 26 } 27 28 //mqtt安装的服务器地址和端口号 29 public static final string host = "tcp://localhost:1883"; 30 //定义一个主题 31 public static final string topic = "test"; 32 //定义mqtt的id,可以在mqtt服务配置中指定 33 private static final string clientid = "client-1"; 34 35 private mqttclient client; 36 private mqtttopic topic;
//用户和密码 37 private string username = "zhny"; 38 private string password = "zhny2020"; 39 private mqttmessage message; 40 41 /** 42 * g构造函数 43 */ 44 public servermqtt() throws mqttexception { 45 // memorypersistence设置clientid的保存形式,默认为以内存保存 46 client = new mqttclient(host, clientid, new memorypersistence()); 47 connect(); 48 } 49 50 /** 51 * l连接服务器 52 */ 53 private void connect() { 54 mqttconnectoptions options = new mqttconnectoptions(); 55 options.setcleansession(false); 56 options.setusername(username); 57 options.setpassword(password.tochararray()); 58 // 设置超时时间 59 options.setconnectiontimeout(10); 60 // 设置会话心跳时间 61 options.setkeepaliveinterval(20); 62 try { 63 client.setcallback(new mqttcallback() { 64 public void connectionlost(throwable cause) { 65 // 连接丢失后,一般在这里面进行重连 66 system.out.println("连接断开……(可以做重连)"); 67 } 68 69 public void deliverycomplete(imqttdeliverytoken token) { 70 system.out.println("deliverycomplete---------" + token.iscomplete()); 71 } 72 73 public void messagearrived(string topic, mqttmessage message) throws exception { 74 // subscribe后得到的消息会执行到这里面 75 system.out.println("接收消息主题:" + topic + " 接收消息qos:" + message.getqos() + "接收消息内容:" + new string(message.getpayload())); 76 } 77 }); 78 client.connect(options); 79 80 topic = client.gettopic(topic); 81 } catch (exception e) { 82 e.printstacktrace(); 83 } 84 } 85 86 /** 87 * t推送消息 88 */ 89 public void publish(mqtttopic topic , mqttmessage message) throws mqttpersistenceexception, mqttexception { 90 mqttdeliverytoken token = topic.publish(message); 91 token.waitforcompletion(); 92 system.out.println("测试成功为true失败为false " + token.iscomplete()); 93 } 94 95 96 97 }
订阅接收代码
1 package com.mqtt; 2 3 import org.eclipse.paho.client.mqttv3.mqttclient; 4 import org.eclipse.paho.client.mqttv3.mqttconnectoptions; 5 import org.eclipse.paho.client.mqttv3.mqttexception; 6 import org.eclipse.paho.client.mqttv3.mqtttopic; 7 import org.eclipse.paho.client.mqttv3.persist.memorypersistence; 8 9 /** 10 * mqtt工具类操作 11 * 12 */ 13 public class mqttconnect { 14 15 16 public static void main(string[] args) throws mqttexception { 17 mqttconnect client = new mqttconnect(); 18 client.start(); 19 } 20 21 22 23 24 25 //mqtt安装的服务器地址和端口号(本机的ip) 26 public static final string host = "tcp://localhost:1883"; 27 //定义一个主题 28 public static final string topic = "test"; 29 //定义mqtt的id,可以在mqtt服务配置中指定 30 private static final string clientid = "client-2"; 31 private mqttclient client; 32 private mqttconnectoptions options;
//用户和密码 33 private string username = "zhny"; 34 private string password = "zhny2020"; 35 36 // private scheduledexecutorservice scheduler; 37 38 public void start() { 39 try { 40 // host为主机名,clientid即连接mqtt的客户端id,一般以唯一标识符表示,memorypersistence设置clientid的保存形式,默认为以内存保存 41 client = new mqttclient(host, clientid, new memorypersistence()); 42 // mqtt的连接设置 43 options = new mqttconnectoptions(); 44 // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 45 options.setcleansession(true); 46 // 设置连接的用户名 47 options.setusername(username); 48 // 设置连接的密码 49 options.setpassword(password.tochararray()); 50 // 设置超时时间 单位为秒 51 options.setconnectiontimeout(10); 52 // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 53 options.setkeepaliveinterval(20); 54 55 // 设置回调,client.setcallback就可以调用pushcallback类中的messagearrived()方法 56 client.setcallback(new pushcallback()); 57 mqtttopic topic = client.gettopic(topic); 58 59 int qos = 2; 60 61 //setwill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 62 //options.setwill(topic, "this is yizhu...".getbytes(), qos, true); 63 client.connect(options); 64 //订阅消息 65 int[] qos = {qos}; 66 string[] topic1 = {topic}; 67 client.subscribe(topic1, qos); 68 69 } catch (exception e) { 70 e.printstacktrace(); 71 } 72 } 73 74 } 75 76 77 78 79 80 81 82
1 package com.mqtt; 2 3 4 5 6 7 import java.text.simpledateformat; 8 9 import org.eclipse.paho.client.mqttv3.imqttdeliverytoken; 10 import org.eclipse.paho.client.mqttv3.mqttcallback; 11 import org.eclipse.paho.client.mqttv3.mqttmessage; 12 13 import com.alibaba.fastjson.jsonobject; 14 import com.jdbc.mysqldemo; 15 16 17 18 /** 19 * 必须实现mqttcallback的接口并实现对应的相关接口方法callback 类将实现 mqttcallback。 20 * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 21 * 在回调中,将它用来标识已经启动了该回调的哪个实例。 22 * 必须在回调类中实现三个方法: 23 * public void messagearrived(mqtttopic topic, mqttmessage message)接收已经预订的发布。 24 * public void connectionlost(throwable cause)在断开连接时调用。 25 * public void deliverycomplete(mqttdeliverytoken token)) 26 * 接收到已经发布的 qos 1 或 qos 2 消息的传递令牌时调用。 27 * 由 mqttclient.connect 激活此回调。 28 */ 29 public class pushcallback implements mqttcallback{ 30 31 32 public void connectionlost(throwable cause) { 33 // 连接丢失后,一般在这里面进行重连 34 system.out.println("连接断开……(可以做重连)"); 35 } 36 37 38 39 public void deliverycomplete(imqttdeliverytoken token) { 40 system.out.println("deliverycomplete---------" + token.iscomplete()); 41 } 42 43 44 public void messagearrived(string topic, mqttmessage message) throws exception { 45 // subscribe后得到的消息会执行到这里面 46 system.out.println("接收消息主题:" + topic + " 接收消息qos:" + message.getqos() + "接收消息内容:" + new string(message.getpayload())); 47
//特别注意一下,messagearrived这个里面不允许调用代码和写方法,如果调用就会断开连接这个是个坑,然后用我这个try{}catch{}就没有问题了,就不会再去掉线断开连接
try { 48 jsonobject jsonobject = jsonobject.parseobject(message.tostring()); 49 string equipment=jsonobject.getstring("equipment"); 50 double temperature=double.parsedouble(jsonobject.getstring("temperature")); 51 double humidity=double.parsedouble(jsonobject.getstring("humidity")); 52 string datetime=jsonobject.getstring("datetime"); 53 simpledateformat simpledateformat = new simpledateformat("yyyy-mm-dd hh-mm-ss"); 54 system.out.println(equipment); 55 56 mysqldemo mysqldemo=new mysqldemo(); 57 mysqldemo.add(equipment, temperature, humidity, datetime); 58 } catch (exception e) { 59 e.printstacktrace(); 60 } 61 62 } 63 64 private void mqttpublisherfetch() { 65 // 66 // jsonobject jsonobject = jsonobject.parseobject(messtr); 67 // 68 // jsonobject.put("*", "1111"); 69 // 70 // jsonobject.put("李12", "222"); 71 // 72 // system.out.println(jsonobject.tostring()); 73 74 // system.out.println(equipment+"-------"+temperature+"---------------"+humidity+"-----------"+datetime+"-----------猪蹄---"); 75 } 76 77 78 } 79 80
特别注意一下,messagearrived这个里面不允许调用代码和写方法,如果调用就会断开连接这个是个坑,然后用我这个try{}catch{}就没有问题了,就不会再去掉线断开连接
简单的一个小maven项目集成mqtt还有个jdbc简单的也给大家粘上去
1 package com.jdbc; 2 3 import java.sql.*; 4 5 import com.alibaba.fastjson.jsonobject; 6 7 public class mysqldemo { 8 9 // mysql 8.0 以下版本 - jdbc 驱动名及数据库 url 10 static final string jdbc_driver = "com.mysql.jdbc.driver"; 11 static final string db_url = "jdbc:mysql://localhost:3306/zhny?useunicode=true&characterencoding=utf-8&allowmultiqueries=true"; 12 13 // mysql 8.0 以上版本 - jdbc 驱动名及数据库 url 14 //static final string jdbc_driver = "com.mysql.cj.jdbc.driver"; 15 //static final string db_url = "jdbc:mysql://localhost:3306/runoob?usessl=false&servertimezone=utc"; 16 17 18 // 数据库的用户名与密码,需要根据自己的设置 19 static final string user = "root"; 20 static final string pass = "root"; 21 22 23 public void select() { 24 connection conn = null; 25 statement stmt = null; 26 try{ 27 // 注册 jdbc 驱动 28 class.forname(jdbc_driver); 29 30 // 打开链接 31 system.out.println("连接数据库..."); 32 conn = drivermanager.getconnection(db_url,user,pass); 33 34 35 // 执行查询 36 system.out.println(" 实例化statement对象..."); 37 stmt = conn.createstatement(); 38 string sql; 39 sql = "select * from crop"; 40 resultset rs = stmt.executequery(sql); 41 // 展开结果集数据库 42 while(rs.next()){ 43 // 通过字段检索 44 int id = rs.getint("id"); 45 string name = rs.getstring("crop_name"); 46 47 // 输出数据 48 system.out.print("id: " + id); 49 system.out.print(", 站点名称: " + name); 50 system.out.print("\n"); 51 } 52 // 完成后关闭 53 rs.close(); 54 stmt.close(); 55 conn.close(); 56 }catch(sqlexception se){ 57 // 处理 jdbc 错误 58 se.printstacktrace(); 59 }catch(exception e){ 60 // 处理 class.forname 错误 61 e.printstacktrace(); 62 }finally{ 63 // 关闭资源 64 try{ 65 if(stmt!=null) stmt.close(); 66 }catch(sqlexception se2){ 67 }// 什么都不做 68 try{ 69 if(conn!=null) conn.close(); 70 }catch(sqlexception se){ 71 se.printstacktrace(); 72 } 73 } 74 system.out.println("goodbye!"); 75 } 76 77 public void add(string equipment,double temperature,double humidity,string datetime) { 78 79 80 connection conn = null; 81 statement stmt = null; 82 try{ 83 // 注册 jdbc 驱动 84 class.forname(jdbc_driver); 85 86 // 打开链接 87 system.out.println("连接数据库..."); 88 conn = drivermanager.getconnection(db_url,user,pass); 89 90 91 // 执行查询 92 system.out.println(" 实例化statement对象..."); 93 stmt = conn.createstatement(); 94 string sql = "insert into sensor (model,device_id,temperature,humidity,date_time,section_id)\r\n" + 95 "values('"+equipment+"','"+temperature+"','"+temperature+"','"+humidity+"','"+datetime+"',12)";//传感器 96 int rs = stmt.executeupdate(sql); 97 if (rs>0) { 98 system.out.println("true"+rs); 99 }else { 100 system.out.println("flase"+rs); 101 } 102 103 104 105 stmt.close(); 106 conn.close(); 107 }catch(sqlexception se){ 108 // 处理 jdbc 错误 109 se.printstacktrace(); 110 }catch(exception e){ 111 // 处理 class.forname 错误 112 e.printstacktrace(); 113 }finally{ 114 // 关闭资源 115 try{ 116 if(stmt!=null) stmt.close(); 117 }catch(sqlexception se2){ 118 }// 什么都不做 119 try{ 120 if(conn!=null) conn.close(); 121 }catch(sqlexception se){ 122 se.printstacktrace(); 123 } 124 } 125 system.out.println("goodbye!"); 126 } 127 }
最后接收结果