websocket+rabbitmq实战
程序员文章站
2022-10-06 16:35:07
1. websocket+rabbitmq实战 1.1. 前言 接到的需求是后台定向给指定web登录用户推送消息,且可能同一账号会登录多个客户端都要接收到消息 1.2. 遇坑 1. 基于springboot环境搭建的websocket+rabbitmq,搭建完成后发现webs ......
1. websocket+rabbitmq实战
1.1. 前言
接到的需求是后台定向给指定web登录用户推送消息,且可能同一账号会登录多个客户端都要接收到消息
1.2. 遇坑
- 基于springboot环境搭建的websocket+rabbitmq,搭建完成后发现websocket每隔一段时间会断开,看网上有人因为nginx的连接超时机制断开,而我这似乎是因为长连接空闲时间太长而断开
- 经过测试,如果一直保持每隔段时间发送消息,那么连接不会断开,所以我采用了断开重连机制,分三种情况
- 服务器正常,客户端正常且空闲时间不超过1分钟,则情况正常,超过一分钟会断线,前端发起请求重连
- 服务器正常,客户端关闭或注销,服务器正常收到通知,去除对应客户端session
- 服务器异常,客户端正常,客户端发现连不上服务器会尝试重连3次,3次都连不上放弃重连
- rabbitmq定向推送,按需求需要一台机器对应一批用户,所以定制化需要服务启动的时候定向订阅该ip对应的队列名,简单说就是动态队列名的设定,所以又复杂了点,不能直接在注解写死。同时因为使用的apollo配置中心,同一集群应该相同的配置,所以也不能通过提取配置的方式设定值,为了这个点设置apollo的集群方式有点小题大做,所以采用动态读取数据库对应的ip取出对应的队列名。
- 部署线上tomcat的话,不需要加上一块代码
/** * 使用tomcat启动无需配置 */ //@configuration //@conditionalonproperty(name="websocket.enabled",havingvalue = "true") public class websocketconfig { @bean public serverendpointexporter serverendpointexporter() { return new serverendpointexporter(); } }
1.3. 正式代码
1.3.1. rabbimq部分
- application.properties配置
spring.rabbitmq.addresses = i.tzxylao.com:5672 spring.rabbitmq.username = admin spring.rabbitmq.password = 123456 spring.rabbitmq.virtual-host = / spring.rabbitmq.connection-timeout = 15000
- 交换机和队列配置
/** * @author laoliangliang * @date 2019/3/29 11:41 */ @configuration @conditionalonproperty(name="websocket.enabled",havingvalue = "true") public class rabbitmqconfig { final public static string exchangename = "websocketexchange"; /** * 创建交换器 */ @bean fanoutexchange exchange() { return new fanoutexchange(exchangename); } @bean public queue queue(){ return new queue(orderqueuename()); } @bean binding bindingexchangemessage(queue queue,fanoutexchange exchange) { return bindingbuilder.bind(queue).to(exchange); } @bean public simplemessagelistenercontainer messagelistenercontainer(orderreceiver orderreceiver, @qualifier("rabbitconnectionfactory") cachingconnectionfactory cachingconnectionfactory){ simplemessagelistenercontainer container = new simplemessagelistenercontainer(cachingconnectionfactory); // 监听队列的名称 container.setqueuenames(orderqueuename()); container.setexposelistenerchannel(true); // 设置每个消费者获取的最大消息数量 container.setprefetchcount(100); // 消费者的个数 container.setconcurrentconsumers(1); // 设置确认模式为自动确认 container.setacknowledgemode(acknowledgemode.auto); container.setmessagelistener(orderreceiver); return container; } /** * 在这里写获取订单队列名的具体过程 * @return */ public string orderqueuename(){ return "orderchannel"; } }
- 消息监听类
/** * @author laoliangliang * @date 2019/3/29 11:38 */ @component @slf4j @conditionalonproperty(name="websocket.enabled",havingvalue = "true") public class orderreceiver implements channelawaremessagelistener { @autowired private mywebsocket mywebsocket; @override public void onmessage(message message, channel channel) throws exception { byte[] body = message.getbody(); log.info("接收到消息:" + new string(body)); try { mywebsocket.sendmessage(new string(body)); } catch (ioexception e) { log.error("send rabbitmq message error", e); } } }
1.3.2. websocket部分
- 配置服务端点
@configuration @conditionalonproperty(name="websocket.enabled",havingvalue = "true") public class websocketconfig { @bean public serverendpointexporter serverendpointexporter() { return new serverendpointexporter(); } }
- 核心代码
/** * @author laoliangliang * @date 2019/3/28 14:40 */ public abstract class abstractwebsocket { protected static map<string, copyonwritearrayset<session>> sessionstore = new hashmap<>(); public void sendmessage(string message) throws ioexception { list<string> usercodes = beforesendmessage(); for (string usercode : usercodes) { copyonwritearrayset<session> sessions = sessionstore.get(usercode); //阻塞式的(同步的) if (sessions !=null && sessions.size() != 0) { for (session s : sessions) { if (s != null) { s.getbasicremote().sendtext(message); } } } } } /** * 删选给谁发消息 * @return */ protected abstract list<string> beforesendmessage(); protected void clearsession(session session) { collection<copyonwritearrayset<session>> values = sessionstore.values(); for (copyonwritearrayset<session> sessions : values) { for (session session1 : sessions) { if (session.equals(session1)) { sessions.remove(session); } } } } }
@serverendpoint(value = "/websocket") @component @conditionalonproperty(name="websocket.enabled",havingvalue = "true") public class mywebsocket extends abstractwebsocket { private static logger log = logmanager.getlogger(mywebsocket.class); @autowired private amqptemplate amqptemplate; @postconstruct public void init() { /*scheduledexecutorservice executorservice = executors.newscheduledthreadpool(1); executorservice.scheduleatfixedrate(new runnable() { int i = 0; @override public void run() { amqptemplate.convertandsend(rabbitfanout.exchangename, "",("msg num : " + i).getbytes()); i++; } }, 50, 1, timeunit.seconds);*/ } /** * 连接建立成功调用的方法 * * @param session 可选的参数。session为与某个客户端的连接会话,需要通过它来给客户端发送数据 */ @onopen public void onopen(session session) throws timeoutexception { log.info("websocket connect"); //10m session.setmaxtextmessagebuffersize(10485760); } /** * 连接关闭调用的方法 */ @onclose public void onclose(session session) { clearsession(session); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息 * @param session 可选的参数 */ @onmessage public void onmessage(string message, session session) { log.info("from client request:" + message); copyonwritearrayset<session> sessions = sessionstore.get(message); if (sessions == null) { sessions = new copyonwritearrayset<>(); } sessions.add(session); sessionstore.put(message, sessions); } /** * 发生错误时调用 * * @param session * @param error */ @onerror public void onerror(session session, throwable error) { clearsession(session); } /** * 这里返回需要给哪些用户发送消息 * @return */ @override protected list<string> beforesendmessage() { //todo 给哪些用户发送消息 return lists.newarraylist("6"); } }
1.3.3. 前端代码
var websocket = null; var reconnectcount = 0; function connectsocket(){ var data = basicconfig(); if(data.websocketenable !== "true"){ return; } //判断当前浏览器是否支持websocket if ('websocket' in window) { if(data.localip && data.localip !== "" && data.serverport && data.serverport !== ""){ websocket = new websocket("ws://"+data.localip+":"+data.serverport+data.servercontextpath+"/websocket"); }else{ return; } }else { alert('当前浏览器 不支持websocket') } //连接发生错误的回调方法 websocket.onerror = function () { console.log("连接发生错误"); }; //连接成功建立的回调方法 websocket.onopen = function () { reconnectcount = 0; console.log("连接成功"); }; //接收到消息的回调方法,此处添加处理接收消息方法,当前是将接收到的信息显示在网页上 websocket.onmessage = function (event) { console.log("receive message:" + event.data); }; //连接关闭的回调方法 websocket.onclose = function () { console.log("连接关闭,如需登录请刷新页面。"); if(reconnectcount === 3) { reconnectcount = 0; return; } connectsocket(); basicconfig(); reconnectcount++; }; //添加事件监听 websocket.addeventlistener('open', function () { websocket.send(data.usercode); }); //监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。 window.onbeforeunload = function () { console.log("closewebsocket"); }; } connectsocket(); function basicconfig(){ var result = {}; $.ajax({ type: "post", async: false, url: "${request.contextpath}/basicconfig", data: {}, success: function (data) { result = data; } }); return result; }
1.3.4. 后端提供接口
@apolloconfig private config config; @requestmapping(value = {"/basicconfig"}) @responsebody public map<string, object> getusercode(httpsession session) { map<string, object> map = new hashmap<>(2); map.put("usercode",string.valueof(session.getattribute("usercode"))); string websocketenable = config.getproperty("websocket.enabled", "false"); string servercontextpath = config.getproperty("server.context-path", ""); map.put("websocketenable", websocketenable); map.put("servercontextpath", servercontextpath); string localip = config.getproperty("local.ip", ""); string serverport = config.getproperty("server.port", "80"); map.put("localip", localip); map.put("serverport", serverport); return map; }
上一篇: 博客友情链接的风险和规避