Zeus源码剖析之事件处理机制
Zeus的事件处理机制中对观察着模式的应用可以说是用到了极致,因此在分析解读zeus中的事件处理机制之前,我们先简单的脑补一下观察者设计模式。观察者设计模式,简单来说,就是一个对象观察另一个对象,当被观察的对象发生变化时,观察者也会跟着变化。
观察者模式
举一个简单的例子,老鹰抓小鸡的游戏,
package designPatterns.observer.eagle;
/**
* @time 2017/10/30
* @desc 老鹰(被观察者)
*/
public class Eagle {
/**
* 动作
*/
public static String action;
private String name;
public Eagle(String name){
this.name = name;
}
public void move(){
System.out.println(this.name + action);
}
}
母鸡为观察者,监听老鹰的动作
package designPatterns.observer.eagle;
/**
* @time 2017/10/30
* @desc 母鸡(观察者)
*/
public class Hen {
private String name;
private Eagle eagle;
public Hen(Eagle eagle, String name){
this.eagle = eagle;
this.name = name;
}
public void move() {
if(eagle.action.equals("飞走了")){
System.out.println(this.name + "呱呱叫");
}else{
System.out.println(this.name + Eagle.action);
}
}
}
小鸡响应母鸡的监听
package designPatterns.observer.eagle;
/**
* @author <a href="mailto:aaa@qq.com">凌霄</a>
* @time 2017/10/30
* @desc 小鸡
*/
public class Chick {
private String name;
public Chick(String name) {
this.name = name;
}
public void move(){
if(Eagle.action.equals("飞走了")){
System.out.println(this.name + "唧唧叫");
}else{
System.out.println(this.name + Eagle.action);
}
}
}
母鸡捕捉老鹰的事件动作
package designPatterns.observer.eagle;
import java.util.Random;
/**
* @author <a href="mailto:aaa@qq.com">凌霄</a>
* @time 2017/10/30
* @desc
*/
public class Test {
private static final String[] actions = {"向左移动一步", "向左移动两步", "向左移动三步",
"向右移动一步", "向右移动两步", "向右移动三步", "飞走了"};
public static void main(String[] args) {
Eagle eagle = new Eagle("老鹰");
Hen hen = new Hen(eagle,"母鸡");
Chick chick = new Chick("小鸡");
Random random = new Random();
int ran;
for(int i=0;i<10;i++){
ran = random.nextInt(actions.length);
Eagle.action = actions[ran];
eagle.move();
hen.move();
chick.move();
}
}
}
输出如下:
老鹰飞走了
母鸡呱呱叫
小鸡唧唧叫
老鹰向右移动三步
母鸡向右移动三步
小鸡向右移动三步
老鹰向右移动三步
母鸡向右移动三步
小鸡向右移动三步
老鹰飞走了
母鸡呱呱叫
小鸡唧唧叫
老鹰向右移动三步
母鸡向右移动三步
小鸡向右移动三步
老鹰向左移动一步
母鸡向左移动一步
小鸡向左移动一步
老鹰向右移动一步
母鸡向右移动一步
小鸡向右移动一步
老鹰向左移动两步
母鸡向左移动两步
小鸡向左移动两步
老鹰向左移动一步
母鸡向左移动一步
小鸡向左移动一步
老鹰飞走了
母鸡呱呱叫
小鸡唧唧叫
zeus事件响应机制
以上只是对观察者设计模式的简单理解实现。下面开始到zeus的事件处理机制中来,前面的zeus架构图中很明显的画出zeus客户端所使用的前端技术,万恶的GWT,这种类似于java中swing技术的封装,关于这个技术的实现细节,我们不做深入探讨,因为这毕竟已经不是主流的web开发技术了。但是为了深入解读开源zeus的源码,我们还是要对这个技术的实现做一个简单的研究。zeus客户端,也就是页面使用的技术交GWT RPC,关于这个技术怎么运用,谷歌有个官方文档(http://www.tutorialspoint.com/gwt/gwt_rpc_communication.htm)
这种技术的应用直接导致web开发表现出只有MV两层架构,视图层和控制层被融合在一起,所有的页面操作都被封装成事件(Event),并通过用GWT封装好的ajax请求异步请求到后台,关于GWT这项令人匪夷所思的技术就简单介绍这么多。我们只需要明白这么一点:它是一种用java写页面的技术,并封装好了异步请求,将MVC三层架构中的VC层融合在一起。
zeus客户端操作事件(Event)都能被服务端敏感的捕捉,我们先看看zeus中对事件的抽象所设计到的核心几个类,以及他们之间的继承关系如下:
事件,类似于前面观察者模式中的老鹰,那么小鸡是什么呢,zeus中抽象的定义为Dispatcher,事件分发器,它的类继承关系如下:
Dispatcher的源码如下:
package com.taobao.zeus.mvc;
import com.taobao.zeus.schedule.mvc.ScheduleInfoLog;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class Dispatcher extends BaseObservable {
public static final EventType BeforeDispatch = new EventType();
public static final EventType AfterDispatch = new EventType();
private Map<String, AppEvent> history;
private List<Controller> controllers;
/**
* Forwards an application event to the dispatcher.
*
* @param event the application event
*/
public void forwardEvent(AppEvent event) {
dispatch(event);
}
/**
* Creates and forwards an application event to the dispatcher.
*
* @param eventType the application event type
*/
public void forwardEvent(EventType eventType) {
dispatch(eventType);
}
/**
* Creates and forwards an application event to the dispatcher.
*
* @param eventType the application event type
* @param data the event data
*/
public void forwardEvent(EventType eventType, Object data) {
dispatch(new AppEvent(eventType, data));
}
/**
* Creates and forwards an application event to the dispatcher.
*
* @param eventType the application event type
* @param data the event data
* @param historyEvent true to mark event as a history event
*/
public void forwardEvent(EventType eventType, Object data, boolean historyEvent) {
AppEvent ae = new AppEvent(eventType, data);
ae.setHistoryEvent(historyEvent);
dispatch(ae);
}
public Dispatcher() {
controllers = new ArrayList<Controller>();
}
/**
* Adds a controller.
*
* @param controller the controller to be added
*/
public void addController(Controller controller) {
if (!controllers.contains(controller)) {
controllers.add(controller);
}
}
/**
* Adds a listener to receive dispatch events.
*
* @param listener the listener to add
*/
public void addDispatcherListener(DispatcherListener listener) {
addListener(BeforeDispatch, listener);
addListener(AfterDispatch, listener);
}
/**
* The dispatcher will query its controllers and pass the application event to
* controllers that can handle the particular event type.
*
* @param type the event type
*/
public void dispatch(EventType type) {
dispatch(new AppEvent(type));
}
/**
* The dispatcher will query its controllers and pass the application event to
* controllers that can handle the particular event type.
*
* @param type the event type
* @param data the app event data
*/
public void dispatch(EventType type, Object data) {
dispatch(new AppEvent(type, data));
}
/**
* Returns all controllers.
*
* @return the list of controllers
*/
public List<Controller> getControllers() {
return controllers;
}
/**
* Returns the dispatcher's history cache.
*
* @return the history
*/
public Map<String, AppEvent> getHistory() {
return history;
}
/**
* Removes a controller.
*
* @param controller the controller to be removed
*/
public void removeController(Controller controller) {
boolean contain=controllers.contains(controller);
System.out.println(contain);
controllers.remove(controller);
}
/**
* Removes a previously added listener.
*
* @param listener the listener to be removed
*/
public void removeDispatcherListener(DispatcherListener listener) {
removeListener(BeforeDispatch, listener);
removeListener(AfterDispatch, listener);
}
private void dispatch(AppEvent event) {
try {
MvcEvent e = new MvcEvent(this, event);
e.setAppEvent(event);
if (fireEvent(BeforeDispatch, e)) {
List<Controller> copy = new ArrayList<Controller>(controllers);
for (Controller controller : copy) {
if (controller.canHandle(event)) {
if (!controller.initialized) {
controller.initialized = true;
controller.initialize();
}
controller.handleEvent(event);
}
}
fireEvent(AfterDispatch, e);
}
} catch (Exception e) {
ScheduleInfoLog.error("dispatch error", e);
}
}
}
源码并不复杂,重点关注他是持有所有任务的 List controllers 的,也是所有关于任务的控制都他触发。还有一个比较有意思的是,那么所有任务的Dispatchar又是被谁持有, 那就是MasterContext,这个是zeus任务调度分发的一个核心类,源码如下:
package com.taobao.zeus.socket.master;
import java.util.Comparator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.util.internal.ConcurrentHashMap;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.impl.StdSchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import com.taobao.zeus.model.HostGroupCache;
import com.taobao.zeus.mvc.Dispatcher;
import com.taobao.zeus.schedule.mvc.ScheduleInfoLog;
import com.taobao.zeus.store.DebugHistoryManager;
import com.taobao.zeus.store.FileManager;
import com.taobao.zeus.store.GroupManager;
import com.taobao.zeus.store.GroupManagerOld;
import com.taobao.zeus.store.JobHistoryManager;
import com.taobao.zeus.store.ProfileManager;
import com.taobao.zeus.store.HostGroupManager;
public class MasterContext {
private static Logger log = LoggerFactory.getLogger(MasterContext.class);
private Map<Channel, MasterWorkerHolder> workers=new ConcurrentHashMap<Channel, MasterWorkerHolder>();
private ApplicationContext applicationContext;
private Master master;
private Scheduler scheduler;
private Dispatcher dispatcher;//分发器,各种监听的分发
private Map<String,HostGroupCache> hostGroupCache;
private Queue<JobElement> queue=new PriorityBlockingQueue<JobElement>(10000, new Comparator<JobElement>() {
public int compare(JobElement je1, JobElement je2) {
int numbera = je1.getPriorityLevel();
int numberb = je2.getPriorityLevel();
if (numberb > numbera) {
return 1;
} else if (numberb < numbera) {
return -1;
} else {
return 0;
}
}
});
private Queue<JobElement> exceptionQueue = new LinkedBlockingQueue<JobElement>();
//调试任务 debugId
private Queue<JobElement> debugQueue=new ArrayBlockingQueue<JobElement>(1000);
//手动任务 historyId
private Queue<JobElement> manualQueue=new ArrayBlockingQueue<JobElement>(1000);
private MasterHandler handler;
private MasterServer server;
private ExecutorService threadPool=Executors.newCachedThreadPool();
private ScheduledExecutorService schedulePool=Executors.newScheduledThreadPool(12);
public MasterContext(ApplicationContext applicationContext){
this.applicationContext=applicationContext;
}
public void init(int port){
log.info("init begin");
try {
StdSchedulerFactory stdSchedulerFactory = new StdSchedulerFactory();
stdSchedulerFactory.initialize("zeusQuartz.properties");
scheduler = stdSchedulerFactory.getScheduler();
scheduler.start();
} catch (SchedulerException e) {
ScheduleInfoLog.error("schedule start fail", e);
}
dispatcher=new Dispatcher();
handler=new MasterHandler(this);
server=new MasterServer(handler);
server.start(port);
master=new Master(this);
log.info("init finish");
}
public void destory(){
threadPool.shutdown();
schedulePool.shutdown();
if(server!=null){
server.shutdown();
}
if(scheduler!=null){
try {
scheduler.shutdown();
} catch (SchedulerException e) {
e.printStackTrace();
}
}
ScheduleInfoLog.info("destory finish");
}
public Map<Channel, MasterWorkerHolder> getWorkers() {
return workers;
}
public void setWorkers(Map<Channel, MasterWorkerHolder> workers) {
this.workers = workers;
}
public Scheduler getScheduler() {
return scheduler;
}
public void setScheduler(Scheduler scheduler) {
this.scheduler = scheduler;
}
public Dispatcher getDispatcher() {
return dispatcher;
}
public void setDispatcher(Dispatcher dispatcher) {
this.dispatcher = dispatcher;
}
public HostGroupManager getHostGroupManager(){
return (HostGroupManager) applicationContext.getBean("hostGroupManager");
}
public JobHistoryManager getJobHistoryManager() {
return (JobHistoryManager) applicationContext.getBean("jobHistoryManager");
}
public DebugHistoryManager getDebugHistoryManager(){
return (DebugHistoryManager)applicationContext.getBean("debugHistoryManager");
}
public FileManager getFileManager(){
return (FileManager) applicationContext.getBean("fileManager");
}
public ProfileManager getProfileManager(){
return (ProfileManager) applicationContext.getBean("profileManager");
}
public Queue<JobElement> getQueue() {
return queue;
}
public void setQueue(Queue<JobElement> queue) {
this.queue = queue;
}
public GroupManager getGroupManager() {
return (GroupManager) applicationContext.getBean("groupManager");
}
public GroupManagerOld getGroupManagerOld() {
return (GroupManagerOld) applicationContext.getBean("groupManagerOld");
}
public MasterHandler getHandler() {
return handler;
}
public void setHandler(MasterHandler handler) {
this.handler = handler;
}
public ApplicationContext getApplicationContext() {
return applicationContext;
}
public MasterServer getServer() {
return server;
}
public void setServer(MasterServer server) {
this.server = server;
}
public ExecutorService getThreadPool() {
return threadPool;
}
public Master getMaster() {
return master;
}
public void setMaster(Master master) {
this.master = master;
}
public ScheduledExecutorService getSchedulePool() {
return schedulePool;
}
public Queue<JobElement> getDebugQueue() {
return debugQueue;
}
public void setDebugQueue(Queue<JobElement> debugQueue) {
this.debugQueue = debugQueue;
}
public Queue<JobElement> getManualQueue() {
return manualQueue;
}
public synchronized void refreshHostGroupCache(){
try {
hostGroupCache = getHostGroupManager().getAllHostGroupInfomations();
} catch (Exception e) {
ScheduleInfoLog.error("refresh hostgroupcache error", e);
}
}
public synchronized Map<String,HostGroupCache> getHostGroupCache() {
return hostGroupCache;
}
public Queue<JobElement> getExceptionQueue() {
return exceptionQueue;
}
}
可以说他持有了真个zeus中所有关于调度,分布式通信等核心信息。
再回到时间出发机制与观察者模式中,老鹰,小鸡已经现身,那么,事件处理机制中的母鸡是什么?我再看一个类继承图:
重点看一下AddJobListener的源码:
package com.taobao.zeus.schedule.mvc;
import java.util.ArrayList;
import org.quartz.Scheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
import com.taobao.zeus.mvc.AppEvent;
import com.taobao.zeus.mvc.Controller;
import com.taobao.zeus.mvc.Dispatcher;
import com.taobao.zeus.mvc.DispatcherListener;
import com.taobao.zeus.mvc.MvcEvent;
import com.taobao.zeus.schedule.mvc.event.Events;
import com.taobao.zeus.schedule.mvc.event.JobMaintenanceEvent;
import com.taobao.zeus.socket.master.Master;
import com.taobao.zeus.socket.master.MasterContext;
import com.taobao.zeus.store.GroupManager;
/**
* 如果是新增操作,这里进行处理,添加controller
* @author zhoufang
*
*/
public class AddJobListener extends DispatcherListener{
private static Logger log=LoggerFactory.getLogger(AddJobListener.class);
private Master master;
private MasterContext context;
public AddJobListener(MasterContext context,Master master){
this.master=master;
this.context=context;
}
@Override
public void beforeDispatch(MvcEvent mvce) {
if(mvce.getAppEvent() instanceof JobMaintenanceEvent){
JobMaintenanceEvent event=(JobMaintenanceEvent)mvce.getAppEvent();
if (event.getType() != Events.UpdateActions) {
String jobId=event.getId();
boolean exist=false;
for(Controller c:new ArrayList<Controller>(context.getDispatcher().getControllers())){
if(c instanceof JobController){
JobController jc=(JobController)c;
if(jc.getJobId().equals(jobId)){
exist=true;
break;
}
}
}
if(!exist){//新增操作
JobController controller=new JobController(context,master, jobId);
context.getDispatcher().addController(controller);
controller.handleEvent(new AppEvent(Events.Initialize));
mvce.setCancelled(true);
log.error("schedule add job with jobId:"+jobId);
}
}
}
}
}
beforeDispatch函数的代码功能就是感知手动操作任务事件,修改任务的时候,创建Controller,并分发给Dispatcher。
经过上面的分析:我们可以发现zeus中的事件触发机制中,首先事件(event)触发被listener监听到,然后分发给Dispatcher处理,而处理任务的核心类是Controller,它是一个抽象类,源码如下:
package com.taobao.zeus.mvc;
import java.util.ArrayList;
import java.util.List;
public abstract class Controller {
protected List<Controller> children;
protected boolean initialized;
protected Controller parent;
private List<EventType> supportedEvents;
/**
* Add a child controller.
*
* @param controller the controller to added
*/
public void addChild(Controller controller) {
if (children == null) children = new ArrayList<Controller>();
children.add(controller);
controller.parent = this;
}
/**
* Determines if the controller can handle the particular event. Default
* implementation checks against registered event types then queries all child
* controllers.
*
* @param event the event
* @return <code>true</code> if event can be handled, <code>false</code>
* otherwise
*/
public boolean canHandle(AppEvent event) {
return canHandle(event, true);
}
/**
* Determines if the controller can handle the particular event. Default
* implementation checks against registered event types then queries all child
* controllers if bubbleUp set to true.
*
* @param event the event
* @param bubbleDown true to bubble down children controllers
* @return <code>true</code> if event can be handled, <code>false</code>
* otherwise
*/
public boolean canHandle(AppEvent event, boolean bubbleDown) {
if (supportedEvents != null && supportedEvents.contains(event.getType())) return true;
if (children != null && bubbleDown) {
for (Controller c : children) {
if (c.canHandle(event, bubbleDown)) return true;
}
}
return false;
}
/**
* Forwards an event to any child controllers who can handle the event.
*
* @param event the event to forward
*/
public void forwardToChild(AppEvent event) {
if (children != null) {
for (Controller c : children) {
if (!c.initialized) {
c.initialize();
c.initialized = true;
}
if (c.canHandle(event)) {
c.handleEvent(event);
}
}
}
}
/**
* Forward an event to a view. Ensures the view is initialized before
* forwarding the event.
*
* @param view the view to forward the event
* @param event the event to be forwarded
*/
public void forwardToView(View view, AppEvent event) {
if (!view.initialized) {
view.initialize();
view.initialized = true;
}
view.handleEvent(event);
}
/**
* Forward an event to a view. Ensures the view is initialized before
* forwarding the event.
*
* @param view the view to forward the event
* @param type the event type
* @param data the event data
*/
public void forwardToView(View view, EventType type, Object data) {
AppEvent e = new AppEvent(type, data);
forwardToView(view, e);
}
/**
* Processes the event.
*
* @param event the current event
*/
public abstract void handleEvent(AppEvent event);
/**
* Called once prior to handleEvent being called.
*/
protected void initialize() {
}
protected void destory(){
}
/**
* Registers the event type.
*
* @param types the event types
*/
protected void registerEventTypes(EventType... types) {
if (supportedEvents == null) {
supportedEvents = new ArrayList<EventType>();
}
if (types != null) {
for (EventType type : types) {
if (!supportedEvents.contains(type)) {
supportedEvents.add(type);
}
}
}
}
}
它的核心操作就是响应各种事件,一个抽象类。而它是被JobController所继承:
package com.taobao.zeus.schedule.mvc;
import com.taobao.zeus.client.ZeusException;
import com.taobao.zeus.jobs.JobContext;
import com.taobao.zeus.jobs.sub.tool.CancelHadoopJob;
import com.taobao.zeus.model.JobDescriptor;
import com.taobao.zeus.model.JobDescriptor.JobScheduleType;
import com.taobao.zeus.model.JobHistory;
import com.taobao.zeus.model.JobStatus;
import com.taobao.zeus.model.JobStatus.Status;
import com.taobao.zeus.model.JobStatus.TriggerType;
import com.taobao.zeus.mvc.AppEvent;
import com.taobao.zeus.mvc.Controller;
import com.taobao.zeus.mvc.Dispatcher;
import com.taobao.zeus.schedule.hsf.CacheJobDescriptor;
import com.taobao.zeus.schedule.mvc.event.*;
import com.taobao.zeus.socket.master.Master;
import com.taobao.zeus.socket.master.MasterContext;
import com.taobao.zeus.store.GroupManager;
import com.taobao.zeus.store.JobBean;
import com.taobao.zeus.store.JobHistoryManager;
import com.taobao.zeus.util.DateUtil;
import com.taobao.zeus.util.PropertyKeys;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.quartz.*;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
public class JobController extends Controller {
private final String jobId;
private CacheJobDescriptor cache;
private JobHistoryManager jobHistoryManager;
private GroupManager groupManager;
private Master master;
private MasterContext context;
private static Logger log = LogManager.getLogger(JobController.class);
public JobController(MasterContext context, Master master, String jobId) {
this.jobId = jobId;
this.jobHistoryManager = context.getJobHistoryManager();
groupManager = context.getGroupManager();
this.cache = new CacheJobDescriptor(jobId, groupManager);
this.master = master;
this.context = context;
registerEventTypes(Events.Initialize);
}
private final Date getForver(){
try {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2099-12-31 23:59:59");
} catch (ParseException e) {
e.printStackTrace();
return null;
}
}
@Override
public boolean canHandle(AppEvent event, boolean bubbleDown) {
if (super.canHandle(event, bubbleDown)) {
JobDescriptor jd = cache.getJobDescriptor();
if (jd == null) {
autofix();
return false;
}
return jd.getAuto();
}
return false;
}
@Override
public void handleEvent(AppEvent event) {
try {
if (event instanceof JobSuccessEvent) {
successEventHandle((JobSuccessEvent) event);
} else if (event instanceof JobFailedEvent) {
failedEventHandle((JobFailedEvent) event);
} else if (event instanceof ScheduleTriggerEvent) {
triggerEventHandle((ScheduleTriggerEvent) event);
} else if (event instanceof JobMaintenanceEvent) {
maintenanceEventHandle((JobMaintenanceEvent) event);
} else if (event instanceof JobLostEvent) {
lostEventHandle((JobLostEvent) event);
} else if (event.getType() == Events.Initialize) {
initializeEventHandle();
}
} catch (Exception e) {
// catch所有的异常,保证本job的异常不影响其他job的运行
ScheduleInfoLog.error("JobId:" + jobId + " handleEvent error", e);
}
}
private void initializeEventHandle() {
JobStatus jobStatus = groupManager.getJobStatus(jobId);
// System.out.println("jobId: "+jobId+" jobStatus:"+jobStatus.getStatus());
if (jobStatus != null) {
// 启动时发现在RUNNING 状态,说明上一次运行的结果丢失,将立即进行重试
if (jobStatus.getStatus() == Status.RUNNING) {
log.error("jobId=" + jobId
+ " 处于RUNNING状态,说明该JOB状态丢失,立即进行重试操作...");
// 搜索上一次运行的日志,从日志中提取jobid 进行kill
if (jobStatus.getHistoryId() != null) {
JobHistory history = jobHistoryManager
.findJobHistory(jobStatus.getHistoryId());
// 特殊情况下,有可能history查询为空
if (history != null && history.getStatus() == Status.RUNNING){
try {
JobContext temp = JobContext.getTempJobContext(JobContext.MANUAL_RUN);
history.setIllustrate("启动服务器发现正在running状态,判断状态已经丢失,进行重试操作");
temp.setJobHistory(history);
new CancelHadoopJob(temp).run();
master.run(history);
} catch (Exception e) {
// 忽略
}
}else if(history != null
&& history.getStatus() == Status.FAILED
&& history.getIllustrate().equals("worker断开连接,主动取消该任务")){
try {
JobContext temp = JobContext.getTempJobContext(JobContext.MANUAL_RUN);
history.setIllustrate("启动服务器发现worker与master断开连接,worker主动取消该任务,进行重试操作");
temp.setJobHistory(history);
new CancelHadoopJob(temp).run();
master.run(history);
} catch (Exception e) {
// 忽略
}
}
}else{
JobHistory history = new JobHistory();
history.setIllustrate("启动服务器发现正在running状态,判断状态已经丢失,进行重试操作");
history.setTriggerType(TriggerType.MANUAL_RECOVER);
history.setJobId(jobId);
JobDescriptor jobDescriptor = groupManager.getUpstreamJobBean(jobId).getJobDescriptor();
history.setToJobId(jobDescriptor.getToJobId());
if(jobDescriptor != null){
history.setOperator(jobDescriptor.getOwner() == null ? null : jobDescriptor.getOwner());
history.setHostGroupId(jobDescriptor.getHostGroupId());
}
context.getJobHistoryManager().addJobHistory(history);
master.run(history);
}
}
}
JobDescriptor jd = cache.getJobDescriptor();
// 如果是定时任务,启动定时程序
if (jd.getAuto() && jd.getScheduleType() == JobScheduleType.Independent) {
String cronExpression = jd.getCronExpression();
try {
CronTrigger trigger = new CronTrigger(jd.getId(), "zeus",
cronExpression);
JobDetail detail = new JobDetail(jd.getId(), "zeus",
TimerJob.class);
detail.getJobDataMap().put("jobId", jd.getId());
detail.getJobDataMap().put("dispatcher",
context.getDispatcher());
context.getScheduler().scheduleJob(detail, trigger);
} catch (Exception e) {
if (e instanceof SchedulerException
&& "Based on configured schedule, the given trigger will never fire."
.equals(e.getMessage())) {
// 定时器已经不会被触发了,关闭该job的自动调度功能
jd.setAuto(false);
try {
groupManager.updateJob(jd.getOwner(), jd);
} catch (ZeusException e1) {
log.error("JobId:" + jobId + " 更新失败", e1);
}
cache.refresh();
} else {
log.error("JobId:" + jobId + " 定时程序启动失败", e);
}
}
}
// 周期任务,并且没有依赖的情况下,直接根据开始时间执行
if (jd.getAuto()
&& jd.getScheduleType() == JobScheduleType.CyleJob
&& (jd.getDependencies() == null || jd.getDependencies()
.isEmpty())) {
initCycleJob(jd);
}
}
private void initCycleJob(JobDescriptor jd) {
Date date = null;
try {
date = DateUtil.timestamp2Date(jd.getStartTimestamp(),
DateUtil.getDefaultTZStr());
} catch (ParseException e) {
date = new Date();
log.error("parse job start timestamp to date failed,", e);
}
SimpleTrigger simpleTrigger=null;
simpleTrigger = new SimpleTrigger(jd.getId(), "zeus",
date, null, 0, 0L);
// }
JobDetail detail = null;
// 先查看之前是否存在该任务的调度,如果存在,先删除
try {
detail = context.getScheduler().getJobDetail(jd.getId(), "zeus");
} catch (SchedulerException e) {
log.error(e);
}
if (detail != null) {
try {
context.getScheduler().deleteJob(jobId, "zeus");
log.error("schedule remove job with jobId:" + jobId);
} catch (SchedulerException e) {
log.error(e);
}
detail = null;
}
detail = new JobDetail(jd.getId(), "zeus", TimerJob.class);
detail.getJobDataMap().put("jobId", jd.getId());
detail.getJobDataMap().put("dispatcher", context.getDispatcher());
try {
context.getScheduler().scheduleJob(detail, simpleTrigger);
ScheduleInfoLog.info("Add job JobId:" + jobId
+ " to schedule");
} catch (SchedulerException e) {
log.error("schedule simple job failed,", e);
}
}
@Override
protected void destory() {
try {
JobDetail detail = context.getScheduler().getJobDetail(jobId,
"group");
if (detail != null) {
context.getScheduler().deleteJob(jobId, "zeus");
}
} catch (SchedulerException e) {
log.error(e);
}
}
@Override
public boolean canHandle(AppEvent event) {
if (super.canHandle(event)) {
return true;
}
if (event instanceof JobSuccessEvent || event instanceof JobFailedEvent
|| event instanceof ScheduleTriggerEvent
|| event instanceof JobMaintenanceEvent
|| event instanceof JobLostEvent) {
return true;
}
return false;
}
/**
* 维护 当Job被更新后,调度系统需要相应的进行修改
*
* @param event
*/
private void maintenanceEventHandle(JobMaintenanceEvent event) {
if (event.getType() == Events.UpdateJob
&& jobId.equals(event.getId())) {
autofix();
}
//根据任务Id批量更新action
if (event.getType() == Events.UpdateActions && isBelongTo(event.getId())) {
autofix();
}
}
private boolean isBelongTo(String id){
String substr = jobId.substring(12);
Integer id1 = Integer.valueOf(substr);
Integer id2 = Integer.valueOf(id);
return id1.equals(id2);
}
/**
* 漏跑JOB,重新依赖调度
*
* @param event
*/
private void lostEventHandle(JobLostEvent event) {
if (event.getType() == Events.UpdateJob
&& jobId.equals(event.getJobId())) {
//cache.refresh();
JobDescriptor jd = cache.getJobDescriptor();
if(jd!=null && jd.getAuto()){
JobStatus jobStatus = groupManager.getJobStatus(jobId);
if(jobStatus != null){
if(jobStatus.getStatus() == null || jobStatus.getStatus() == Status.WAIT){
Date now = new Date();
SimpleDateFormat df=new SimpleDateFormat("yyyyMMddHHmmss");
String currentDateStr = df.format(now)+"0000";
if(Long.parseLong(jobId) < Long.parseLong(currentDateStr)){
JobHistory history = new JobHistory();
history.setIllustrate("漏跑任务,自动恢复执行");
history.setTriggerType(TriggerType.SCHEDULE);
history.setJobId(jobId);
history.setToJobId(jd.getToJobId());
// history.setExecuteHost(jd.getHost());
history.setHostGroupId(jd.getHostGroupId());
if(jd != null){
history.setOperator(jd.getOwner() == null ? null : jd.getOwner());
}
context.getJobHistoryManager().addJobHistory(history);
master.run(history);
ScheduleInfoLog.info("JobId:" + jobId + " roll lost back lost ");
}
}
}
}
}
}
/**
* 收到执行任务成功的事件的处理流程
*
* @param event
*/
private void successEventHandle(JobSuccessEvent event) {
if (event.getTriggerType() == TriggerType.MANUAL) {
return;
}
String eId = event.getJobId();
JobDescriptor jobDescriptor = cache.getJobDescriptor();
if (jobDescriptor == null) {
autofix();
return;
}
if (!jobDescriptor.getAuto()) {
return;
}
if (jobDescriptor.getScheduleType() == JobScheduleType.Independent) {
return;
}
if (jobDescriptor.getScheduleType() == JobScheduleType.CyleJob) {
cycleJobSuccessHandle(event);
return;
}
if (!jobDescriptor.getDependencies().contains(eId)) {
return;
}
JobStatus jobStatus = null;
synchronized (this) {
jobStatus = groupManager.getJobStatus(jobId);
JobBean bean = groupManager.getUpstreamJobBean(jobId);
String cycle = bean.getHierarchyProperties().getProperty(
PropertyKeys.DEPENDENCY_CYCLE);
if (cycle != null && !"".equals(cycle)) {
Map<String, String> dep = jobStatus.getReadyDependency();
//判断依赖周期是同一天,如果依赖Job的完成时间与当前时间不是同一天,就移除此依赖关系
if ("sameday".equals(cycle)) {
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
String now = format.format(new Date());
for (String key : new HashSet<String>(dep.keySet())) {
String d = format.format(new Date(Long.valueOf(dep
.get(key))));
if (!now.equals(d)) {
jobStatus.getReadyDependency().remove(key);
ScheduleInfoLog.info("JobId:" + jobId
+ " remove overdue dependency " + key);
}
}
}
}
ScheduleInfoLog.info("JobId:" + jobId
+ " received a successed dependency job with jobId:"
+ event.getJobId());
ScheduleInfoLog.info("JobId:" + jobId + " the dependency jobId:"
+ event.getJobId() + " record it");
jobStatus.getReadyDependency().put(eId,
String.valueOf(new Date().getTime()));
groupManager.updateJobStatus(jobStatus);
}
boolean allComplete = true;
for (String key : jobDescriptor.getDependencies()) {
if (jobStatus.getReadyDependency().get(key) == null) {
allComplete = false;
break;
}
}
if (allComplete) {
ScheduleInfoLog.info("JobId:" + jobId
+ " all dependency jobs is ready,run!");
startNewJob(event.getTriggerType(), jobDescriptor, jobId);
} else {
ScheduleInfoLog.info("JobId:" + jobId
+ " some of dependency is not ready,waiting!");
}
}
private void startNewJob(TriggerType type, JobDescriptor jobDescriptor,
String jobID) {
JobHistory history = new JobHistory();
history.setIllustrate("依赖任务全部到位,开始执行");
history.setTriggerType(TriggerType.SCHEDULE);
history.setJobId(jobId);
// System.out.println("依赖任务执行的operator :"+jobDescriptor.getOwner());
history.setOperator(jobDescriptor.getOwner() == null ? null : jobDescriptor.getOwner());
history.setToJobId(jobDescriptor.getToJobId() == null ? null : jobDescriptor.getToJobId());
// history.setExecuteHost(jobDescriptor.getHost());
history.setHostGroupId(jobDescriptor.getHostGroupId());
context.getJobHistoryManager().addJobHistory(history);
history = master.run(history);
if (history.getStatus() == Status.FAILED) {
ZeusJobException exception = new ZeusJobException(
history.getJobId(), history.getLog().getContent());
JobFailedEvent jfe = new JobFailedEvent(jobDescriptor.getId(),
type, history, exception);
ScheduleInfoLog.info("JobId:" + jobId
+ " is fail,dispatch the fail event");
// 广播消息
context.getDispatcher().forwardEvent(jfe);
}
}
/*
* 处理周期任务成功事件 上面一句判断了该任务依赖一句完成的任务
*/
private void cycleJobSuccessHandle(JobSuccessEvent event) {
String eId = event.getJobId();
JobDescriptor jobDescriptor = cache.getJobDescriptor();
JobDescriptor jd = jobDescriptor.getCopy();
JobDescriptor eIobDescriptor = groupManager.getJobDescriptor(eId)
.getX();
String nextStartTime = null;
String nextSSTime = null;
String nextSETime = null;
long nextTS = 0;
// 独立的周期任务,运算下次开始时间
if (eId.equals(jobId)
&& (jobDescriptor.getDependencies() == null || jobDescriptor
.getDependencies().isEmpty())) {
try {
if (jobDescriptor.getCycle().equals("hour")) {
nextStartTime = DateUtil.getDelayTime(1,
jobDescriptor.getStartTime());
nextSSTime = DateUtil.getDelayTime(1,
jobDescriptor.getStatisStartTime());
nextSETime = DateUtil.getDelayTime(1,
jobDescriptor.getStatisEndTime());
nextTS = jobDescriptor.getStartTimestamp() + 60 * 60 * 1000;
}
if (jobDescriptor.getCycle().equals("day")) {
nextStartTime = DateUtil.getDelayTime(24,
jobDescriptor.getStartTime());
nextSSTime = DateUtil.getDelayTime(24,
jobDescriptor.getStatisStartTime());
nextSETime = DateUtil.getDelayTime(24,
jobDescriptor.getStatisEndTime());
nextTS = jobDescriptor.getStartTimestamp() + 24 * 60 * 60
* 1000;
}
} catch (ParseException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
jd.setStartTime(nextStartTime);
jd.setStatisEndTime(nextSETime);
jd.setStatisStartTime(nextSSTime);
jd.setStartTimestamp(nextTS);
JobStatus js = new JobStatus();
js.setJobId(eId);
js.setStatus(JobStatus.Status.WAIT);
try {
groupManager.updateJob(jd.getOwner(), jd);
groupManager.updateJobStatus(js);
} catch (ZeusException e) {
log.error("", e);
e.printStackTrace();
}
//initCycleJob(jd);
cache.refresh();
return;
}
// 如果当前任务依赖于已经完成的任务,则在当前任务中保存已经完成的依赖,
// 并检查当前任务是否可以执行
if (!jobDescriptor.getDependencies().contains(eId)) {
return;
}
JobStatus jobStatus = null;
synchronized (this) {
jobStatus = groupManager.getJobStatus(jobId);
ScheduleInfoLog.info("JobId:" + jobId
+ " received a successed dependency job with jobId:" + eId
+ " statisTime:" + event.getStatisEndTime());
jobStatus.getReadyDependency().put(eId, event.getStatisEndTime());
groupManager.updateJobStatus(jobStatus);
}
boolean allComplete = true;
for (String key : jobDescriptor.getDependencies()) {
if (jobStatus.getReadyDependency().get(key) == null
|| !jobStatus.getReadyDependency().get(key)
.equals(jobDescriptor.getStatisEndTime())) {
allComplete = false;
break;
}
}
// 任务依赖的周期都必须相同并且任务的结束时间也必须相同
String cycle = jobDescriptor.getDepdCycleJob().get(eId);
for (Entry<String, String> entry : jobDescriptor.getDepdCycleJob()
.entrySet()) {
if (!entry.getValue().equals(cycle)) {
ScheduleInfoLog.error("JobId:" + jobId
+ " has different cycle dependence", null);
allComplete = false;
break;
}
}
if (allComplete) {
// 如果周期一样
if (eIobDescriptor.getCycle().equals(jobDescriptor.getCycle())) {
jd.setStatisEndTime(jobDescriptor.getStatisEndTime());
jd.setStartTime(jobDescriptor.getStartTime());
jd.setStatisStartTime(jobDescriptor.getStatisStartTime());
jd.setId(jobId);
jd.setCycle(jobDescriptor.getCycle());
try {
if (jobDescriptor.getCycle().equals("hour")) {
jobDescriptor.setStatisStartTime(DateUtil.getDelayTime(
1, jobDescriptor.getStatisStartTime()));
jobDescriptor.setStatisEndTime(DateUtil.getDelayTime(
1, jobDescriptor.getStatisEndTime()));
jobDescriptor.setStartTime(DateUtil.getDelayTime(
1, jobDescriptor.getStartTime()));
}
if(jobDescriptor.getCycle().equals("day")){
jobDescriptor.setStatisStartTime(DateUtil.getDelayTime(
24, jobDescriptor.getStatisStartTime()));
jobDescriptor.setStatisEndTime(DateUtil.getDelayTime(
24, jobDescriptor.getStatisEndTime()));
jobDescriptor.setStartTime(DateUtil.getDelayTime(
24, jobDescriptor.getStartTime()));
}
groupManager.updateJob(jobDescriptor.getOwner(), jobDescriptor);
cache.refresh();
} catch (ParseException e) {
ScheduleInfoLog.error("parse date failed", e);
} catch (ZeusException e) {
ScheduleInfoLog.error("update job failed", e);
}
ScheduleInfoLog.info("JobId:"+jobId+" all dependence for "+jd.getStatisEndTime()+" is ready,start");
runJob(jd);
} else {
// 如果周期不一样,因为只有天任务依赖小时任务,没有小时任务依赖天,所以可以判断自己肯定是天任务
// 而完成的是小时任务,因此需要判断该小时是否是23时即可
if (event.getStatisEndTime().equals(
jobDescriptor.getStatisEndTime())) {
jd.setStatisEndTime(jobDescriptor.getStatisEndTime());
jd.setStartTime(jobDescriptor.getStartTime());
jd.setStatisStartTime(jobDescriptor.getStatisStartTime());
jd.setId(jobId);
jd.setCycle(jobDescriptor.getCycle());
try {
jobDescriptor.setStatisStartTime(DateUtil.getDelayTime(
24, jobDescriptor.getStatisStartTime()));
jobDescriptor.setStatisEndTime(DateUtil.getDelayTime(
24, jobDescriptor.getStatisEndTime()));
jobDescriptor.setStartTime(DateUtil.getDelayTime(
24, jobDescriptor.getStartTime()));
groupManager.updateJob(jobDescriptor.getOwner(), jobDescriptor);
cache.refresh();
} catch (ParseException e) {
ScheduleInfoLog.error("parse date failed", e);
} catch (ZeusException e) {
ScheduleInfoLog.error("update job failed", e);
}
ScheduleInfoLog.info("JobId:"+jobId+" all dependence for "+jd.getStatisEndTime()+" is ready,start");
runJob(jd);
}
}
} else {
ScheduleInfoLog.info("JobId:" + jobId + " is not ready,waiting!");
}
}
/**
* 收到执行任务失败的事件的处理流程
*
* ?疑惑 当依赖的一个Job失败时,本Job也自动失败了。但是本Job依赖的其他Job的状态是否还保存? 1. 2.抛出失败的消息
*
* @param event
*/
private void failedEventHandle(JobFailedEvent event) {
JobDescriptor jobDescriptor = cache.getJobDescriptor();
if (jobDescriptor == null) {
autofix();
return;
}
if (!jobDescriptor.getAuto()) {
return;
}
}
/**
* 自动修复 因为可能会碰到很多异常情况,比如本该删除的job没有删除,本该更新的job没有更新等等
* 这里做统一的处理,处理完成之后,保证与数据库的设置是一致的
*/
private void autofix() {
cache.refresh();
JobDescriptor jd = cache.getJobDescriptor();
if (jd == null) {// 如果这是一个删除操作,这里将会是null 忽略
// job被删除,需要清理
context.getDispatcher().removeController(this);
destory();
ScheduleInfoLog.info("schedule remove job with jobId:" + jobId);
return;
}
JobDetail detail = null;
try {
detail = context.getScheduler().getJobDetail(jobId, "zeus");
} catch (SchedulerException e) {
log.error(e);
}
// 判断自动调度的开关
if (!jd.getAuto()) {
if (detail != null) {
try {
context.getScheduler().deleteJob(jobId, "zeus");
log.error("schedule remove job with jobId:" + jobId);
} catch (SchedulerException e) {
log.error(e);
}
}
return;
}
if (jd.getScheduleType() == JobScheduleType.Dependent) {// 如果是依赖任务
if (detail != null) {// 说明原来是独立任务,现在变成依赖任务,需要删除原来的定时调度
try {
context.getScheduler().deleteJob(jobId, "zeus");
ScheduleInfoLog
.info("JobId:"
+ jobId
+ " from independent to dependent ,remove from schedule");
} catch (SchedulerException e) {
log.error(e);
}
}
} else if (jd.getScheduleType() == JobScheduleType.Independent) {// 如果是独立任务
ScheduleInfoLog.info("JobId:" + jobId + " independent job,update");
try {
if (detail != null) {
// context.getScheduler().deleteJob(jobId, "zeus");
// ScheduleInfoLog.info("JobId:" + jobId
// + " remove from schedule");
return;
}
CronTrigger trigger = new CronTrigger(jd.getId(), "zeus",
jd.getCronExpression());
detail = new JobDetail(jd.getId(), "zeus", TimerJob.class);
detail.getJobDataMap().put("jobId", jd.getId());
detail.getJobDataMap().put("dispatcher",
context.getDispatcher());
context.getScheduler().scheduleJob(detail, trigger);
ScheduleInfoLog.info("JobId:" + jobId
+ " add job to schedule ");
} catch (SchedulerException e) {
log.error(e);
} catch (ParseException e) {
log.error(e);
}
} else if (jd.getScheduleType() == JobScheduleType.CyleJob
&& (jd.getDependencies() == null || jd.getDependencies()
.isEmpty())) {
initCycleJob(jd);
}
}
/**
* 收到定时触发任务的事件的处理流程
*
* @param event
*/
private void triggerEventHandle(ScheduleTriggerEvent event) {
String eId = event.getJobId();
JobDescriptor jobDescriptor = cache.getJobDescriptor();
if (jobDescriptor == null) {// 说明job被删除了,这是一个异常状况,autofix
autofix();
return;
}
if (!eId.equals(jobDescriptor.getId())) {
return;
}
ScheduleInfoLog.info("JobId:" + jobId
+ " receive a timer trigger event,statisTime is:"
+ jobDescriptor.getStatisEndTime());
runJob(jobDescriptor);
}
private void runJob(JobDescriptor jobDescriptor) {
JobHistory history = new JobHistory();
history.setJobId(jobDescriptor.getId());
history.setToJobId(jobDescriptor.getToJobId() == null ? null : jobDescriptor.getToJobId());
history.setTriggerType(TriggerType.SCHEDULE);
history.setStatisEndTime(jobDescriptor.getStatisEndTime());
history.setTimezone(jobDescriptor.getTimezone());
history.setCycle(jobDescriptor.getCycle());
history.setHostGroupId(jobDescriptor.getHostGroupId());
history.setOperator(jobDescriptor.getOwner() == null ? null : jobDescriptor.getOwner());
context.getJobHistoryManager().addJobHistory(history);
master.run(history);
}
public String getJobId() {
return jobId;
}
@Override
public boolean equals(Object obj) {
if (!(obj instanceof JobController)) {
return false;
}
JobController jc = (JobController) obj;
return jobId.equals(jc.getJobId());
}
@Override
public int hashCode() {
return jobId.hashCode();
}
public static class TimerJob implements Job {
@Override
public void execute(JobExecutionContext context)
throws JobExecutionException {
String jobId = context.getJobDetail().getJobDataMap()
.getString("jobId");
Dispatcher dispatcher = (Dispatcher) context.getJobDetail()
.getJobDataMap().get("dispatcher");
ScheduleInfoLog.info("start the triggerEvent, the jobId = " + jobId);
ScheduleTriggerEvent ste = new ScheduleTriggerEvent(jobId);
dispatcher.forwardEvent(ste);
}
}
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
JobDescriptor jd = cache.getJobDescriptor();
if (jd == null) {
sb.append("JobId:" + jobId + " 查询为null,有异常");
} else {
sb.append("JobId:" + jobId).append(
" auto:" + cache.getJobDescriptor().getAuto());
sb.append(" dependency:"
+ cache.getJobDescriptor().getDependencies());
}
JobDetail detail = null;
try {
detail = context.getScheduler().getJobDetail(jobId, "zeus");
} catch (SchedulerException e) {
}
if (detail == null) {
sb.append("job not in scheduler");
} else {
sb.append("job is in scheduler");
}
return sb.toString();
}
}
从他的方法列表中可以看出他完成zeus中所有的事件响应:
结束语
以上只是对源码分析的个人理解,详细的实现细节只能跟踪源码解读,总之,zeus中的事件响应机制可能远远不止本文提到这些,关于各种事件的定义,还可以分为MVCEvent和zeus中本身调度事件等,这些只能通过源码解读才能分析。