欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

【Spring Boot】集成Netty Socket.IO通讯框架

程序员文章站 2022-04-05 09:37:36
服务端 Android客户端 注:仅供学习参考 ......

服务端

 1 @configuration
 2 public class nettysocketconfig {
 3 
 4     private static final logger logger = loggerfactory.getlogger(nettysocketconfig.class);
 5 
 6     @bean
 7     public socketioserver socketioserver() {
 8         //创建socket,并设置监听端口
 9         com.corundumstudio.socketio.configuration config = new com.corundumstudio.socketio.configuration();
10         // 设置主机名,默认是0.0.0.0
11         config.sethostname("192.168.8.107");
12         // 设置监听端口
13         config.setport(9096);
14         // 协议升级超时时间(毫秒),默认10000。http握手升级为ws协议超时时间
15         config.setupgradetimeout(10000);
16         // ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔
17         config.setpinginterval(60000);
18         // ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件
19         config.setpingtimeout(180000);
20         // 这个版本0.9.0不能处理好namespace和query参数的问题。所以为了做认证必须使用全局默认命名空间
21         config.setauthorizationlistener(new authorizationlistener() {
22             @override
23             public boolean isauthorized(handshakedata data) {
24                 // 可以使用如下代码获取用户密码信息
25                 //string username = data.getsingleurlparam("username");
26                 //string password = data.getsingleurlparam("password");
27                 //logger.info("连接参数:username=" + username + ",password=" + password);
28                 //managerinfo managerinfo = managerinfoservice.findbyusername(username);
29                 //
30                 //string salt = managerinfo.getsalt();
31                 //string encodedpassword = shirokit.md5(password, username + salt);
32                 //// 如果认证不通过会返回一个socket.event_connect_error事件
33                 //return encodedpassword.equals(managerinfo.getpassword());
34 
35                 return true;
36             }
37         });
38 
39         final socketioserver server = new socketioserver(config);
40         system.out.println("注入socketioserver");
41         return server;
42     }
43 
44     @bean
45     public springannotationscanner springannotationscanner(socketioserver socketserver) {
46         return new springannotationscanner(socketserver);
47     }
48 }

 

  1 @component
  2 public class messageeventhandler {
  3 
  4     private static final logger logger = loggerfactory.getlogger(messageeventhandler.class);
  5 
  6     /**
  7      * 服务器socket对象
  8      */
  9     public static socketioserver socketioserver;
 10 
 11     /**
 12      * 客户端集合
 13      */
 14     static arraylist<uuid> listclient = new arraylist<>();
 15 
 16     /**
 17      * 超时时间
 18      */
 19     static final int limitseconds = 60;
 20 
 21     @autowired
 22     public loginservice loginservice;
 23 
 24     /**
 25      * 初始化消息事件处理器
 26      *
 27      * @param server 服务器socket对象
 28      */
 29     @autowired
 30     public messageeventhandler(socketioserver server) {
 31         logger.info("初始化socket消息事件处理器");
 32         this.socketioserver = server;
 33     }
 34 
 35     /**
 36      * 客户端发起连接时触发
 37      *
 38      * @param client 客户端socket对象信息
 39      */
 40     @onconnect
 41     public void onconnect(socketioclient client) {
 42         logger.info("客户端{}已连接", client.getsessionid());
 43         listclient.add(client.getsessionid());
 44     }
 45 
 46     /**
 47      * 客户端断开连接时触发
 48      *
 49      * @param client 客户端socket对象信息
 50      */
 51     @ondisconnect
 52     public void ondisconnect(socketioclient client) {
 53         logger.info("客户端{}断开连接", client.getsessionid());
 54         if (listclient.contains(client.getsessionid())) {
 55             listclient.remove(client.getsessionid());
 56         }
 57     }
 58 
 59 
 60     /**
 61      * 客户端发送消息时触发
 62      *
 63      * @param client  客户端socket对象信息
 64      * @param request ackrequest 回调对象
 65      * @param data    消息信息实体
 66      */
 67     @onevent(value = socketconstants.socketevent.message)
 68     public void onevent(socketioclient client, ackrequest request, messageinfo data) {
 69         system.out.println("发来消息:" + data.getmsgcontent());
 70         socketioserver.getclient(client.getsessionid()).sendevent("messageevent", "back data");
 71     }
 72 
 73     /**
 74      * 效验连接事件并存储客户端信息
 75      *
 76      * @param client  客户端socket对象信息
 77      * @param data    客户端数据
 78      * @param request ackrequest 回调对象
 79      */
 80     @onevent(value = socketconstants.socketevent.health_check)
 81     public void oneventbyhealthcheck(socketioclient client, string data, ackrequest request) {
 82         //logger.info("客户端{}效验连接请求", client.getsessionid());
 83         ////解析请求数据
 84         //healthcheckrequest healthcheckrequest = json.parseobject(data, healthcheckrequest.class);
 85         //if (healthcheckrequest != null) {
 86         //    //存储客户端信息
 87         //    socketinstance instance = socketinstance.getsocketinstance();
 88         //    system.out.println(data);
 89         //    instance.insertsocketclient(healthcheckrequest.getencode(), client);
 90         //    logger.info("客户端{}效验连接响应:{}", client.getsessionid(), "ok");
 91         //    //响应客户端
 92         //    request.sendackdata("ok");
 93         //}
 94     }
 95 
 96     /**
 97      * 登录事件
 98      *
 99      * @param client  客户端socket对象信息
100      * @param data    客户端数据
101      * @param request ackrequest 回调对象
102      */
103     @onevent(value = socketconstants.socketevent.login)
104     public void oneventbylogin(socketioclient client, string data, ackrequest request) {
105         logger.info("客户端{}登录请求:{}", client.getsessionid(), data);
106         appresponsebase appresponsebase = new appresponsebase(0, "通讯成功");
107         //业务响应对象
108         loginresponse loginresponse = null;
109         try {
110             //解析请求数据
111             loginrequest loginrequest = json.parseobject(data, loginrequest.class);
112             if (loginrequest == null) {
113                 throw new appexception(appresultcode.loginanalysis_fail);
114             }
115             //调用登陆接口
116             loginresponse = loginservice.applogin(loginrequest);
117             if (loginresponse == null) {
118                 throw new appexception(appresultcode.logincloud_fail);
119             }
120             if (enumresult.success.equals(loginresponse.getresultcode())) {
121                 //保存客户端socket信息
122                 socketinstance instance = socketinstance.getsocketinstance();
123                 instance.insertsocketclient(loginrequest.getencode(), client);
124             }
125         } catch (appexception ex) {
126             loginresponse = new loginresponse(ex.getappresultcode().getcode(), ex.getappresultcode().getmsg());
127         } catch (exception ex) {
128             loginresponse = new loginresponse(appresultcode.exceptions.getcode(), appresultcode.exceptions.getmsg());
129             ex.printstacktrace();
130         }
131         appresponsebase.setrespdata(loginresponse);
132         string result = json.tojsonstring(appresponsebase);
133         logger.info("客户端{}登录响应:{}", client.getsessionid(), result);
134         //响应客户端
135         request.sendackdata(result);
136     }
137 
138     /**
139      * 交易下单事件
140      * @param callpayrequest 下单请求信息实体
141      * @return
142      */
143     public static string sendbypayevent(callpayrequest callpayrequest) {
144         string result = "";
145         //获取客户端信息
146         socketinstance instance = socketinstance.getsocketinstance();
147         socketioclient client = instance.getclientsocket(callpayrequest.getencode());
148         if (client != null) {
149             //请求报文
150             string requestparam = json.tojsonstring(callpayrequest);
151             //请求下单
152             client.sendevent(socketconstants.socketevent.pay, new ackcallback<string>(string.class) {
153                 @override
154                 public void onsuccess(string s) {
155                     //响应信息
156                     system.out.println("ack from client: " + client.getsessionid() + " data: " + s.tostring());
157                 }
158             }, requestparam);
159 
160         } else {
161             //客户端已断开连接
162 
163         }
164         return result;
165     }
166 }

 

 1 @component
 2 @order(value = 1)
 3 public class mycommandlinerunner implements commandlinerunner {
 4 
 5     private final socketioserver server;
 6 
 7     @autowired
 8     public mycommandlinerunner(socketioserver server) {
 9         system.out.println("初始化mycommandlinerunner");
10         this.server = server;
11     }
12 
13     @override
14     public void run(string... args) {
15         try {
16             server.start();
17             system.out.println("socket.io启动成功!");
18         } catch (exception ex) {
19             ex.printstacktrace();
20         }
21     }
22 }

 

 1 public class socketconstants {
 2 
 3     /**
 4      * socket事件类
 5      */
 6     public class socketevent {
 7 
 8         /**
 9          * 效验连接状况
10          */
11         public static final string health_check = "health_check";
12 
13         /**
14          * 消息接收事件名称
15          */
16         public static final string message = "message";
17 
18         /**
19          * 登录事件名称
20          */
21         public static final string login = "login";
22 
23         /**
24          * 获取交易要素事件名称
25          */
26         public static final string query_pay_fields = "query_pay_fields";
27 
28         /**
29          * 创建订单事件名称
30          */
31         public static final string create_order = "create_order";
32 
33         /**
34          * 监控订单状态事件名称
35          */
36         public static final string check_order_status = "check_order_status";
37 
38         /**
39          * 获取订单事件名称
40          */
41         public static final string query_order = "query_order";
42 
43         /**
44          * 支付事件名称
45          */
46         public static final string pay = "pay";
47     }
48 }

 

 1 public class socketinstance {
 2 
 3     /**
 4      * 客户端socket连接对象容器
 5      */
 6     private static map<string, socketioclient> socketclients = null;
 7 
 8     /**
 9      * 私有构造
10      */
11     private socketinstance() {
12         //从缓存中获取socketclients
13         socketclients = new hashmap<>();
14     }
15 
16     /**
17      * 定义一个私有的内部类,在第一次用这个嵌套类时,会创建一个实例。而类型为socketinstanceholder的类,只有在socketinstance.getsocketinstance()中调用,
18      * 由于私有的属性,他人无法使用socketinstanceholder,不调用socketinstance.getsocketinstance()就不会创建实例。
19      * 优点:达到了lazy loading的效果,即按需创建实例。
20      * 无法适用于分布式集群部署
21      */
22     private static class socketinstanceholder {
23         /**
24          * 创建全局唯一实例
25          */
26         private final static socketinstance instance = new socketinstance();
27     }
28 
29     /**
30      * 获取全局唯一实例
31      *
32      * @return socketinstance对象
33      */
34     public static socketinstance getsocketinstance() {
35         return socketinstanceholder.instance;
36     }
37 
38     /**
39      * 新增客户端连接到容器
40      *
41      * @param encode         设备en号
42      * @param socketioclient 客户端socket对象
43      */
44     public void insertsocketclient(string encode, socketioclient socketioclient) {
45         socketioclient oldsocketioclient = socketclients.get(encode);
46         if (oldsocketioclient != null) {
47             try {
48                 //关闭客户端连接
49                 oldsocketioclient.disconnect();
50             } catch (exception ex) {
51                 ex.printstacktrace();
52             }
53         }
54         socketclients.put(encode, socketioclient);
55     }
56 
57     /**
58      * 获取客户端socket对象
59      *
60      * @param encode 设备encode
61      * @return 客户端socket对象
62      */
63     public socketioclient getclientsocket(string encode) {
64         return socketclients.get(encode);
65     }
66 }

 

 android客户端

  1 public class socketclient {
  2 
  3     /**
  4      * 最大重连次数
  5      */
  6     private int maxreconnectioncount = 5;
  7 
  8     /**
  9      * 重连次数
 10      */
 11     private int reconnectioncount = 0;
 12 
 13     /**
 14      * 等待框对象
 15      */
 16     private static progressdialog progressdialog;
 17 
 18     /**
 19      * 提示框
 20      */
 21     private static alertdialog.builder dialogexitbuilder;
 22 
 23     /**
 24      * toast提示对象
 25      */
 26     private static toast toast;
 27 
 28     /**
 29      * socket客户端对象信息
 30      */
 31     public static socket socket;
 32 
 33     /**
 34      * 主页面对象,每个页面oncreate时必须设置,可在每个页面监控socket连接状况
 35      */
 36     public static context nowcontext;
 37 
 38     /**
 39      * socket连接提示handler(等待框)
 40      */
 41     handler dialogmessagehandler = new handler() {
 42         @override
 43         public void handlemessage(message msg) {
 44             super.handlemessage(msg);
 45             bundle bundle = msg.getdata();
 46             string message = bundle.getstring(messageutil.message);
 47             setdialogmessage(message);
 48         }
 49     };
 50 
 51     /**
 52      * socket连接失败退出提示handler(提示框)
 53      */
 54     handler dialogexithandler = new handler() {
 55         @override
 56         public void handlemessage(message msg) {
 57             super.handlemessage(msg);
 58             bundle bundle = msg.getdata();
 59             string message = bundle.getstring(messageutil.message);
 60             dialogexit(message);
 61         }
 62     };
 63 
 64     /**
 65      * socket连接提示handler(toast)
 66      */
 67     handler toastmessagehandler = new handler() {
 68         @override
 69         public void handlemessage(message msg) {
 70             super.handlemessage(msg);
 71             bundle bundle = msg.getdata();
 72             string message = bundle.getstring(messageutil.message);
 73             showtoast(message, toast.length_short);
 74         }
 75     };
 76 
 77     /**
 78      * 等待框
 79      *
 80      * @param message 提示文字
 81      */
 82     private static void setdialogmessage(string message) {
 83         if (progressdialog == null) {
 84             progressdialog = new progressdialog(nowcontext);
 85         }
 86         progressdialog.settitle("学通宝收银");
 87         progressdialog.setmessage(message);
 88         progressdialog.setcancelable(false);
 89         progressdialog.show();
 90     }
 91 
 92     /**
 93      * 退出提示框
 94      *
 95      * @param message 提示文字
 96      */
 97     private void dialogexit(string message) {
 98         //初始化退出builder
 99         if (dialogexitbuilder == null) {
100             dialogexitbuilder = new alertdialog.builder(nowcontext);
101         }
102         dialogexitbuilder.setmessage(message);
103         dialogexitbuilder.settitle("提示");
104         dialogexitbuilder.seticon(r.mipmap.warning);
105         dialogexitbuilder.setpositivebutton("确认", new dialoginterface.onclicklistener() {
106             public void onclick(dialoginterface dialog, int which) {
107                 dialog.dismiss();
108                 //参数用作状态码;根据惯例,非 0 的状态码表示异常终止。
109                 system.exit(0);
110             }
111         });
112         dialogexitbuilder.create().show();
113     }
114 
115     /**
116      * toast消息提醒
117      *
118      * @param text     标题
119      * @param duration 时长
120      */
121     public void showtoast(string text, int duration) {
122         //只创建一次
123         if (toast == null) {
124             toast = toast.maketext(nowcontext, text, duration);
125         } else {
126             toast.settext(text);
127             toast.setduration(duration);
128         }
129         toast.show();
130     }
131 
132     public void startsocket() throws urisyntaxexception {
133         //初始化socket配置
134         io.options options = new io.options();
135         options.transports = new string[]{"websocket"};
136         options.reconnectionattempts = maxreconnectioncount; // 设置一个重连的最大尝试次数,超过这个值后socket.io会使用所有允许的其他连接方式尝试重连,直到最终失败。
137         options.reconnectiondelay = 500;  //为socket.io的重连设置一个时间间隔,内部会在多次重连尝试时采用该值的指数值间隔,用来避免性能损耗(500 > 1000 > 2000 > 4000 > 8000)
138         options.reconnection = true;      //当连接终止后,是否允许socket.io自动进行重连
139         options.timeout = 9000;           //连接超时时间(ms)
140         options.forcenew = true;
141         options.query = "appid=cn.xuetongbao.xtbpay";
142         socket = io.socket("http://192.168.8.107:9096/", options);
143         //连接成功
144         socket.on(socket.event_connect, new emitter.listener() {
145             @override
146             public void call(object... args) {
147                 //重连机制
148                 if (reconnectioncount > 0) {
149                     //连接存储客户端信息
150                     deviceinfoinstance instance = deviceinfoinstance.getsocketinstance();
151                     healthcheckrequest healthcheckrequest = new healthcheckrequest();
152                     healthcheckrequest.setencode(instance.getdeviceinfo().getencode());
153                     socket.emit(socketconstants.socketevent.health_check, requestutil.createobject(healthcheckrequest), (ack) args1 -> {
154                         system.out.println("args1:" + args1.tostring());
155                     });
156                 }
157                 system.out.println("连接成功...");
158                 toastmessagehandler.sendmessage(messageutil.createmessage("服务器连接成功"));
159                 //关闭等待框
160                 if (progressdialog != null) {
161                     progressdialog.dismiss();
162                 }
163             }
164         });
165 
166         //连接失败事件
167         socket.on(socket.event_connect_error, new emitter.listener() {
168             @override
169             public void call(object... args) {
170                 system.out.println("socket.event_connect_error");
171                 system.out.println("reconnectioncount:" + reconnectioncount);
172                 if (reconnectioncount >= maxreconnectioncount) {
173                     dialogexithandler.sendmessage(messageutil.createmessage("服务器连接失败,请稍后再试"));
174                 } else {
175                     dialogmessagehandler.sendmessage(messageutil.createmessage("服务器连接失败,正在重新连接..."));
176                 }
177             }
178         });
179 
180         //连接中事件
181         socket.on(socket.event_reconnecting, new emitter.listener() {
182             @override
183             public void call(object... args) {
184                 reconnectioncount++;
185                 system.out.println("socket.event_reconnecting");
186                 dialogmessagehandler.sendmessage(messageutil.createmessage("正在连接服务器..."));
187             }
188         });
189 
190         //连接超时事件
191         socket.on(socket.event_connect_timeout, new emitter.listener() {
192             @override
193             public void call(object... args) {
194                 system.out.println("socket.event_connect_timeout");
195                 if (nowcontext != null) {
196                     dialogmessagehandler.sendmessage(messageutil.createmessage("与服务器连接超时,正在重新建立连接..."));
197                     socket.connect();
198                 }
199             }
200         });
201 
202         //心跳包
203         socket.on(socket.event_ping, new emitter.listener() {
204             @override
205             public void call(object... args) {
206                 system.out.println("socket.event_ping");
207             }
208         });
209         //心跳包
210         socket.on(socket.event_pong, new emitter.listener() {
211             @override
212             public void call(object... args) {
213                 system.out.println("socket.event_pong");
214             }
215         });
216 
217         //消息接收事件
218         socket.on(socket.event_message, new emitter.listener() {
219             @override
220             public void call(object... args) {
221                 system.out.println("-----------接受到消息啦--------" + arrays.tostring(args));
222             }
223         });
224 
225         //连接断开事件
226         socket.on(socket.event_disconnect, new emitter.listener() {
227             @override
228             public void call(object... args) {
229                 reconnectioncount = 0;
230                 system.out.println("客户端断开连接啦。。。");
231                 if (nowcontext != null) {
232                     dialogmessagehandler.sendmessage(messageutil.createmessage("似乎与服务器断开连接,正在重新建立连接..."));
233                     socket.connect();
234                 }
235             }
236         });
237 
238         //交易事件
239         socket.on(socketconstants.socketevent.pay, new emitter.listener() {
240             @override
241             public void call(object... args) {
242                 object data = args[0];
243                 object ackcallback = args[1];
244                 system.out.println("接收到服务端交易下单消息" + data);
245                 callpayrequest callpayrequest = json.parseobject(data.tostring(), callpayrequest.class);
246                 if (callpayrequest != null) {
247 
248                 }
249                 //data
250                 callpayresponse callpayresponse = new callpayresponse();
251                 callpayresponse.setresultcode(appresultcode.success.getcode());
252                 callpayresponse.setresultmsg(appresultcode.success.getmsg());
253 
254                 //响应服务端
255                 ((ack) ackcallback).call(json.tojsonstring(callpayresponse));
256             }
257         });
258         system.out.println("准备连接服务器...");
259         socket.connect();
260     }
261 }

 

 注:仅供学习参考