欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

TBSchedule源码学习笔记-线程组任务调度

程序员文章站 2024-03-02 16:04:46
...

根据上文的启动过程,找到了线程组的实现。com.taobao.pamirs.schedule.taskmanager.TBScheduleManager

/**
 * 1、任务调度分配器的目标:    让所有的任务不重复,不遗漏的被快速处理。
 * 2、一个Manager只管理一种任务类型的一组工作线程。
 * 3、在一个JVM里面可能存在多个处理相同任务类型的Manager,也可能存在处理不同任务类型的Manager。
 * 4、在不同的JVM里面可以存在处理相同任务的Manager 
 * 5、调度的Manager可以动态的随意增加和停止
 * 
 * 主要的职责:
 * 1、定时向集中的数据配置中心更新当前调度服务器的心跳状态
 * 2、向数据配置中心获取所有服务器的状态来重新计算任务的分配。这么做的目标是避免集中任务调度中心的单点问题。
 * 3、在每个批次数据处理完毕后,检查是否有其它处理服务器申请自己把持的任务队列,如果有,则释放给相关处理服务器。
 *  
 * 其它:
 *   如果当前服务器在处理当前任务的时候超时,需要清除当前队列,并释放已经把持的任务。并向控制主动中心报警。
 * 
 * @author xuannan
 *
 */
@SuppressWarnings({ "rawtypes", "unchecked" })
abstract class TBScheduleManager implements IStrategyTask {
    //.......//
}

这个类的构造方法是这样的

TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{
        //.......//
    }  

这里发现有两个老朋友,aFactory就是那个调度服务器对象,baseTaskType是设置的平台类型,aScheduleCenter调度配置中心客户端接口,那么ownSign是什么?
以下取自官方文档:

OwnSign环境区域
是对运行环境的划分,进行调度任务和数据隔离。例如:开发环境、测试环境、预发环境、生产环境。
不同的开发人员需要进行数据隔离也可以用OwnSign来实现,避免不同人员的数据冲突。缺省配置的环境区域OwnSign=’BASE’。
例如:TaskType=’DataDeal’,配置的队列是0、1、2、3、4、5、6、7、8、9。缺省的OwnSign=’BASE’。
此时如果再启动一个测试环境,则Schedule会动态生成一个TaskType=’DataDeal-Test’的任务类型,环境会作为一个变量传递给业务接口,
由业务接口的实现类,在读取数据和处理数据的时候进行确定。业务系统一种典型的做法就是在数据表中增加一个OWN_SIGN字段。
在创建数据的时候根据运行环境填入对应的环境名称,在Schedule中就可以环境的区分了。

com.taobao.pamirs.schedule.taskmanager.IScheduleDataManager 这个接口要好好看下,定义的方法有点多,但是一眼看下来更多的方法是为控制台页面提供服务了,例如创建任务。诸如任务和任务相等信息的维护和查询都在这个接口中定义。

那么TBScheduleManager 这个构造函数就用到了一个,继续打开构造函数看

TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{
        this.factory = aFactory;
        //private static int nextSerialNumber = 0; 生成用户标志不同的线程序号,用于区分不同的线程组
        this.currentSerialNumber = serialNumber();
        this.scheduleCenter = aScheduleCenter;
        //按照任务类型加载调度任务信息
        this.taskTypeInfo = this.scheduleCenter.loadTaskTypeBaseInfo(baseTaskType);
        log.info("create TBScheduleManager for taskType:"+baseTaskType);
        //清除已经过期1天的TASK,OWN_SIGN的组合。超过一天没有活动server的视为过期
        //为什么要清除?如果任务确实能跑一天怎么办
        this.scheduleCenter.clearExpireTaskTypeRunningInfo(baseTaskType,ScheduleUtil.getLocalIP() + "清除过期OWN_SIGN信息",this.taskTypeInfo.getExpireOwnSignInterval());
        //通过调度服务器提供的方法获得对应的bean
        Object dealBean = aFactory.getBean(this.taskTypeInfo.getDealBeanName());
        if (dealBean == null) {
            throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 不存在");
        }
        //如果没有实现调度器对外的基础接口,肯定不能用啊
        if (dealBean instanceof IScheduleTaskDeal == false) {
            throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 没有实现 IScheduleTaskDeal接口");
        }
        this.taskDealBean = (IScheduleTaskDeal)dealBean;
        //任务的配置校验,为什么要大于5倍 出发点是什么
        if(this.taskTypeInfo.getJudgeDeadInterval() < this.taskTypeInfo.getHeartBeatRate() * 5){
            throw new Exception("数据配置存在问题,死亡的时间间隔,至少要大于心跳线程的5倍。当前配置数据:JudgeDeadInterval = "
                    + this.taskTypeInfo.getJudgeDeadInterval() 
                    + ",HeartBeatRate = " + this.taskTypeInfo.getHeartBeatRate());
        }
        //这个currenScheduleServer是类"com.taobao.pamirs.schedule.taskmanager.ScheduleServer"的实例,存储了当前调度服务的信息
        this.currenScheduleServer = ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber());
        this.currenScheduleServer.setManagerFactoryUUID(this.factory.getUuid());
        //向调度中心客户端注册调度服务信息
        scheduleCenter.registerScheduleServer(this.currenScheduleServer);
        this.mBeanName = "pamirs:name=" + "schedule.ServerMananger." +this.currenScheduleServer.getUuid();
        //又启动了一个定时任务,看这名字就知道是心跳
        this.heartBeatTimer = new Timer(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-HeartBeat");
        this.heartBeatTimer.schedule(new HeartBeatTimerTask(this),
                new java.util.Date(System.currentTimeMillis() + 500),
                this.taskTypeInfo.getHeartBeatRate());
        //对象创建时需要做的初始化工作,模版方法。
        initial();
    }  

目前能观察到的这个构造函数流程

1.按照任务名称加载任务配置
2.清除过期OWN_SIGN信息
3.检查任务配置是否正确
4.将调度服务信息注册到调度中心客户端
5.其他初始化操作

记录一下tbschedule对任务配置的存储方式(控制台页面”任务管理”)
控制台:
TBSchedule源码学习笔记-线程组任务调度
zk节点:
TBSchedule源码学习笔记-线程组任务调度

节点内容格式化之后

{
    "baseTaskType": "commonSyncAdvertiserTask",
    "heartBeatRate": 5000,
    "judgeDeadInterval": 60000,
    "sleepTimeNoData": 500,
    "sleepTimeInterval": 0,
    "fetchDataNumber": 500,
    "executeNumber": 10,
    "threadNumber": 5,
    "processorType": "SLEEP",
    "permitRunStartTime": "0 * * * * ?",
    "expireOwnSignInterval": 1,
    "dealBeanName": "commonSyncAdvertiserTask",
    "taskParameter": "3",
    "taskKind": "static",
    "taskItems": [
        "0",
        "1",
        "2",
        "3",
        "4",
        "5",
        "6",
        "7",
        "8",
        "9"
    ],
    "maxTaskItemsOfOneThreadGroup": 0,
    "version": 0,
    "sts": "resume"
}

第4步主要包含以下代码行

//这个currenScheduleServer是类"com.taobao.pamirs.schedule.taskmanager.ScheduleServer"的实例,存储了当前调度服务的信息
this.currenScheduleServer = ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber());
        this.currenScheduleServer.setManagerFactoryUUID(this.factory.getUuid());
        //向调度中心客户端注册调度服务信息
        scheduleCenter.registerScheduleServer(this.currenScheduleServer);

当前调度服务信息都包含什么,有什么是状态量,向调度中心客户端注册的意图是什么?打开ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber()); 这个代码看一下,这个ScheduleServer实例融合了哪些信息

public static ScheduleServer createScheduleServer(IScheduleDataManager aScheduleCenter,String aBaseTaskType,
            String aOwnSign, int aThreadNum)
            throws Exception {
        ScheduleServer result = new ScheduleServer();
        //调度任务类型(beanName)
        result.baseTaskType = aBaseTaskType;
        result.ownSign = aOwnSign;
        //会将调度类型和环境区域 按这种方式组合baseType+"$" + ownSign
        result.taskType = ScheduleUtil.getTaskTypeByBaseAndOwnSign(
                aBaseTaskType, aOwnSign);
        result.ip = ScheduleUtil.getLocalIP();
        result.hostName = ScheduleUtil.getLocalHostName();
        //aScheduleCenter.getSystemTime() 从哪来
        result.registerTime = new Timestamp(aScheduleCenter.getSystemTime());
        //任务配置的线程数
        result.threadNum = aThreadNum;
        result.heartBeatTime = null;
        result.dealInfoDesc = "调度初始化";
        result.version = 0;
        //厉害了这里生成了一个自己的uuid
        result.uuid = result.ip
                + "$"
                + (UUID.randomUUID().toString().replaceAll("-", "")
                        .toUpperCase());
        SimpleDateFormat DATA_FORMAT_yyyyMMdd = new SimpleDateFormat("yyMMdd");
        String s = DATA_FORMAT_yyyyMMdd.format(new Date(aScheduleCenter.getSystemTime()));
        //这个作用是啥
        result.id = Long.parseLong(s) * 100000000
                + Math.abs(result.uuid.hashCode() % 100000000);
        return result;
    }

在这里就可以发现aOwnSign 的作用原理了,就是作为一个标记和任务类型拼接在一块,这样对于同样的任务不同的ownSign有不同的调度域,这个方法里有几个关键点:

result.registerTime = new Timestamp(aScheduleCenter.getSystemTime()); 这个属性的作用是?
result.uuid 这个作用是?
result.id 这个的作用是?为什么这么生成?为什么和aScheduleCenter.getSystemTime() 相关
aScheduleCenter.getSystemTime() 取到的是一个什么时间呢?

aScheduleCenter.getSystemTime() 取到的是个什么时间?之前只是初始化了IScheduleDataManager实现,估计构造函数里会有答案,看实现类com.taobao.pamirs.schedule.zk.ScheduleDataManager4ZK

public ScheduleDataManager4ZK(ZKManager aZkManager) throws Exception {
        this.zkManager = aZkManager;
        gson = new GsonBuilder().registerTypeAdapter(Timestamp.class,new TimestampTypeAdapter()).setDateFormat("yyyy-MM-dd HH:mm:ss").create();

        this.PATH_BaseTaskType = this.zkManager.getRootPath() +"/baseTaskType";

        if (this.getZooKeeper().exists(this.PATH_BaseTaskType, false) == null) {
            //创建了一个永久节点
            ZKTools.createPath(getZooKeeper(),this.PATH_BaseTaskType, CreateMode.PERSISTENT, this.zkManager.getAcl());
        }
        //当前服务器时间
        loclaBaseTime = System.currentTimeMillis();
        //Zookeeper服务器时间
        String tempPath = this.zkManager.getZooKeeper().create(this.zkManager.getRootPath() + "/systime",null, this.zkManager.getAcl(), CreateMode.EPHEMERAL_SEQUENTIAL);
        Stat tempStat = this.zkManager.getZooKeeper().exists(tempPath, false);

        zkBaseTime = tempStat.getCtime();
        ZKTools.deleteTree(getZooKeeper(), tempPath);
        if(Math.abs(this.zkBaseTime - this.loclaBaseTime) > 5000){
            log.error("请注意,Zookeeper服务器时间与本地时间相差 : " + Math.abs(this.zkBaseTime - this.loclaBaseTime) +" ms");
        }   
    }   
    public long getSystemTime(){
            //zk服务器时间+客户端启动时间
            return this.zkBaseTime + ( System.currentTimeMillis() - this.loclaBaseTime);
        }

可见这里返回了一个zk服务器时间+本地时间偏移量这样一个值,这里做法我是这么理解的,在分布式环境下可能存在各机器时钟不同步的情况(上下偏差),通过zk服务器时间加相对时间相当于把整个集群的时间都拨正了,这样时钟在tbschedule框架这里是可信的,所以可以把aScheduleCenter.getSystemTime()方法看作返回调度服务的集群可信时间,

目前为止以下两个问题还没有得到解答

result.uuid 这个作用是?
result.id 这个的作用是?为什么这么生成?为什么和aScheduleCenter.getSystemTime() 相关

只能接着看第4步里的scheduleCenter.registerScheduleServer(this.currenScheduleServer); 这个代码行,在对这个currenScheduleServer小宝贝做了些什么

public void registerScheduleServer(ScheduleServer server) throws Exception {
        //简单
        if(server.isRegister() == true){
            throw new Exception(server.getUuid() + " 被重复注册");
        }
        //本例 rootPath /dsp_official_0928_wd/schedule/
        //本例 /dsp_official_0928_wd/schedule/baseTaskType/commonSyncAdvertiserTask/commonSyncAdvertiserTask
        String zkPath = this.PATH_BaseTaskType + "/" + server.getBaseTaskType() +"/" + server.getTaskType();
        if (this.getZooKeeper().exists(zkPath, false) == null) {
            this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
        }
        //本例 /dsp_official_0928_wd/schedule/baseTaskType/commonSyncAdvertiserTask/commonSyncAdvertiserTask/server
        zkPath = zkPath +"/" + this.PATH_Server;
        if (this.getZooKeeper().exists(zkPath, false) == null) {
            this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
        }
        String realPath = null;
        //此处必须增加UUID作为唯一性保障
        String zkServerPath = zkPath + "/" + server.getTaskType() + "$"+ server.getIp() + "$"
                + (UUID.randomUUID().toString().replaceAll("-", "").toUpperCase())+"$";

        //创建临时节点
        realPath = this.getZooKeeper().create(zkServerPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT_SEQUENTIAL);

        //之前已经生成过uuid,那么为什么这里要重新设置一次????
        server.setUuid(realPath.substring(realPath.lastIndexOf("/") + 1));

        //
        Timestamp heartBeatTime = new Timestamp(this.getSystemTime());
        server.setHeartBeatTime(heartBeatTime);

        //server序列化存储到zk的节点上
        String valueString = this.gson.toJson(server);      
        this.getZooKeeper().setData(realPath,valueString.getBytes(),-1);
        server.setRegister(true);
    }

这个registerScheduleServer方法看到这里,都是写zk的查询操作和设置操作,在这里也会发现zk维护的内容,就是说创建了zk节点用于维护调度服务数据,便于控制台页面查询和调度服务器间同步

rootPath/baseTaskType/{策略管理>任务名称}/${任务管理>任务处理的SpringBean}/server 下每个任务组都有自己的节点,这个节点会存储调度服务数据,从数据内容上看该数据会因每一次调度而修改。

这里是一个调度节点的例子:
TBSchedule源码学习笔记-线程组任务调度

节点内容的格式化后如下:

{
    "uuid": "commonSyncAdvertiserTask$127.0.0.1$35542D3EE3984FEAB15525C319358DE6$0000000060",
    "id": 17120684352326,
    "taskType": "commonSyncAdvertiserTask",
    "baseTaskType": "commonSyncAdvertiserTask",
    "ownSign": "BASE",
    "ip": "127.0.0.1",
    "hostName": "adm_adc02",
    "threadNum": 5,
    "registerTime": "2017-12-06 18:04:30",
    "heartBeatTime": "2017-12-08 10:43:17",
    "lastFetchDataTime": "2017-12-08 10:43:00",
    "dealInfoDesc": "没有数据,暂停调度:FetchDataCount=2439,FetchDataNum=0,DealDataSucess=0,DealDataFail=0,DealSpendTime=0,otherCompareCount=0",
    "nextRunStartTime": "2017-12-08 10:44:00",
    "nextRunEndTime": "当不能获取到数据的时候pause",
    "version": 34144,
    "isRegister": true,
    "managerFactoryUUID": "127.0.0.1$adm_adc02$AB1899D1EA4F4783B25EC4B4E04A6A79$0000000081"
}

控制台页面观察到的数据如下:
TBSchedule源码学习笔记-线程组任务调度

那么问题来了,存储调度服务数据内容的节点值是什么时候被修改的,调度任务是怎么被启动的?
回到com.taobao.pamirs.schedule.taskmanager.TBScheduleManager 构造方法,最后一行

TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{
        //.......//
        initial();
    }  

这个initial() 方法的实现有两个,分别位于com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStaticcom.taobao.pamirs.schedule.taskmanager.TBScheduleManagerDynamic 经过之前的学习,已经很明确的知道就是使用的com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStatic实例,那么看一下这个方法的实现。这个方法里第一眼就看到一个关键字”start()”

public void initial() throws Exception{
        //线程啊。。
        new Thread(this.currenScheduleServer.getTaskType()  +"-" + this.currentSerialNumber +"-StartProcess"){
            @SuppressWarnings("static-access")
            public void run(){
                try{
                   //唉呀妈呀,这个log老熟悉了
                   log.info("开始获取调度任务队列...... of " + currenScheduleServer.getUuid());
                   while (isRuntimeInfoInitial == false) {
                      if(isStopSchedule == true){
                          log.debug("外部命令终止调度,退出调度队列获取:" + currenScheduleServer.getUuid());
                          return;
                      }
                      //log.error("isRuntimeInfoInitial = " + isRuntimeInfoInitial);
                      try{
                          //初始化运行期信息
                      initialRunningInfo();
                      //运行期信息是否初始化成功
                      isRuntimeInfoInitial = scheduleCenter.isInitialRunningInfoSucuss(
                                        currenScheduleServer.getBaseTaskType(),
                                        currenScheduleServer.getOwnSign());
                      }catch(Throwable e){
                          //忽略初始化的异常
                          log.error(e.getMessage(),e);
                      }
                      //每隔一秒重试一次
                      if(isRuntimeInfoInitial == false){
                          Thread.currentThread().sleep(1000);
                      }
                   }

                   int count =0;
                   //获得框架系统时间
                   lastReloadTaskItemListTime = scheduleCenter.getSystemTime();
                   //这个代码的动作是什么?getCurrentScheduleTaskItemListNow();,好像是获取调度队列,如果没拿到就每隔一秒重试
                   while(getCurrentScheduleTaskItemListNow().size() <= 0){
                          if(isStopSchedule == true){
                              log.debug("外部命令终止调度,退出调度队列获取:" + currenScheduleServer.getUuid());
                              return;
                          }
                          Thread.currentThread().sleep(1000);
                          count = count + 1;
                         // log.error("尝试获取调度队列,第" + count + "次 ") ;
                   }

                   String tmpStr ="TaskItemDefine:";
                   //遍历调度队列
                   for(int i=0;i< currentTaskItemList.size();i++){
                       if(i>0){
                           tmpStr = tmpStr +",";                           
                       }
                       //哦?循环体内字符串拼接......
                       tmpStr = tmpStr + currentTaskItemList.get(i);
                   }

                   log.info("获取到任务处理队列,开始调度:" + tmpStr +"  of  "+ currenScheduleServer.getUuid());

                    //任务总量
                    taskItemCount = scheduleCenter.loadAllTaskItem(currenScheduleServer.getTaskType()).size();
                    //只有在已经获取到任务处理队列后才开始启动任务处理器                
                    computerStart();
                }catch(Exception e){
                    log.error(e.getMessage(),e);
                    String str = e.getMessage();
                    if(str.length() > 300){
                        str = str.substring(0,300);
                    }
                    startErrorInfo = "启动处理异常:" + str;
                }
            }
        }.start();
    }

看到这里疑问满满,初始化运行期信息都做了些什么??,这里貌似没有看到任务项?任务项在哪里初始化的?线程组怎么按照任务项拆分的??
看初始化运行期信息相关的代码,目前就两行

public void initial() throws Exception{
    new Thread(this.currenScheduleServer.getTaskType()  +"-" + this.currentSerialNumber +"-StartProcess"){
        @SuppressWarnings("static-access")
        public void run(){
            //......//
            //初始化运行期信息
            initialRunningInfo();
            //运行期信息是否初始化成功
            isRuntimeInfoInitial = scheduleCenter.isInitialRunningInfoSucuss(
                                                    currenScheduleServer.getBaseTaskType(),
                                                    currenScheduleServer.getOwnSign());
            //......//  
        }
    }.start();
}

initialRunningInfo() 在做些什么?打开代码看:

public void initialRunningInfo() throws Exception{
        //清除过期的调度任务
        scheduleCenter.clearExpireScheduleServer(this.currenScheduleServer.getTaskType(),this.taskTypeInfo.getJudgeDeadInterval());
        //拿到任务数据节点名
        List<String> list = scheduleCenter.loadScheduleServerNames(this.currenScheduleServer.getTaskType());
        //如果当前是leader节点
        if(scheduleCenter.isLeader(this.currenScheduleServer.getUuid(),list)){
            //是第一次启动,先清楚所有的垃圾数据
            log.debug(this.currenScheduleServer.getUuid() + ":" + list.size());
            //初始化任务调度的域信息和静态任务信息
            //如果是leader节点,做这样的操作
            //清除zk任务项节点并重建 
            //      //本例 rootPath /dsp_official_0928_wd/schedule/
                    //本例 /dsp_official_0928_wd/schedule/baseTaskType/commonSyncAdvertiserTask/commonSyncAdvertiserTask/taskItem
                    //调用public void createScheduleTaskItem(String baseTaskType, String ownSign,String[] baseTaskItems) throws Exception 方法创建任务项
            this.scheduleCenter.initialRunningInfo4Static(this.currenScheduleServer.getBaseTaskType(), this.currenScheduleServer.getOwnSign(),this.currenScheduleServer.getUuid());
        }
     }

createScheduleTaskItem(String baseTaskType, String ownSign,String[] baseTaskItems) throws Exception 方法创建任务项就是说根据任务管理里面的任务项配置在zk下创建节点
TBSchedule源码学习笔记-线程组任务调度

/**
     * 根据基础配置里面的任务项来创建各个域里面的任务项
     * @param baseTaskType
     * @param ownSign
     * @param baseTaskItems
     * @throws Exception
     */
    public void createScheduleTaskItem(String baseTaskType, String ownSign,String[] baseTaskItems) throws Exception {
        ScheduleTaskItem[] taskItems = new ScheduleTaskItem[baseTaskItems.length];
        Pattern p = Pattern.compile("\\s*:\\s*\\{");

        for (int i=0;i<baseTaskItems.length;i++){
            taskItems[i] = new ScheduleTaskItem();
            taskItems[i].setBaseTaskType(baseTaskType);
            taskItems[i].setTaskType(ScheduleUtil.getTaskTypeByBaseAndOwnSign(baseTaskType, ownSign));
            taskItems[i].setOwnSign(ownSign);
            Matcher matcher = p.matcher(baseTaskItems[i]);
            if(matcher.find()){
                taskItems[i].setTaskItem(baseTaskItems[i].substring(0,matcher.start()).trim());
                taskItems[i].setDealParameter(baseTaskItems[i].substring(matcher.end(),baseTaskItems[i].length()-1).trim());
            }else{
                taskItems[i].setTaskItem(baseTaskItems[i]);
            }
            taskItems[i].setSts(ScheduleTaskItem.TaskItemSts.ACTIVTE);
        }
        createScheduleTaskItem(taskItems);
    }   

    /**
         * 创建任务项,注意其中的 CurrentSever和RequestServer不会起作用
         * @param taskItems
         * @throws Exception
         */
        public void createScheduleTaskItem(ScheduleTaskItem[] taskItems) throws Exception {
            for (ScheduleTaskItem taskItem : taskItems){
               String zkPath = this.PATH_BaseTaskType + "/" + taskItem.getBaseTaskType() + "/" + taskItem.getTaskType() +"/" + this.PATH_TaskItem;
               if(this.getZooKeeper().exists(zkPath, false)== null){
                   ZKTools.createPath(this.getZooKeeper(), zkPath, CreateMode.PERSISTENT, this.zkManager.getAcl());
               }
               String zkTaskItemPath = zkPath + "/" + taskItem.getTaskItem();
               this.getZooKeeper().create(zkTaskItemPath,null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
               this.getZooKeeper().create(zkTaskItemPath + "/cur_server",null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
               this.getZooKeeper().create(zkTaskItemPath + "/req_server",null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
               this.getZooKeeper().create(zkTaskItemPath + "/sts",taskItem.getSts().toString().getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
               this.getZooKeeper().create(zkTaskItemPath + "/parameter",taskItem.getDealParameter().getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
               this.getZooKeeper().create(zkTaskItemPath + "/deal_desc",taskItem.getDealDesc().getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
            }
        }

回到 public void initial() throws Exception 方法,方法最后调用了一个computerStart();方法,这个方法干嘛的?

public void initial() throws Exception{
    new Thread(this.currenScheduleServer.getTaskType()  +"-" + this.currentSerialNumber +"-StartProcess"){
        @SuppressWarnings("static-access")
        public void run(){
            //......//
            computerStart();
            //......//  
        }
    }.start();
}

/**
     * 开始的时候,计算第一次执行时间
     * @throws Exception
     */
    public void computerStart() throws Exception{
        //只有当存在可执行队列后再开始启动队列
        //
        boolean isRunNow = false;
        if(this.taskTypeInfo.getPermitRunStartTime() == null){
            isRunNow = true;
        }else{
            //获得任务"执行开始时间"的设置
            //以startrun:开始,则表示开机立即启动调度.
            String tmpStr = this.taskTypeInfo.getPermitRunStartTime();
            if(tmpStr.toLowerCase().startsWith("startrun:")){
                isRunNow = true;
                tmpStr = tmpStr.substring("startrun:".length());
            }


            CronExpression cexpStart = new CronExpression(tmpStr);
            //获得集群校准后的系统当前时间(zk服务器时间+时间偏移量)
            Date current = new Date( this.scheduleCenter.getSystemTime());
            //第一次的开始执行时间
            Date firstStartTime = cexpStart.getNextValidTimeAfter(current);
            //到达开始执行时间后,启动调度任务
            this.heartBeatTimer.schedule(
                    new PauseOrResumeScheduleTask(this,this.heartBeatTimer,
                            PauseOrResumeScheduleTask.TYPE_RESUME,tmpStr), 
                            firstStartTime);
            //看到没有在这里改了ScheduleServer,用于后续调度使用。
            this.currenScheduleServer.setNextRunStartTime(ScheduleUtil.transferDataToString(firstStartTime));   
            //如果没有设置任务的终止时间,代表"当不能获取到数据的时候pause"
            if( this.taskTypeInfo.getPermitRunEndTime() == null
               || this.taskTypeInfo.getPermitRunEndTime().equals("-1")){
                this.currenScheduleServer.setNextRunEndTime("当不能获取到数据的时候pause");                
            }else{ // 如果设置了任务的终止时间
                try {
                    //拿到“执行结束时间”设置
                    String tmpEndStr = this.taskTypeInfo.getPermitRunEndTime();
                    CronExpression cexpEnd = new CronExpression(tmpEndStr);
                    //根据第一次执行开始时间,计算第一次执行结束时间
                    Date firstEndTime = cexpEnd.getNextValidTimeAfter(firstStartTime);
                    //根据当前时间计算一个执行结束时间
                    Date nowEndTime = cexpEnd.getNextValidTimeAfter(current);
                    //避免任务不被执行的情况
                    if(!nowEndTime.equals(firstEndTime) && current.before(nowEndTime)){
                        isRunNow = true;
                        firstEndTime = nowEndTime;
                    }
                    //到达执行结束时间后暂停任务
                    this.heartBeatTimer.schedule(
                            new PauseOrResumeScheduleTask(this,this.heartBeatTimer,
                                    PauseOrResumeScheduleTask.TYPE_PAUSE,tmpEndStr), 
                                    firstEndTime);
                    this.currenScheduleServer.setNextRunEndTime(ScheduleUtil.transferDataToString(firstEndTime));
                } catch (Exception e) {
                    log.error("计算第一次执行时间出现异常:" + currenScheduleServer.getUuid(), e);
                    throw new Exception("计算第一次执行时间出现异常:" + currenScheduleServer.getUuid(), e);
                }
            }
        }
        if(isRunNow == true){
            this.resume("开启服务立即启动");
        }
        //重写zk节点信息,就是之前提到的server节点之后重写
        this.rewriteScheduleInfo();

    }

这里小心下PauseOrResumeScheduleTask ,理解起来有点讨厌,在run方法里又套了个调度任务,利用cron表达式计算一个下一次运行时间(启动或暂停),再去跑一个PauseOrResumeScheduleTask调度任务。如果不打开看这个类很容易懵逼,以下是这个类的源码,发现他会在run方法内利用cron再计算自己下一次的运行时间。然后再调用这样一个猥琐操作this.timer.schedule(new PauseOrResumeScheduleTask(this.manager,this.timer,this.type,this.cronTabExpress) , nextTime);

class PauseOrResumeScheduleTask extends java.util.TimerTask {
    private static transient Logger log = LoggerFactory
            .getLogger(HeartBeatTimerTask.class);
    public static int TYPE_PAUSE  = 1;
    public static int TYPE_RESUME = 2;  
    TBScheduleManager manager;
    Timer timer;
    int type;
    String cronTabExpress;
    public PauseOrResumeScheduleTask(TBScheduleManager aManager,Timer aTimer,int aType,String aCronTabExpress) {
        this.manager = aManager;
        this.timer = aTimer;
        this.type = aType;
        this.cronTabExpress = aCronTabExpress;
    }
    public void run() {
        try {
            Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
            this.cancel();//取消调度任务
            Date current = new Date(System.currentTimeMillis());
            CronExpression cexp = new CronExpression(this.cronTabExpress);
            Date nextTime = cexp.getNextValidTimeAfter(current);
            if(this.type == TYPE_PAUSE){
                manager.pause("到达终止时间,pause调度");
                this.manager.getScheduleServer().setNextRunEndTime(ScheduleUtil.transferDataToString(nextTime));
            }else{
                manager.resume("到达开始时间,resume调度");
                this.manager.getScheduleServer().setNextRunStartTime(ScheduleUtil.transferDataToString(nextTime));
            }
            this.timer.schedule(new PauseOrResumeScheduleTask(this.manager,this.timer,this.type,this.cronTabExpress) , nextTime);
        } catch (Throwable ex) {
            log.error(ex.getMessage(), ex);
        }
    }
}

可以看到最后会重写调度服务信息到zk对应的节点上,方法大概处理流程。

1.该方法先判断是否设置有任务的开始执行时间,如果没有设置那么立即开始执行任务项任务
2.如果有设置 “执行开始时间”设置,以当前时间为基准计算下一次开始执行时间,设置一个定时任务,在指定时间开始任务线程
3.如果有设置“执行结束时间”设置,则计算一个执行结束时间,设置一个定时任务,在指定的时间暂停线程

这里使用的“开启服务”和“暂停服务”,分别使用了com.taobao.pamirs.schedule.taskmanager.TBScheduleManager类的public void resume(String message) throws Exception方法以及public void pause(String message) throws Exception方法


/**
     * 处在了可执行的时间区间,恢复运行
     * @throws Exception 
     */
    public void resume(String message) throws Exception{
        if (this.isPauseSchedule == true) {
            if(log.isDebugEnabled()){
                log.debug("恢复调度:" + this.currenScheduleServer.getUuid());
            }
            this.isPauseSchedule = false;
            this.pauseMessage = message;
            if (this.taskDealBean != null) {
                if (this.taskTypeInfo.getProcessorType() != null &&
                    this.taskTypeInfo.getProcessorType().equalsIgnoreCase("NOTSLEEP")==true){
                    this.taskTypeInfo.setProcessorType("NOTSLEEP");
                    this.processor = new TBScheduleProcessorNotSleep(this,
                            taskDealBean,this.statisticsInfo);
                }else{
                    this.processor = new TBScheduleProcessorSleep(this,
                            taskDealBean,this.statisticsInfo);
                    this.taskTypeInfo.setProcessorType("SLEEP");
                }
            }
            rewriteScheduleInfo();
        }
    }   
/**
     * 超过运行的运行时间,暂时停止调度
     * @throws Exception 
     */
    public void pause(String message) throws Exception{
        if (this.isPauseSchedule == false) {
            this.isPauseSchedule = true;
            this.pauseMessage = message;
            if (log.isDebugEnabled()) {
                log.debug("暂停调度 :" + this.currenScheduleServer.getUuid()+":" + this.statisticsInfo.getDealDescription());
            }
            if (this.processor != null) {
                this.processor.stopSchedule();
            }
            rewriteScheduleInfo();
        }
    }

发现resume()方法涉及tbschedule的SLEEP和NOTSLEEP模式的处理。先借用官方描述说明这两种模式有什么区别

现有的工作线程模式分为Sleep模式和NotSleep模式。缺省是缺省是NOTSLEEP模式。在通常模式下,在通常情况下用Sleep模式。
在一些特殊情况需要用NotSleep模式。两者之间的差异在后续进行描述。
4、Sleep模式和NotSleep模式的区别
1、ScheduleServer启动的工作线程组线程是共享一个任务池的。
2、在Sleep的工作模式:当某一个线程任务处理完毕,从任务池中取不到任务的时候,检查其它线程是否处于活动状态。如果是,则自己休眠;
如果其它线程都已经因为没有任务进入休眠,当前线程是最后一个活动线程的时候,就调用业务接口,获取需要处理的任务,放入任务池中,
同时唤醒其它休眠线程开始工作。
3、在NotSleep的工作模式:当一个线程任务处理完毕,从任务池中取不到任务的时候,立即调用业务接口获取需要处理的任务,放入任务池中。
4、Sleep模式在实现逻辑上相对简单清晰,但存在一个大任务处理时间长,导致其它线程不工作的情况。
5、在NotSleep模式下,减少了线程休眠的时间,避免大任务阻塞的情况,但为了避免数据被重复处理,增加了CPU在数据比较上的开销。
同时要求业务接口实现对象的比较接口。
6、如果对任务处理不允许停顿的情况下建议用NotSleep模式,其它情况建议用sleep模式。

估计是个大活,后续再继续整理。到这里先整理下业务流程,有点乱:

1.集成tbSchedule的应用,初始化一个com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory 对象
2.在这个调度服务器里,检查zk的连接状态,判断zk是否连接成功
3.如果zk连接成功,初始化调度策略管理器(com.taobao.pamirs.schedule.zk.ScheduleStrategyDataManager4ZK)和调度任务客户端(com.taobao.pamirs.schedule.zk.ScheduleDataManager4ZK)
4.启动一个定时任务,这个定时任务会以2秒为一个周期对当前的zk状态做检查,如果zk连接失败(重试次数5)会停止当前服务,否则刷新调度服务器,按照以下步骤做刷新
.1 取的本应用所有的可用策略,然后每个策略按照策略的总任务组数和机器数目分配任务组数目,并将计算的数目存储到策略的应用数据节点上。
.2 根据zk的机器节点的分配数目(requestNum)创建任务组(com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStatic)
5.任务组中根据factory和调度任务客户端 创建调度服务数据并存储在server节点
6.任务组里面启动一个任务线程,拉取控制台设置的数据,并按照配置设置调度任务到指定时间操作processor开始或停止,执行后将新信息回写到server节点以便于下次计算。