欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

通用任务分发框架(TaskDispatcher),基于生产者消费者模式 dispatcher 

程序员文章站 2022-07-03 08:37:27
...
TaskDispatcher:通用的任务分发和处理框架,基于生产者消费者模式,底层使用阻塞队列实现。

如果需要使用生产者消费者 模式,不需要再手写阻塞队列,只需要启动该服务,并写对应的process 就可以了。
除了使用简单外,还增加任务状态维护,处理结果追踪,以及任务处理方式等功能


核心代码如下


服务接口
package com.gbcom.ccsv3.transport.dispatcher;

import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;


/**
 * 转发器接口。
 * 
 * @author syz
 * @date 2014-9-30,下午02:29:52
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.TaskDispatcherItf
 */
public interface TaskDispatcherItf {
	/**
	 * 新增
	 * @param task TaskBase
	 * @param unique 唯一性队列
	 */
	public void addTask(TaskBase task,boolean unique);
	/**
	 * 新增
	 * @param task TaskBase
	 */
	public void addTask(TaskBase task);
	/**
	 * start
	 */
	public void start();
	/**
	 * stop
	 */
	public void stop();
}




转发器实现
package com.gbcom.ccsv3.transport.dispatcher;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.log4j.Logger;

import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;

/**
 * task 转发器:异步方式,生产者消费者模型 : 基于阻塞队列方式
 * 
 * 是否支持重复添加:默认是允许重复的,unique
 * 
 * 可替换为 框架,后者更方便,且无需关注细节。
 * 
 * 如果需要同步,需提供回调函数
 * @modify add unique ,see method public synchronized void addTask(TaskBase task, boolean keyUnique) 
 * @author syz
 * @date 2014-9-30,下午03:09:12
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.TaskDispatcher
 */
public class TaskDispatcher implements TaskDispatcherItf {
	private static final Logger LOG = Logger.getLogger(TaskDispatcher.class);
	private static final int THREAD_NUM = 40;
	private static final int BLOCK_QUEUE_MAX_SIZE = 10000;
	private static final int BLOCK_QUEUE_CLEAR_SIZE = 2000;

	/**
	 * 线程的执行器
	 */
	private ExecutorService executor = null;

	private TaskProcessManager taskManager = null;

	private boolean isRunning = false;
	/**
	 * 上报Trap消息的队列 :SIZE , 阻塞队列。
	 */
	private BlockingQueue<TaskBase> taskQueue = new LinkedBlockingQueue<TaskBase>(
			BLOCK_QUEUE_MAX_SIZE);
	// 唯一性队列
	private Map<String, TaskBase> keyMap = new HashMap<String, TaskBase>();

	private static class TaskDispatcherHolder {
		private static final TaskDispatcher INSTANCE = new TaskDispatcher();
	}

	/**
	 * 获取单例对象
	 * 
	 * @return TaskDispatcher
	 */
	public static TaskDispatcher getInstance() {
		return TaskDispatcherHolder.INSTANCE;
	}

	private TaskDispatcher() {
		init();
		start();
	}

	private void init() {
		isRunning = false;
		taskManager = new TaskProcessManager();
	}

	@Override
	public synchronized void addTask(TaskBase task, boolean keyUnique) {
		if (!isRunning) {
			LOG
					.error("TaskDispatcher  is not running, the Task below may not process");
		}
		if (LOG.isDebugEnabled()) {
			LOG.debug("add Task to Queue " + task.getTaskName() + " "
					+ task.getType());
		}
		try {
			if (keyUnique) {// 如果需要唯一性校验,TaskKey default is hashcode,allow  define it
				if (keyMap.containsKey(task.getTaskKey())) {
					LOG.info("can not add Task to Queue,the ele exist!!!!"
							+ task.getTaskName() + " , size = "
							+ taskQueue.size());
					return;
				} else {
					keyMap.put(task.getTaskKey(), task);// seize a seat 
				}
			}
			LOG.info("add Task to Queue " + task.getTaskName() + " , size = "
					+ taskQueue.size());
			if (taskQueue.size() >= BLOCK_QUEUE_CLEAR_SIZE) {
				LOG.info(" *****cleart request Task***** trap queue size is more than "
								+ BLOCK_QUEUE_CLEAR_SIZE
								+ ";;  CLEAR BlockingQueue");
				taskQueue.clear();
			}
			taskQueue.put(task);
		} catch (InterruptedException e) {
			LOG.info("/******* add Task InterruptedException*********/");
			LOG.error("add Task to queue interrupted", e);
			LOG.info("/******* add Task InterruptedException  *********/");
		} catch (Exception e) {
			LOG.error("Other Exception  ", e);
		}

	}

	/**
	 * 添加任务
	 * 
	 * @param task
	 *            TaskBase
	 */
	@Override
	public synchronized void addTask(TaskBase task) {
		addTask(task, false);
	}

	/**
	 * 停止
	 */
	@Override
	public void stop() {
		executor.shutdownNow();
		isRunning = false;
	}

	/**
	 * 开始
	 */
	@Override
	public void start() {
		executor = Executors.newCachedThreadPool();
		for (int i = 0; i < THREAD_NUM; i++) {
			executor.execute(new DispatcherTask());
		}
		isRunning = true;
		LOG.info("task Dispatcher task start  , current thread size =  "
				+ THREAD_NUM);

	}

	class DispatcherTask implements Runnable {

		/**
		 * 线程执行方法
		 */
		@Override
		public void run() {
			TaskBase bean = null;
			while (!Thread.currentThread().isInterrupted()) {
				try {
					long begin = System.currentTimeMillis();
					bean = taskQueue.take();
					taskManager.taskProcess(bean);
					LOG.info("process Task  success, thread="
							+ Thread.currentThread().getName()
							+ "  ;spend time :total= "
							+ ((System.currentTimeMillis() - begin) / 1000)
							+ "s  || the queue size is not actually:"
							+ taskQueue.size());
				} catch (InterruptedException e) {
					LOG
							.info("/******* Task Dispatcher  InterruptedException*********/");
					LOG.error("Task Dispatcher thread interrupted ;; tread = "
							+ Thread.currentThread().getName(), e);
					LOG
							.info("/******* Task Dispatcher  InterruptedException*********/");
					Thread.currentThread().interrupt();
					break;
				} catch (Exception e) {
					LOG.error("Task Dispatcher thread exception", e);
					continue;
				} finally {
					// addTask(task,false) keymap has not the key , remove is ok
					keyMap.remove(bean.getTaskKey());// remove the key, pair
														// addTask(task,true);
				}
			}

		}

	}

}



处理器管理器,注册指定任务的处理器
TaskProcessManager
package com.gbcom.ccsv3.transport.dispatcher;

import java.util.HashMap;
import java.util.Map;

import org.apache.log4j.Logger;

import com.gbcom.ccsv3.transport.dispatcher.process.CfgTplTaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.DBTaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.HTTPTaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.SNMPTaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.TaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.UdpTaskProcess;
import com.gbcom.ccsv3.transport.dispatcher.task.CfgTplTask;
import com.gbcom.ccsv3.transport.dispatcher.task.DBTask;
import com.gbcom.ccsv3.transport.dispatcher.task.HTTPTask;
import com.gbcom.ccsv3.transport.dispatcher.task.SNMPTask;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;
import com.gbcom.ccsv3.transport.dispatcher.task.UdpTask;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase.TaskType;

/**
 * Task 处理器管理器。
 * 
 * 
 * @author syz
 * @date 2014-9-30,下午03:07:25
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.TaskProcessManager
 */
public class TaskProcessManager{
	/**
	 * 日志记录器
	 */
	protected static final Logger LOG = Logger.getLogger(TaskProcessManager.class);
	@SuppressWarnings("unchecked")
	private Map<TaskType,TaskProcess> taskProcessors = new HashMap<TaskType,TaskProcess>();

	/**
	 * TaskProcessManager
	 */
	public TaskProcessManager(){
		//注册
		taskProcessors.put(TaskType.UDP, new UdpTaskProcess<UdpTask>());
		taskProcessors.put(TaskType.DB, new DBTaskProcess<DBTask>());
		taskProcessors.put(TaskType.HTTP, new HTTPTaskProcess<HTTPTask>());
		taskProcessors.put(TaskType.SNMP, new SNMPTaskProcess<SNMPTask>());
		taskProcessors.put(TaskType.CFGTPL, new CfgTplTaskProcess<CfgTplTask>());
	}
	
	/**
	 * TaskProcessManager
	 * @param map Map<TaskType,TaskProcess>
	 */
	@SuppressWarnings("unchecked")
	public TaskProcessManager(Map<TaskType,TaskProcess> map){
		this.taskProcessors = map;
	}
	/**
	 * 新增处理任务
	 * @param task TaskBase
	 * @param process TaskProcess
	 */
	@SuppressWarnings("unchecked")
	public void put(TaskBase task, TaskProcess process){
		taskProcessors.put(task.getType(), process);
	}
	/**
	 * 删除指定task 的处理器,,根据类型进行。
	 * @param task TaskBase
	 */
	public void remove(TaskBase task){
		taskProcessors.remove(task.getType());
	}
	
	
	/**
	 * 处理任务
	 * @param task task
	 */
	@SuppressWarnings("unchecked")
	public void taskProcess(TaskBase task) {
		TaskProcess process =taskProcessors.get(task.getType());
		if(process !=null){
			process.process(task);
		}
	}

}

以上是主题流程,以下是一些辅助pojo和处理相关

结果数据
package com.gbcom.ccsv3.transport.dispatcher;

import java.io.Serializable;

/**
 * 返回结果。<code>task</code>
 *  封装了 任务和 操作结果,,
 * @param <T>
 * @author syz
 * @date 2014-9-30,上午11:16:48
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.TaskProResult
 */
public class TaskProResult<T> implements Serializable{

	/**
	 * serialVersionUID: long
	 */
	private static final long serialVersionUID = -6165298377784981884L;
	/**
	 * constructor
	 */
	public TaskProResult() {
		this.successful = true;
	}

	/**
	 * constructor
	 * @param t T
	 */
	public TaskProResult(T t) {
		this.origin = t;
		this.successful = true;
	}


	/**
	 * 构造函数

	 * 
	 * @param origin
	 *            数据源

	 * @param isSuccessful
	 *            数据源处理是否成功

	 * @param throwable
	 *            如果数据源处理失败,那导致失败的异常是什么

	 */
	public TaskProResult(T origin, boolean isSuccessful, Throwable throwable) {
		this.origin = origin;
		this.successful = isSuccessful;
		this.throwable = throwable;
	}
	
	/**
	 * 数据源

	 */
	private T origin;
	
	
	//UDP:Message
	private Object result;

	/**
	 * 数据源处理结果

	 */
	private boolean successful;

	/**
	 * 导致失败的原因,如果数据源处理失败

	 */
	private Throwable throwable;

	private Object extraInfo;

	/**
	 * @return Returns the extraInfo.
	 */
	public Object getExtraInfo() {
		return extraInfo;
	}

	/**
	 * @param extraInfo
	 *            The extraInfo to set.
	 */
	public void setExtraInfo(Object extraInfo) {
		this.extraInfo = extraInfo;
	}


	/**
	 * @return Returns the origin.
	 */
	public T getOrigin() {
		return origin;
	}

	/**
	 * @param origin
	 *            The origin to set.
	 */
	public void setOrigin(T origin) {
		this.origin = origin;
	}

	/**
	 * @return Returns the successful.
	 */
	public boolean isSuccessful() {
		return successful;
	}

	/**
	 * @param successful
	 *            The successful to set.
	 */
	public void setSuccessful(boolean successful) {
		this.successful = successful;
	}

	/**
	 * @return Returns the throwable.
	 */
	public Throwable getThrowable() {
		return throwable;
	}

	/**
	 * @param throwable
	 *            The throwable to set.
	 */
	public void setThrowable(Throwable throwable) {
		this.throwable = throwable;
	}

	/**
	 * 
	 * @param result Object
	 */
	public void setResult(Object result) {
		this.result = result;
	}

	/**
	 * 
	 * @return getResult
	 */
	public Object getResult() {
		return result;
	}

}



事件类型定义
基类
//off checkstyle
package com.gbcom.ccsv3.transport.dispatcher.task;

import java.util.Date;

/**
 * 任务基类  两种状态有用  queue:会执行所有操作
 * 
 * 其它状态 都值执行post方法
 * 
 * @author syz
 * @date 2014-9-30,上午10:02:07
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.task.TaskBase
 */
public class TaskBase {
	public enum TaskState{
		QUEUE(0),RUNNING(1),RUNNED(2),INVALID(3);
		
		private int value;
		TaskState(int i){
			this.setValue(i);
		}
		public TaskState valueOf(int i){
			switch(i){
			case 0:
				return TaskState.QUEUE;
			case 1:
				return TaskState.RUNNING;
			case 2:
				return TaskState.RUNNED;
			case 3:
				return TaskState.INVALID;
				default :
					return TaskState.INVALID;
			}
		}
		public void setValue(int value) {
			this.value = value;
		}
		public int getValue() {
			return value;
		}
	}
	
	
	public enum TaskType{
		DB,UDP,HTTP,SNMP,CFGTPL
	}
	//key,,can unique respons of Object ,,suggess String
	//if you want unique queue ,this is must!!!
	//if not set  the default value is object's hashcode
	//this field can be delete ,the logic remove to TaskDispactcher :public synchronized void addTask(TaskBase task,boolean keyUnique){
	private String taskKey = ""+hashCode();
	
	private String taskName;
	@SuppressWarnings("unchecked")
	private Class clzProc;
	@SuppressWarnings("unchecked")
	private Class postProc;
	@SuppressWarnings("unchecked")
	private Class preProc;
	private Date begin;
	private Date end;
	private TaskState state;
	private TaskType type;
	
	
	public String getTaskKey() {
		return taskKey;
	}
	public void setTaskKey(String tastObj) {
		this.taskKey = tastObj;
	}
	public String getTaskName() {
		return taskName;
	}
	public void setTaskName(String taskName) {
		this.taskName = taskName;
	}
	@SuppressWarnings("unchecked")
	public Class getClzProc() {
		return clzProc;
	}
	@SuppressWarnings("unchecked")
	public void setClzProc(Class clzProc) {
		this.clzProc = clzProc;
	}
	public Date getBegin() {
		return begin;
	}
	public void setBegin(Date begin) {
		this.begin = begin;
	}
	public Date getEnd() {
		return end;
	}
	public void setEnd(Date end) {
		this.end = end;
	}
	public TaskState getState() {
		return state;
	}
	public void setState(TaskState state) {
		this.state = state;
	}
	public TaskType getType() {
		return type;
	}
	public void setType(TaskType type) {
		this.type = type;
	}
	@SuppressWarnings("unchecked")
	public void setPostProc(Class postProc) {
		this.postProc = postProc;
	}
	@SuppressWarnings("unchecked")
	public Class getPostProc() {
		return postProc;
	}
	@SuppressWarnings("unchecked")
	public void setPreProc(Class preProc) {
		this.preProc = preProc;
	}
	@SuppressWarnings("unchecked")
	public Class getPreProc() {
		return preProc;
	}
	
}


一个实现类:upd任务
//off checkstyle
package com.gbcom.ccsv3.transport.dispatcher.task;


/**
 * upd任务
 * 
 * @author syz
 * @date 2014-9-30,上午10:33:37
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.task.UdpTask
 */
public class UdpTask extends TaskBase{
	private OperType operType;
	private String item;
	private String json;
	
	public enum OperType{
		REQUEST,INFOR//不带receive相应
	}
	
	public UdpTask(String item){
		super();
		setType(TaskType.UDP);
		setState(TaskState.QUEUE);
	}
	public UdpTask(String item, OperType operType) {
		super();
		setType(TaskType.UDP);
		setState(TaskState.QUEUE);
		this.item = item;
		this.operType = operType;
	}
	public UdpTask() {
		super();
		setState(TaskState.QUEUE);
		setType(TaskType.UDP);
	}
	
	public String getJson() {
		return json;
	}
	public void setJson(String values) {
		this.json = values;
	}

	
	public String getItem() {
		return item;
	}
	public void setItem(String item) {
		this.item = item;
	}
	public OperType getOperType() {
		return operType;
	}
	public void setOperType(OperType operType) {
		this.operType = operType;
	}


}



处理相关类


接口
package com.gbcom.ccsv3.transport.dispatcher.process;

import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;

/**
 * 任务处理器
 * 
 * @author syz
 * @date 2014-9-30,上午10:47:39
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.TaskProcess
 */
public interface TaskProcess <T>{
	

	/**
	 * 任务处理接口。
	 * @param task 任务对象
	 * @return 任务处理结果封装
	 */
	public TaskProResult<T> process(T task);

}



抽象类
package com.gbcom.ccsv3.transport.dispatcher.process;

import java.util.Date;

import org.apache.log4j.Logger;

import com.gbcom.ccsv3.common.exception.TaskProException;
import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;
import com.gbcom.ccsv3.transport.dispatcher.process.post.PostProcess;
import com.gbcom.ccsv3.transport.dispatcher.process.post.PreProcess;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase.TaskState;

/**
 * 抽象事件处理:事件处理器,实现该抽象类的抽象方法,
 * @param <T>
 * @author syz
 * @date 2014-12-4,下午01:20:13
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.AbstractTaskProcess
 */
public abstract class AbstractTaskProcess<T extends TaskBase> implements TaskProcess<T> {
	protected static final Logger LOG = Logger.getLogger(AbstractTaskProcess.class);
	/**
	 * before 方式
	 */
	public abstract void before();
	/**
	 * after 方法
	 */
	public abstract void after();
	
	
	/**
	 * 处理方法
	 * @param result result:处理结果的封装,保留source
	 * @throws TaskProException TaskProException
	 */
	public abstract void processCall(TaskProResult<T> result)throws TaskProException;
	
	/**
	 * 
	 * 前置处理
	 * @param task  task
	 * @throws InstantiationException  InstantiationException
	 * @throws IllegalAccessException IllegalAccessException
	 */
	@SuppressWarnings("unchecked")
	private void preProcess(T task) throws InstantiationException, IllegalAccessException{
		Class<PreProcess> pre = task.getPreProc();
		if(pre != null){
			PreProcess obj = pre.newInstance();
			obj.preProcess(task);
		}
	}
	@SuppressWarnings("unchecked")
	private void postProcess(TaskProResult<T>result ) throws InstantiationException, IllegalAccessException{
		Class<PostProcess> post = result.getOrigin().getPostProc();
		if(post != null){
			PostProcess obj = post.newInstance();
			obj.postProcess(result);
		}
	}
	/**
	 * @param task T
	 * @return TaskProResult<T>
	 */
	@Override
	public TaskProResult<T> process(T task) {
		TaskProResult<T> result = new TaskProResult<T>();
		
		result.setOrigin(task);
		try {
			if(task.getState() == TaskState.QUEUE){
				preProcess(task);
			}
		} catch (InstantiationException e) {
			LOG.error("InstantiationException", e);
		} catch (IllegalAccessException e) {
			LOG.error("IllegalAccessException", e);
		}
		
		before();
		try {
			if(task.getState() == TaskState.QUEUE){
				processCall(result);
			}
			task.setState(TaskState.RUNNED);
			task.setEnd(new Date());
			result.setSuccessful(true);
			result.setExtraInfo("success");
			result.setThrowable(null);
		} catch (TaskProException e) {
			LOG.error("TaskProException", e);
			result.setSuccessful(false);
			result.setExtraInfo(e.getClass());
			result.setThrowable(e);
		} catch (Exception e){
			LOG.error("other",e);
			result.setSuccessful(false);
			result.setExtraInfo(e.getClass());
			result.setThrowable(e);
		}
		after();
		try {
			postProcess(result);
		} catch (InstantiationException e) {
			LOG.error("InstantiationException", e);
		} catch (IllegalAccessException e) {
			LOG.error("IllegalAccessException", e);
		}
		
		return result;
	}


}



一个实现:UdpTaskProcess
package com.gbcom.ccsv3.transport.dispatcher.process;

import com.gbcom.ccsv3.common.exception.TaskProException;
import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;

/**
 * 执行UDP 操作:发送UDP消息,
 * 
 * @param <T>
 * @author syz
 * @date 2014-9-30,下午12:55:45
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.UdpTaskProcess
 */
public class UdpTaskProcess<T extends TaskBase> extends AbstractTaskProcess<T> {
	/**
	 * 后处理
	 */
	@Override
	public void after() {
		LOG.info("UdpTaskProcess  --after---");
	}

	/**
	 * 前处理
	 */
	@Override
	public void before() {
		LOG.info("UdpTaskProcess  --before---");

	}

	/**
	 * 处理
	 * 
	 * @param result
	 *            TaskProResult<T>
	 * @throws TaskProException
	 *             Exception
	 */
	@Override
	public void processCall(TaskProResult<T> result) throws TaskProException {
	}

	/**
	 * 私有snmp辅助类 result == {@link ProcessResult}
	 * 
	 * @author SunYanzheng
	 * @date 2014-9-30,下午12:59:57
	 * @version v1.0.0
	 * @see com.gbcom.smvc.dispatcher.process.UdpTaskHolder
	 */
	@SuppressWarnings("unused")
	private static class UdpTaskHolder {

	}

}


以上 必要的处理模块完毕,,同时可以自定义一些post-action和pre-action的扩展

post-action/pre-action模块
接口
package com.gbcom.ccsv3.transport.dispatcher.process.post;

import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;


/**
 * 后处理器,,用户自定义自己的处理方法,,在after方法中执行。
 * 
 * @author syz
 * @date 2014-9-30,下午01:36:40
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.post.PostProcess
 */
public interface PostProcess {
	/**
	 * @param result TaskProResult
	 */
	@SuppressWarnings("unchecked")
	public void postProcess(TaskProResult result);

}


适配器
package com.gbcom.ccsv3.transport.dispatcher.process.post;

import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;
import org.apache.log4j.Logger;

/**
 * 默认的后处理 适配器,作为默认处理。
 * 
 * @author syz
 * @date 2014-9-30,下午01:38:53
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.post.PostProcessAdapter
 */
public class PostProcessAdapter implements PostProcess {
	private static final Logger LOG = Logger.getLogger("PostProcessAdapter");
	@SuppressWarnings("unused")
	private Object process;

	/**
	 * 后处理 适配器
	 * 
	 * @param result
	 *            TaskProResult
	 */
	@SuppressWarnings("unchecked")
	@Override
	public void postProcess(TaskProResult result) {
		LOG.info("result----SUCCESS:" + result.isSuccessful());
		LOG.info("result----RESULT:" + result.getResult());
		LOG.info("result----EXTINFO:" + result.getExtraInfo());
		LOG.info("result----EXCEPTION:" + result.getThrowable());

	}

}



一个实现
package com.gbcom.ccsv3.transport.dispatcher.process.post;

import java.util.HashMap;
import java.util.Map;
import org.apache.log4j.Logger;
import com.gbcom.ccsv3.transport.dispatcher.TaskProResult;
import com.gbcom.ccsv3.transport.dispatcher.task.UdpTask;
import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase.TaskState;
import com.gbcom.ccsv3.util.HttpClientUtil;
import com.gbcom.system.utils.JsonUtil;

/**
 * UDP.INFORM type 后续post接口,,
 * 
 * 
 * 在 controller中使用,, 当云平台 异步访问java模块,当操作完成是,需要调用云平台的回调函数。
 * 
 * 
 * 需要考虑源。
 * 
 * @author syz
 * @date 2014-11-5,下午06:31:59
 * @version v1.0.0
 * @see com.gbcom.smvc.dispatcher.process.post.InformPostProcess
 */
public class InformPostProcess implements PostProcess {
	private static final Logger LOG = Logger.getLogger(InformPostProcess.class);

	/**
	 * 心跳报文后处理
	 * 
	 * @param result
	 *            TaskProResult
	 */
	@SuppressWarnings("unchecked")
	@Override
	public void postProcess(TaskProResult result) {

		if (!(result.getOrigin() instanceof UdpTask)) {
			return;
		}
		UdpTask udpTask = (UdpTask) result.getOrigin();
		if (udpTask.getItem().equalsIgnoreCase("forceoffline")) {
			notifyForceOfflien(result);
		}
		if (udpTask.getItem().equalsIgnoreCase("gwebOper")) {
			ignore(result);
		}
	}

	@SuppressWarnings("unchecked")
	private void ignore(TaskProResult result) {
		LOG.info("gwebOper return ,,this in not advance process ,can ignore");
	}

	@SuppressWarnings("unchecked")
	private void notifyForceOfflien(TaskProResult result) {
		UdpTask udpTask = (UdpTask) result.getOrigin();
		String obj = udpTask.getTaskKey();
		String item = udpTask.getItem();// 方法
		String json = udpTask.getJson();// json
		Map<String, String> postmap = new HashMap<String, String>();
		postmap.put("type", item);
		postmap.put("gwid", obj);
		postmap.put("json", json);

		String responseData = HttpClientUtil.post("replace urll", postmap);

		String rst = (String) JsonUtil.jsonToMap(responseData).get("result");

		if (rst != null && rst.equalsIgnoreCase("success")) {

		} else {
			udpTask.setState(TaskState.INVALID);// 无效的task 不会再发送udp报文,但会有post
												// 和pre 也可以有次数限制
			// TaskDispatcher.getInstance().addTask(udpTask);

		}
	}

}


pre-action 跟post类似,仅仅写出接口
package com.gbcom.ccsv3.transport.dispatcher.process.post;

import com.gbcom.ccsv3.transport.dispatcher.task.TaskBase;

/**
 * 前处理器,,用户自定义自己的处理方法,,在after方法中执行。
 * 
 * @author syz
 * @date 2014-9-30,下午01:36:40
 * @version v1.0.0
 * @see PreProcess
 */
public interface PreProcess {
	/**
	 * @param task TaskBase
	 */
	public void preProcess(TaskBase task);
}




客户端使用


自定义事件类型
自定义事件的process
如果需要,扩展pre/post action
开启服务器 就可以工作,,不需要关注线程和阻塞队列。

相关标签: dispatcher