Netty中HashWheelTimer的使用
最近在写项目的时候, 需要用到延迟任务. 需求如下: 用户通过微信绑定一个设备的开关机时间, 可以选择一周内哪几天需要开启这个定时任务, 就像我们得手机闹钟一样. 因此用到了netty的HashedWheelTimer
时间轮计时器来处理这个问题.
什么是时间轮? 简单来说, 就像我们的时钟一样,上面有很多格子, 本质上一个wheel是一个哈希表,每个延时任务通过散列函数放入对应的位置. 每个格子中的延时任务是一个双向链表, 当"指针"指到哪个格子中的时候, 格子中的第一个任务便开始执行, 这样的设计方便取消和添加任务.
一个Timer的构造函数有几个重要的参数:
1.tickDuration: tick一次需要的时间, 默认100ms
2.tickPerWheel: 每个wheel需要tick多少次, 即每个wheel有多少个格子,默认512个
3.timeUntil: tickDuration的时间单位
4.ThreadFactory: 负责创建worker线程
除了构造函数, 还有一个比较重要的概念.
轮(round): 一轮的时长 tickDuration*tickPerWheel,也就是转一圈的时长. 其中Worker线程是HashedWheelTimer的核心,主要负责每过tickDuration时间就累加一次tick. 同时, 也负责执行到期的timeout任务并添加timeout任务到指定的wheel中. 当添加timeout任务的时候, 会根据设置的时间, 来计算需要等待的时间长度, 根据时间长度,进而计算出要经过多少次tick,然后根据tick的次数来计算进过多少轮,最终得出任务在wheel中的位置.
例如: 如果任务设置为在100s后执行, 按照默认的配置tickDuration=100ms, tickPerWheel=512.
任务需要经过的tick = (100 * 1000) / 100 = 1000次
任务需要经过的轮数 = 1000 / 512 = 1.....488
这表明这个定时任务会放到第一轮的第488的索引的位置.
大致了解了时间轮的原理, 看看我们的需求应该如何实现呢? 首先,用户通过微信绑定了设备的开关机时间,以及一周内哪几天需要执行开关机任务,按照面向对象的思想,我们需要建立两个类, 一个是用户开关机的数据UserTimer
, 还有一个是用户的开关机任务UserTask
.
/**
* 用户定时任务数据
*/
@Data
public class UserTimer {
/**
* 设备id
*/
private int eId;
/**
* 每周哪几天需要定时(数据:1100111 周日~周六)
* 1代表需要开启定时 0代表不需要开启定时
*/
private String week;
/**
* 开机时间 例如:08:00
*/
private String openTime;
/**
* 关机时间 例如:23:23
*/
private String closeTime;
}
/**
* 用户定时任务
*/
@Data
public class UserTask {
/**
* 开机任务
*/
private TimerTask openTask;
/**
* 关机任务
*/
private TimerTask closeTask;
public TimerTask getOpenTask() {
return openTask;
}
public TimerTask getCloseTask() {
return closeTask;
}
public UserTask(TimerTask openTask, TimerTask closeTask) {
this.openTask = openTask;
this.closeTask = closeTask;
}
}
用户的开关机定时任务和数据已经封装好, 开始写我们的接口Controller.
url=server/client/device/usertimer?Mac=9999&closetime=20:30&opentime=20:22&week=0111001
首先我们需要思考: 如果用户操作失误或者的确需要修改设备的开关机时间的话,上一次的开关机任务如何取消?因为此时任务已经添加到队列中了.
我的做法是: 将用户的开关机任务和开关机数据放到一个map中, key为设备的id, value是开关机数据或者开关机任务:
private static Map<Integer, UserTask> taskMap = new ConcurrentHashMap<>();
private static Map<Integer, UserTimer> timerMap = new ConcurrentHashMap<>();
这样每当用户修改的时候这个map可以随时更新 (这种做法其实并不好, 因为如果程序重启, 数据就会丢失, 最好用redis存储) , 但是这样还不够, 因为我们还是不能取消上次的任务, 通过分析源码, 我们可以发现, 每次启动一个延时任务的时候,都会返回一个绑定这个任务的句柄timeout, 通过这个句柄我们可以取消定时任务, 因此再创建一个map保存用户任务的句柄:
private static Map<Integer, UserTimeout> userTimeoutMap = new ConcurrentHashMap<>();
用户延迟任务的句柄UserTimeout:
/**
* 用户延时任务句柄,用来取消延时任务
*/
public class UserTimeout {
/**
* 设备id
*/
private int eId;
/**
* 开机任务句柄
*/
private Timeout startTimeout;
/**
* 关机任务句柄
*/
private Timeout closeTimeout;
public int geteId() {
return eId;
}
public Timeout getStartTimeout() {
return startTimeout;
}
public Timeout getCloseTimeout() {
return closeTimeout;
}
public UserTimeout(int eId, Timeout startTimeout, Timeout closeTimeout) {
this.eId = eId;
this.startTimeout = startTimeout;
this.closeTimeout = closeTimeout;
}
}
这些变量设置好之后, 我们开始实现主要功能:
public ResMsg deviceOnOffTimerController(@RequestParam("Mac") int Mac, @RequestParam("closetime")String closetime,@RequestParam("opentime")
String opentime, @RequestParam("week")String week) throws ParseException {
opentime += ":00";
closetime += ":00";
UserTimer userTimer = new UserTimer(Mac, week, opentime, closetime);
System.out.println("用户设置的定时任务:" + userTimer);
timerMap.put(Mac, userTimer); //保存用户定时数据
Water water = JedisUtil6379.getaDevice(Mac + "");
Map<String, String> mapWater = WaterToMap.toMapWater(water);
TimerTask openTask = openTask(Mac, mapWater); //开机任务
TimerTask closeTask = closeTask(Mac, mapWater); //关机任务
UserTask userTask = new UserTask(openTask, closeTask); //用户定时任务
if (null != taskMap.get(Mac)) { //用户以前设置过定时任务,更新
cancel(Mac); //取消上次的定时任务
taskMap.put(Mac, userTask); //保存用户定时任务
startUserTask(timerMap.get(Mac), userTask);//启动定时任务
} else { //用户第一次设置定时
taskMap.put(Mac, userTask);
startUserTask(timerMap.get(Mac), userTask);
}
return new ResMsg("success");
/**
* 取消用户定时任务
* 用户绑定定时任务可能需要需改,此时需要取消上次的定时任务
* 将最新的定时任务放入时间轮
*/
private void cancel(int eId) {
UserTimeout userTimeout = userTimeoutMap.get(eId);
//根据设备号,得到开关机任务的句柄
Timeout startTimeout = userTimeout.getStartTimeout();
Timeout closeTimeout = userTimeout.getCloseTimeout();
if (startTimeout != null) { //如果今天开关机任务都要执行,全部取消
startTimeout.cancel();
closeTimeout.cancel();
} else { //如果今天只执行关机任务,只取消关机任务
closeTimeout.cancel();
}
}
/**
* 启动用户定时任务
* @param userTimer 用户设置的定时数据
* @param userTask 用户开关机定时任务
*/
private void startUserTask(UserTimer userTimer, UserTask userTask) throws ParseException {
String week = userTimer.getWeek();
String startTime = userTimer.getOpenTime();
String endTime = userTimer.getCloseTime();
int[] weekArray = weekArray(week);
for (int i : weekArray) {
int weekOfDate = DateUtils.getWeekOfDate(new Date()); //获得当前是星期几
long c = System.currentTimeMillis();
String s = DateUtils.getShotDate(new Date()) + " " + startTime;
long s1 = DateUtils.dateToStamp(s);
String e = DateUtils.getShotDate(new Date()) + " " + endTime;
long e1 = DateUtils.dateToStamp(e);
if (weekOfDate == i) {
if (s1 - c > 0 && e1 - c > 0) { //绑定时间在开关机之前,那么今天就要执行开关机
Timeout startTimeout = timer.newTimeout(userTask.getOpenTask(), s1 - c, TimeUnit.MILLISECONDS);
Timeout closeTimeout = timer.newTimeout(userTask.getCloseTask(), e1 - c, TimeUnit.MILLISECONDS);
UserTimeout userTimeout = new UserTimeout(userTimer.getEId(), startTimeout, closeTimeout);
userTimeoutMap.put(userTimer.getEId(), userTimeout); //保存用户定时任务的句柄
} else if (s1 - c < 0 && e1 - c > 0) { //绑定时间在开关机中间,那没今天只执行关机
Timeout closeTimeout = timer.newTimeout(userTask.getCloseTask(), e1 - c, TimeUnit.MILLISECONDS);
UserTimeout userTimeout = new UserTimeout(userTimer.getEId(), null, closeTimeout);
userTimeoutMap.put(userTimer.getEId(), userTimeout);
}
break;
}
}
}
这样我们就完成了,用户绑定设备的开关机操作.
HashWheelTimeer源码