基于zookeeper动态扩展处理分类数据
程序员文章站
2022-03-01 22:30:03
...
背景:
日终处理分户账记录的转逾期,数据量越来越大,单机处理时间已经不能忍受,考虑重构批处理逻辑。
场景特性:分户账转逾期处理,每条记录和别的记录互不相干
因此可以考虑把分户账信息分类处理,该方案可以方便的让执行程序随着分户账数据的不断增加,任意扩展到多个虚拟机,或者在同一个JVM内使用多线程处理。
待完善部分:某个任务处理失败,需要在回调函数增加处理,记录失败的Id号,因为是跑批另外最后两个类里面还可以加上工作日信息,在处理逻辑中加一层校验;在后台修复数据后,增加添加任务接口,让剩余的这个ID的创建新的节点,并重新执行。
第一步;
数据预处理,基于分户账记录的主键,hash后对128(数值可以取大点)取模,把数据分成128份,在此字段建索引
第二步:
利用zookeeper,建立阻塞消息队列
第三部:
任务分发系统MissionMaker定时执行,从数据库取出128这个值和任务名称,在/Queue/operation_yuqi节点下,以此创建128个持久化排序队列,把i值放到节点取值里面
第四步:
任务执行系统TaskExecuter监听/Queue/operation_yuqi节点的队列变化情况,发生变更,就在里面取出一个节点,并读出节点的数据(0-127),然后执行自己管理的部分分户账数据
阻塞队列
//创建任务
//执行任务
//回调接口
序列化到zookeeper的类
获取数据库信息的类
日终处理分户账记录的转逾期,数据量越来越大,单机处理时间已经不能忍受,考虑重构批处理逻辑。
场景特性:分户账转逾期处理,每条记录和别的记录互不相干
因此可以考虑把分户账信息分类处理,该方案可以方便的让执行程序随着分户账数据的不断增加,任意扩展到多个虚拟机,或者在同一个JVM内使用多线程处理。
待完善部分:某个任务处理失败,需要在回调函数增加处理,记录失败的Id号,因为是跑批另外最后两个类里面还可以加上工作日信息,在处理逻辑中加一层校验;在后台修复数据后,增加添加任务接口,让剩余的这个ID的创建新的节点,并重新执行。
第一步;
数据预处理,基于分户账记录的主键,hash后对128(数值可以取大点)取模,把数据分成128份,在此字段建索引
第二步:
利用zookeeper,建立阻塞消息队列
第三部:
任务分发系统MissionMaker定时执行,从数据库取出128这个值和任务名称,在/Queue/operation_yuqi节点下,以此创建128个持久化排序队列,把i值放到节点取值里面
第四步:
任务执行系统TaskExecuter监听/Queue/operation_yuqi节点的队列变化情况,发生变更,就在里面取出一个节点,并读出节点的数据(0-127),然后执行自己管理的部分分户账数据
阻塞队列
public class DistributedBlockingQueue<T> { protected final ZkClient zkClient; protected final String root; protected static final String Node_NAME = "n_"; public DistributedBlockingQueue(ZkClient zkClient, String root,String taskName) { this.zkClient = zkClient; this.root = root.concat("/").concat(taskName); } public boolean offer(T element) throws Exception{ String nodeFullPath = root .concat( "/" ).concat( Node_NAME ); try { zkClient.createPersistentSequential(nodeFullPath , element); }catch (ZkNoNodeException e) { zkClient.createPersistent(root); offer(element); } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } return true; } public T poll(TaskCallBack back) throws Exception { while (true){ final CountDownLatch latch = new CountDownLatch(1); final IZkChildListener childListener = new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { System.out.println(Thread.currentThread().getName()+",发现任务队列长度发生变化!"); latch.countDown(); } }; zkClient.subscribeChildChanges(root, childListener); try{ List<String> list = zkClient.getChildren(root); T node = null; if (list.size() == 0) { }else{ Collections.sort(list, new Comparator<String>() { public int compare(String lhs, String rhs) { return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME)); } }); } String nodeN = ""; for ( String nodeName : list ){ String nodeFullPath = root.concat("/").concat(nodeName); try { node = (T) zkClient.readData(nodeFullPath); Boolean bb = zkClient.delete(nodeFullPath); if(bb){ nodeN = nodeName; } } catch (ZkNoNodeException e) { node=null; // ignore } break; } if (node != null && null!=nodeN && !"".equals(nodeN)){ back.doTask(node); }else{ latch.await(); } }finally{ zkClient.unsubscribeChildChanges(root, childListener); } } } public T getPoll() throws Exception { try { List<String> list = zkClient.getChildren(root); if (list.size() == 0) { return null; } Collections.sort(list, new Comparator<String>() { public int compare(String lhs, String rhs) { return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME)); } }); for ( String nodeName : list ){ String nodeFullPath = root.concat("/").concat(nodeName); try { T node = (T) zkClient.readData(nodeFullPath); zkClient.delete(nodeFullPath); return node; } catch (ZkNoNodeException e) { // ignore } } return null; } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } } private String getNodeNumber(String str, String nodeName) { int index = str.lastIndexOf(nodeName); if (index >= 0) { index += Node_NAME.length(); return index <= str.length() ? str.substring(index) : ""; } return str; } }
//创建任务
public class MissionMaker implements Runnable{ private String root; private ZkClient zkClient; public MissionMaker(String url,String root){ this.zkClient = new ZkClient(url, 5000, 5000, new SerializableSerializer()); this.root = root; } public TaskBean getTaskBean(){ TaskBean taskBean = new TaskBean(); taskBean.setTaskName("operation_yuqi");//逾期 taskBean.setHashNum(16); return taskBean; } //创建任务 public void createTasks() throws Exception { TaskBean taskBean = getTaskBean(); DistributedBlockingQueue queueMaker = new DistributedBlockingQueue<TaskBean>(zkClient,root,taskBean.getTaskName()); System.out.println("create mission:"+taskBean.getHashNum()); for(int i=0;i<taskBean.getHashNum();i++){ OprBean oprBean = new OprBean(); oprBean.setHashId(i+""); queueMaker.offer(oprBean); } } public void run(){ try { while (true) { createTasks(); Thread.sleep(30000); } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
//执行任务
public class TaskExecuter implements Runnable{ private String root; private ZkClient zkClient; private String taskName; private DistributedBlockingQueue queueMaker; public TaskExecuter(String url,String root,String taskName){ this.zkClient = new ZkClient(url, 5000, 5000, new SerializableSerializer()); this.root = root; this.taskName = taskName; this.queueMaker = new DistributedBlockingQueue<TaskBean>(zkClient,root,taskName); } public void run(){ try { queueMaker.poll(new TaskCallBack<OprBean>() { @Override public void doTask(OprBean oprBean) { // TODO Auto-generated method stub try { System.out.println("线程"+Thread.currentThread().getName()+"执行计算任务,taskName="+taskName+",hashId = "+oprBean.getHashId()); Thread.sleep(2000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void afterTask(String hashId) { // TODO Auto-generated method stub System.out.println("更新数据库记录,taskName="+taskName+",hashId = "+hashId); } }); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
//回调接口
public interface TaskCallBack<T> { void doTask(T oprBean); void afterTask(String hashId); }
序列化到zookeeper的类
public class OprBean implements Serializable{ /** * */ private static final long serialVersionUID = 2840329402832770757L; private String hashId; private String oprResult; public String getHashId() { return hashId; } public void setHashId(String hashId) { this.hashId = hashId; } public String getOprResult() { return oprResult; } public void setOprResult(String oprResult) { this.oprResult = oprResult; } }
获取数据库信息的类
public class TaskBean { private String taskName; private Integer hashNum; public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public Integer getHashNum() { return hashNum; } public void setHashNum(Integer hashNum) { this.hashNum = hashNum; } }