通用任务分发框架(TaskDispatcher),基于生产者消费者模式 dispatcher
程序员文章站
2022-07-03 08:37:27
...
TaskDispatcher:通用的任务分发和处理框架,基于生产者消费者模式,底层使用阻塞队列实现。
如果需要使用生产者消费者 模式,不需要再手写阻塞队列,只需要启动该服务,并写对应的process 就可以了。
除了使用简单外,还增加任务状态维护,处理结果追踪,以及任务处理方式等功能
核心代码如下
转发器实现
处理器管理器,注册指定任务的处理器
TaskProcessManager
以上是主题流程,以下是一些辅助pojo和处理相关
结果数据
事件类型定义
基类
一个实现类:upd任务
处理相关类
接口
抽象类
一个实现:UdpTaskProcess
以上 必要的处理模块完毕,,同时可以自定义一些post-action和pre-action的扩展
post-action/pre-action模块
接口
适配器
一个实现
pre-action 跟post类似,仅仅写出接口
客户端使用
自定义事件类型
自定义事件的process
如果需要,扩展pre/post action
开启服务器 就可以工作,,不需要关注线程和阻塞队列。
如果需要使用生产者消费者 模式,不需要再手写阻塞队列,只需要启动该服务,并写对应的process 就可以了。
除了使用简单外,还增加任务状态维护,处理结果追踪,以及任务处理方式等功能
核心代码如下
服务接口 package com.gbcom.ccsv3.transport.dispatcher; import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase; /** * 转发器接口。 * * @author syz * @date 2014-9-30,下午02:29:52 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.TaskDispatcherItf */ public interface TaskDispatcherItf { /** * 新增 * @param task TaskBase * @param unique 唯一性队列 */ public void addTask(TaskBase task,boolean unique); /** * 新增 * @param task TaskBase */ public void addTask(TaskBase task); /** * start */ public void start(); /** * stop */ public void stop(); }
转发器实现
package com.gbcom.ccsv3.transport.dispatcher; import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import org.apache.log4j.Logger; import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase; /** * task 转发器:异步方式,生产者消费者模型 : 基于阻塞队列方式 * * 是否支持重复添加:默认是允许重复的,unique * * 可替换为 框架,后者更方便,且无需关注细节。 * * 如果需要同步,需提供回调函数 * @modify add unique ,see method public synchronized void addTask(TaskBase task, boolean keyUnique) * @author syz * @date 2014-9-30,下午03:09:12 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.TaskDispatcher */ public class TaskDispatcher implements TaskDispatcherItf { private static final Logger LOG = Logger.getLogger(TaskDispatcher.class); private static final int THREAD_NUM = 40; private static final int BLOCK_QUEUE_MAX_SIZE = 10000; private static final int BLOCK_QUEUE_CLEAR_SIZE = 2000; /** * 线程的执行器 */ private ExecutorService executor = null; private TaskProcessManager taskManager = null; private boolean isRunning = false; /** * 上报Trap消息的队列 :SIZE , 阻塞队列。 */ private BlockingQueue<TaskBase> taskQueue = new LinkedBlockingQueue<TaskBase>( BLOCK_QUEUE_MAX_SIZE); // 唯一性队列 private Map<String, TaskBase> keyMap = new HashMap<String, TaskBase>(); private static class TaskDispatcherHolder { private static final TaskDispatcher INSTANCE = new TaskDispatcher(); } /** * 获取单例对象 * * @return TaskDispatcher */ public static TaskDispatcher getInstance() { return TaskDispatcherHolder.INSTANCE; } private TaskDispatcher() { init(); start(); } private void init() { isRunning = false; taskManager = new TaskProcessManager(); } @Override public synchronized void addTask(TaskBase task, boolean keyUnique) { if (!isRunning) { LOG .error("TaskDispatcher is not running, the Task below may not process"); } if (LOG.isDebugEnabled()) { LOG.debug("add Task to Queue " + task.getTaskName() + " " + task.getType()); } try { if (keyUnique) {// 如果需要唯一性校验,TaskKey default is hashcode,allow define it if (keyMap.containsKey(task.getTaskKey())) { LOG.info("can not add Task to Queue,the ele exist!!!!" + task.getTaskName() + " , size = " + taskQueue.size()); return; } else { keyMap.put(task.getTaskKey(), task);// seize a seat } } LOG.info("add Task to Queue " + task.getTaskName() + " , size = " + taskQueue.size()); if (taskQueue.size() >= BLOCK_QUEUE_CLEAR_SIZE) { LOG.info(" *****cleart request Task***** trap queue size is more than " + BLOCK_QUEUE_CLEAR_SIZE + ";; CLEAR BlockingQueue"); taskQueue.clear(); } taskQueue.put(task); } catch (InterruptedException e) { LOG.info("/******* add Task InterruptedException*********/"); LOG.error("add Task to queue interrupted", e); LOG.info("/******* add Task InterruptedException *********/"); } catch (Exception e) { LOG.error("Other Exception ", e); } } /** * 添加任务 * * @param task * TaskBase */ @Override public synchronized void addTask(TaskBase task) { addTask(task, false); } /** * 停止 */ @Override public void stop() { executor.shutdownNow(); isRunning = false; } /** * 开始 */ @Override public void start() { executor = Executors.newCachedThreadPool(); for (int i = 0; i < THREAD_NUM; i++) { executor.execute(new DispatcherTask()); } isRunning = true; LOG.info("task Dispatcher task start , current thread size = " + THREAD_NUM); } class DispatcherTask implements Runnable { /** * 线程执行方法 */ @Override public void run() { TaskBase bean = null; while (!Thread.currentThread().isInterrupted()) { try { long begin = System.currentTimeMillis(); bean = taskQueue.take(); taskManager.taskProcess(bean); LOG.info("process Task success, thread=" + Thread.currentThread().getName() + " ;spend time :total= " + ((System.currentTimeMillis() - begin) / 1000) + "s || the queue size is not actually:" + taskQueue.size()); } catch (InterruptedException e) { LOG .info("/******* Task Dispatcher InterruptedException*********/"); LOG.error("Task Dispatcher thread interrupted ;; tread = " + Thread.currentThread().getName(), e); LOG .info("/******* Task Dispatcher InterruptedException*********/"); Thread.currentThread().interrupt(); break; } catch (Exception e) { LOG.error("Task Dispatcher thread exception", e); continue; } finally { // addTask(task,false) keymap has not the key , remove is ok keyMap.remove(bean.getTaskKey());// remove the key, pair // addTask(task,true); } } } } }
处理器管理器,注册指定任务的处理器
TaskProcessManager
package com.gbcom.ccsv3.transport.dispatcher; import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; import com.gbcom.ccsv3.transport.dispatcher.process.CfgTplTaskProcess; import com.gbcom.ccsv3.transport.dispatcher.process.DBTaskProcess; import com.gbcom.ccsv3.transport.dispatcher.process.HTTPTaskProcess; import com.gbcom.ccsv3.transport.dispatcher.process.SNMPTaskProcess; import com.gbcom.ccsv3.transport.dispatcher.process.TaskProcess; import com.gbcom.ccsv3.transport.dispatcher.process.UdpTaskProcess; import com.gbcom.ccsv3.transport.dispatcher.task.CfgTplTask; import com.gbcom.ccsv3.transport.dispatcher.task.DBTask; import com.gbcom.ccsv3.transport.dispatcher.task.HTTPTask; import com.gbcom.ccsv3.transport.dispatcher.task.SNMPTask; import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase; import com.gbcom.ccsv3.transport.dispatcher.task.UdpTask; import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase.TaskType; /** * Task 处理器管理器。 * * * @author syz * @date 2014-9-30,下午03:07:25 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.TaskProcessManager */ public class TaskProcessManager{ /** * 日志记录器 */ protected static final Logger LOG = Logger.getLogger(TaskProcessManager.class); @SuppressWarnings("unchecked") private Map<TaskType,TaskProcess> taskProcessors = new HashMap<TaskType,TaskProcess>(); /** * TaskProcessManager */ public TaskProcessManager(){ //注册 taskProcessors.put(TaskType.UDP, new UdpTaskProcess<UdpTask>()); taskProcessors.put(TaskType.DB, new DBTaskProcess<DBTask>()); taskProcessors.put(TaskType.HTTP, new HTTPTaskProcess<HTTPTask>()); taskProcessors.put(TaskType.SNMP, new SNMPTaskProcess<SNMPTask>()); taskProcessors.put(TaskType.CFGTPL, new CfgTplTaskProcess<CfgTplTask>()); } /** * TaskProcessManager * @param map Map<TaskType,TaskProcess> */ @SuppressWarnings("unchecked") public TaskProcessManager(Map<TaskType,TaskProcess> map){ this.taskProcessors = map; } /** * 新增处理任务 * @param task TaskBase * @param process TaskProcess */ @SuppressWarnings("unchecked") public void put(TaskBase task, TaskProcess process){ taskProcessors.put(task.getType(), process); } /** * 删除指定task 的处理器,,根据类型进行。 * @param task TaskBase */ public void remove(TaskBase task){ taskProcessors.remove(task.getType()); } /** * 处理任务 * @param task task */ @SuppressWarnings("unchecked") public void taskProcess(TaskBase task) { TaskProcess process =taskProcessors.get(task.getType()); if(process !=null){ process.process(task); } } }
以上是主题流程,以下是一些辅助pojo和处理相关
结果数据
package com.gbcom.ccsv3.transport.dispatcher; import java.io.Serializable; /** * 返回结果。<code>task</code> * 封装了 任务和 操作结果,, * @param <T> * @author syz * @date 2014-9-30,上午11:16:48 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.TaskProResult */ public class TaskProResult<T> implements Serializable{ /** * serialVersionUID: long */ private static final long serialVersionUID = -6165298377784981884L; /** * constructor */ public TaskProResult() { this.successful = true; } /** * constructor * @param t T */ public TaskProResult(T t) { this.origin = t; this.successful = true; } /** * 构造函数 * * @param origin * 数据源 * @param isSuccessful * 数据源处理是否成功 * @param throwable * 如果数据源处理失败,那导致失败的异常是什么 */ public TaskProResult(T origin, boolean isSuccessful, Throwable throwable) { this.origin = origin; this.successful = isSuccessful; this.throwable = throwable; } /** * 数据源 */ private T origin; //UDP:Message private Object result; /** * 数据源处理结果 */ private boolean successful; /** * 导致失败的原因,如果数据源处理失败 */ private Throwable throwable; private Object extraInfo; /** * @return Returns the extraInfo. */ public Object getExtraInfo() { return extraInfo; } /** * @param extraInfo * The extraInfo to set. */ public void setExtraInfo(Object extraInfo) { this.extraInfo = extraInfo; } /** * @return Returns the origin. */ public T getOrigin() { return origin; } /** * @param origin * The origin to set. */ public void setOrigin(T origin) { this.origin = origin; } /** * @return Returns the successful. */ public boolean isSuccessful() { return successful; } /** * @param successful * The successful to set. */ public void setSuccessful(boolean successful) { this.successful = successful; } /** * @return Returns the throwable. */ public Throwable getThrowable() { return throwable; } /** * @param throwable * The throwable to set. */ public void setThrowable(Throwable throwable) { this.throwable = throwable; } /** * * @param result Object */ public void setResult(Object result) { this.result = result; } /** * * @return getResult */ public Object getResult() { return result; } }
事件类型定义
基类
//off checkstyle package com.gbcom.ccsv3.transport.dispatcher.task; import java.util.Date; /** * 任务基类 两种状态有用 queue:会执行所有操作 * * 其它状态 都值执行post方法 * * @author syz * @date 2014-9-30,上午10:02:07 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.task.TaskBase */ public class TaskBase { public enum TaskState{ QUEUE(0),RUNNING(1),RUNNED(2),INVALID(3); private int value; TaskState(int i){ this.setValue(i); } public TaskState valueOf(int i){ switch(i){ case 0: return TaskState.QUEUE; case 1: return TaskState.RUNNING; case 2: return TaskState.RUNNED; case 3: return TaskState.INVALID; default : return TaskState.INVALID; } } public void setValue(int value) { this.value = value; } public int getValue() { return value; } } public enum TaskType{ DB,UDP,HTTP,SNMP,CFGTPL } //key,,can unique respons of Object ,,suggess String //if you want unique queue ,this is must!!! //if not set the default value is object's hashcode //this field can be delete ,the logic remove to TaskDispactcher :public synchronized void addTask(TaskBase task,boolean keyUnique){ private String taskKey = ""+hashCode(); private String taskName; @SuppressWarnings("unchecked") private Class clzProc; @SuppressWarnings("unchecked") private Class postProc; @SuppressWarnings("unchecked") private Class preProc; private Date begin; private Date end; private TaskState state; private TaskType type; public String getTaskKey() { return taskKey; } public void setTaskKey(String tastObj) { this.taskKey = tastObj; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } @SuppressWarnings("unchecked") public Class getClzProc() { return clzProc; } @SuppressWarnings("unchecked") public void setClzProc(Class clzProc) { this.clzProc = clzProc; } public Date getBegin() { return begin; } public void setBegin(Date begin) { this.begin = begin; } public Date getEnd() { return end; } public void setEnd(Date end) { this.end = end; } public TaskState getState() { return state; } public void setState(TaskState state) { this.state = state; } public TaskType getType() { return type; } public void setType(TaskType type) { this.type = type; } @SuppressWarnings("unchecked") public void setPostProc(Class postProc) { this.postProc = postProc; } @SuppressWarnings("unchecked") public Class getPostProc() { return postProc; } @SuppressWarnings("unchecked") public void setPreProc(Class preProc) { this.preProc = preProc; } @SuppressWarnings("unchecked") public Class getPreProc() { return preProc; } }
一个实现类:upd任务
//off checkstyle package com.gbcom.ccsv3.transport.dispatcher.task; /** * upd任务 * * @author syz * @date 2014-9-30,上午10:33:37 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.task.UdpTask */ public class UdpTask extends TaskBase{ private OperType operType; private String item; private String json; public enum OperType{ REQUEST,INFOR//不带receive相应 } public UdpTask(String item){ super(); setType(TaskType.UDP); setState(TaskState.QUEUE); } public UdpTask(String item, OperType operType) { super(); setType(TaskType.UDP); setState(TaskState.QUEUE); this.item = item; this.operType = operType; } public UdpTask() { super(); setState(TaskState.QUEUE); setType(TaskType.UDP); } public String getJson() { return json; } public void setJson(String values) { this.json = values; } public String getItem() { return item; } public void setItem(String item) { this.item = item; } public OperType getOperType() { return operType; } public void setOperType(OperType operType) { this.operType = operType; } }
处理相关类
接口
package com.gbcom.ccsv3.transport.dispatcher.process; import com.gbcom.ccsv3.transport.dispatcher.TaskProResult; /** * 任务处理器 * * @author syz * @date 2014-9-30,上午10:47:39 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.process.TaskProcess */ public interface TaskProcess <T>{ /** * 任务处理接口。 * @param task 任务对象 * @return 任务处理结果封装 */ public TaskProResult<T> process(T task); }
抽象类
package com.gbcom.ccsv3.transport.dispatcher.process; import java.util.Date; import org.apache.log4j.Logger; import com.gbcom.ccsv3.common.exception.TaskProException; import com.gbcom.ccsv3.transport.dispatcher.TaskProResult; import com.gbcom.ccsv3.transport.dispatcher.process.post.PostProcess; import com.gbcom.ccsv3.transport.dispatcher.process.post.PreProcess; import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase; import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase.TaskState; /** * 抽象事件处理:事件处理器,实现该抽象类的抽象方法, * @param <T> * @author syz * @date 2014-12-4,下午01:20:13 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.process.AbstractTaskProcess */ public abstract class AbstractTaskProcess<T extends TaskBase> implements TaskProcess<T> { protected static final Logger LOG = Logger.getLogger(AbstractTaskProcess.class); /** * before 方式 */ public abstract void before(); /** * after 方法 */ public abstract void after(); /** * 处理方法 * @param result result:处理结果的封装,保留source * @throws TaskProException TaskProException */ public abstract void processCall(TaskProResult<T> result)throws TaskProException; /** * * 前置处理 * @param task task * @throws InstantiationException InstantiationException * @throws IllegalAccessException IllegalAccessException */ @SuppressWarnings("unchecked") private void preProcess(T task) throws InstantiationException, IllegalAccessException{ Class<PreProcess> pre = task.getPreProc(); if(pre != null){ PreProcess obj = pre.newInstance(); obj.preProcess(task); } } @SuppressWarnings("unchecked") private void postProcess(TaskProResult<T>result ) throws InstantiationException, IllegalAccessException{ Class<PostProcess> post = result.getOrigin().getPostProc(); if(post != null){ PostProcess obj = post.newInstance(); obj.postProcess(result); } } /** * @param task T * @return TaskProResult<T> */ @Override public TaskProResult<T> process(T task) { TaskProResult<T> result = new TaskProResult<T>(); result.setOrigin(task); try { if(task.getState() == TaskState.QUEUE){ preProcess(task); } } catch (InstantiationException e) { LOG.error("InstantiationException", e); } catch (IllegalAccessException e) { LOG.error("IllegalAccessException", e); } before(); try { if(task.getState() == TaskState.QUEUE){ processCall(result); } task.setState(TaskState.RUNNED); task.setEnd(new Date()); result.setSuccessful(true); result.setExtraInfo("success"); result.setThrowable(null); } catch (TaskProException e) { LOG.error("TaskProException", e); result.setSuccessful(false); result.setExtraInfo(e.getClass()); result.setThrowable(e); } catch (Exception e){ LOG.error("other",e); result.setSuccessful(false); result.setExtraInfo(e.getClass()); result.setThrowable(e); } after(); try { postProcess(result); } catch (InstantiationException e) { LOG.error("InstantiationException", e); } catch (IllegalAccessException e) { LOG.error("IllegalAccessException", e); } return result; } }
一个实现:UdpTaskProcess
package com.gbcom.ccsv3.transport.dispatcher.process; import com.gbcom.ccsv3.common.exception.TaskProException; import com.gbcom.ccsv3.transport.dispatcher.TaskProResult; import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase; /** * 执行UDP 操作:发送UDP消息, * * @param <T> * @author syz * @date 2014-9-30,下午12:55:45 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.process.UdpTaskProcess */ public class UdpTaskProcess<T extends TaskBase> extends AbstractTaskProcess<T> { /** * 后处理 */ @Override public void after() { LOG.info("UdpTaskProcess --after---"); } /** * 前处理 */ @Override public void before() { LOG.info("UdpTaskProcess --before---"); } /** * 处理 * * @param result * TaskProResult<T> * @throws TaskProException * Exception */ @Override public void processCall(TaskProResult<T> result) throws TaskProException { } /** * 私有snmp辅助类 result == {@link ProcessResult} * * @author SunYanzheng * @date 2014-9-30,下午12:59:57 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.process.UdpTaskHolder */ @SuppressWarnings("unused") private static class UdpTaskHolder { } }
以上 必要的处理模块完毕,,同时可以自定义一些post-action和pre-action的扩展
post-action/pre-action模块
接口
package com.gbcom.ccsv3.transport.dispatcher.process.post; import com.gbcom.ccsv3.transport.dispatcher.TaskProResult; /** * 后处理器,,用户自定义自己的处理方法,,在after方法中执行。 * * @author syz * @date 2014-9-30,下午01:36:40 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.process.post.PostProcess */ public interface PostProcess { /** * @param result TaskProResult */ @SuppressWarnings("unchecked") public void postProcess(TaskProResult result); }
适配器
package com.gbcom.ccsv3.transport.dispatcher.process.post; import com.gbcom.ccsv3.transport.dispatcher.TaskProResult; import org.apache.log4j.Logger; /** * 默认的后处理 适配器,作为默认处理。 * * @author syz * @date 2014-9-30,下午01:38:53 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.process.post.PostProcessAdapter */ public class PostProcessAdapter implements PostProcess { private static final Logger LOG = Logger.getLogger("PostProcessAdapter"); @SuppressWarnings("unused") private Object process; /** * 后处理 适配器 * * @param result * TaskProResult */ @SuppressWarnings("unchecked") @Override public void postProcess(TaskProResult result) { LOG.info("result----SUCCESS:" + result.isSuccessful()); LOG.info("result----RESULT:" + result.getResult()); LOG.info("result----EXTINFO:" + result.getExtraInfo()); LOG.info("result----EXCEPTION:" + result.getThrowable()); } }
一个实现
package com.gbcom.ccsv3.transport.dispatcher.process.post; import java.util.HashMap; import java.util.Map; import org.apache.log4j.Logger; import com.gbcom.ccsv3.transport.dispatcher.TaskProResult; import com.gbcom.ccsv3.transport.dispatcher.task.UdpTask; import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase.TaskState; import com.gbcom.ccsv3.util.HttpClientUtil; import com.gbcom.system.utils.JsonUtil; /** * UDP.INFORM type 后续post接口,, * * * 在 controller中使用,, 当云平台 异步访问java模块,当操作完成是,需要调用云平台的回调函数。 * * * 需要考虑源。 * * @author syz * @date 2014-11-5,下午06:31:59 * @version v1.0.0 * @see com.gbcom.smvc.dispatcher.process.post.InformPostProcess */ public class InformPostProcess implements PostProcess { private static final Logger LOG = Logger.getLogger(InformPostProcess.class); /** * 心跳报文后处理 * * @param result * TaskProResult */ @SuppressWarnings("unchecked") @Override public void postProcess(TaskProResult result) { if (!(result.getOrigin() instanceof UdpTask)) { return; } UdpTask udpTask = (UdpTask) result.getOrigin(); if (udpTask.getItem().equalsIgnoreCase("forceoffline")) { notifyForceOfflien(result); } if (udpTask.getItem().equalsIgnoreCase("gwebOper")) { ignore(result); } } @SuppressWarnings("unchecked") private void ignore(TaskProResult result) { LOG.info("gwebOper return ,,this in not advance process ,can ignore"); } @SuppressWarnings("unchecked") private void notifyForceOfflien(TaskProResult result) { UdpTask udpTask = (UdpTask) result.getOrigin(); String obj = udpTask.getTaskKey(); String item = udpTask.getItem();// 方法 String json = udpTask.getJson();// json Map<String, String> postmap = new HashMap<String, String>(); postmap.put("type", item); postmap.put("gwid", obj); postmap.put("json", json); String responseData = HttpClientUtil.post("replace urll", postmap); String rst = (String) JsonUtil.jsonToMap(responseData).get("result"); if (rst != null && rst.equalsIgnoreCase("success")) { } else { udpTask.setState(TaskState.INVALID);// 无效的task 不会再发送udp报文,但会有post // 和pre 也可以有次数限制 // TaskDispatcher.getInstance().addTask(udpTask); } } }
pre-action 跟post类似,仅仅写出接口
package com.gbcom.ccsv3.transport.dispatcher.process.post; import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase; /** * 前处理器,,用户自定义自己的处理方法,,在after方法中执行。 * * @author syz * @date 2014-9-30,下午01:36:40 * @version v1.0.0 * @see PreProcess */ public interface PreProcess { /** * @param task TaskBase */ public void preProcess(TaskBase task); }
客户端使用
自定义事件类型
自定义事件的process
如果需要,扩展pre/post action
开启服务器 就可以工作,,不需要关注线程和阻塞队列。