Springboot之整合Socket连接案例
socket连接与硬件通信
一、如何让socket随着springboot项目一起启动
springboot中commandlinerunner的作用:平常开发中有可能需要实现在项目启动后执行的功能,springboot提供的一种简单的实现方案就是添加一个model并实现commandlinerunner接口,实现功能的代码放在实现的run方法中
具体实现
import org.springframework.beans.factory.annotation.autowired; import org.springframework.boot.commandlinerunner; import org.springframework.stereotype.component; import java.net.serversocket; import java.net.socket; import java.util.concurrent.arrayblockingqueue; import java.util.concurrent.threadpoolexecutor; import java.util.concurrent.timeunit; /** * @author 易水●墨龙吟 * @description * @create 2019-04-14 23:40 */ @component public class testrunner implements commandlinerunner { @autowired private socketproperties properties; @override public void run(string... args) throws exception { serversocket server = null; socket socket = null; server = new serversocket(properties.getport()); system.out.println("设备服务器已经开启, 监听端口:" + properties.getport()); threadpoolexecutor pool = new threadpoolexecutor( properties.getpoolcore(), properties.getpoolmax(), properties.getpoolkeep(), timeunit.seconds, new arrayblockingqueue<runnable>(properties.getpoolqueueinit()), new threadpoolexecutor.discardoldestpolicy() ); while (true) { socket = server.accept(); pool.execute(new serverconfig(socket)); } } }
此处使用了自定义的线程池,提高对于socket的客户端处理能力。
二、自定义配置并使用
此处将socket的端口和线程池的一些配置放到 application.yml中使用,方便使用和修改
# socket配置 socket: # 监听端口 2323 port: 2323 # 线程池 - 保持线程数 20 pool-keep: 20 # 线程池 - 核心线程数 10 pool-core: 10 # 线程池 - 最大线程数 20 pool-max: 30 # 线程队列容量 10 pool-queue-init: 10
import lombok.getter; import lombok.setter; import lombok.tostring; import org.springframework.boot.context.properties.configurationproperties; import org.springframework.context.annotation.configuration; import org.springframework.context.annotation.propertysource; import org.springframework.stereotype.component; /** * @author 易水●墨龙吟 * @description * @create 2019-04-18 22:35 */ @setter @getter @tostring @component @configuration @propertysource("classpath:application.yml") @configurationproperties(prefix = "socket") public class socketproperties { private integer port; private integer poolkeep; private integer poolcore; private integer poolmax; private integer poolqueueinit; }
三、socket对于客户端发来的信息的处理和重发机制
当客户端端连接之后发送信息,如果超时未发送,将会关闭,发送数据有异常将会返回给客户端一个error,让客户端在发送一次数据。
import com.farm.config.socket.resolve.messagechain; import com.farm.service.environmentservice; import com.farm.service.impl.environmentserviceimpl; import java.io.*; import java.net.socket; import java.net.socketexception; import java.net.sockettimeoutexception; import java.util.map; /** * @author 易水●墨龙吟 * @description * @create 2019-04-14 23:21 */ public class serverconfig extends thread { private socket socket; public serverconfig(socket socket) { this.socket = socket; } // 获取spring容器管理的类,可以获取到sevrice的类 private environmentservice service = springutil.getbean(environmentserviceimpl.class); private string handle(inputstream inputstream) throws ioexception, dataformexception { byte[] bytes = new byte[1024]; int len = inputstream.read(bytes); if (len != -1) { stringbuffer request = new stringbuffer(); request.append(new string(bytes, 0, len, "utf-8")); system.out.println("接受的数据: " + request); system.out.println("from client ... " + request + "当前线程" + thread.currentthread().getname()); map<string, string> map = messagechain.out(request.tostring()); system.out.println("处理的数据" + map); integer res = service.addenvironment(map); if (res == 1) { return "ok"; } else { throw new dataformexception("数据处理异常"); } } else { throw new dataformexception("数据处理异常"); } } @override public void run() { bufferedwriter writer = null; try { // 设置连接超时9秒 socket.setsotimeout(9000); system.out.println("客户 - " + socket.getremotesocketaddress() + " -> 机连接成功"); inputstream inputstream = socket.getinputstream(); writer = new bufferedwriter(new outputstreamwriter(socket.getoutputstream())); string result = null; try { result = handle(inputstream); writer.write(result); writer.newline(); writer.flush(); } catch (ioexception | dataformexception | illegalargumentexception e) { writer.write("error"); writer.newline(); writer.flush(); system.out.println("发生异常"); try { system.out.println("再次接受!"); result = handle(inputstream); writer.write(result); writer.newline(); writer.flush(); } catch (dataformexception | sockettimeoutexception ex) { system.out.println("再次接受, 发生异常,连接关闭"); } } } catch (socketexception socketexception) { socketexception.printstacktrace(); try { writer.close(); } catch (ioexception ioexception) { ioexception.printstacktrace(); } } catch (ioexception e) { e.printstacktrace(); } finally { try { writer.close(); } catch (ioexception e) { e.printstacktrace(); } } } }
在此处有一个坑,如果客户端是用c/c++编写的,必须使用如下方法:
byte[] bytes = new byte[1024]; int len = inputstream.read(bytes);
如果使用readline或者 datainputstream datainputstream =new datainputstream(socket.getinputstream())这样会出现使用tcp连接助手,客户端发送数据收不到。
四、如何在普通类中使用spring注入类
这里需要使用一个工具类。
import org.springframework.beans.beansexception; import org.springframework.context.applicationcontext; import org.springframework.context.applicationcontextaware; import org.springframework.stereotype.component; /** * @author 易水●墨龙吟 * @description * @create 2019-04-15 0:01 */ @component public class springutil implements applicationcontextaware { private static applicationcontext applicationcontext; @override public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception { if (springutil.applicationcontext == null) { springutil.applicationcontext = applicationcontext; } } /** * 获取applicationcontext * @return */ public static applicationcontext getapplicationcontext() { return applicationcontext; } /** * 通过name获取 bean. * @param name * @return */ public static object getbean(string name){ return getapplicationcontext().getbean(name); } /** * 通过class获取bean. * @param clazz * @param <t> * @return */ public static <t> t getbean(class<t> clazz){ return getapplicationcontext().getbean(clazz); } /** * 通过name,以及clazz返回指定的bean * @param name * @param clazz * @param <t> * @return */ public static <t> t getbean(string name,class<t> clazz){ return getapplicationcontext().getbean(name, clazz); } }
补充:springboot下websocket前台后端数据长连接
首先导入依赖
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-websocket</artifactid> </dependency> <dependency> <groupid>org.springframework.security</groupid> <artifactid>spring-security-messaging</artifactid> </dependency>
spring-security-messaging 是后面继承 abstractsecuritywebsocketmessagebrokerconfigurer需要用到的依赖
websocketconfig
@configuration @enablewebsocketmessagebroker //此注解表示使用stomp协议来传输基于消息代理的消息,此时可以在@controller类中使用@messagemapping public class websocketconfig implements websocketmessagebrokerconfigurer { @override public void registerstompendpoints(stompendpointregistry registry) { /** * 注册 stomp的端点 * addendpoint:添加stomp协议的端点。这个http url是供websocket或sockjs客户端访问的地址 * withsockjs:指定端点使用sockjs协议 */ registry.addendpoint("/websocket/tracker") //物流消息通道, .setallowedorigins("*") //允许跨域,里面路径可以设定 .withsockjs() //指定协议 .setinterceptors(httpsessionhandshakeinterceptor()) ; //设置拦截器() } @override public void configuremessagebroker(messagebrokerregistry registry) { /** * 配置消息代理 * 启动简单broker,消息的发送的地址符合配置的前缀来的消息才发送到这个broker */ registry.enablesimplebroker("/topic","/user"); } //拦截器 @bean public handshakeinterceptor httpsessionhandshakeinterceptor() { return new handshakeinterceptor() { @override public boolean beforehandshake(serverhttprequest request, serverhttpresponse response, websockethandler wshandler, map<string, object> attributes) throws exception { //可以在这里先判断登录是否合法 return true; } @override public void afterhandshake(serverhttprequest request, serverhttpresponse response, websockethandler wshandler, exception exception) { //握手成功后, } }; } }
websocketsecurityconfiguration
@configuration public class websocketsecurityconfiguration extends abstractsecuritywebsocketmessagebrokerconfigurer { @override protected void configureinbound(messagesecuritymetadatasourceregistry messages) { messages .nulldestmatcher().authenticated() .simpdestmatchers("/topic/**").authenticated() .simpdestmatchers("/user/**").authenticated() .simptypematchers(simpmessagetype.message, simpmessagetype.subscribe).denyall() // catch all .anymessage().denyall(); } /** * disables csrf for websockets. */ @override protected boolean sameorigindisabled() { return true; } }
websocketresource
package com.gleam.shopmall.web.rest; import org.slf4j.logger; import org.slf4j.loggerfactory; import org.springframework.beans.factory.annotation.autowired; import org.springframework.context.applicationlistener; import org.springframework.messaging.handler.annotation.messagemapping; import org.springframework.messaging.handler.annotation.sendto; import org.springframework.messaging.simp.simpmessageheaderaccessor; import org.springframework.messaging.simp.simpmessagemappinginfo; import org.springframework.messaging.simp.simpmessagesendingoperations; import org.springframework.stereotype.controller; import org.springframework.web.socket.messaging.sessiondisconnectevent; @controller public class websocketresource { private static final logger log = loggerfactory.getlogger(websocketresource.class); @autowired simpmessagesendingoperations messagingtemplate; //此方法适用于网页聊天室,从前端接收数据,返回订阅者(前端) @messagemapping("/welcome") //指定要接收消息的地址,类似@requestmapping @sendto("/topic/getresponse") //默认消息将被发送到与传入消息相同的目的地,但是目的地前面附加前缀(默认情况下为“/topic”} public string say(string message) throws exception { return message; } //发送指定用户(直接从后端发送数据到前端) public void sendtouser(string login,string channel, string info) { log.debug("[touser]websocket发送消息, username={}, info={}", login, info); this.messagingtemplate.convertandsendtouser(login, channel, info); log.debug("[touser]websocket发送消息:完成"); } //发送所有订阅的(直接从后端发送数据到前端) public void send(string channel, string info) { log.debug("[toall]websocket发送消息, info={}", info); // this.messagingtemplate.convertandsend(channel, info); this.messagingtemplate.convertandsend("/topic/getresponse", "接收到了吗?"); log.debug("[toall]websocket发送消息:完成"); } }
前端html
<!doctype html> <html xmlns:th="http://www.thymeleaf.org"> <head> <meta charset="utf-8" /> <script src="http://cdn.jsdelivr.net/sockjs/0.3.4/sockjs.min.js"></script> <script src="https://cdn.bootcss.com/stomp.js/2.3.3/stomp.js"></script> <script src="http://code.jquery.com/jquery-1.7.2.min.js"></script> <script src="http://pv.sohu.com/cityjson?ie=utf-8"></script> <title>spring boot+websocket+广播式</title> <script type="text/javascript"> var stompclient = null; function setconnected(connected) { document.getelementbyid('connect').disabled = connected; document.getelementbyid('disconnect').disabled = !connected; document.getelementbyid('conversationdiv').style.visibility = connected ? 'visible' : 'hidden'; $('#response').html(); } function connect() { // websocket的连接地址,此值等于websocketconfig中registry.addendpoint("/websocket/tracker").withsockjs()配置的地址, //这里如果是微服务或者远端,需要全路径 var socket = new sockjs('/websocket/tracker'); //1 stompclient = stomp.over(socket);//2 stompclient.connect({}, function(frame) {//3 setconnected(true); console.log('开始进行连接connected: ' + frame); // 客户端订阅消息的目的地址:此值等于websocketresource中@sendto("/topic/getresponse")注解的里配置的值 stompclient.subscribe('/topic/getresponse', function(respnose){ //4 showresponse(respnose.body); }); }); } function disconnect() { if (stompclient != null) { stompclient.disconnect(); } setconnected(false); console.log("disconnected"); } function sendname() { var name = $('#name').val(); stompclient.send("/welcome", {}, returncitysn['cip'] +":"+name);// json.stringify(name) } function showresponse(message) { var response = $("#response"); response.html(message+"<br>" + response.html()); } </script> </head> <body onload="disconnect()"> <noscript><h2 style="color: red">貌似你的浏览器不支持websocket</h2></noscript> <div> <div> <button id="connect" onclick="connect();" style="color: red">连接</button> <button id="disconnect" disabled="disabled" onclick="disconnect();">断开连接</button> </div> <div id="conversationdiv"> <label>输入内容</label><input type="text" id="name" /> <button id="sendname" onclick="sendname();">发送</button> <p id="response"></p> </div> </div> </body> </html>```
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。
下一篇: 微信如何设置在线状态?
推荐阅读
-
详解SpringBoot开发案例之整合定时任务(Scheduled)
-
SpringBoot 2.0 开发案例之百倍级减肥瘦身之旅
-
socket C/C++编程(5)服务器端允许用户连接之listen()函数
-
SpringBoot2 整合Nacos组件,环境搭建和入门案例详解
-
SpringBoot 2.x 开发案例之 Shiro 整合 Redis
-
常用类之TCP连接类-socket编程
-
玩转 SpringBoot 2 之整合 JWT 上篇
-
玩转 SpringBoot 2 之整合 JWT 下篇
-
springboot activiti 整合项目框架源码 druid 数据库连接池 shiro 安全框架
-
前端笔记之微信小程序(四)WebSocket&Socket.io&摇一摇案例&地图|地理位置