【Spring Boot】集成Netty Socket.IO通讯框架
程序员文章站
2022-04-05 09:37:36
服务端 Android客户端 注:仅供学习参考 ......
服务端
1 @configuration 2 public class nettysocketconfig { 3 4 private static final logger logger = loggerfactory.getlogger(nettysocketconfig.class); 5 6 @bean 7 public socketioserver socketioserver() { 8 //创建socket,并设置监听端口 9 com.corundumstudio.socketio.configuration config = new com.corundumstudio.socketio.configuration(); 10 // 设置主机名,默认是0.0.0.0 11 config.sethostname("192.168.8.107"); 12 // 设置监听端口 13 config.setport(9096); 14 // 协议升级超时时间(毫秒),默认10000。http握手升级为ws协议超时时间 15 config.setupgradetimeout(10000); 16 // ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔 17 config.setpinginterval(60000); 18 // ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件 19 config.setpingtimeout(180000); 20 // 这个版本0.9.0不能处理好namespace和query参数的问题。所以为了做认证必须使用全局默认命名空间 21 config.setauthorizationlistener(new authorizationlistener() { 22 @override 23 public boolean isauthorized(handshakedata data) { 24 // 可以使用如下代码获取用户密码信息 25 //string username = data.getsingleurlparam("username"); 26 //string password = data.getsingleurlparam("password"); 27 //logger.info("连接参数:username=" + username + ",password=" + password); 28 //managerinfo managerinfo = managerinfoservice.findbyusername(username); 29 // 30 //string salt = managerinfo.getsalt(); 31 //string encodedpassword = shirokit.md5(password, username + salt); 32 //// 如果认证不通过会返回一个socket.event_connect_error事件 33 //return encodedpassword.equals(managerinfo.getpassword()); 34 35 return true; 36 } 37 }); 38 39 final socketioserver server = new socketioserver(config); 40 system.out.println("注入socketioserver"); 41 return server; 42 } 43 44 @bean 45 public springannotationscanner springannotationscanner(socketioserver socketserver) { 46 return new springannotationscanner(socketserver); 47 } 48 }
1 @component 2 public class messageeventhandler { 3 4 private static final logger logger = loggerfactory.getlogger(messageeventhandler.class); 5 6 /** 7 * 服务器socket对象 8 */ 9 public static socketioserver socketioserver; 10 11 /** 12 * 客户端集合 13 */ 14 static arraylist<uuid> listclient = new arraylist<>(); 15 16 /** 17 * 超时时间 18 */ 19 static final int limitseconds = 60; 20 21 @autowired 22 public loginservice loginservice; 23 24 /** 25 * 初始化消息事件处理器 26 * 27 * @param server 服务器socket对象 28 */ 29 @autowired 30 public messageeventhandler(socketioserver server) { 31 logger.info("初始化socket消息事件处理器"); 32 this.socketioserver = server; 33 } 34 35 /** 36 * 客户端发起连接时触发 37 * 38 * @param client 客户端socket对象信息 39 */ 40 @onconnect 41 public void onconnect(socketioclient client) { 42 logger.info("客户端{}已连接", client.getsessionid()); 43 listclient.add(client.getsessionid()); 44 } 45 46 /** 47 * 客户端断开连接时触发 48 * 49 * @param client 客户端socket对象信息 50 */ 51 @ondisconnect 52 public void ondisconnect(socketioclient client) { 53 logger.info("客户端{}断开连接", client.getsessionid()); 54 if (listclient.contains(client.getsessionid())) { 55 listclient.remove(client.getsessionid()); 56 } 57 } 58 59 60 /** 61 * 客户端发送消息时触发 62 * 63 * @param client 客户端socket对象信息 64 * @param request ackrequest 回调对象 65 * @param data 消息信息实体 66 */ 67 @onevent(value = socketconstants.socketevent.message) 68 public void onevent(socketioclient client, ackrequest request, messageinfo data) { 69 system.out.println("发来消息:" + data.getmsgcontent()); 70 socketioserver.getclient(client.getsessionid()).sendevent("messageevent", "back data"); 71 } 72 73 /** 74 * 效验连接事件并存储客户端信息 75 * 76 * @param client 客户端socket对象信息 77 * @param data 客户端数据 78 * @param request ackrequest 回调对象 79 */ 80 @onevent(value = socketconstants.socketevent.health_check) 81 public void oneventbyhealthcheck(socketioclient client, string data, ackrequest request) { 82 //logger.info("客户端{}效验连接请求", client.getsessionid()); 83 ////解析请求数据 84 //healthcheckrequest healthcheckrequest = json.parseobject(data, healthcheckrequest.class); 85 //if (healthcheckrequest != null) { 86 // //存储客户端信息 87 // socketinstance instance = socketinstance.getsocketinstance(); 88 // system.out.println(data); 89 // instance.insertsocketclient(healthcheckrequest.getencode(), client); 90 // logger.info("客户端{}效验连接响应:{}", client.getsessionid(), "ok"); 91 // //响应客户端 92 // request.sendackdata("ok"); 93 //} 94 } 95 96 /** 97 * 登录事件 98 * 99 * @param client 客户端socket对象信息 100 * @param data 客户端数据 101 * @param request ackrequest 回调对象 102 */ 103 @onevent(value = socketconstants.socketevent.login) 104 public void oneventbylogin(socketioclient client, string data, ackrequest request) { 105 logger.info("客户端{}登录请求:{}", client.getsessionid(), data); 106 appresponsebase appresponsebase = new appresponsebase(0, "通讯成功"); 107 //业务响应对象 108 loginresponse loginresponse = null; 109 try { 110 //解析请求数据 111 loginrequest loginrequest = json.parseobject(data, loginrequest.class); 112 if (loginrequest == null) { 113 throw new appexception(appresultcode.loginanalysis_fail); 114 } 115 //调用登陆接口 116 loginresponse = loginservice.applogin(loginrequest); 117 if (loginresponse == null) { 118 throw new appexception(appresultcode.logincloud_fail); 119 } 120 if (enumresult.success.equals(loginresponse.getresultcode())) { 121 //保存客户端socket信息 122 socketinstance instance = socketinstance.getsocketinstance(); 123 instance.insertsocketclient(loginrequest.getencode(), client); 124 } 125 } catch (appexception ex) { 126 loginresponse = new loginresponse(ex.getappresultcode().getcode(), ex.getappresultcode().getmsg()); 127 } catch (exception ex) { 128 loginresponse = new loginresponse(appresultcode.exceptions.getcode(), appresultcode.exceptions.getmsg()); 129 ex.printstacktrace(); 130 } 131 appresponsebase.setrespdata(loginresponse); 132 string result = json.tojsonstring(appresponsebase); 133 logger.info("客户端{}登录响应:{}", client.getsessionid(), result); 134 //响应客户端 135 request.sendackdata(result); 136 } 137 138 /** 139 * 交易下单事件 140 * @param callpayrequest 下单请求信息实体 141 * @return 142 */ 143 public static string sendbypayevent(callpayrequest callpayrequest) { 144 string result = ""; 145 //获取客户端信息 146 socketinstance instance = socketinstance.getsocketinstance(); 147 socketioclient client = instance.getclientsocket(callpayrequest.getencode()); 148 if (client != null) { 149 //请求报文 150 string requestparam = json.tojsonstring(callpayrequest); 151 //请求下单 152 client.sendevent(socketconstants.socketevent.pay, new ackcallback<string>(string.class) { 153 @override 154 public void onsuccess(string s) { 155 //响应信息 156 system.out.println("ack from client: " + client.getsessionid() + " data: " + s.tostring()); 157 } 158 }, requestparam); 159 160 } else { 161 //客户端已断开连接 162 163 } 164 return result; 165 } 166 }
1 @component 2 @order(value = 1) 3 public class mycommandlinerunner implements commandlinerunner { 4 5 private final socketioserver server; 6 7 @autowired 8 public mycommandlinerunner(socketioserver server) { 9 system.out.println("初始化mycommandlinerunner"); 10 this.server = server; 11 } 12 13 @override 14 public void run(string... args) { 15 try { 16 server.start(); 17 system.out.println("socket.io启动成功!"); 18 } catch (exception ex) { 19 ex.printstacktrace(); 20 } 21 } 22 }
1 public class socketconstants { 2 3 /** 4 * socket事件类 5 */ 6 public class socketevent { 7 8 /** 9 * 效验连接状况 10 */ 11 public static final string health_check = "health_check"; 12 13 /** 14 * 消息接收事件名称 15 */ 16 public static final string message = "message"; 17 18 /** 19 * 登录事件名称 20 */ 21 public static final string login = "login"; 22 23 /** 24 * 获取交易要素事件名称 25 */ 26 public static final string query_pay_fields = "query_pay_fields"; 27 28 /** 29 * 创建订单事件名称 30 */ 31 public static final string create_order = "create_order"; 32 33 /** 34 * 监控订单状态事件名称 35 */ 36 public static final string check_order_status = "check_order_status"; 37 38 /** 39 * 获取订单事件名称 40 */ 41 public static final string query_order = "query_order"; 42 43 /** 44 * 支付事件名称 45 */ 46 public static final string pay = "pay"; 47 } 48 }
1 public class socketinstance { 2 3 /** 4 * 客户端socket连接对象容器 5 */ 6 private static map<string, socketioclient> socketclients = null; 7 8 /** 9 * 私有构造 10 */ 11 private socketinstance() { 12 //从缓存中获取socketclients 13 socketclients = new hashmap<>(); 14 } 15 16 /** 17 * 定义一个私有的内部类,在第一次用这个嵌套类时,会创建一个实例。而类型为socketinstanceholder的类,只有在socketinstance.getsocketinstance()中调用, 18 * 由于私有的属性,他人无法使用socketinstanceholder,不调用socketinstance.getsocketinstance()就不会创建实例。 19 * 优点:达到了lazy loading的效果,即按需创建实例。 20 * 无法适用于分布式集群部署 21 */ 22 private static class socketinstanceholder { 23 /** 24 * 创建全局唯一实例 25 */ 26 private final static socketinstance instance = new socketinstance(); 27 } 28 29 /** 30 * 获取全局唯一实例 31 * 32 * @return socketinstance对象 33 */ 34 public static socketinstance getsocketinstance() { 35 return socketinstanceholder.instance; 36 } 37 38 /** 39 * 新增客户端连接到容器 40 * 41 * @param encode 设备en号 42 * @param socketioclient 客户端socket对象 43 */ 44 public void insertsocketclient(string encode, socketioclient socketioclient) { 45 socketioclient oldsocketioclient = socketclients.get(encode); 46 if (oldsocketioclient != null) { 47 try { 48 //关闭客户端连接 49 oldsocketioclient.disconnect(); 50 } catch (exception ex) { 51 ex.printstacktrace(); 52 } 53 } 54 socketclients.put(encode, socketioclient); 55 } 56 57 /** 58 * 获取客户端socket对象 59 * 60 * @param encode 设备encode 61 * @return 客户端socket对象 62 */ 63 public socketioclient getclientsocket(string encode) { 64 return socketclients.get(encode); 65 } 66 }
android客户端
1 public class socketclient { 2 3 /** 4 * 最大重连次数 5 */ 6 private int maxreconnectioncount = 5; 7 8 /** 9 * 重连次数 10 */ 11 private int reconnectioncount = 0; 12 13 /** 14 * 等待框对象 15 */ 16 private static progressdialog progressdialog; 17 18 /** 19 * 提示框 20 */ 21 private static alertdialog.builder dialogexitbuilder; 22 23 /** 24 * toast提示对象 25 */ 26 private static toast toast; 27 28 /** 29 * socket客户端对象信息 30 */ 31 public static socket socket; 32 33 /** 34 * 主页面对象,每个页面oncreate时必须设置,可在每个页面监控socket连接状况 35 */ 36 public static context nowcontext; 37 38 /** 39 * socket连接提示handler(等待框) 40 */ 41 handler dialogmessagehandler = new handler() { 42 @override 43 public void handlemessage(message msg) { 44 super.handlemessage(msg); 45 bundle bundle = msg.getdata(); 46 string message = bundle.getstring(messageutil.message); 47 setdialogmessage(message); 48 } 49 }; 50 51 /** 52 * socket连接失败退出提示handler(提示框) 53 */ 54 handler dialogexithandler = new handler() { 55 @override 56 public void handlemessage(message msg) { 57 super.handlemessage(msg); 58 bundle bundle = msg.getdata(); 59 string message = bundle.getstring(messageutil.message); 60 dialogexit(message); 61 } 62 }; 63 64 /** 65 * socket连接提示handler(toast) 66 */ 67 handler toastmessagehandler = new handler() { 68 @override 69 public void handlemessage(message msg) { 70 super.handlemessage(msg); 71 bundle bundle = msg.getdata(); 72 string message = bundle.getstring(messageutil.message); 73 showtoast(message, toast.length_short); 74 } 75 }; 76 77 /** 78 * 等待框 79 * 80 * @param message 提示文字 81 */ 82 private static void setdialogmessage(string message) { 83 if (progressdialog == null) { 84 progressdialog = new progressdialog(nowcontext); 85 } 86 progressdialog.settitle("学通宝收银"); 87 progressdialog.setmessage(message); 88 progressdialog.setcancelable(false); 89 progressdialog.show(); 90 } 91 92 /** 93 * 退出提示框 94 * 95 * @param message 提示文字 96 */ 97 private void dialogexit(string message) { 98 //初始化退出builder 99 if (dialogexitbuilder == null) { 100 dialogexitbuilder = new alertdialog.builder(nowcontext); 101 } 102 dialogexitbuilder.setmessage(message); 103 dialogexitbuilder.settitle("提示"); 104 dialogexitbuilder.seticon(r.mipmap.warning); 105 dialogexitbuilder.setpositivebutton("确认", new dialoginterface.onclicklistener() { 106 public void onclick(dialoginterface dialog, int which) { 107 dialog.dismiss(); 108 //参数用作状态码;根据惯例,非 0 的状态码表示异常终止。 109 system.exit(0); 110 } 111 }); 112 dialogexitbuilder.create().show(); 113 } 114 115 /** 116 * toast消息提醒 117 * 118 * @param text 标题 119 * @param duration 时长 120 */ 121 public void showtoast(string text, int duration) { 122 //只创建一次 123 if (toast == null) { 124 toast = toast.maketext(nowcontext, text, duration); 125 } else { 126 toast.settext(text); 127 toast.setduration(duration); 128 } 129 toast.show(); 130 } 131 132 public void startsocket() throws urisyntaxexception { 133 //初始化socket配置 134 io.options options = new io.options(); 135 options.transports = new string[]{"websocket"}; 136 options.reconnectionattempts = maxreconnectioncount; // 设置一个重连的最大尝试次数,超过这个值后socket.io会使用所有允许的其他连接方式尝试重连,直到最终失败。 137 options.reconnectiondelay = 500; //为socket.io的重连设置一个时间间隔,内部会在多次重连尝试时采用该值的指数值间隔,用来避免性能损耗(500 > 1000 > 2000 > 4000 > 8000) 138 options.reconnection = true; //当连接终止后,是否允许socket.io自动进行重连 139 options.timeout = 9000; //连接超时时间(ms) 140 options.forcenew = true; 141 options.query = "appid=cn.xuetongbao.xtbpay"; 142 socket = io.socket("http://192.168.8.107:9096/", options); 143 //连接成功 144 socket.on(socket.event_connect, new emitter.listener() { 145 @override 146 public void call(object... args) { 147 //重连机制 148 if (reconnectioncount > 0) { 149 //连接存储客户端信息 150 deviceinfoinstance instance = deviceinfoinstance.getsocketinstance(); 151 healthcheckrequest healthcheckrequest = new healthcheckrequest(); 152 healthcheckrequest.setencode(instance.getdeviceinfo().getencode()); 153 socket.emit(socketconstants.socketevent.health_check, requestutil.createobject(healthcheckrequest), (ack) args1 -> { 154 system.out.println("args1:" + args1.tostring()); 155 }); 156 } 157 system.out.println("连接成功..."); 158 toastmessagehandler.sendmessage(messageutil.createmessage("服务器连接成功")); 159 //关闭等待框 160 if (progressdialog != null) { 161 progressdialog.dismiss(); 162 } 163 } 164 }); 165 166 //连接失败事件 167 socket.on(socket.event_connect_error, new emitter.listener() { 168 @override 169 public void call(object... args) { 170 system.out.println("socket.event_connect_error"); 171 system.out.println("reconnectioncount:" + reconnectioncount); 172 if (reconnectioncount >= maxreconnectioncount) { 173 dialogexithandler.sendmessage(messageutil.createmessage("服务器连接失败,请稍后再试")); 174 } else { 175 dialogmessagehandler.sendmessage(messageutil.createmessage("服务器连接失败,正在重新连接...")); 176 } 177 } 178 }); 179 180 //连接中事件 181 socket.on(socket.event_reconnecting, new emitter.listener() { 182 @override 183 public void call(object... args) { 184 reconnectioncount++; 185 system.out.println("socket.event_reconnecting"); 186 dialogmessagehandler.sendmessage(messageutil.createmessage("正在连接服务器...")); 187 } 188 }); 189 190 //连接超时事件 191 socket.on(socket.event_connect_timeout, new emitter.listener() { 192 @override 193 public void call(object... args) { 194 system.out.println("socket.event_connect_timeout"); 195 if (nowcontext != null) { 196 dialogmessagehandler.sendmessage(messageutil.createmessage("与服务器连接超时,正在重新建立连接...")); 197 socket.connect(); 198 } 199 } 200 }); 201 202 //心跳包 203 socket.on(socket.event_ping, new emitter.listener() { 204 @override 205 public void call(object... args) { 206 system.out.println("socket.event_ping"); 207 } 208 }); 209 //心跳包 210 socket.on(socket.event_pong, new emitter.listener() { 211 @override 212 public void call(object... args) { 213 system.out.println("socket.event_pong"); 214 } 215 }); 216 217 //消息接收事件 218 socket.on(socket.event_message, new emitter.listener() { 219 @override 220 public void call(object... args) { 221 system.out.println("-----------接受到消息啦--------" + arrays.tostring(args)); 222 } 223 }); 224 225 //连接断开事件 226 socket.on(socket.event_disconnect, new emitter.listener() { 227 @override 228 public void call(object... args) { 229 reconnectioncount = 0; 230 system.out.println("客户端断开连接啦。。。"); 231 if (nowcontext != null) { 232 dialogmessagehandler.sendmessage(messageutil.createmessage("似乎与服务器断开连接,正在重新建立连接...")); 233 socket.connect(); 234 } 235 } 236 }); 237 238 //交易事件 239 socket.on(socketconstants.socketevent.pay, new emitter.listener() { 240 @override 241 public void call(object... args) { 242 object data = args[0]; 243 object ackcallback = args[1]; 244 system.out.println("接收到服务端交易下单消息" + data); 245 callpayrequest callpayrequest = json.parseobject(data.tostring(), callpayrequest.class); 246 if (callpayrequest != null) { 247 248 } 249 //data 250 callpayresponse callpayresponse = new callpayresponse(); 251 callpayresponse.setresultcode(appresultcode.success.getcode()); 252 callpayresponse.setresultmsg(appresultcode.success.getmsg()); 253 254 //响应服务端 255 ((ack) ackcallback).call(json.tojsonstring(callpayresponse)); 256 } 257 }); 258 system.out.println("准备连接服务器..."); 259 socket.connect(); 260 } 261 }
注:仅供学习参考
推荐阅读
-
基于netty实现rpc框架-spring boot客户端
-
Spring Boot 集成Swagger框架
-
Spring-Boot快速集成netty-socketio(socket服务实现,支持认证)
-
【Spring Boot】集成Netty Socket.IO通讯框架
-
基于netty实现rpc框架-spring boot服务端
-
Spring-boot集成Netty做websocket服务端
-
Spring-boot集成Netty做websocket服务端
-
基于netty实现rpc框架-spring boot客户端
-
微服务框架(六)Spring Boot集成Mybatis及Druid
-
(十六) Nepxion-Thunder分布式RPC集成框架 - Spring Boot + Docker部署