牛客高级项目课(8)
一、异步队列
1、同步、异步的概念
多线程中同步的意思大概是这样:线程访问资源时一直在等待,直到资源访问结束。所以,有同步的概念,我们可以大概理解与之相对的异步的概念:线程在访问资源(或者处理耗时较长的数据)时,不必一直等待资源访问完成或者数据处理完,在等待期间线程可以做其他事情,而当资源访问完成之后,会采取回调的方式执行相应的代码。
例如,在IO读写中,同步的方式就是在IO 操作的阻塞过程一直阻塞,直到IO操作完成;而异步的意思就是在io操作阻塞过程线程去做其他事情,当IO操作完成后,采取回调的方式执行相应的操作。
2.异步框架的模型原理
1)生产者–消费者模式
大概的思路如下示例图:
大概意思就是:生产者负责数据的产生,它将数据放到内存中去(一般是一个队列),而消费者则负责处理内存中的数据,处理完成后,可以通过回调的方式进行响应。上面的图比较粗略,下面是具体的实现示意图:
上面示意图具体说明了生产者消费者的具体实现方式:
eventProducer(当然,也可以是dataProducer等)是生产者,它会将前端传输过来的数据或者说需要处理的事件封装好,然后将这些封装好的数据放进一个队列里面去;
而eventConsumer是消费者,它会读取队列里面的数据,然后进行处理。
在这个过程,程序是以异步的方式运行的:生产者无需等待消费者处理完成,它的职责只是将数据推到内存里面去,然后就可以进行响应;而消费者只需要处理数据即可,它不用管数据是哪来的。显然,这样的方式可以提高响应的速度,同时使得异步的实现方式变得简单起来。
2)web开发中的异步框架思路
上面的生产者–消费者为我们实现web的异步框架提供了一种很好的思路:在复杂的业务操作或者耗时比较长的业务中,我们可以采用异步的方式提高程序的响应速度,而生产者消费者的模式正是我们实现异步框架的参考模型–复杂业务的service层使对应的生产者,它只需要将要处理的数据放进一个队列里面,然后即可相应用户;而相应的handler类则负责具体的数据处理。
3)为什么用异步?
显然,在上面描述的思路中,我们大概可以知道什么时候应该使用异步框架:对相应速度要求比较高请求,但是该请求的相关业务操作允许一定的延迟。
举个具体的例子:在一个社交网站中,很多时候会有点赞的操作,A给B点赞,一般来说会包含两个操作,第一个操作是告诉A点赞成功了,第二个操作是告诉B他被A点赞了;如果不采用异步的方式,那就需要在在这两个操作都完成后,才响应A说点赞成功,但是第二个操作显然会耗时很长(例如需要发邮件通知),所以不采用异步方式时A就会有这样一种感觉:怎么点个赞要等半天才响应的,什么垃圾系统!所以,这时候为了提高对A的相应速度,我们可以采用异步的方式:A点赞请求发出之后,程序不需要等到B收到A的点赞通知了,才告诉A说你点赞成功了,因为B收到A的点赞通知相对于A知道自己点赞成功来说,是允许延迟的。
4)什么是消息队列?
所谓消息队列,就是一个以队列数据结构为基础的一个实体,这个实体是真实存在的,比如程序中的数组,数据库中的表,或者redis等等,都可以。
5)首先我们说说为什么要使用队列,什么情况下才会使用队列?
实时性要求不高,且比较耗时的任务,是队列的最佳应用场景。比如说我在某网站注册一个账号,当我的信息入库注册成功后,网站需要发送一封**邮件,让我**账号,而这个发邮件的操作并不是需要实时响应的,没有必要卡在那个注册界面,等待邮件发送成功。再说发送邮件本来就是一个耗时的操作(需要调用第三方smtp服务器)。此时,选择消息队列去处理。注册完成,我只要向队列投递一个消息,消息的内容中包含我要发送邮件的一些设置,以及发送时间,重试次数等消息属性。这里的投递操作(可以是入库,写入缓存等)是要消息进入一个实体的队列。其中应该有一进程(消费者)一直在后台运行,他不断的去轮训队列中的消息(按照时间正序,队列是先进先出),看有没有达到执行条件的,如果有就取出一条,根据消息配置,执行任务,如果成功,则销毁这条消息,继续轮训,如果失败,则重试,知道达到重试次数。这时用户已经收到注册成功的提示,但是已经去做其他事了,邮件也来了,用户点击邮件,注册成功。这就是消息队列的一个典型应用。
再说一个场景,点赞,这个在高并发的情况下,很容易造成数据库连接数占满,到时整个网站响应缓慢,才是就是想到要解决数据库的压力问题,一般就是两种方案,一是提高数据库本身的能力(如增加连接数,读写分离等),但是数据库总是有极限的,到达了极限是没有办法在提升了的,此时就要考虑第二种方案,释放数据库的压力,将压力转移到缓存里面。就拿实际的点赞来说吧,用户的点赞请求到来,我只是将点赞请求投递到消息队列里面,后续的点赞请求可以将消息合并,即只更新点赞数,不产生新的任务,此时有个进行再不断的轮训消息队列,将点赞消息消耗,并将值更新到数据库里面,这样就有效的降低了数据库的压力,因为在缓存层将数个数据库更新请求合并成一个,大大提高了效率,降低了负载。
3.事件处理异步框架
1)框架的大体模型
主要是包括三个部分:生产者producer类,消费者comsumer类,事件处理的handler接口以及对应的实现类,具体的事件eventModel类(对应数据)。
在这里,producer类会将前端传输过来的eventModel对象进行序列化,将它加入到一个异步队列中,这里采用redis的list数据结构实现。
消费者comsumer则负责将redis中队列的数据读取出来,反序列化后,根据eventModel中的eventType来调用相应的handler具体实现类(handler实现类存储在一个map结构里面,key对应的是eventType,value对应的是具体handler实现类)进行业务处理。
handler实现类负责具体事件的处理,它需要实现一个handler接口(该接口是通过spring进行自动注册的关键,具体后面会讲)。
eventModel是事件模型,它主要存储与事件有关的数据,包括事件类型,时间触发者,事件所属者等数据。具体的后面会讲解。
2)代码实现
eventType表明事件是不一样的,获得活动的类型,可以有点赞,评论,登陆等待
eventModel是事件模型,它主要存储与事件有关的数据,包括事件类型,时间触发者,事件所属者等数据。
①eventType:
package com.nowcoder.async;
public enum EventType {//表明事件是不一样的,获得活动的类型,可以有点赞,评论,登陆等待
LIKE(0),
COMMENT(1),
LOGIN(2),
MAIL(3);
private int value;
EventType(int value) { this.value = value; }
public int getValue() { return value; }
}
②eventModel:
package com.nowcoder.async;
import java.util.HashMap;
import java.util.Map;
public class EventModel {
/**
* 事件类型,用于标识事件,同时在comsumer中根据这个值确定handler的具体实现类,一般可用一个枚举类型实现
* 例如点赞通知对应的事件类型和注册发邮件进行**的事件就应该属于不同的eventType,应该对应不同的handler实现类
*/
private EventType type;//评论
/**
* 事件触发者,例如用户A给用户B点赞,A就是事件触发者
*/
private int actorId;//谁评论了
private int entityType;//下面这几个是触发的对象的一些信息//评论类型
private int entityId;//评论id
private int entityOwnerId;//这些信息是与某个人相关的
/**
* 扩展字段exts
* 事件发生的时候会有各种各样的信息,为了将信息保留下来存到map,读取方便,类似viewobject
* 事件处理需要的额外的数据,采用map的方式可以保证程序的扩展性
* 例如注册发送邮件的操作需要的数据和点赞通知需要的数据并不一样,所以用map存储最大程度地保证程序的灵活性
*/
private Map<String, String> exts = new HashMap<>();
public EventType getType() {
return type;
}
public EventModel setType(EventType type) {//为了链式调用,所以返回类型改为EventModel
this.type = type;
return this;
}
public int getActorId() {
return actorId;
}
public EventModel setActorId(int actorId) {
this.actorId = actorId;
return this;
}
public int getEntityType() {
return entityType;
}
public EventModel setEntityType(int entityType) {
this.entityType = entityType;
return this;
}
public int getEntityId() {
return entityId;
}
public EventModel setEntityId(int entityId) {
this.entityId = entityId;
return this;
}
public int getEntityOwnerId() {
return entityOwnerId;
}
public EventModel setEntityOwnerId(int entityOwnerId) {
this.entityOwnerId = entityOwnerId;
return this;
}
public Map<String, String> getExts() {
return exts;
}
public EventModel setExts(Map<String, String> exts) {
this.exts = exts;
return this;
}
public EventModel setExt(String key,String value){
exts.put(key,value);
return this;
}
public String getExt(String key) {
return exts.get(key);
}
}
在组织eventModel时,我们应该保证灵活性,将必须的变量抽取出来之余,用一个map结构来存储具体业务可能需要的额外数据。
③producer类:EventProducer
producer的功能较为简单,只是将eventModel进行序列化,然后将它添加进相应的队列,具体代码如下:
package com.nowcoder.async;
import com.alibaba.fastjson.JSONObject;
import com.nowcoder.util.JedisAdapter;
import com.nowcoder.util.RedisKeyUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class EventProducer {
@Autowired
JedisAdapter jedisAdapter;
public boolean fireEvent(EventModel eventModel){
try {
String json= JSONObject.toJSONString(eventModel);//对象转化成json字符串
String key= RedisKeyUtil.getEventQueueKey();
jedisAdapter.lpush(key,json);//推到队列里
return true;
}catch (Exception e){
return false;
}
}
}
④handler接口设计:EventHandler
package com.nowcoder.async;
import java.util.List;
/**
* 事件处理器:用于处理事件队列里面的事件,被eventConsumer调用
* doHandler:model是具体的事件模型,它需要由调用者(一般是comsumer)传进来
*/
public interface EventHandler {
void doHandle(EventModel model);
/**
*
* @return 表明该接口是什么类型的handler,list表明handler可以支持多个业务,也就是说,一个handler可以对应多个eventType
*例如说,sendEmailHandler,邮件发送handler,具体业务例如注册**的事件类型,点赞的发邮件通知时间类型都会需要这个handler,
*所以一个handler是有必要对应多个eventType的,所以,具体handler实现类中必须有一个list变量来存储它对应的事件类型
*/
List<EventType> getSupportEventTypes();
}
⑤comsumer类:EventComsumer
在这个异步事件处理框架中,comsumer主要负责以下的职责:
- 读取事件队列中的eventModel对象,将它反序列化后,根据eventType负责调用具体的handler实现类;
- 在初始化的时候利用spring框架自动对handler具体实现类进行注册操作,并将之存储在一个map的数据结构中,key是eventType,value是handler具体实现类的对象。
具体的实现方式:
package com.nowcoder.async;
import com.alibaba.fastjson.JSON;
import com.nowcoder.util.JedisAdapter;
import com.nowcoder.util.RedisKeyUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* 事件处理类,该类负责调用handler,分发事件,把handler和事件的关系建立起来,对事件进行处理。需要实现spring的两个接口,InitializingBean接口是初始化时自动注册handler要用;
*ApplicationContextAware则是调用spring的applicationContext(该applicationContext中存储着handler具体实现类的bean对象)需要实现
* 的接口,通过applicationContext获取handler对应的beans,然后就可以将handler自动注册到下面的config对象中了(是一个map)
*/
@Service
public class EventComsumer implements InitializingBean, ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(EventComsumer.class);
/**
* config:该变量用于存储EventHandler和eventType的映射关系,在消费时,可以直接根据config中你的映射关系进行handler调用
* 注意,这里为了保证程序的灵活性,eventHandler用一个list进行存储,因为有可能一个EventType事件类型可能对应多个
* handler事件处理对象,例如点赞通知这个事件类型可能需要通知被点赞的人以及通知系统管理员,所以应该对应两个事件handler
*/
private Map<EventType, List<EventHandler>> config=new HashMap<>();
/**
* spring上下文对象,该对象存储着handler bean对象,必须通过setApplicationContext(ApplicationContextAware接口的实现方法)
* 进行初始化,这样才能获取spring中的handler具体实现类的beans
*/
private ApplicationContext applicationContext;
@Autowired
JedisAdapter jedisAdapter;
/**
* spring对该对象进行初始化的时候,将所有的handler具体对象注册到config对象中
*/
@Override
public void afterPropertiesSet() throws Exception {
Map<String, EventHandler> beans = applicationContext.getBeansOfType(EventHandler.class);//获取所有handler类
//把handler对象注册到config中
if(beans!=null){
for (Map.Entry<String,EventHandler> entry:beans.entrySet()){
List<EventType> eventTypes=entry.getValue().getSupportEventTypes();
//由于一个handler也可能对应多个事件类型,所以一个handler要注册到所有的eventType中去
for (EventType type:eventTypes){
if (!config.containsKey(type)){
config.put(type,new ArrayList<EventHandler>());
}
config.get(type).add(entry.getValue());
}
}
}
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
while (true){
String key= RedisKeyUtil.getEventQueueKey();
List<String> events=jedisAdapter.brpop(0,key);//从队列中取事件,操作时间是0,如果队列中没有时间,就一直卡着。当取到event,需要去找关联的各种handler去处理
for(String message:events){
if(message.equals(key)){//第一个值可能是key,这是返回值的原因.过滤掉
continue;
}
//反序列化
EventModel eventModel= JSON.parseObject(message,EventModel.class);
if (!config.containsKey(eventModel.getType())){
logger.error("不能识别的事件");
continue;
}
//执行handler
for (EventHandler handler:config.get(eventModel.getType())){
handler.doHandle(eventModel);
}
}
}
}
});
thread.start();
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext=applicationContext;
}
}
参考:https://www.cnblogs.com/lcplcpjava/p/6884420.html