调度平台YYSchedule-细节-心跳机制实现node节点管理
本文关键技术:
1、Sigar获取系统信息
2、node通过心跳机制向task发送信息
3、activemq获取队列信息
一、介绍
当taskmanager收到一个总的任务mission时,会按照任务类型以mission --> job --> task 的 方式进行切分(详见博客“新调度平台-taskmanager之任务下发”),接下来taskmanager会对task逐条进行下发,也就是说,taskmanager给node节点下发任务的单位是task。
taskmanager在对task进行下发之前,会在自己所管理的所有node节点中,找寻最为适合的node节点,再将task发给该节点。因此,想要实现这一步,有以下几个前提:
前提一:taskmanager必须知道,自己管理着哪些node节点。
前提二:taskmanager必须清楚自己所管理的节点在当前时刻的具体状态,例如node节点的cpu使用率,cpu性能,硬盘剩余空间,当前队列的长度等信息。
前提三:taskmanager在拥有所有node节点的具体状态信息后,需要有基本的算法,选出当前时刻最合适执行该task的的节点。
二、原理
1、taskmanager获知node节点
通常获取信息有两种方式:第一种是主动获取信息,即“我主动找你要”;而第二种是被动获取信息,即“我等你自己给我”。
而在调度平台YYSchedule中,我们采用了第二种方式,让taskmanager开启服务后,被动等待node向其进行注册。
node注册的方法的流程如下,node节点通过thrift RPC框架,将心跳信息NodePayload作为参数传递给taskmanager,taskmanager接受到NodePayload之后,将NodePayload再转化为nodeItem,最后将其放入一个专门存放nodeItem的容器类NodeMapper中。此时,taskmanager便清楚自己管理着哪些节点。具体请看最后的流程图。
2、 taskmanager获知node节点在当前时刻的具体状态
虽然我们可以通过注册的方式让taskmanager知道自己管理着哪些节点,但我们无法实时地掌控所有node节点在当前时刻的具体状态。在YYSchedule中,我们通过心跳机制来解决这一难题:
当node向taskmanager注册完成后,会再次通过thrift RPC框架,同时使用quartz框架,持续实时向taskmanager传递心跳信息NodePayload。从而使得taskmanager实时获取最近的节点信息NodeItem,并在容器中不断进行更新。
通过上述介绍,我们已经清楚taskmanager是如何解决前提一以及前提二。现在我们将对心跳机制的实现进行具体描述。
三、心跳具体实现
在上文中,我们已经介绍了taskmanager获取node节点信息的方法,而在本节我们将会对对心跳机制的实现进行具体描述。
1、信息类介绍
如下图所示:
nodePayload是心跳信息用于taskmanager与node节点之间进行传输的,nodeInfo表示node的具体信息,nodePayload表示node的心跳信息。同时,nodePayload还包含了nodeRuntime(node的实时系统信息)。最后,在taskmanager中还有一个NodeItem类,这个类是在包含nodePayload所有属性的基础上,再增加一个属性grade,这个属性表示node的分值,将用于负载均衡算法。
1.1、NodePayload
NodePayload有以下5个属性,注意第二个属性是一个java类NodeRuntime
public java.lang.String nodeId; // node的Ip地址
public NodeRuntime nodeRuntime; // 运行时参数
public int queueLimit; // 单条队列最大长度
public int queueLength; // 单条队列现在长度
public long expectedDelay; // 心跳间隔
获取心跳信息nodePayload其实很简单,无非就是将它所拥有的5个属性获取后进行封装。其中,NodeRunTime将在下文进行说明,而对于queueLength,也就是队列的实时长度,我们需要使用activemq工具的相关代码进行获取,具体请见博客“调度平台YYSchedule-activemq的使用”。剩余的获取都是通过配置文件获取。
具体获取代码如下:
/**
* generate heart beat information
*
* @return nodePayload
*/
public NodePayload generateHeartBeat() {
NodePayload nodePayload = new NodePayload();
String nodeId = config.getLocal_listener_domain() + ":" + config.getTask_call_node_port();
Queue queue = new Queue(nodeId);
// init payload id info
nodePayload.setNodeId(nodeId);
nodePayload.setNodeRuntime(NodeService.getNodeRuntime());
nodePayload.setQueueLimit(config.getMax_queue_size());
nodePayload.setQueueLength(ActiveMQUtils.getQueueSize(jmsTemplate, queue.getEquippedContextQueueName()));
nodePayload.setExpectedDelay(queue.getContextQueueExpectedDelay(jmsTemplate));
nodePayload.setTaskPhase(TaskPhase.valueOf(config.getTask_phase()));
return nodePayload;
}
1.2、NodeRuntime
NodeRuntime有以下6个属性,分别是cpu个数、cpu核数、cpu频率、cpu使用率、磁盘剩余空间以及jvm剩余空间,可以看到,这些都属于系统的相关信息。
public int cpuCount; // cpu个数
public int cpuCores; // cpu核数
public int cpuMhz; // cpu
public double cpuUsedPerc; // cpu使用率
public long freeMem; // 磁盘剩余空间
public long jvmFreeMem; // jvm剩余空间
我们无法直接获取这些系统信息,因此我们需要借用第三方工具sigar来进行获取。
sigar指的是一个第三方的api,org.hyperic.sigar.Sigar,我们可以通过maven直接进行获取,pom文件的依赖如下:
<!-- sigar util -->
<dependency>
<groupId>org.hyperic</groupId>
<artifactId>sigar</artifactId>
<version>1.6.4</version>
</dependency>
通过sigar获取nodeRuntime所需的信息的实现代码如下:
/**
* 通过sigar获得node的runtime信息
*
* @return NodeRuntime
*/
public static NodeRuntime getNodeRuntime() {
Sigar sigar = new Sigar();
NodeRuntime nodeRuntime = new NodeRuntime();
try {
//获取cpu信息
int cpuCount = sigar.getCpuInfoList().length;
CpuInfo cpuInfo = sigar.getCpuInfoList()[0];
nodeRuntime.setCpuCount(cpuCount);
nodeRuntime.setCpuCores(cpuInfo.getTotalCores());
nodeRuntime.setCpuMhz(cpuInfo.getMhz());
CpuPerc cpuPerc = sigar.getCpuPerc();
double cpuUsedPerc = 0;
if (cpuPerc.getCombined() == 0) {
cpuUsedPerc = 0;
} else {
cpuUsedPerc = Double.parseDouble(new DecimalFormat("#.00").format(cpuPerc.getCombined() * 100));
}
nodeRuntime.setCpuUsedPerc(cpuUsedPerc);
//获取剩余空间
nodeRuntime.setFreeMem(sigar.getMem().getFree() / 1024 / 1024);
nodeRuntime.setJvmFreeMem(Runtime.getRuntime().freeMemory() / 1024 / 1024);
} catch (SigarException se) {
throw new RuntimeException("Failed to get node heart beat [ " + nodeRuntime + " ] : " + se.getMessage(), se);
} finally {
sigar.close();
sigar = null;
}
return nodeRuntime;
}
四、NodeMapper,NodeItem
现在我们来了解NodeItem.java以及NodeMapper.java。
1、NodeItem.java
NodeItem的属性如下所示,包括cpu个数、cpu核数、cpu使用率、磁盘剩余空间等信息,它通过心跳信息NodePayload转化而来。在将心跳信息NodePayload转化成NodeItem类的同时,我们也会对NodeItem中的信息进行计算,对其进行打分,最后计算出来的分值便存在属性grade中。
public class NodeItem implements Comparable<NodeItem> {
private String nodeId;
private int cpuCount;
private int cpuCores;
private int cpuMhz;
private double cpuUsedPerc;
private long freeMem;
private long jvmFreeMem;
private int queueLimit;
private int queueLength;
private long expectedDelay;
private int grade;
private long updatedTime;
private TaskPhase taskPhase;
2、NodeMapper.java
taskmanager拥有一个存放并管理node节点信息的容器类,名为NodeMapper。通过在该类中设置一个map结构来实现其存储功能。
private Map<TaskPhase, ConcurrentSkipListSet<NodeItem>> nodeMap = new ConcurrentHashMap<TaskPhase, ConcurrentSkipListSet<NodeItem>>();
因为不同node节点可能执行的任务类型可能不同,因此nodeMap中以任务类型TaskPhase作为key值,对nodeItem进行存储。
实际上,NodeMapper与博客“taskmanager之生成id并保证唯一性”中的其他Mapper类大同小异,因为它们的作用都是在并发环境下存储某个属性。其具体代码如下所示:
package com.YYSchedule.task.mapper;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import com.YYSchedule.common.pojo.NodeItem;
import com.YYSchedule.common.rpc.domain.task.TaskPhase;
/**
* NodeMapper.java
*
* @author yubingtao
* @date 2018-7-2
* @description
*/
@Component("NodeMapper")
@Scope("singleton")
public class NodeMapper {
/**
* Map<TaskPhase, List<NodeItem>>
*/
private Map<TaskPhase, ConcurrentSkipListSet<NodeItem>> nodeMap = new ConcurrentHashMap<TaskPhase, ConcurrentSkipListSet<NodeItem>>();
private NodeMapper() {
}
public synchronized Map<TaskPhase, ConcurrentSkipListSet<NodeItem>> getNodeMap() {
return nodeMap;
}
/**
* init node map with taskPhase
*
* @param TaskPhase
*
*/
public synchronized ConcurrentSkipListSet<NodeItem> initNodeMapWithTaskPhase(TaskPhase taskPhase) {
if (nodeMap.get(taskPhase) == null) {
ConcurrentSkipListSet<NodeItem> nodeSet = new ConcurrentSkipListSet<NodeItem>();
nodeMap.put(taskPhase, nodeSet);
return nodeSet;
} else {
return null;
}
}
/**
* init general node map with node type and node item
*
* @param nodeTypeList
* @param nodeItem
*/
public synchronized void initNodeMapWithNodeItem(NodeItem nodeItem) {
ConcurrentSkipListSet<NodeItem> nodeSet = nodeMap.get(nodeItem.getTaskPhase());
if (nodeSet == null) {
initNodeMapWithTaskPhase(nodeItem.getTaskPhase());
} else if (!nodeSet.contains(nodeItem)) {
nodeSet.add(nodeItem);
}
}
/**
* update general node when received heart beat
*
* @param nodeItem
* @return nodeSet
*/
public synchronized void updateNode(NodeItem nodeItem) {
ConcurrentSkipListSet<NodeItem> nodeSet = nodeMap.get(nodeItem.getTaskPhase());
if (nodeSet == null) {
nodeSet = initNodeMapWithTaskPhase(nodeItem.getTaskPhase());
} else if (nodeSet.contains(nodeItem)) {
nodeSet.remove(nodeItem);
}
if (nodeSet != null) {
nodeSet.add(nodeItem);
}
}
public synchronized ConcurrentSkipListSet<NodeItem> getNodeSet(TaskPhase taskPhase) {
return nodeMap.get(taskPhase);
}
}
五、选出最优节点
通过阅读上一节,我们对NodeItem和NodeMapper有了基本的了解,现在我们开始讲解如何选出最优node节点。
1、打分机制
在将心跳信息NodePayload转化成NodeItem类时,我们在将各个属性赋值的同时,我们也会对NodeItem中的信息进行综合计算,对其进行打分,最后计算出来的分值便存在属性grade中。如代码所示:
public NodeItem(NodePayload nodePayload) {
this.nodeId = nodePayload.getNodeId();
this.cpuCount = nodePayload.getNodeRuntime().getCpuCount();
this.cpuCores = nodePayload.getNodeRuntime().getCpuCores();
this.cpuMhz = nodePayload.getNodeRuntime().getCpuMhz();
this.cpuUsedPerc = nodePayload.getNodeRuntime().getCpuUsedPerc();
this.freeMem = nodePayload.getNodeRuntime().getFreeMem();
this.jvmFreeMem = nodePayload.getNodeRuntime().getJvmFreeMem();
this.queueLimit = nodePayload.getQueueLimit();
this.queueLength = nodePayload.getQueueLength();
this.expectedDelay = nodePayload.getExpectedDelay();
this.taskPhase = nodePayload.getTaskPhase();
this.updatedTime = System.currentTimeMillis();
getGrade(this.cpuUsedPerc,this.queueLength);
}
getGrade() 方法如下:我们主要还是队列中任务的数量来作为评分依据,请注意,分数最低的节点才是最合适的node节点,因为分数越低,队列中任务数越少,就越适合接受任务。
/**
* node performance evaluate equation
*
* @param cpuUsedPerc
* @param queueNum
*/
private void getGrade(double cpuUsedPercint,int queueLength) {
if(cpuUsedPerc/100 > 0.8)
{
this.grade = Integer.MAX_VALUE;
}
else
{
this.grade = queueLength;
}
}
2、找出最优node节点
当NodeItem都拥有分值,并且统一存放在NodeMapper后。我们便可以去获得最优节点,代码如下:
/**
* 找出最为适合的nodeItem节点
* 即分数最低的nodeItem节点
* @param nodeSet
* @return
*/
public static NodeItem getNodeItem(ConcurrentSkipListSet<NodeItem> nodeSet)
{
NodeItem selectedNodeItem = null;
int minGrade = Integer.MAX_VALUE;
for (NodeItem node : nodeSet) {
if(minGrade > node.getQueueLength())
{
minGrade = node.getQueueLength();
selectedNodeItem = node;
}
}
return selectedNodeItem;
}
通过上述这一系列步骤,taskmanager便真正的管理了所有的node节点。