欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

如何高效的触发设备离线

程序员文章站 2022-04-19 11:20:14
...

业务场景

业务开发过程中,我们经常会需要判断远程终端是否在线,当终端离线的时候我们需要发送消息告知相应的系统,
例如:设备通过mqtt与服务端进行交互,通过心跳来上报设备的运行状态,如果超过30秒未上报设备信息则判断设备离线关闭相应的连接通道

一般处理这种业务场景有3种比较常见的方法

轮询法
  1. 使用一个Map<uid, last_time>来记录设备每次心跳的最后时间,
  2. 当设备发送心跳包的时候记录这个设备的心跳时间到Map中
  3. 启动一个定时异步线程扫描这个map。寻找到超时心跳的设备id

简单做了一下测试,map中保存100W的数据。定时线程每秒都扫描一次map。每次扫描消耗大量的时间以及系统资源,一个设备过期基本要在几百毫秒后才会被发现

延时任务
  1. 每次接收到设备的心跳消息存储到map中
  2. 为每个设备都建立一个延时30秒的异步线程任务
  3. 到期设备状态未更新则判断设备离线

此方法在少量设备连接是可以做的设备状态及时更新,但在大量设备连接时会占用大量系统资源,并不可取。

环形队列
  1. 创建一个index从0到30的环形队列(本质是个数组)
  2. 环上每一个slot是一个Set,任务集合
  3. 同时还有一个Map<uid, index>,记录uid落在环上的哪个slot里
  4. 启动一个timer,每隔1s,在上述环形队列中移动一格,0->1->2->3…->29->30->0…
  5. 有一个Current Index指针来标识刚检测过的slot
  6. 接收到设备心跳后将寻找到原来uid的位置然后移动到当前指针的后一位,并删除原来slot里的uid
  7. 这样就可以快速获取超时的设备uid

网上找的环形队列示意图
如何高效的触发设备离线
环形队列实现

package com.iot.simulator.ipc.study;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CircleQueue<T> {
    //线程安全锁
    Lock lock = new ReentrantLock();

    //初始环形队列大小
    private int capacity = 30;

    //当前环形队列所在节点
    private volatile int currentIndex = 0;

    //数据所在节点
    private Map<T,Integer> dataIndex = new HashMap<>();

    //环形队列
    private Set<T>[] array;

    public CircleQueue(){
        array = new HashSet[capacity];
    }

    public CircleQueue(int capacity){
        this.capacity = capacity;
        array = new HashSet[capacity];
    }

    public void add(T t){
        try {
            lock.lock();
            //判断数据是否存在
            if(dataIndex.containsKey(t)){
                Set<T> old  =  array[dataIndex.get(t)];
                old.remove(t);
            }
            //获取当前节点的队列
            Set<T> set = null;
            int previous = currentIndex-1;
            if(previous<0){
                previous = array.length -1;
            }
            set = array[previous];
            if(null == set){
                set = new HashSet<>();
                array[previous] = set;
            }
            set.add(t);
            //更新新的节点位置
            dataIndex.put(t,previous);
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            lock.unlock();
        }

    }

    public Set<T> getData(){
        try {
            lock.lock();
            Set<T> set = array[currentIndex];
            currentIndex++;
            //将数组重新致为起始位置
            if(currentIndex==capacity){
                currentIndex = 0;
            }
            return set;
        }finally {
            lock.unlock();
        }
    }

    public int getIndex(T t){
        if(dataIndex.containsKey(t)){
            return dataIndex.get(t);
        }
        return -1;
    }
}

测试代码

public class DeviceOnlineCheck {

	ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
    
    List<String> deviceList = new CopyOnWriteArrayList();

    Long first = 0L;

    String lastUuid = null;
	@Test
    public void circleTest(){
        CircleQueue<String> circleQueue = new CircleQueue<>();
        addCircleDate(circleQueue);
        checkCircleTime(circleQueue);
        try {
            Thread.sleep(60000);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    private void addCircleDate(CircleQueue circleQueue){
        for (int i=0;i<100000;i++){
            String uuid = UUID.randomUUID().toString();
            deviceList.add(uuid);
            circleQueue.add(uuid);
            if(i==(100000 -1)){
                lastUuid = uuid;
                first = System.currentTimeMillis();
                System.out.println(uuid+"初始时间"+first);
            }
        }
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                if(!CollectionUtils.isEmpty(deviceList)){
                    for (String uuid:deviceList){
                        if(!uuid.equals(lastUuid)){
                            circleQueue.add(uuid);
                        }else {
                            System.out.println(uuid+"不移动位置"+circleQueue.getIndex(uuid));

                        }
                    }
                }
            }
        },5,10, TimeUnit.SECONDS);
    }

    private void checkCircleTime(CircleQueue circleQueue){
        service.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                if(!CollectionUtils.isEmpty(deviceList)){
                    Long time = System.currentTimeMillis();
                    Set<String> set =circleQueue.getData();
                    if(!CollectionUtils.isEmpty(set)){
                        System.err.println(set.size());
                        System.err.println(lastUuid+"过期");
                        System.err.println("超时时间"+(time-first));
                    }

                }
            }
        },1,1,TimeUnit.SECONDS);
    }
}

测试结果

148aed53-a083-40a6-82d3-ede14e5e39c9初始时间1585571543739
当前位置:0数据大小:0
当前位置:1数据大小:0
当前位置:2数据大小:0
当前位置:3数据大小:0
当前位置:4数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移动位置29
当前位置:5数据大小:0
当前位置:6数据大小:0
当前位置:7数据大小:0
当前位置:8数据大小:0
当前位置:9数据大小:0
当前位置:10数据大小:0
当前位置:11数据大小:0
当前位置:12数据大小:0
当前位置:13数据大小:0
当前位置:14数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移动位置29
当前位置:15数据大小:0
当前位置:16数据大小:0
当前位置:17数据大小:0
当前位置:18数据大小:0
当前位置:19数据大小:0
当前位置:20数据大小:0
当前位置:21数据大小:0
当前位置:22数据大小:0
当前位置:23数据大小:0
当前位置:24数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9不移动位置29
当前位置:25数据大小:0
当前位置:26数据大小:0
当前位置:27数据大小:0
当前位置:28数据大小:0
148aed53-a083-40a6-82d3-ede14e5e39c9过期
超时时间30005

基本到时就扫描出过期的设备,而且只启动了一个线程,并且1秒执行一次。大大减少了系统的开支。

相关标签: 项目经验