欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

调度平台YYSchedule-细节-心跳机制实现node节点管理

程序员文章站 2022-05-04 15:05:14
...

本文关键技术:

1、Sigar获取系统信息

2、node通过心跳机制向task发送信息

3、activemq获取队列信息

 

一、介绍

当taskmanager收到一个总的任务mission时,会按照任务类型以mission --> job --> task 的  方式进行切分(详见博客“新调度平台-taskmanager之任务下发”),接下来taskmanager会对task逐条进行下发,也就是说,taskmanager给node节点下发任务的单位是task。

taskmanager在对task进行下发之前,会在自己所管理的所有node节点中,找寻最为适合的node节点,再将task发给该节点。因此,想要实现这一步,有以下几个前提:

前提一:taskmanager必须知道,自己管理着哪些node节点。

前提二:taskmanager必须清楚自己所管理的节点在当前时刻的具体状态,例如node节点的cpu使用率,cpu性能,硬盘剩余空间,当前队列的长度等信息。

前提三:taskmanager在拥有所有node节点的具体状态信息后,需要有基本的算法,选出当前时刻最合适执行该task的的节点。

 

二、原理

1、taskmanager获知node节点

通常获取信息有两种方式:第一种是主动获取信息,即“我主动找你要”;而第二种是被动获取信息,即“我等你自己给我”。

而在调度平台YYSchedule中,我们采用了第二种方式,让taskmanager开启服务后,被动等待node向其进行注册。

node注册的方法的流程如下,node节点通过thrift RPC框架,将心跳信息NodePayload作为参数传递给taskmanager,taskmanager接受到NodePayload之后,将NodePayload再转化为nodeItem,最后将其放入一个专门存放nodeItem的容器类NodeMapper中。此时,taskmanager便清楚自己管理着哪些节点。具体请看最后的流程图。

2、  taskmanager获知node节点在当前时刻的具体状态

虽然我们可以通过注册的方式让taskmanager知道自己管理着哪些节点,但我们无法实时地掌控所有node节点在当前时刻的具体状态。在YYSchedule中,我们通过心跳机制来解决这一难题:

当node向taskmanager注册完成后,会再次通过thrift RPC框架,同时使用quartz框架,持续实时向taskmanager传递心跳信息NodePayload。从而使得taskmanager实时获取最近的节点信息NodeItem,并在容器中不断进行更新。

    调度平台YYSchedule-细节-心跳机制实现node节点管理

通过上述介绍,我们已经清楚taskmanager是如何解决前提一以及前提二。现在我们将对心跳机制的实现进行具体描述。

 

三、心跳具体实现

 在上文中,我们已经介绍了taskmanager获取node节点信息的方法,而在本节我们将会对对心跳机制的实现进行具体描述。

1、信息类介绍

如下图所示:

nodePayload是心跳信息用于taskmanager与node节点之间进行传输的,nodeInfo表示node的具体信息,nodePayload表示node的心跳信息。同时,nodePayload还包含了nodeRuntime(node的实时系统信息)。最后,在taskmanager中还有一个NodeItem类,这个类是在包含nodePayload所有属性的基础上,再增加一个属性grade,这个属性表示node的分值,将用于负载均衡算法。

调度平台YYSchedule-细节-心跳机制实现node节点管理

  

1.1、NodePayload

  NodePayload有以下5个属性,注意第二个属性是一个java类NodeRuntime

  public java.lang.String nodeId; // node的Ip地址
  public NodeRuntime nodeRuntime; // 运行时参数
  public int queueLimit; // 单条队列最大长度
  public int queueLength; // 单条队列现在长度
  public long expectedDelay; // 心跳间隔

获取心跳信息nodePayload其实很简单,无非就是将它所拥有的5个属性获取后进行封装。其中,NodeRunTime将在下文进行说明,而对于queueLength,也就是队列的实时长度,我们需要使用activemq工具的相关代码进行获取,具体请见博客“调度平台YYSchedule-activemq的使用”。剩余的获取都是通过配置文件获取。

具体获取代码如下:

​
	/**
	 * generate heart beat information
	 * 
	 * @return nodePayload
	 */
	public NodePayload generateHeartBeat() {
		NodePayload nodePayload = new NodePayload();
		String nodeId = config.getLocal_listener_domain() + ":" + config.getTask_call_node_port();
		Queue queue = new Queue(nodeId);
		
		// init payload id info
		nodePayload.setNodeId(nodeId);
		
		nodePayload.setNodeRuntime(NodeService.getNodeRuntime());
		nodePayload.setQueueLimit(config.getMax_queue_size());
		nodePayload.setQueueLength(ActiveMQUtils.getQueueSize(jmsTemplate, queue.getEquippedContextQueueName()));
		nodePayload.setExpectedDelay(queue.getContextQueueExpectedDelay(jmsTemplate));
		nodePayload.setTaskPhase(TaskPhase.valueOf(config.getTask_phase()));
		
		return nodePayload;
	}

​

1.2、NodeRuntime

NodeRuntime有以下6个属性,分别是cpu个数、cpu核数、cpu频率、cpu使用率、磁盘剩余空间以及jvm剩余空间,可以看到,这些都属于系统的相关信息。

  public int cpuCount; // cpu个数
  public int cpuCores; // cpu核数
  public int cpuMhz; // cpu
  public double cpuUsedPerc; // cpu使用率
  public long freeMem; // 磁盘剩余空间
  public long jvmFreeMem; // jvm剩余空间

我们无法直接获取这些系统信息,因此我们需要借用第三方工具sigar来进行获取。

sigar指的是一个第三方的api,org.hyperic.sigar.Sigar,我们可以通过maven直接进行获取,pom文件的依赖如下:

<!-- sigar util -->
    <dependency>
    <groupId>org.hyperic</groupId>
    <artifactId>sigar</artifactId>
    <version>1.6.4</version>
</dependency>

 通过sigar获取nodeRuntime所需的信息的实现代码如下:

​
        /**
	 * 通过sigar获得node的runtime信息
	 * 
	 * @return NodeRuntime
	 */
	public static NodeRuntime getNodeRuntime() {

		Sigar sigar = new Sigar();
		NodeRuntime nodeRuntime = new NodeRuntime();

		try {
			//获取cpu信息
			int cpuCount = sigar.getCpuInfoList().length;
			CpuInfo cpuInfo = sigar.getCpuInfoList()[0];
			nodeRuntime.setCpuCount(cpuCount);
			nodeRuntime.setCpuCores(cpuInfo.getTotalCores());
			nodeRuntime.setCpuMhz(cpuInfo.getMhz());

			CpuPerc cpuPerc = sigar.getCpuPerc();
			double cpuUsedPerc = 0;
			if (cpuPerc.getCombined() == 0) {
				cpuUsedPerc = 0;
			} else {
				cpuUsedPerc = Double.parseDouble(new DecimalFormat("#.00").format(cpuPerc.getCombined() * 100));
			}
			nodeRuntime.setCpuUsedPerc(cpuUsedPerc);
			
			//获取剩余空间
			nodeRuntime.setFreeMem(sigar.getMem().getFree() / 1024 / 1024);
			nodeRuntime.setJvmFreeMem(Runtime.getRuntime().freeMemory() / 1024 / 1024);

		} catch (SigarException se) {
			throw new RuntimeException("Failed to get node heart beat [ " + nodeRuntime + " ] : " + se.getMessage(), se);
		} finally {
			sigar.close();
			sigar = null;
		}

		return nodeRuntime;
	}

​

 

四、NodeMapper,NodeItem

现在我们来了解NodeItem.java以及NodeMapper.java。

1、NodeItem.java

NodeItem的属性如下所示,包括cpu个数、cpu核数、cpu使用率、磁盘剩余空间等信息,它通过心跳信息NodePayload转化而来。在将心跳信息NodePayload转化成NodeItem类的同时,我们也会对NodeItem中的信息进行计算,对其进行打分,最后计算出来的分值便存在属性grade中。

public class NodeItem implements Comparable<NodeItem> {

	private String nodeId;
	
	private int cpuCount;

	private int cpuCores;

	private int cpuMhz;

	private double cpuUsedPerc;

	private long freeMem;

	private long jvmFreeMem;

	private int queueLimit;

	private int queueLength;

	private long expectedDelay;

	private int grade;

	private long updatedTime;
	
	private TaskPhase taskPhase;

2、NodeMapper.java

taskmanager拥有一个存放并管理node节点信息的容器类,名为NodeMapper。通过在该类中设置一个map结构来实现其存储功能。

private Map<TaskPhase, ConcurrentSkipListSet<NodeItem>> nodeMap = new ConcurrentHashMap<TaskPhase, ConcurrentSkipListSet<NodeItem>>();

因为不同node节点可能执行的任务类型可能不同,因此nodeMap中以任务类型TaskPhase作为key值,对nodeItem进行存储。

实际上,NodeMapper与博客“taskmanager之生成id并保证唯一性”中的其他Mapper类大同小异,因为它们的作用都是在并发环境下存储某个属性。其具体代码如下所示:

package com.YYSchedule.task.mapper;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;

import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import com.YYSchedule.common.pojo.NodeItem;
import com.YYSchedule.common.rpc.domain.task.TaskPhase;

/**
 * NodeMapper.java
 * 
 * @author yubingtao
 * @date 2018-7-2
 * @description
 */
@Component("NodeMapper")
@Scope("singleton")
public class NodeMapper {


	/**
	 * Map<TaskPhase, List<NodeItem>>
	 */
	private Map<TaskPhase, ConcurrentSkipListSet<NodeItem>> nodeMap = new ConcurrentHashMap<TaskPhase, ConcurrentSkipListSet<NodeItem>>();

	private NodeMapper() {
	}

	public synchronized Map<TaskPhase, ConcurrentSkipListSet<NodeItem>> getNodeMap() {
		return nodeMap;
	}

	/**
	 * init node map with taskPhase
	 * 
	 * @param TaskPhase
	 * 
	 */
	public synchronized ConcurrentSkipListSet<NodeItem> initNodeMapWithTaskPhase(TaskPhase taskPhase) {
		if (nodeMap.get(taskPhase) == null) {
			ConcurrentSkipListSet<NodeItem> nodeSet = new ConcurrentSkipListSet<NodeItem>();
			nodeMap.put(taskPhase, nodeSet);
			return nodeSet;
		} else {
			return null;
		}
	}

	/**
	 * init general node map with node type and node item
	 * 
	 * @param nodeTypeList
	 * @param nodeItem
	 */
	public synchronized void initNodeMapWithNodeItem(NodeItem nodeItem) {

		ConcurrentSkipListSet<NodeItem> nodeSet = nodeMap.get(nodeItem.getTaskPhase());
		if (nodeSet == null) {
			initNodeMapWithTaskPhase(nodeItem.getTaskPhase());
		} else if (!nodeSet.contains(nodeItem)) {
			nodeSet.add(nodeItem);
		}
		
	}
	

	/**
	 * update general node when received heart beat
	 * 
	 * @param nodeItem
	 * @return nodeSet
	 */
	public synchronized void updateNode(NodeItem nodeItem) {
		ConcurrentSkipListSet<NodeItem> nodeSet = nodeMap.get(nodeItem.getTaskPhase());
		if (nodeSet == null) {
			nodeSet = initNodeMapWithTaskPhase(nodeItem.getTaskPhase());
		} else if (nodeSet.contains(nodeItem)) {
			nodeSet.remove(nodeItem);
		} 
	
		if (nodeSet != null) {
			nodeSet.add(nodeItem);
		}
	}
	
	public synchronized ConcurrentSkipListSet<NodeItem> getNodeSet(TaskPhase taskPhase) {
		return nodeMap.get(taskPhase);
	}

	
}

五、选出最优节点

通过阅读上一节,我们对NodeItem和NodeMapper有了基本的了解,现在我们开始讲解如何选出最优node节点。

1、打分机制

在将心跳信息NodePayload转化成NodeItem类时,我们在将各个属性赋值的同时,我们也会对NodeItem中的信息进行综合计算,对其进行打分,最后计算出来的分值便存在属性grade中。如代码所示:

public NodeItem(NodePayload nodePayload) {

	this.nodeId = nodePayload.getNodeId();
		
	this.cpuCount = nodePayload.getNodeRuntime().getCpuCount();
	this.cpuCores = nodePayload.getNodeRuntime().getCpuCores();
	this.cpuMhz = nodePayload.getNodeRuntime().getCpuMhz();
	this.cpuUsedPerc = nodePayload.getNodeRuntime().getCpuUsedPerc();
	this.freeMem = nodePayload.getNodeRuntime().getFreeMem();
	this.jvmFreeMem = nodePayload.getNodeRuntime().getJvmFreeMem();

	this.queueLimit = nodePayload.getQueueLimit();
	this.queueLength = nodePayload.getQueueLength();
	this.expectedDelay = nodePayload.getExpectedDelay();
	this.taskPhase = nodePayload.getTaskPhase();
		
	this.updatedTime = System.currentTimeMillis();
		

	getGrade(this.cpuUsedPerc,this.queueLength);
}

getGrade() 方法如下:我们主要还是队列中任务的数量来作为评分依据,请注意,分数最低的节点才是最合适的node节点,因为分数越低,队列中任务数越少,就越适合接受任务。

	/**
	 * node performance evaluate equation
	 * 
	 * @param cpuUsedPerc
	 * @param queueNum
	 */
	private void getGrade(double cpuUsedPercint,int queueLength) {

		if(cpuUsedPerc/100 > 0.8)
		{
			this.grade = Integer.MAX_VALUE;
		}
		else
		{
			this.grade = queueLength;
		}
	}

2、找出最优node节点

当NodeItem都拥有分值,并且统一存放在NodeMapper后。我们便可以去获得最优节点,代码如下:

	/**
	 * 找出最为适合的nodeItem节点
	 * 即分数最低的nodeItem节点
	 * @param nodeSet
	 * @return
	 */
	public static NodeItem getNodeItem(ConcurrentSkipListSet<NodeItem> nodeSet)
	{
		NodeItem selectedNodeItem = null;
		int minGrade = Integer.MAX_VALUE;
		
		for (NodeItem node : nodeSet) {
			if(minGrade > node.getQueueLength())
			{
				minGrade = node.getQueueLength();
				selectedNodeItem = node;
			}
		}
		
		return selectedNodeItem;
	}

 

通过上述这一系列步骤,taskmanager便真正的管理了所有的node节点。