欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

基于数据库锁实现springtask的集群  

程序员文章站 2024-02-10 11:27:46
...
   目前项目使用大量spring-task,spring-task有个足的地方是缺失对集群的支持。quartz可以支持定时任务集群,我们项目没有用,所以就自己实现了。我们设计的定时任务有三类。1、节点间不允许并发,2、节点间允许并发,节点内不允许并发,3.节点间允许并发,节点内允许多线程并发。

首先实现任务接口
public interface Task {

	/**
	 * 定时任务被调用入口,此方法中异常应捕获,不应往外面抛出
	 */
    public void excut();
	
    /**
     * 定时业务任务实现方法
     * @throws Exception
     */
	public void doExcut() throws Exception;
	
	
	/**
	 * 是否允许多节点并发运行
	 * @return
	 */
	public boolean isConcurrent();
	
	
	/**
	 * 是否是单节中配置了多线程执行任务
	 * @return
	 */
	public boolean isMulitiThread();
	
	
	/**
	 * 设置任务数据分片信息,当多节点并发取数时,需通过此方法设置分片信息
	 * @param dataSliceInfo  每个节点的取数分片信息
	 */
	public void setDataSliceInfo(DataSliceInfo dataSliceInfo);
	
	/**
	 * 获取单节点下,任务的并发数目 。当节点下任务为多线程并发时,返回此值
	 * @return
	 */
	public int getLocalMulitiThreadNum();
}



创建一个任务基类
public abstract class ClusterBaseTask implements Task {

	protected Logger logger = Logger.getLogger(getClass());
	
	
	@Autowired
	ClusterTaskExcutor excutor;
	
	/**
	 * 暴露在外的方法
	 * @throws Exception 
	 */
	@Override
	public void excut() {
		try {
			excutor.excute(this);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	
	@Override
	public void setDataSliceInfo(DataSliceInfo dataSliceInfo) {

	}

	@Override
	public int getLocalMulitiThreadNum() {
		return 0;
	}

	
	

}



创建一个 ClusterTaskExcutor 进行任务控制
public interface ClusterTaskExcutor {

	
	void excute(Task task) throws Exception;
	
}


实现类
@Service
public class ClusterTaskExcutorImpl implements ClusterTaskExcutor{

	protected Logger logger = Logger.getLogger(getClass());
	
	@Autowired
    TaskLockerDAO taskLockerDAO;
	
	@Autowired
	TaskRuntimeInfoService taskRuntimeInfoService;
	
	//spring中具有线程池管理的调度
	@Autowired
	SchedulingTaskExecutor schedulingTaskExecutor;
	
	
	@Autowired
	HeartbeatService  heartbeatService;
	
	
	@Autowired
	private DataSourceTransactionManager txManager;
	
	

	@Override
	public void excute(final Task task) throws Exception {
		
		logger.info("ClusterTaskExcutorImpl begin ");
		
		if(task.isConcurrent()){//节点间可以并发执行
			
			if(task.isMulitiThread()){//一个节点下多线程执行任务
				doConcurrent(task);
			}else{//一个节点下单线程执行
				task.doExcut();
			}
			
		}else{//节点间互斥执行
	
			doSync(task);
		}
		
		logger.info("ClusterTaskExcutorImpl end ");
		
	}


	private void doConcurrent(final Task task) throws Exception {
		
	  
		DataSliceInfo dataSliceInfo=getDataSliceInfo();
		
		if(dataSliceInfo!=null){//获取到分片信息
			task.setDataSliceInfo(dataSliceInfo);
			
			int size=task.getLocalMulitiThreadNum();
			
			List<Future<?>> futures = new ArrayList<Future<?>>(size);
			
			for (int i = 0; i < size; i++) {
				
				RunableTask runableTask=new RunableTask(task);
				Future<?> f=schedulingTaskExecutor.submit(runableTask);
				futures.add(f);
			}
			
			//主线程等待,让所有线程执行完成后,主线程才执行
			for (Future<?> f : futures) {
				if (!f.isDone()) {
					try {
						f.get();
					} catch (CancellationException ignore) {
					} catch (ExecutionException ignore) {
					}
				}
			}
		}else{//未获取到分片信息
			logger.error("未获取到分片信息,任务退出执行");
		}
		
		
		
	}


	/**
	 * 获取本机的分片信息
	 * @return
	 * @throws UnknownHostException
	 * @throws Exception
	 */
	private DataSliceInfo getDataSliceInfo() {
		String nodeIp=null ;
		try {
			DataSliceInfo dataSliceInfo=new DataSliceInfo();
			List<String> ipList=heartbeatService.getAliveHostList();
			nodeIp= InetAddress.getLocalHost().getHostAddress();
			int index=MapUtil.getHashIndex(nodeIp, ipList);
			int size=ipList.size();
			dataSliceInfo.setIndex(index);
			dataSliceInfo.setSize(size);
			return dataSliceInfo;
		} catch (UnknownHostException e) {
			logger.error("getDataSliceInfo错误", e);
			return null;
		} catch (BusinessServiceException e) {
			logger.error("getDataSliceInfo错误;ip:"+nodeIp+"未激活");
			return null;
		}
	}


	/**
	 * 方法同步执行
	 * @param task
	 * @throws InterruptedException
	 */
	private void doSync(final Task task) throws InterruptedException {
		//手动提交事务
		DefaultTransactionDefinition def = new DefaultTransactionDefinition();
		def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);// 事物隔离级别,开启新事务
		TransactionStatus txStatus = txManager.getTransaction(def); // 获得事务状态
		
		try {
			String taskName=task.getClass().getName();
			if(taskLockerDAO.lockTask(taskName)){//获取任务锁
				
				final TaskRuntimeInfoDTO runtimeInfoDTO = getTaskRunTimeInfo(taskName);
				
				
				Thread thread=new Thread(new Runnable() {
					
					@Override
					public void run() {
						
						taskRuntimeInfoService.beginTask(runtimeInfoDTO);
						try {
							task.doExcut();
						} catch (Exception e) {
							logger.error(e.getMessage(), e);
						}finally{
							taskRuntimeInfoService.endTask(runtimeInfoDTO);
						}
					}
				});
				thread.start();
				thread.join();
			}else{
				logger.warn("任务:["+taskName+"],已经在执行,此线程退出执行");
			}
			
		} finally{
			//操作完成后手动提交事务
			txManager.commit(txStatus);
		}
	}


	private TaskRuntimeInfoDTO getTaskRunTimeInfo(String taskName)
			 {
		
		final TaskRuntimeInfoDTO   runtimeInfoDTO=new TaskRuntimeInfoDTO();
		runtimeInfoDTO.setcTaskName(taskName);
		String nodeIp;
		try {
			nodeIp = InetAddress.getLocalHost().getHostAddress();
		} catch (UnknownHostException e) {
			nodeIp="UnknownHost";
			logger.error("获取IP地址失败", e);
		}
		runtimeInfoDTO.setcRunNodeIp(nodeIp);
		runtimeInfoDTO.setcUpdCde(nodeIp);
		return runtimeInfoDTO;
	}

}


数据库表设计

create table TASK_LOCKER
(
  c_task_name VARCHAR2(256) not null
);
-- Add comments to the table 
comment on table TASK_LOCKER
  is '定时任务locker';
-- Add comments to the columns 
comment on column TASK_LOCKER.c_task_name
  is '任务名称';
-- Create/Recreate primary, unique and foreign key constraints 
alter table TASK_LOCKER
  add constraint TASK_LOCKER_PK_ID primary key (C_TASK_NAME);


-- Create table
create table TASK_RUNTIME_INFO
(
  c_task_name   VARCHAR2(256) not null,
  c_is_run      VARCHAR2(1) not null,
  c_run_node_ip VARCHAR2(256),
  c_crt_cde     VARCHAR2(128) not null,
  t_crt_date    DATE not null,
  c_upd_cde     VARCHAR2(128) not null,
  t_upd_date    DATE not null
);
-- Add comments to the table 
comment on table TASK_RUNTIME_INFO
  is '定时任务运行情况表';
-- Add comments to the columns 
comment on column TASK_RUNTIME_INFO.c_task_name
  is '任务名称';
comment on column TASK_RUNTIME_INFO.c_is_run
  is '是否正在运行';
comment on column TASK_RUNTIME_INFO.c_run_node_ip
  is '运行此任务的节点ip,此任务正在运行时有此值';
comment on column TASK_RUNTIME_INFO.c_crt_cde
  is '创建者';
comment on column TASK_RUNTIME_INFO.t_crt_date
  is '创建时间';
comment on column TASK_RUNTIME_INFO.c_upd_cde
  is '更新者';
comment on column TASK_RUNTIME_INFO.t_upd_date
  is '更新时间';
-- Create/Recreate primary, unique and foreign key constraints 
alter table TASK_RUNTIME_INFO
  add constraint TASK_RUNTIME_INFO_PK_ID primary key (C_TASK_NAME);


获取数据库sql
SELECT 1 FROM  task_locker t WHERE t.c_task_name=#taskName# FOR UPDATE NOWAIT


1、节点间不允许并发定时任务
@Service
public class ClusterTaskMock extends ClusterBaseTask implements TaskMock {

	
	
	@Override
	public void doExcut() throws Exception {
		
		logger.info("doExcut() begin");
		
		Thread.currentThread().sleep(60*1000);
		
		logger.info("doExcut() end");
		
	}

	@Override
	public boolean isConcurrent() {
		return false;
	}

	@Override
	public boolean isMulitiThread() {
		return false;
	}

	
}



2、节点间可以并发定时任务
@Service
public class ConcurrentClusterTaskMock extends ClusterBaseTask implements ConcurrentTaskMock {

	
	
	@Override
	public void doExcut() throws Exception {
		
		logger.info("doExcut() begin");
		
		System.out.println(Thread.currentThread().isDaemon());
		Thread.currentThread().sleep(60*1000);
		
		logger.info("doExcut() end");
		
	}

	@Override
	public boolean isConcurrent() {
		return   true;
	}

	@Override
	public boolean isMulitiThread() {
		return true;
	}

	@Override
	public void setDataSliceInfo(DataSliceInfo dataSliceInfo) {
		
	}

	@Override
	public int getLocalMulitiThreadNum() {
		
		return 5;
	}
	


DataSliceInfo是多线程执行任务时,我们采用对任务id取模的方法分配每个线程所需要执行的任务,避免了在不加锁情况下,多个线程取到相同的任务

public class DataSliceInfo {

	/**
	 * 总分片数
	 */
	private Integer size;
	
	/**
	 * 分片下标
	 */
	private Integer index;
	

	public Integer getSize() {
		return size;
	}

	public void setSize(Integer size) {
		this.size = size;
	}

	public Integer getIndex() {
		return index;
	}

	public void setIndex(Integer index) {
		this.index = index;
	}
	
	
	
	
}