如何高效的触发设备离线
程序员文章站
2022-04-19 11:20:14
...
业务场景
业务开发过程中,我们经常会需要判断远程终端是否在线,当终端离线的时候我们需要发送消息告知相应的系统,
例如:设备通过mqtt与服务端进行交互,通过心跳来上报设备的运行状态,如果超过30秒未上报设备信息则判断设备离线关闭相应的连接通道
一般处理这种业务场景有3种比较常见的方法
轮询法
- 使用一个Map<uid, last_time>来记录设备每次心跳的最后时间,
- 当设备发送心跳包的时候记录这个设备的心跳时间到Map中
- 启动一个定时异步线程扫描这个map。寻找到超时心跳的设备id
简单做了一下测试,map中保存100W的数据。定时线程每秒都扫描一次map。每次扫描消耗大量的时间以及系统资源,一个设备过期基本要在几百毫秒后才会被发现
延时任务
- 每次接收到设备的心跳消息存储到map中
- 为每个设备都建立一个延时30秒的异步线程任务
- 到期设备状态未更新则判断设备离线
此方法在少量设备连接是可以做的设备状态及时更新,但在大量设备连接时会占用大量系统资源,并不可取。
环形队列
- 创建一个index从0到30的环形队列(本质是个数组)
- 环上每一个slot是一个Set,任务集合
- 同时还有一个Map<uid, index>,记录uid落在环上的哪个slot里
- 启动一个timer,每隔1s,在上述环形队列中移动一格,0->1->2->3…->29->30->0…
- 有一个Current Index指针来标识刚检测过的slot
- 接收到设备心跳后将寻找到原来uid的位置然后移动到当前指针的后一位,并删除原来slot里的uid
- 这样就可以快速获取超时的设备uid
网上找的环形队列示意图
环形队列实现
package com.iot.simulator.ipc.study;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class CircleQueue<T> {
//线程安全锁
Lock lock = new ReentrantLock();
//初始环形队列大小
private int capacity = 30;
//当前环形队列所在节点
private volatile int currentIndex = 0;
//数据所在节点
private Map<T,Integer> dataIndex = new HashMap<>();
//环形队列
private Set<T>[] array;
public CircleQueue(){
array = new HashSet[capacity];
}
public CircleQueue(int capacity){
this.capacity = capacity;
array = new HashSet[capacity];
}
public void add(T t){
try {
lock.lock();
//判断数据是否存在
if(dataIndex.containsKey(t)){
Set<T> old = array[dataIndex.get(t)];
old.remove(t);
}
//获取当前节点的队列
Set<T> set = null;
int previous = currentIndex-1;
if(previous<0){
previous = array.length -1;
}
set = array[previous];
if(null == set){
set = new HashSet<>();
array[previous] = set;
}
set.add(t);
//更新新的节点位置
dataIndex.put(t,previous);
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public Set<T> getData(){
try {
lock.lock();
Set<T> set = array[currentIndex];
currentIndex++;
//将数组重新致为起始位置
if(currentIndex==capacity){
currentIndex = 0;
}
return set;
}finally {
lock.unlock();
}
}
public int getIndex(T t){
if(dataIndex.containsKey(t)){
return dataIndex.get(t);
}
return -1;
}
}
测试代码
public class DeviceOnlineCheck {
ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
List<String> deviceList = new CopyOnWriteArrayList();
Long first = 0L;
String lastUuid = null;
@Test
public void circleTest(){
CircleQueue<String> circleQueue = new CircleQueue<>();
addCircleDate(circleQueue);
checkCircleTime(circleQueue);
try {
Thread.sleep(60000);
}catch (Exception e){
e.printStackTrace();
}
}
private void addCircleDate(CircleQueue circleQueue){
for (int i=0;i<100000;i++){
String uuid = UUID.randomUUID().toString();
deviceList.add(uuid);
circleQueue.add(uuid);
if(i==(100000 -1)){
lastUuid = uuid;
first = System.currentTimeMillis();
System.out.println(uuid+"初始时间"+first);
}
}
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if(!CollectionUtils.isEmpty(deviceList)){
for (String uuid:deviceList){
if(!uuid.equals(lastUuid)){
circleQueue.add(uuid);
}else {
System.out.println(uuid+"不移动位置"+circleQueue.getIndex(uuid));
}
}
}
}
},5,10, TimeUnit.SECONDS);
}
private void checkCircleTime(CircleQueue circleQueue){
service.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if(!CollectionUtils.isEmpty(deviceList)){
Long time = System.currentTimeMillis();
Set<String> set =circleQueue.getData();
if(!CollectionUtils.isEmpty(set)){
System.err.println(set.size());
System.err.println(lastUuid+"过期");
System.err.println("超时时间"+(time-first));
}
}
}
},1,1,TimeUnit.SECONDS);
}
}
测试结果
148aed53-a083-40a6-82d3-ede14e5e39c9初始时间1585571543739
当前位置:0数据大小:0
当前位置:1数据大小:0
当前位置:2数据大小:0
当前位置:3数据大小:0
当前位置:4数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移动位置29
当前位置:5数据大小:0
当前位置:6数据大小:0
当前位置:7数据大小:0
当前位置:8数据大小:0
当前位置:9数据大小:0
当前位置:10数据大小:0
当前位置:11数据大小:0
当前位置:12数据大小:0
当前位置:13数据大小:0
当前位置:14数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移动位置29
当前位置:15数据大小:0
当前位置:16数据大小:0
当前位置:17数据大小:0
当前位置:18数据大小:0
当前位置:19数据大小:0
当前位置:20数据大小:0
当前位置:21数据大小:0
当前位置:22数据大小:0
当前位置:23数据大小:0
当前位置:24数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移动位置29
当前位置:25数据大小:0
当前位置:26数据大小:0
当前位置:27数据大小:0
当前位置:28数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9过期
超时时间30005
基本到时就扫描出过期的设备,而且只启动了一个线程,并且1秒执行一次。大大减少了系统的开支。