TBSchedule源码学习笔记-线程组任务调度
根据上文的启动过程,找到了线程组的实现。com.taobao.pamirs.schedule.taskmanager.TBScheduleManager
/**
* 1、任务调度分配器的目标: 让所有的任务不重复,不遗漏的被快速处理。
* 2、一个Manager只管理一种任务类型的一组工作线程。
* 3、在一个JVM里面可能存在多个处理相同任务类型的Manager,也可能存在处理不同任务类型的Manager。
* 4、在不同的JVM里面可以存在处理相同任务的Manager
* 5、调度的Manager可以动态的随意增加和停止
*
* 主要的职责:
* 1、定时向集中的数据配置中心更新当前调度服务器的心跳状态
* 2、向数据配置中心获取所有服务器的状态来重新计算任务的分配。这么做的目标是避免集中任务调度中心的单点问题。
* 3、在每个批次数据处理完毕后,检查是否有其它处理服务器申请自己把持的任务队列,如果有,则释放给相关处理服务器。
*
* 其它:
* 如果当前服务器在处理当前任务的时候超时,需要清除当前队列,并释放已经把持的任务。并向控制主动中心报警。
*
* @author xuannan
*
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
abstract class TBScheduleManager implements IStrategyTask {
//.......//
}
这个类的构造方法是这样的
TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{
//.......//
}
这里发现有两个老朋友,aFactory就是那个调度服务器对象,baseTaskType是设置的平台类型,aScheduleCenter调度配置中心客户端接口,那么ownSign是什么?
以下取自官方文档:
OwnSign环境区域
是对运行环境的划分,进行调度任务和数据隔离。例如:开发环境、测试环境、预发环境、生产环境。
不同的开发人员需要进行数据隔离也可以用OwnSign来实现,避免不同人员的数据冲突。缺省配置的环境区域OwnSign=’BASE’。
例如:TaskType=’DataDeal’,配置的队列是0、1、2、3、4、5、6、7、8、9。缺省的OwnSign=’BASE’。
此时如果再启动一个测试环境,则Schedule会动态生成一个TaskType=’DataDeal-Test’的任务类型,环境会作为一个变量传递给业务接口,
由业务接口的实现类,在读取数据和处理数据的时候进行确定。业务系统一种典型的做法就是在数据表中增加一个OWN_SIGN字段。
在创建数据的时候根据运行环境填入对应的环境名称,在Schedule中就可以环境的区分了。
com.taobao.pamirs.schedule.taskmanager.IScheduleDataManager
这个接口要好好看下,定义的方法有点多,但是一眼看下来更多的方法是为控制台页面提供服务了,例如创建任务。诸如任务和任务相等信息的维护和查询都在这个接口中定义。
那么TBScheduleManager 这个构造函数就用到了一个,继续打开构造函数看
TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{
this.factory = aFactory;
//private static int nextSerialNumber = 0; 生成用户标志不同的线程序号,用于区分不同的线程组
this.currentSerialNumber = serialNumber();
this.scheduleCenter = aScheduleCenter;
//按照任务类型加载调度任务信息
this.taskTypeInfo = this.scheduleCenter.loadTaskTypeBaseInfo(baseTaskType);
log.info("create TBScheduleManager for taskType:"+baseTaskType);
//清除已经过期1天的TASK,OWN_SIGN的组合。超过一天没有活动server的视为过期
//为什么要清除?如果任务确实能跑一天怎么办
this.scheduleCenter.clearExpireTaskTypeRunningInfo(baseTaskType,ScheduleUtil.getLocalIP() + "清除过期OWN_SIGN信息",this.taskTypeInfo.getExpireOwnSignInterval());
//通过调度服务器提供的方法获得对应的bean
Object dealBean = aFactory.getBean(this.taskTypeInfo.getDealBeanName());
if (dealBean == null) {
throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 不存在");
}
//如果没有实现调度器对外的基础接口,肯定不能用啊
if (dealBean instanceof IScheduleTaskDeal == false) {
throw new Exception( "SpringBean " + this.taskTypeInfo.getDealBeanName() + " 没有实现 IScheduleTaskDeal接口");
}
this.taskDealBean = (IScheduleTaskDeal)dealBean;
//任务的配置校验,为什么要大于5倍 出发点是什么
if(this.taskTypeInfo.getJudgeDeadInterval() < this.taskTypeInfo.getHeartBeatRate() * 5){
throw new Exception("数据配置存在问题,死亡的时间间隔,至少要大于心跳线程的5倍。当前配置数据:JudgeDeadInterval = "
+ this.taskTypeInfo.getJudgeDeadInterval()
+ ",HeartBeatRate = " + this.taskTypeInfo.getHeartBeatRate());
}
//这个currenScheduleServer是类"com.taobao.pamirs.schedule.taskmanager.ScheduleServer"的实例,存储了当前调度服务的信息
this.currenScheduleServer = ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber());
this.currenScheduleServer.setManagerFactoryUUID(this.factory.getUuid());
//向调度中心客户端注册调度服务信息
scheduleCenter.registerScheduleServer(this.currenScheduleServer);
this.mBeanName = "pamirs:name=" + "schedule.ServerMananger." +this.currenScheduleServer.getUuid();
//又启动了一个定时任务,看这名字就知道是心跳
this.heartBeatTimer = new Timer(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-HeartBeat");
this.heartBeatTimer.schedule(new HeartBeatTimerTask(this),
new java.util.Date(System.currentTimeMillis() + 500),
this.taskTypeInfo.getHeartBeatRate());
//对象创建时需要做的初始化工作,模版方法。
initial();
}
目前能观察到的这个构造函数流程
1.按照任务名称加载任务配置
2.清除过期OWN_SIGN信息
3.检查任务配置是否正确
4.将调度服务信息注册到调度中心客户端
5.其他初始化操作
记录一下tbschedule对任务配置的存储方式(控制台页面”任务管理”)
控制台:
zk节点:
节点内容格式化之后
{
"baseTaskType": "commonSyncAdvertiserTask",
"heartBeatRate": 5000,
"judgeDeadInterval": 60000,
"sleepTimeNoData": 500,
"sleepTimeInterval": 0,
"fetchDataNumber": 500,
"executeNumber": 10,
"threadNumber": 5,
"processorType": "SLEEP",
"permitRunStartTime": "0 * * * * ?",
"expireOwnSignInterval": 1,
"dealBeanName": "commonSyncAdvertiserTask",
"taskParameter": "3",
"taskKind": "static",
"taskItems": [
"0",
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9"
],
"maxTaskItemsOfOneThreadGroup": 0,
"version": 0,
"sts": "resume"
}
第4步主要包含以下代码行
//这个currenScheduleServer是类"com.taobao.pamirs.schedule.taskmanager.ScheduleServer"的实例,存储了当前调度服务的信息
this.currenScheduleServer = ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber());
this.currenScheduleServer.setManagerFactoryUUID(this.factory.getUuid());
//向调度中心客户端注册调度服务信息
scheduleCenter.registerScheduleServer(this.currenScheduleServer);
当前调度服务信息都包含什么,有什么是状态量,向调度中心客户端注册的意图是什么?打开ScheduleServer.createScheduleServer(this.scheduleCenter,baseTaskType,ownSign,this.taskTypeInfo.getThreadNumber());
这个代码看一下,这个ScheduleServer实例融合了哪些信息
public static ScheduleServer createScheduleServer(IScheduleDataManager aScheduleCenter,String aBaseTaskType,
String aOwnSign, int aThreadNum)
throws Exception {
ScheduleServer result = new ScheduleServer();
//调度任务类型(beanName)
result.baseTaskType = aBaseTaskType;
result.ownSign = aOwnSign;
//会将调度类型和环境区域 按这种方式组合baseType+"$" + ownSign
result.taskType = ScheduleUtil.getTaskTypeByBaseAndOwnSign(
aBaseTaskType, aOwnSign);
result.ip = ScheduleUtil.getLocalIP();
result.hostName = ScheduleUtil.getLocalHostName();
//aScheduleCenter.getSystemTime() 从哪来
result.registerTime = new Timestamp(aScheduleCenter.getSystemTime());
//任务配置的线程数
result.threadNum = aThreadNum;
result.heartBeatTime = null;
result.dealInfoDesc = "调度初始化";
result.version = 0;
//厉害了这里生成了一个自己的uuid
result.uuid = result.ip
+ "$"
+ (UUID.randomUUID().toString().replaceAll("-", "")
.toUpperCase());
SimpleDateFormat DATA_FORMAT_yyyyMMdd = new SimpleDateFormat("yyMMdd");
String s = DATA_FORMAT_yyyyMMdd.format(new Date(aScheduleCenter.getSystemTime()));
//这个作用是啥
result.id = Long.parseLong(s) * 100000000
+ Math.abs(result.uuid.hashCode() % 100000000);
return result;
}
在这里就可以发现aOwnSign 的作用原理了,就是作为一个标记和任务类型拼接在一块,这样对于同样的任务不同的ownSign有不同的调度域,这个方法里有几个关键点:
result.registerTime = new Timestamp(aScheduleCenter.getSystemTime()); 这个属性的作用是?
result.uuid 这个作用是?
result.id 这个的作用是?为什么这么生成?为什么和aScheduleCenter.getSystemTime() 相关
aScheduleCenter.getSystemTime() 取到的是一个什么时间呢?
aScheduleCenter.getSystemTime() 取到的是个什么时间?之前只是初始化了IScheduleDataManager实现,估计构造函数里会有答案,看实现类com.taobao.pamirs.schedule.zk.ScheduleDataManager4ZK
public ScheduleDataManager4ZK(ZKManager aZkManager) throws Exception {
this.zkManager = aZkManager;
gson = new GsonBuilder().registerTypeAdapter(Timestamp.class,new TimestampTypeAdapter()).setDateFormat("yyyy-MM-dd HH:mm:ss").create();
this.PATH_BaseTaskType = this.zkManager.getRootPath() +"/baseTaskType";
if (this.getZooKeeper().exists(this.PATH_BaseTaskType, false) == null) {
//创建了一个永久节点
ZKTools.createPath(getZooKeeper(),this.PATH_BaseTaskType, CreateMode.PERSISTENT, this.zkManager.getAcl());
}
//当前服务器时间
loclaBaseTime = System.currentTimeMillis();
//Zookeeper服务器时间
String tempPath = this.zkManager.getZooKeeper().create(this.zkManager.getRootPath() + "/systime",null, this.zkManager.getAcl(), CreateMode.EPHEMERAL_SEQUENTIAL);
Stat tempStat = this.zkManager.getZooKeeper().exists(tempPath, false);
zkBaseTime = tempStat.getCtime();
ZKTools.deleteTree(getZooKeeper(), tempPath);
if(Math.abs(this.zkBaseTime - this.loclaBaseTime) > 5000){
log.error("请注意,Zookeeper服务器时间与本地时间相差 : " + Math.abs(this.zkBaseTime - this.loclaBaseTime) +" ms");
}
}
public long getSystemTime(){
//zk服务器时间+客户端启动时间
return this.zkBaseTime + ( System.currentTimeMillis() - this.loclaBaseTime);
}
可见这里返回了一个zk服务器时间+本地时间偏移量这样一个值,这里做法我是这么理解的,在分布式环境下可能存在各机器时钟不同步的情况(上下偏差),通过zk服务器时间加相对时间相当于把整个集群的时间都拨正了,这样时钟在tbschedule框架这里是可信的,所以可以把aScheduleCenter.getSystemTime()方法看作返回调度服务的集群可信时间,
目前为止以下两个问题还没有得到解答
result.uuid 这个作用是?
result.id 这个的作用是?为什么这么生成?为什么和aScheduleCenter.getSystemTime() 相关
只能接着看第4步里的scheduleCenter.registerScheduleServer(this.currenScheduleServer);
这个代码行,在对这个currenScheduleServer小宝贝做了些什么
public void registerScheduleServer(ScheduleServer server) throws Exception {
//简单
if(server.isRegister() == true){
throw new Exception(server.getUuid() + " 被重复注册");
}
//本例 rootPath /dsp_official_0928_wd/schedule/
//本例 /dsp_official_0928_wd/schedule/baseTaskType/commonSyncAdvertiserTask/commonSyncAdvertiserTask
String zkPath = this.PATH_BaseTaskType + "/" + server.getBaseTaskType() +"/" + server.getTaskType();
if (this.getZooKeeper().exists(zkPath, false) == null) {
this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
}
//本例 /dsp_official_0928_wd/schedule/baseTaskType/commonSyncAdvertiserTask/commonSyncAdvertiserTask/server
zkPath = zkPath +"/" + this.PATH_Server;
if (this.getZooKeeper().exists(zkPath, false) == null) {
this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
}
String realPath = null;
//此处必须增加UUID作为唯一性保障
String zkServerPath = zkPath + "/" + server.getTaskType() + "$"+ server.getIp() + "$"
+ (UUID.randomUUID().toString().replaceAll("-", "").toUpperCase())+"$";
//创建临时节点
realPath = this.getZooKeeper().create(zkServerPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT_SEQUENTIAL);
//之前已经生成过uuid,那么为什么这里要重新设置一次????
server.setUuid(realPath.substring(realPath.lastIndexOf("/") + 1));
//
Timestamp heartBeatTime = new Timestamp(this.getSystemTime());
server.setHeartBeatTime(heartBeatTime);
//server序列化存储到zk的节点上
String valueString = this.gson.toJson(server);
this.getZooKeeper().setData(realPath,valueString.getBytes(),-1);
server.setRegister(true);
}
这个registerScheduleServer方法看到这里,都是写zk的查询操作和设置操作,在这里也会发现zk维护的内容,就是说创建了zk节点用于维护调度服务数据,便于控制台页面查询和调度服务器间同步
在
rootPath/baseTaskType/ {策略管理>任务名称}/${任务管理>任务处理的SpringBean}/server 下每个任务组都有自己的节点,这个节点会存储调度服务数据,从数据内容上看该数据会因每一次调度而修改。
这里是一个调度节点的例子:
节点内容的格式化后如下:
{
"uuid": "commonSyncAdvertiserTask$127.0.0.1$35542D3EE3984FEAB15525C319358DE6$0000000060",
"id": 17120684352326,
"taskType": "commonSyncAdvertiserTask",
"baseTaskType": "commonSyncAdvertiserTask",
"ownSign": "BASE",
"ip": "127.0.0.1",
"hostName": "adm_adc02",
"threadNum": 5,
"registerTime": "2017-12-06 18:04:30",
"heartBeatTime": "2017-12-08 10:43:17",
"lastFetchDataTime": "2017-12-08 10:43:00",
"dealInfoDesc": "没有数据,暂停调度:FetchDataCount=2439,FetchDataNum=0,DealDataSucess=0,DealDataFail=0,DealSpendTime=0,otherCompareCount=0",
"nextRunStartTime": "2017-12-08 10:44:00",
"nextRunEndTime": "当不能获取到数据的时候pause",
"version": 34144,
"isRegister": true,
"managerFactoryUUID": "127.0.0.1$adm_adc02$AB1899D1EA4F4783B25EC4B4E04A6A79$0000000081"
}
控制台页面观察到的数据如下:
那么问题来了,存储调度服务数据内容的节点值是什么时候被修改的,调度任务是怎么被启动的?
回到com.taobao.pamirs.schedule.taskmanager.TBScheduleManager
构造方法,最后一行
TBScheduleManager(TBScheduleManagerFactory aFactory,String baseTaskType,String ownSign ,IScheduleDataManager aScheduleCenter) throws Exception{
//.......//
initial();
}
这个initial() 方法的实现有两个,分别位于com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStatic
和com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerDynamic
经过之前的学习,已经很明确的知道就是使用的com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStatic
实例,那么看一下这个方法的实现。这个方法里第一眼就看到一个关键字”start()”
public void initial() throws Exception{
//线程啊。。
new Thread(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-StartProcess"){
@SuppressWarnings("static-access")
public void run(){
try{
//唉呀妈呀,这个log老熟悉了
log.info("开始获取调度任务队列...... of " + currenScheduleServer.getUuid());
while (isRuntimeInfoInitial == false) {
if(isStopSchedule == true){
log.debug("外部命令终止调度,退出调度队列获取:" + currenScheduleServer.getUuid());
return;
}
//log.error("isRuntimeInfoInitial = " + isRuntimeInfoInitial);
try{
//初始化运行期信息
initialRunningInfo();
//运行期信息是否初始化成功
isRuntimeInfoInitial = scheduleCenter.isInitialRunningInfoSucuss(
currenScheduleServer.getBaseTaskType(),
currenScheduleServer.getOwnSign());
}catch(Throwable e){
//忽略初始化的异常
log.error(e.getMessage(),e);
}
//每隔一秒重试一次
if(isRuntimeInfoInitial == false){
Thread.currentThread().sleep(1000);
}
}
int count =0;
//获得框架系统时间
lastReloadTaskItemListTime = scheduleCenter.getSystemTime();
//这个代码的动作是什么?getCurrentScheduleTaskItemListNow();,好像是获取调度队列,如果没拿到就每隔一秒重试
while(getCurrentScheduleTaskItemListNow().size() <= 0){
if(isStopSchedule == true){
log.debug("外部命令终止调度,退出调度队列获取:" + currenScheduleServer.getUuid());
return;
}
Thread.currentThread().sleep(1000);
count = count + 1;
// log.error("尝试获取调度队列,第" + count + "次 ") ;
}
String tmpStr ="TaskItemDefine:";
//遍历调度队列
for(int i=0;i< currentTaskItemList.size();i++){
if(i>0){
tmpStr = tmpStr +",";
}
//哦?循环体内字符串拼接......
tmpStr = tmpStr + currentTaskItemList.get(i);
}
log.info("获取到任务处理队列,开始调度:" + tmpStr +" of "+ currenScheduleServer.getUuid());
//任务总量
taskItemCount = scheduleCenter.loadAllTaskItem(currenScheduleServer.getTaskType()).size();
//只有在已经获取到任务处理队列后才开始启动任务处理器
computerStart();
}catch(Exception e){
log.error(e.getMessage(),e);
String str = e.getMessage();
if(str.length() > 300){
str = str.substring(0,300);
}
startErrorInfo = "启动处理异常:" + str;
}
}
}.start();
}
看到这里疑问满满,初始化运行期信息都做了些什么??,这里貌似没有看到任务项?任务项在哪里初始化的?线程组怎么按照任务项拆分的??
看初始化运行期信息相关的代码,目前就两行
public void initial() throws Exception{
new Thread(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-StartProcess"){
@SuppressWarnings("static-access")
public void run(){
//......//
//初始化运行期信息
initialRunningInfo();
//运行期信息是否初始化成功
isRuntimeInfoInitial = scheduleCenter.isInitialRunningInfoSucuss(
currenScheduleServer.getBaseTaskType(),
currenScheduleServer.getOwnSign());
//......//
}
}.start();
}
initialRunningInfo() 在做些什么?打开代码看:
public void initialRunningInfo() throws Exception{
//清除过期的调度任务
scheduleCenter.clearExpireScheduleServer(this.currenScheduleServer.getTaskType(),this.taskTypeInfo.getJudgeDeadInterval());
//拿到任务数据节点名
List<String> list = scheduleCenter.loadScheduleServerNames(this.currenScheduleServer.getTaskType());
//如果当前是leader节点
if(scheduleCenter.isLeader(this.currenScheduleServer.getUuid(),list)){
//是第一次启动,先清楚所有的垃圾数据
log.debug(this.currenScheduleServer.getUuid() + ":" + list.size());
//初始化任务调度的域信息和静态任务信息
//如果是leader节点,做这样的操作
//清除zk任务项节点并重建
// //本例 rootPath /dsp_official_0928_wd/schedule/
//本例 /dsp_official_0928_wd/schedule/baseTaskType/commonSyncAdvertiserTask/commonSyncAdvertiserTask/taskItem
//调用public void createScheduleTaskItem(String baseTaskType, String ownSign,String[] baseTaskItems) throws Exception 方法创建任务项
this.scheduleCenter.initialRunningInfo4Static(this.currenScheduleServer.getBaseTaskType(), this.currenScheduleServer.getOwnSign(),this.currenScheduleServer.getUuid());
}
}
createScheduleTaskItem(String baseTaskType, String ownSign,String[] baseTaskItems) throws Exception 方法创建任务项就是说根据任务管理里面的任务项配置在zk下创建节点
/**
* 根据基础配置里面的任务项来创建各个域里面的任务项
* @param baseTaskType
* @param ownSign
* @param baseTaskItems
* @throws Exception
*/
public void createScheduleTaskItem(String baseTaskType, String ownSign,String[] baseTaskItems) throws Exception {
ScheduleTaskItem[] taskItems = new ScheduleTaskItem[baseTaskItems.length];
Pattern p = Pattern.compile("\\s*:\\s*\\{");
for (int i=0;i<baseTaskItems.length;i++){
taskItems[i] = new ScheduleTaskItem();
taskItems[i].setBaseTaskType(baseTaskType);
taskItems[i].setTaskType(ScheduleUtil.getTaskTypeByBaseAndOwnSign(baseTaskType, ownSign));
taskItems[i].setOwnSign(ownSign);
Matcher matcher = p.matcher(baseTaskItems[i]);
if(matcher.find()){
taskItems[i].setTaskItem(baseTaskItems[i].substring(0,matcher.start()).trim());
taskItems[i].setDealParameter(baseTaskItems[i].substring(matcher.end(),baseTaskItems[i].length()-1).trim());
}else{
taskItems[i].setTaskItem(baseTaskItems[i]);
}
taskItems[i].setSts(ScheduleTaskItem.TaskItemSts.ACTIVTE);
}
createScheduleTaskItem(taskItems);
}
/**
* 创建任务项,注意其中的 CurrentSever和RequestServer不会起作用
* @param taskItems
* @throws Exception
*/
public void createScheduleTaskItem(ScheduleTaskItem[] taskItems) throws Exception {
for (ScheduleTaskItem taskItem : taskItems){
String zkPath = this.PATH_BaseTaskType + "/" + taskItem.getBaseTaskType() + "/" + taskItem.getTaskType() +"/" + this.PATH_TaskItem;
if(this.getZooKeeper().exists(zkPath, false)== null){
ZKTools.createPath(this.getZooKeeper(), zkPath, CreateMode.PERSISTENT, this.zkManager.getAcl());
}
String zkTaskItemPath = zkPath + "/" + taskItem.getTaskItem();
this.getZooKeeper().create(zkTaskItemPath,null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
this.getZooKeeper().create(zkTaskItemPath + "/cur_server",null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
this.getZooKeeper().create(zkTaskItemPath + "/req_server",null, this.zkManager.getAcl(),CreateMode.PERSISTENT);
this.getZooKeeper().create(zkTaskItemPath + "/sts",taskItem.getSts().toString().getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
this.getZooKeeper().create(zkTaskItemPath + "/parameter",taskItem.getDealParameter().getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
this.getZooKeeper().create(zkTaskItemPath + "/deal_desc",taskItem.getDealDesc().getBytes(), this.zkManager.getAcl(),CreateMode.PERSISTENT);
}
}
回到 public void initial() throws Exception 方法,方法最后调用了一个computerStart();方法,这个方法干嘛的?
public void initial() throws Exception{
new Thread(this.currenScheduleServer.getTaskType() +"-" + this.currentSerialNumber +"-StartProcess"){
@SuppressWarnings("static-access")
public void run(){
//......//
computerStart();
//......//
}
}.start();
}
/**
* 开始的时候,计算第一次执行时间
* @throws Exception
*/
public void computerStart() throws Exception{
//只有当存在可执行队列后再开始启动队列
//
boolean isRunNow = false;
if(this.taskTypeInfo.getPermitRunStartTime() == null){
isRunNow = true;
}else{
//获得任务"执行开始时间"的设置
//以startrun:开始,则表示开机立即启动调度.
String tmpStr = this.taskTypeInfo.getPermitRunStartTime();
if(tmpStr.toLowerCase().startsWith("startrun:")){
isRunNow = true;
tmpStr = tmpStr.substring("startrun:".length());
}
CronExpression cexpStart = new CronExpression(tmpStr);
//获得集群校准后的系统当前时间(zk服务器时间+时间偏移量)
Date current = new Date( this.scheduleCenter.getSystemTime());
//第一次的开始执行时间
Date firstStartTime = cexpStart.getNextValidTimeAfter(current);
//到达开始执行时间后,启动调度任务
this.heartBeatTimer.schedule(
new PauseOrResumeScheduleTask(this,this.heartBeatTimer,
PauseOrResumeScheduleTask.TYPE_RESUME,tmpStr),
firstStartTime);
//看到没有在这里改了ScheduleServer,用于后续调度使用。
this.currenScheduleServer.setNextRunStartTime(ScheduleUtil.transferDataToString(firstStartTime));
//如果没有设置任务的终止时间,代表"当不能获取到数据的时候pause"
if( this.taskTypeInfo.getPermitRunEndTime() == null
|| this.taskTypeInfo.getPermitRunEndTime().equals("-1")){
this.currenScheduleServer.setNextRunEndTime("当不能获取到数据的时候pause");
}else{ // 如果设置了任务的终止时间
try {
//拿到“执行结束时间”设置
String tmpEndStr = this.taskTypeInfo.getPermitRunEndTime();
CronExpression cexpEnd = new CronExpression(tmpEndStr);
//根据第一次执行开始时间,计算第一次执行结束时间
Date firstEndTime = cexpEnd.getNextValidTimeAfter(firstStartTime);
//根据当前时间计算一个执行结束时间
Date nowEndTime = cexpEnd.getNextValidTimeAfter(current);
//避免任务不被执行的情况
if(!nowEndTime.equals(firstEndTime) && current.before(nowEndTime)){
isRunNow = true;
firstEndTime = nowEndTime;
}
//到达执行结束时间后暂停任务
this.heartBeatTimer.schedule(
new PauseOrResumeScheduleTask(this,this.heartBeatTimer,
PauseOrResumeScheduleTask.TYPE_PAUSE,tmpEndStr),
firstEndTime);
this.currenScheduleServer.setNextRunEndTime(ScheduleUtil.transferDataToString(firstEndTime));
} catch (Exception e) {
log.error("计算第一次执行时间出现异常:" + currenScheduleServer.getUuid(), e);
throw new Exception("计算第一次执行时间出现异常:" + currenScheduleServer.getUuid(), e);
}
}
}
if(isRunNow == true){
this.resume("开启服务立即启动");
}
//重写zk节点信息,就是之前提到的server节点之后重写
this.rewriteScheduleInfo();
}
这里小心下PauseOrResumeScheduleTask ,理解起来有点讨厌,在run方法里又套了个调度任务,利用cron表达式计算一个下一次运行时间(启动或暂停),再去跑一个PauseOrResumeScheduleTask调度任务。如果不打开看这个类很容易懵逼,以下是这个类的源码,发现他会在run方法内利用cron再计算自己下一次的运行时间。然后再调用这样一个猥琐操作this.timer.schedule(new PauseOrResumeScheduleTask(this.manager,this.timer,this.type,this.cronTabExpress) , nextTime);
class PauseOrResumeScheduleTask extends java.util.TimerTask {
private static transient Logger log = LoggerFactory
.getLogger(HeartBeatTimerTask.class);
public static int TYPE_PAUSE = 1;
public static int TYPE_RESUME = 2;
TBScheduleManager manager;
Timer timer;
int type;
String cronTabExpress;
public PauseOrResumeScheduleTask(TBScheduleManager aManager,Timer aTimer,int aType,String aCronTabExpress) {
this.manager = aManager;
this.timer = aTimer;
this.type = aType;
this.cronTabExpress = aCronTabExpress;
}
public void run() {
try {
Thread.currentThread().setPriority(Thread.MAX_PRIORITY);
this.cancel();//取消调度任务
Date current = new Date(System.currentTimeMillis());
CronExpression cexp = new CronExpression(this.cronTabExpress);
Date nextTime = cexp.getNextValidTimeAfter(current);
if(this.type == TYPE_PAUSE){
manager.pause("到达终止时间,pause调度");
this.manager.getScheduleServer().setNextRunEndTime(ScheduleUtil.transferDataToString(nextTime));
}else{
manager.resume("到达开始时间,resume调度");
this.manager.getScheduleServer().setNextRunStartTime(ScheduleUtil.transferDataToString(nextTime));
}
this.timer.schedule(new PauseOrResumeScheduleTask(this.manager,this.timer,this.type,this.cronTabExpress) , nextTime);
} catch (Throwable ex) {
log.error(ex.getMessage(), ex);
}
}
}
可以看到最后会重写调度服务信息到zk对应的节点上,方法大概处理流程。
1.该方法先判断是否设置有任务的开始执行时间,如果没有设置那么立即开始执行任务项任务
2.如果有设置 “执行开始时间”设置,以当前时间为基准计算下一次开始执行时间,设置一个定时任务,在指定时间开始任务线程
3.如果有设置“执行结束时间”设置,则计算一个执行结束时间,设置一个定时任务,在指定的时间暂停线程
这里使用的“开启服务”和“暂停服务”,分别使用了com.taobao.pamirs.schedule.taskmanager.TBScheduleManager
类的public void resume(String message) throws Exception
方法以及public void pause(String message) throws Exception
方法
/**
* 处在了可执行的时间区间,恢复运行
* @throws Exception
*/
public void resume(String message) throws Exception{
if (this.isPauseSchedule == true) {
if(log.isDebugEnabled()){
log.debug("恢复调度:" + this.currenScheduleServer.getUuid());
}
this.isPauseSchedule = false;
this.pauseMessage = message;
if (this.taskDealBean != null) {
if (this.taskTypeInfo.getProcessorType() != null &&
this.taskTypeInfo.getProcessorType().equalsIgnoreCase("NOTSLEEP")==true){
this.taskTypeInfo.setProcessorType("NOTSLEEP");
this.processor = new TBScheduleProcessorNotSleep(this,
taskDealBean,this.statisticsInfo);
}else{
this.processor = new TBScheduleProcessorSleep(this,
taskDealBean,this.statisticsInfo);
this.taskTypeInfo.setProcessorType("SLEEP");
}
}
rewriteScheduleInfo();
}
}
/**
* 超过运行的运行时间,暂时停止调度
* @throws Exception
*/
public void pause(String message) throws Exception{
if (this.isPauseSchedule == false) {
this.isPauseSchedule = true;
this.pauseMessage = message;
if (log.isDebugEnabled()) {
log.debug("暂停调度 :" + this.currenScheduleServer.getUuid()+":" + this.statisticsInfo.getDealDescription());
}
if (this.processor != null) {
this.processor.stopSchedule();
}
rewriteScheduleInfo();
}
}
发现resume()方法涉及tbschedule的SLEEP和NOTSLEEP模式的处理。先借用官方描述说明这两种模式有什么区别
现有的工作线程模式分为Sleep模式和NotSleep模式。缺省是缺省是NOTSLEEP模式。在通常模式下,在通常情况下用Sleep模式。
在一些特殊情况需要用NotSleep模式。两者之间的差异在后续进行描述。
4、Sleep模式和NotSleep模式的区别
1、ScheduleServer启动的工作线程组线程是共享一个任务池的。
2、在Sleep的工作模式:当某一个线程任务处理完毕,从任务池中取不到任务的时候,检查其它线程是否处于活动状态。如果是,则自己休眠;
如果其它线程都已经因为没有任务进入休眠,当前线程是最后一个活动线程的时候,就调用业务接口,获取需要处理的任务,放入任务池中,
同时唤醒其它休眠线程开始工作。
3、在NotSleep的工作模式:当一个线程任务处理完毕,从任务池中取不到任务的时候,立即调用业务接口获取需要处理的任务,放入任务池中。
4、Sleep模式在实现逻辑上相对简单清晰,但存在一个大任务处理时间长,导致其它线程不工作的情况。
5、在NotSleep模式下,减少了线程休眠的时间,避免大任务阻塞的情况,但为了避免数据被重复处理,增加了CPU在数据比较上的开销。
同时要求业务接口实现对象的比较接口。
6、如果对任务处理不允许停顿的情况下建议用NotSleep模式,其它情况建议用sleep模式。
估计是个大活,后续再继续整理。到这里先整理下业务流程,有点乱:
1.集成tbSchedule的应用,初始化一个com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory 对象
2.在这个调度服务器里,检查zk的连接状态,判断zk是否连接成功
3.如果zk连接成功,初始化调度策略管理器(com.taobao.pamirs.schedule.zk.ScheduleStrategyDataManager4ZK)和调度任务客户端(com.taobao.pamirs.schedule.zk.ScheduleDataManager4ZK)
4.启动一个定时任务,这个定时任务会以2秒为一个周期对当前的zk状态做检查,如果zk连接失败(重试次数5)会停止当前服务,否则刷新调度服务器,按照以下步骤做刷新
.1 取的本应用所有的可用策略,然后每个策略按照策略的总任务组数和机器数目分配任务组数目,并将计算的数目存储到策略的应用数据节点上。
.2 根据zk的机器节点的分配数目(requestNum)创建任务组(com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStatic)
5.任务组中根据factory和调度任务客户端 创建调度服务数据并存储在server节点
6.任务组里面启动一个任务线程,拉取控制台设置的数据,并按照配置设置调度任务到指定时间操作processor开始或停止,执行后将新信息回写到server节点以便于下次计算。
上一篇: 通过数组创建队列
下一篇: Numpy中关于axis的理解