SpringAOP+RabbitMQ+WebSocket实战详解
背景
最近公司的客户要求,分配给员工的任务除了有微信通知外,还希望pc端的网页也能实时收到通知。管理员分配任务是在我们的系统a,而员工接受任务是在系统b。两个系统都是现在已投入使用的系统。
技术选型
根据需求我们最终选用springaop+rabbitmq+websocket。
springaop可以让我们不修改原有代码,直接将原有service作为切点,加入切面。rabbitmq可以让a系统和b系统解耦。websocket则可以达到实时通知的要求。
springaop
aop称为面向切面编程,在程序开发中主要用来解决一些系统层面上的问题,比如日志,事务,权限等待。是spring的核心模块,底层是通过动态代理来实现(动态代理将在之后的文章重点介绍)。
基本概念
aspect(切面):通常是一个类,里面可以定义切入点和通知。
jointpoint(连接点):程序执行过程中明确的点,一般是方法的调用。
advice(通知):aop在特定的切入点上执行的增强处理,有before,after,afterreturning,afterthrowing,around。
pointcut(切入点):就是带有通知的连接点,在程序中主要体现为书写切入点表达式。
通知类型
before:在目标方法被调用之前做增强处理。
@before只需要指定切入点表达式即可
afterreturning:在目标方法正常完成后做增强。
@afterreturning除了指定切入点表达式后,还可以指定一个返回值形参名returning,代表目标方法的返回值
afterthrowing:主要用来处理程序中未处理的异常。
@afterthrowing除了指定切入点表达式后,还可以指定一个throwing的返回值形参名,可以通过该形参名
来访问目标方法中所抛出的异常对象
after:在目标方法完成之后做增强,无论目标方法时候成功完成。
@after可以指定一个切入点表达式
around:环绕通知,在目标方法完成前后做增强处理,环绕通知是最重要的通知类型,像事务,日志等都是环绕通知,注意编程中核心是一个proceedingjoinpoint。
rabbitmq
从图中我们可以看到rabbitmq主要的结构有:routing、binding、exchange、queue。
queue
queue(队列)rabbitmq的作用是存储消息,队列的特性是先进先出。
exchange
生产者产生的消息并不是直接发送给消息队列queue的,而是要经过exchange(交换器),由exchange再将消息路由到一个或多个queue,还会将不符合路由规则的消息丢弃。
routing
用于标记或生产者寻找exchange。
binding
用于exchange和queue做关联。
exchange type fanout
fanout类型的exchange路由规则非常简单,它会把所有发送到该exchange的消息路由到所有与它绑定的queue中。
direct
direct会把消息路由到那些binding key与routing key完全匹配的queue中。
topic
direct规则是严格意义上的匹配,换言之routing key必须与binding key相匹配的时候才将消息传送给queue,那么topic这个规则就是模糊匹配,可以通过通配符满足一部分规则就可以传送。
headers
headers类型的exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
websocket
了解websocket必须先知道几个常用的web通信技术及其区别。
短轮询
短轮询的基本思路就是浏览器每隔一段时间向浏览器发送http请求,服务器端在收到请求后,不论是否有数据更新,都直接进行响应。这种方式实现的即时通信,本质上还是浏览器发送请求,服务器接受请求的一个过程,通过让客户端不断的进行请求,使得客户端能够模拟实时地收到服务器端的数据的变化。
这种方式的优点是比较简单,易于理解,实现起来也没有什么技术难点。缺点是显而易见的,这种方式由于需要不断的建立http连接,严重浪费了服务器端和客户端的资源。尤其是在客户端,距离来说,如果有数量级想对比较大的人同时位于基于短轮询的应用中,那么每一个用户的客户端都会疯狂的向服务器端发送http请求,而且不会间断。人数越多,服务器端压力越大,这是很不合理的。
因此短轮询不适用于那些同时在线用户数量比较大,并且很注重性能的web应用。
长轮询/ comet
comet指的是,当服务器收到客户端发来的请求后,不会直接进行响应,而是先将这个请求挂起,然后判断服务器端数据是否有更新。如果有更新,则进行响应,如果一直没有数据,则到达一定的时间限制(服务器端设置)后关闭连接。
长轮询和短轮询比起来,明显减少了很多不必要的http请求次数,相比之下节约了资源。长轮询的缺点在于,连接挂起也会导致资源的浪费。
sse
sse是html5新增的功能,全称为server-sent events。它可以允许服务推送数据到客户端。sse在本质上就与之前的长轮询、短轮询不同,虽然都是基于http协议的,但是轮询需要客户端先发送请求。而sse最大的特点就是不需要客户端发送请求,可以实现只要服务器端数据有更新,就可以马上发送到客户端。
sse的优势很明显,它不需要建立或保持大量的客户端发往服务器端的请求,节约了很多资源,提升应用性能。并且sse的实现非常简单,不需要依赖其他插件。
websocket
websocket是html5定义的一个新协议,与传统的http协议不同,该协议可以实现服务器与客户端之间全双工通信。简单来说,首先需要在客户端和服务器端建立起一个连接,这部分需要http。连接一旦建立,客户端和服务器端就处于平等的地位,可以相互发送数据,不存在请求和响应的区别。
websocket的优点是实现了双向通信,缺点是服务器端的逻辑非常复杂。现在针对不同的后台语言有不同的插件可以使用。
四种web即时通信技术比较
从兼容性角度考虑,短轮询>长轮询>长连接sse>websocket;
从性能方面考虑,websocket>长连接sse>长轮询>短轮询。
实战
项目使用springboot搭建。rabbitmq的安装这里不讲述。
rabbitmq配置
两个系统a、b都需要操作rabbitmq,其中a生产消息,b消费消息。故都需要配置。
1、首先引入rabbitmq的dependency:
<dependency> <groupid>org.springframework.boot</groupid> <artifactid>spring-boot-starter-amqp</artifactid> </dependency>
这个dependency中包含了rabbitmq相关dependency。
2、在项目的配置文件里配置为使用rabbitmq及其参数。
application-pro.yml
#消息队列 message.queue.type: rabbitmq ## rabbit mq properties rabbitmq: host: localhost port: 5672 username: guest password: guest
application.properties
#将要使用的队列名 rabbitmq.websocket.msg.queue=websocket_msg_queue
3、创建配置文件。队列的创建交给spring。
rabbitmqconfig.java
@configuration @enablerabbit public class rabbitmqconfig { @value("${rabbitmq.host}") private string host; @value("${rabbitmq.port}") private string port; @value("${rabbitmq.username}") private string username; @value("${rabbitmq.password}") private string password; @value("${rabbitmq.websocket.msg.queue}") private string websocketmsgqueue; @bean public connectionfactory connectionfactory() throws ioexception { cachingconnectionfactory factory = new cachingconnectionfactory(); factory.setusername(username); factory.setpassword(password); // factory.setvirtualhost("test"); factory.sethost(host); factory.setport(integer.valueof(port)); factory.setpublisherconfirms(true); //设置队列参数,是否持久化、队列ttl、队列消息ttl等 factory.createconnection().createchannel(false).queuedeclare(websocketmsgqueue, true, false, false, null); return factory; } @bean public messageconverter messageconverter() { return new jackson2jsonmessageconverter(); } @bean @scope(configurablebeanfactory.scope_prototype) // 必须是prototype类型 public rabbittemplate rabbittemplate() throws ioexception { return new rabbittemplate(connectionfactory()); } @bean public simplerabbitlistenercontainerfactory rabbitlistenercontainerfactory() throws ioexception { simplerabbitlistenercontainerfactory factory = new simplerabbitlistenercontainerfactory(); factory.setconnectionfactory(connectionfactory()); factory.setconcurrentconsumers(3); factory.setmaxconcurrentconsumers(10); factory.setacknowledgemode(acknowledgemode.manual); return factory; } }
4、系统b中创建队列监听,当队列有消息时,发送websocket通知。
rabbitmqlistener.java
@component public class rabbitmqlistener { @autowired private rabbitmqservice mqservice; /** * websocket推送监听器 * @param socketentity * @param deliverytag * @param channel */ @rabbitlistener(queues = "websocket_msg_queue") public void websocketmsglistener(@payload websocketmsgentity socketmsgentity, @header(amqpheaders.delivery_tag) long deliverytag, channel channel) throws ioexception { mqservice.handlewebsocketmsg(socketmsgentity, deliverytag, channel); } }
rabbitmqservice.java
public class rabbitmqservice { @autowired private messagewebsockethandler messagewebsockethandler; /** * @param socketmsgentity * @param deliverytag * @param channel * @throws ioexception */ void handlewebsocketmsg(websocketmsgentity socketmsgentity, long deliverytag, channel channel) throws ioexception { try { messagewebsockethandler.sendmessagetousers(socketmsgentity.tojsonstring(), socketmsgentity.gettouserids()); channel.basicack(deliverytag, false); } catch (exception e) { channel.basicnack(deliverytag, false, false); } } }
websocketmsgentity为mq中传送的实体。
public class websocketmsgentity implements serializable { public enum ordertype{ repair("维修"), maintain("保养"), measure("计量"); ordertype(string value){ this.value = value; } string value; public string getvalue() { return value; } } //设备名称 private string equname; //设备编号 private string equid; //工单类型 private ordertype ordertype; //工单单号 private string orderid; //工单状态 private string orderstatus; //创建时间 private date createtime; //消息接收人id private list<string> touserids; public string getequname() { return equname; } public void setequname(string equname) { equname = equname; } public string getorderid() { return orderid; } public void setorderid(string orderid) { this.orderid = orderid; } public string getequid() { return equid; } public void setequid(string equid) { equid = equid; } public string getorderstatus() { return orderstatus; } public void setorderstatus(string orderstatus) { this.orderstatus = orderstatus; } public ordertype getordertype() { return ordertype; } public void setordertype(ordertype ordertype) { this.ordertype = ordertype; } public date getcreatetime() { return createtime; } public void setcreatetime(date createtime) { this.createtime = createtime; } public list<string> gettouserids() { return touserids; } public void settouserids(list<string> touserids) { this.touserids = touserids; } public string tojsonstring(){ return json.tojsonstring(this); } }
springaop
1、系统a中创建一个切面类datainterceptor.java
@aspect @component public class datainterceptor { @autowired private messagequeueservice queueservice; //维修工单切点 @pointcut("execution(* com.zhishang.hes.common.service.impl.repairserviceimpl.executeflow(..))") private void repairmsg() { } /** * 返回通知,方法执行正常返回时触发 * * @param joinpoint * @param result */ @afterreturning(value = "repairmsg()", returning = "result") public void afterreturning(joinpoint joinpoint, object result) { //此处可以获得切点方法名 //string methodname = joinpoint.getsignature().getname(); equipmentrepair equipmentrepair = (equipmentrepair) result; websocketmsgentity websocketmsgentity = this.generaterepairmsgentity(equipmentrepair); if (websocketmsgentity == null) { return; } queueservice.send(websocketmsgentity); } /** * 生成发送到mq的维修消息 * * @param equipmentrepair * @return */ private websocketmsgentity generaterepairmsgentity(equipmentrepair equipmentrepair) { websocketmsgentity websocketmsgentity = generaterepairmsgfromtasks(equipmentrepair); return websocketmsgentity; } /** * 从任务中生成消息 * * @param equipmentrepair * @return */ private websocketmsgentity generaterepairmsgfromtasks(equipmentrepair equipmentrepair) { //业务代码略 } }
2、发送消息到mq。这里只贴了发送的核心代码
public class rabbitmessagequeue extends abstractmessagequeue { @value("${rabbitmq.websocket.msg.queue}") private string websocketmsgqueue; @autowired private rabbittemplate rabbittemplate; @override public void send(websocketmsgentity entity) { //没有指定exchange,则使用默认名为“”的exchange,binding名与queue名相同 rabbittemplate.convertandsend(websocketmsgqueue, entity); } }
websocket
1、 系统b中引入websocket服务端dependency
<dependency> <groupid>org.springframework</groupid> <artifactid>spring-websocket</artifactid> <version>4.3.10.release</version> </dependency>
2、 配置websocket,添加处理类
websocketconfigurer.java
@configuration @enablewebsocket public class websocketconfig extends webmvcconfigureradapter implements websocketconfigurer { private static logger logger = loggerfactory.getlogger(websocketconfig.class); @override public void registerwebsockethandlers(websockethandlerregistry registry) { //配置websocket路径 registry.addhandler(messagewebsockethandler(),"/msg-websocket").addinterceptors(new myhandshakeinterceptor()).setallowedorigins("*"); //配置websocket路径 支持前端使用socketjs registry.addhandler(messagewebsockethandler(), "/sockjs/msg-websocket").setallowedorigins("*").addinterceptors(new myhandshakeinterceptor()).withsockjs(); } @bean public messagewebsockethandler messagewebsockethandler() { logger.info("......创建messagewebsockethandler......"); return new messagewebsockethandler(); } }
messagewebsockethandler.java 主要用于websocket连接及消息发送处理。配置中还使用了连接握手时的处理,主要是取用户登陆信息,这里不多讲述。
public class messagewebsockethandler extends textwebsockethandler { private static logger logger = loggerfactory.getlogger(systemwebsockethandler.class); private static concurrenthashmap<string, copyonwritearrayset<websocketsession>> users = new concurrenthashmap<>(); @override public void afterconnectionestablished(websocketsession session) throws exception { string userid = session.getattributes().get("websocket_userid").tostring(); logger.info("......afterconnectionestablished......"); logger.info("session.getid:" + session.getid()); logger.info("session.getlocaladdress:" + session.getlocaladdress().tostring()); logger.info("userid:" + userid); //websocket连接后记录连接信息 if (users.keyset().contains(userid)) { copyonwritearrayset<websocketsession> websocketsessions = users.get(userid); websocketsessions.add(session); } else { copyonwritearrayset<websocketsession> websocketsessions = new copyonwritearrayset<>(); websocketsessions.add(session); users.put(userid, websocketsessions); } } @override public void handletransporterror(websocketsession session, throwable throwable) throws exception { removeusersession(session); if (session.isopen()) { session.close(); } logger.info("异常出现handletransporterror" + throwable.getmessage()); } @override public void afterconnectionclosed(websocketsession session, closestatus closestatus) throws exception { removeusersession(session); logger.info("关闭afterconnectionclosed" + closestatus.getreason()); } @override public boolean supportspartialmessages() { return false; } /** * 给符合要求的在线用户发送消息 * * @param message */ public void sendmessagetousers(string message, list<string> userids) throws ioexception{ if (stringutils.isempty(message) || collectionutils.isempty(userids)) { return; } if (users.isempty()) { return; } for (string userid : userids) { if (!users.keyset().contains(userid)) { continue; } copyonwritearrayset<websocketsession> websocketsessions = users.get(userid); if (websocketsessions == null) { continue; } for (websocketsession websocketsession : websocketsessions) { if (websocketsession.isopen()) { try { websocketsession.sendmessage(new textmessage(message)); } catch (ioexception e) { logger.error(" websocket server send message error " + e.getmessage()); try { throw e; } catch (ioexception e1) { e1.printstacktrace(); } } } } } } /** * websocket清除连接信息 * * @param session */ private void removeusersession(websocketsession session) { string userid = session.getattributes().get("websocket_userid").tostring(); if (users.keyset().contains(userid)) { copyonwritearrayset<websocketsession> websocketsessions = users.get(userid); websocketsessions.remove(session); if (websocketsessions.isempty()) { users.remove(userid); } } } }
整个功能完成后,a系统分配任务时,系统b登陆用户收到的消息如图:
总体流程:
1、对于系统b,每个登陆的用户都会和服务器建立websocket长连接。
2、系统a生成任务,aop做出响应,将封装的消息发送给mq。
3、系统b中的mq监听发现队列有消息到达,消费消息。
4、系统b通过websocket长连接将消息发给指定的登陆用户。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。