大数据云计算——Azkaban组件
什么是 Azkaban
Azkaban 是由 Linkedin 公司推出的一个批量工作流任务调度器,主要用于在一个工作流内以一个特定的顺序运行一组工作和流程,它的配置是通过简单的 key:value 对的方式,通过配置中的 Dependencies 来设置依赖关系。 Azkaban 使用 job 配置文件建立任务之间的依赖关系,并提供一个易于使用的 web 用户界面维护和跟踪你的工作流。1)简单的任务调度:直接使用 crontab 实现;2)复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如 ooize、 azkaban 等
Azkaban的适用场景
实际项目中经常有这些场景:每天有一个大任务,这个大任务可以分成A,B,C,D四个小任务,A,B任务之间没有依赖关系,C任务依赖A,B任务的结果,D任务依赖C任务的结果。一般的做法是,开两个终端同时执行A,B,两个都执行完了再执行C,最后再执行D。这样的话,整个的执行过程都需要人工参加,并且得盯着各任务的进度。但是我们的很多任务都是在深更半夜执行的,通过写脚本设置crontab执行。这样子很不好维维护。
其实,整个过程类似于一个有向无环图(DAG)。每个子任务相当于大任务中的一个流,任务的起点可以从没有度的节点开始执行,任何没有通路的节点之间可以同时执行,比如上述的A,B。总结起来的话,我们需要的就是一个工作流的调度器,而Azkaban就是能解决上述问题的一个调度器。
为什么需要工作流调度系统
1)一个完整的数据分析系统通常都是由大量任务单元组成:Shell 脚本程序, Java 程序, MapReduce 程序、 Hive 脚本等
2) 各任务单元之间存在时间先后及前后依赖关系
3) 为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行;例如,我们可能有这样一个需求,某个业务系统每天产生 20G 原始数据,我们每天都要对其进行处理,处理步骤如下所示:
- 1) 通过 Hadoop 先将原始数据上传到 HDFS 上(HDFS 的操作);
- 2) 使用 MapReduce 对原始数据进行清洗(MapReduce 的操作);
- 3) 将清洗后的数据导入到 hive 表中(hive 的导入操作) ;
- 4) 对 Hive 中多个表的数据进行 JOIN 处理, 得到一张 hive 的明细表(创建中间表) ;
- 5) 通过对明细表的统计和分析,得到结果报表信息(hive 的查询操作) ;
Azkaban 特点
1)兼容任何版本的 hadoop
2) 易于使用的 Web 用户界面
3) 简单的工作流的上传
4) 方便设置任务之间的关系
5) 调度工作流
6) 模块化和可插拔的插件机制
7) 认证/授权(权限的工作)
8) 能够杀死并重新启动工作流
9) 有关失败和成功的电子邮件提醒
Azkaban 的架构
Azkaban 由三个关键组件构成:
1) AzkabanWebServer: AzkabanWebServer是整个Azkaban工作流系统的主要管理者,它负责project管理、用户登录认证、定时执行工作流、跟踪工作流执行进度等一系列任务。同时,它还提供Web服务操作的接口,利用该接口,用户可以使用curl或其他ajax的方式,来执行azkaban的相关操作。操作包括:用户登录、创建project、上传workflow、执行workflow、查询workflow的执行进度、杀掉workflow等一系列操作,且这些操作的返回结果均是json的格式。并且Azkaban使用方便,Azkaban使用以.job为后缀名的键值属性文件来定义工作流中的各个任务,以及使用dependencies属性来定义作业间的依赖关系链。这些作业文件和关联的代码最终以*.zip的方式通过Azkaban UI上传到Web服务器上。
- 项目管理:项目、项目权限以及上传的文件。
- 执行流状态:跟踪执行流程以及执行程序正在运行的流程。
- 以前的流程/作业:通过以前的作业和流程执行以及访问其日志文件进行搜索。
- 计划程序:保留计划作业的状态。
- SLA:保持所有的SLA规则
2) AzkabanExecutorServer: 以前版本的Azkaban在单个服务中具有AzkabanWebServer和AzkabanExecutorServer功能,目前Azkaban已将AzkabanExecutorServer分离成独立的服务器,拆分AzkabanExecutorServer的原因有如下几点:1 某个任务流失败后,可以更方便的将其重新执行2 便于Azkaban升级。AzkabanExecutorServer主要负责具体的工作流的提交、执行,可以启动多个执行服务器,它们通过mysql数据库来协调任务的执行以及实现高可用性
- 访问项目:从数据库检索项目文件。
- 执行流程/作业:检索和更新正在执行的作业流的数据
- 日志:将作业和工作流的输出日志存储到数据库中。
- 交互依赖关系:如果一个工作流在不同的执行器上运行,它将从数据库中获取状态。
3) 关系型数据库( MySQL) : 存储大部分执行流状态, AzkabanWebServer 和AzkabanExecutorServer 都需要访问数据库。
Azkaban作业流执行过程
Webserver根据内存中缓存的各Executor的资源状态(Webserver有一个线程会遍历各个active executor,去发送http请求获取其资源状态信息缓存到内存中),按照选择策略(包括executor资源状态、最近执行流个数等)选择一个executor下发作业流;executor判断是否设置作业粒度分配,如果未设置作业粒度分配,则在当前executor执行所有作业;如果设置了作业粒度分配,则当前节点会成为作业分配的决策者,即分配节点;分配节点从zookeeper获取各个executor的资源状态信息,然后根据策略选择一个executor分配作业;被分配到作业的executor即成为执行节点,执行作业,然后更新数据库。
Azkaban架构的三种运行模式
在版本3.0中,Azkaban提供了以下三种模式:
solo server mode:最简单的模式,数据库内置的H2数据库,AzkabanWebServer和AzkabanExecutorServer都在一个进程中运行,任务量不大项目可以采用此模式。
two server mode:数据库为MySQL,管理服务器和执行服务器在不同进程,这种模式下,AzkabanWebServer和AzkabanExecutorServer互不影响。
multiple executor mode:该模式下,AzkabanWebServer和AzkabanExecutorServer运行在不同主机上,且AzkabanExecutorServer可以有多个。
大数据平台要求其具有高可用性,所以目前我们采用的是multiple executor mode方式,分别在不同的主机上部署多个AzkabanExecutorServer以应对高并发定时任务执行的情况,从而减轻单个服务器的压力。 下面是集群架构图:
Azkaban WebServer需要根据Executor Server的运行状态信息,选择一个合适的Executor Server来运行WorkFlow,然后会将提交到队列中的WorkFlow调度到选定的Executor Server上运行。我们整理了与核心调度相关的各个组件,主要包括Azkaban WebServer端和Azkaban ExecutorServer端,他们之间的关系如下图所示:
其实,从调度层面来看,Azkaban WebServer与Executor Server之间的交互方式非常简单,是通过REST API的方式来进行交互,基本的模式是,Azkaban WebServer根据调度的需要,主动调用Executor Server暴露的REST API来获取相应的资源信息,比如Executor Server的状态信息、分配WorkFlow到指定Executor Server上运行,等等。
private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
private static final String MEMORY_COMPARATOR_NAME = "Memory";
private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";
final Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
for (final FactorComparator<T> comparator : comparatorList) {
final int result = comparator.compare(object1, object2);
result1 = result1 + (result > 0 ? comparator.getWeight() : 0);
result2 = result2 + (result < 0 ? comparator.getWeight() : 0);
logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
comparator.getFactorName(), result, result1, result2));
}
在选择调度一个WorkFlow到Azkaban集群中的某个Executor Server时,需要比较Executor Server的如下4个指标:
- 能够运行WorkFlow的剩余容量,数值越大越优先
- 剩余内存用量,数值越大越优先
- 最近分配Flow的时间,数值越大越优先
- CPU使用用量,数值越小越优先
Azkaban 安装部署
配置文件
Web 服务器配置
执行服务器配置
启动 Executor 服务器
启动 Web 服务器
jps 查看进程
Azkaban 实战
Azkaban 内置的任务类型支持 command、 java
单一 job 案例
邮件通知配置案例
1) 修改配置文件 修改 server 的 conf 下的 azkaban.properties 文件
多 job 工作流案例
Java 操作任务
HDFS 操作任务
MapReduce 任务
Hive 脚本任务
本文地址:https://blog.csdn.net/weixin_41605937/article/details/107524126