欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

大数据云计算——Azkaban组件

程序员文章站 2022-04-15 20:12:28
什么是 AzkabanAzkaban 是由 Linkedin 公司推出的一个批量工作流任务调度器,主要用于在一个工作流内以一个特定的顺序运行一组工作和流程,它的配置是通过简单的 key:value 对的方式,通过配置中的 Dependencies 来设置依赖关系。 Azkaban 使用 job 配置文件建立任务之间的依赖关系,并提供一个易于使用的 web 用户界面维护和跟踪你的工作流。为什么需要工作流调度系统1)一个完整的数据分析系统通常都是由大量任务单元组成:Shell 脚本程序, Java...

什么是 Azkaban

Azkaban 是由 Linkedin 公司推出的一个批量工作流任务调度器,主要用于在一个工作流内以一个特定的顺序运行一组工作和流程,它的配置是通过简单的 key:value 对的方式,通过配置中的 Dependencies 来设置依赖关系。 Azkaban 使用 job 配置文件建立任务之间的依赖关系,并提供一个易于使用的 web 用户界面维护和跟踪你的工作流。1)简单的任务调度:直接使用 crontab 实现;2)复杂的任务调度:开发调度平台或使用现成的开源调度系统,比如 ooize、 azkaban 等

Azkaban的适用场景

实际项目中经常有这些场景:每天有一个大任务,这个大任务可以分成A,B,C,D四个小任务,A,B任务之间没有依赖关系,C任务依赖A,B任务的结果,D任务依赖C任务的结果。一般的做法是,开两个终端同时执行A,B,两个都执行完了再执行C,最后再执行D。这样的话,整个的执行过程都需要人工参加,并且得盯着各任务的进度。但是我们的很多任务都是在深更半夜执行的,通过写脚本设置crontab执行。这样子很不好维维护。

其实,整个过程类似于一个有向无环图(DAG)。每个子任务相当于大任务中的一个流,任务的起点可以从没有度的节点开始执行,任何没有通路的节点之间可以同时执行,比如上述的A,B。总结起来的话,我们需要的就是一个工作流的调度器,而Azkaban就是能解决上述问题的一个调度器。

大数据云计算——Azkaban组件

 为什么需要工作流调度系统

1)一个完整的数据分析系统通常都是由大量任务单元组成:Shell 脚本程序, Java 程序, MapReduce 程序、 Hive 脚本等
2) 各任务单元之间存在时间先后及前后依赖关系
3) 为了很好地组织起这样的复杂执行计划,需要一个工作流调度系统来调度执行;例如,我们可能有这样一个需求,某个业务系统每天产生 20G 原始数据,我们每天都要对其进行处理,处理步骤如下所示:

  1. 1) 通过 Hadoop 先将原始数据上传到 HDFS 上(HDFS 的操作);
  2. 2) 使用 MapReduce 对原始数据进行清洗(MapReduce 的操作);
  3. 3) 将清洗后的数据导入到 hive 表中(hive 的导入操作) ;
  4. 4) 对 Hive 中多个表的数据进行 JOIN 处理, 得到一张 hive 的明细表(创建中间表) ;
  5. 5) 通过对明细表的统计和分析,得到结果报表信息(hive 的查询操作) ;

大数据云计算——Azkaban组件

Azkaban 特点

1)兼容任何版本的 hadoop
2) 易于使用的 Web 用户界面
3) 简单的工作流的上传
4) 方便设置任务之间的关系
5) 调度工作流
6) 模块化和可插拔的插件机制
7) 认证/授权(权限的工作)
8) 能够杀死并重新启动工作流
9) 有关失败和成功的电子邮件提醒

Azkaban 的架构

Azkaban 由三个关键组件构成:

大数据云计算——Azkaban组件

1) AzkabanWebServer: AzkabanWebServer是整个Azkaban工作流系统的主要管理者,它负责project管理、用户登录认证、定时执行工作流、跟踪工作流执行进度等一系列任务。同时,它还提供Web服务操作的接口,利用该接口,用户可以使用curl或其他ajax的方式,来执行azkaban的相关操作。操作包括:用户登录、创建project、上传workflow、执行workflow、查询workflow的执行进度、杀掉workflow等一系列操作,且这些操作的返回结果均是json的格式。并且Azkaban使用方便,Azkaban使用以.job为后缀名的键值属性文件来定义工作流中的各个任务,以及使用dependencies属性来定义作业间的依赖关系链。这些作业文件和关联的代码最终以*.zip的方式通过Azkaban UI上传到Web服务器上。

  1. 项目管理:项目、项目权限以及上传的文件。
  2. 执行流状态:跟踪执行流程以及执行程序正在运行的流程。
  3. 以前的流程/作业:通过以前的作业和流程执行以及访问其日志文件进行搜索。
  4. 计划程序:保留计划作业的状态。
  5. SLA:保持所有的SLA规则

2) AzkabanExecutorServer: 以前版本的Azkaban在单个服务中具有AzkabanWebServer和AzkabanExecutorServer功能,目前Azkaban已将AzkabanExecutorServer分离成独立的服务器,拆分AzkabanExecutorServer的原因有如下几点:1 某个任务流失败后,可以更方便的将其重新执行2    便于Azkaban升级。AzkabanExecutorServer主要负责具体的工作流的提交、执行,可以启动多个执行服务器,它们通过mysql数据库来协调任务的执行以及实现高可用性

  1. 访问项目:从数据库检索项目文件。
  2. 执行流程/作业:检索和更新正在执行的作业流的数据
  3. 日志:将作业和工作流的输出日志存储到数据库中。
  4. 交互依赖关系:如果一个工作流在不同的执行器上运行,它将从数据库中获取状态。

3) 关系型数据库( MySQL) : 存储大部分执行流状态, AzkabanWebServer 和AzkabanExecutorServer 都需要访问数据库。

Azkaban作业流执行过程

Webserver根据内存中缓存的各Executor的资源状态(Webserver有一个线程会遍历各个active executor,去发送http请求获取其资源状态信息缓存到内存中),按照选择策略(包括executor资源状态、最近执行流个数等)选择一个executor下发作业流;executor判断是否设置作业粒度分配,如果未设置作业粒度分配,则在当前executor执行所有作业;如果设置了作业粒度分配,则当前节点会成为作业分配的决策者,即分配节点;分配节点从zookeeper获取各个executor的资源状态信息,然后根据策略选择一个executor分配作业;被分配到作业的executor即成为执行节点,执行作业,然后更新数据库。

 Azkaban架构的三种运行模式

在版本3.0中,Azkaban提供了以下三种模式:

    solo server mode:最简单的模式,数据库内置的H2数据库,AzkabanWebServer和AzkabanExecutorServer都在一个进程中运行,任务量不大项目可以采用此模式。
    two server mode:数据库为MySQL,管理服务器和执行服务器在不同进程,这种模式下,AzkabanWebServer和AzkabanExecutorServer互不影响。
    multiple executor mode:该模式下,AzkabanWebServer和AzkabanExecutorServer运行在不同主机上,且AzkabanExecutorServer可以有多个。
    大数据平台要求其具有高可用性,所以目前我们采用的是multiple executor mode方式,分别在不同的主机上部署多个AzkabanExecutorServer以应对高并发定时任务执行的情况,从而减轻单个服务器的压力。 下面是集群架构图:

大数据云计算——Azkaban组件

Azkaban WebServer需要根据Executor Server的运行状态信息,选择一个合适的Executor Server来运行WorkFlow,然后会将提交到队列中的WorkFlow调度到选定的Executor Server上运行。我们整理了与核心调度相关的各个组件,主要包括Azkaban WebServer端和Azkaban ExecutorServer端,他们之间的关系如下图所示:
大数据云计算——Azkaban组件

其实,从调度层面来看,Azkaban WebServer与Executor Server之间的交互方式非常简单,是通过REST API的方式来进行交互,基本的模式是,Azkaban WebServer根据调度的需要,主动调用Executor Server暴露的REST API来获取相应的资源信息,比如Executor Server的状态信息、分配WorkFlow到指定Executor Server上运行,等等。

private static final String NUMOFASSIGNEDFLOW_COMPARATOR_NAME = "NumberOfAssignedFlowComparator";
private static final String MEMORY_COMPARATOR_NAME = "Memory";
private static final String LSTDISPATCHED_COMPARATOR_NAME = "LastDispatched";
private static final String CPUUSAGE_COMPARATOR_NAME = "CpuUsage";


final Collection<FactorComparator<T>> comparatorList = this.factorComparatorList.values();
for (final FactorComparator<T> comparator : comparatorList) {
  final int result = comparator.compare(object1, object2);
  result1 = result1 + (result > 0 ? comparator.getWeight() : 0);
  result2 = result2 + (result < 0 ? comparator.getWeight() : 0);
  logger.debug(String.format("[Factor: %s] compare result : %s (current score %s vs %s)",
      comparator.getFactorName(), result, result1, result2));
}

在选择调度一个WorkFlow到Azkaban集群中的某个Executor Server时,需要比较Executor Server的如下4个指标:

  1.     能够运行WorkFlow的剩余容量,数值越大越优先
  2.     剩余内存用量,数值越大越优先
  3.     最近分配Flow的时间,数值越大越优先
  4.     CPU使用用量,数值越小越优先

Azkaban 安装部署
大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

配置文件

Web 服务器配置

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

执行服务器配置

大数据云计算——Azkaban组件

启动 Executor 服务器
大数据云计算——Azkaban组件 启动 Web 服务器
大数据云计算——Azkaban组件

jps 查看进程
大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

Azkaban 实战

Azkaban 内置的任务类型支持 command、 java

单一 job 案例

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

邮件通知配置案例

1) 修改配置文件       修改 server 的 conf 下的 azkaban.properties 文件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

多 job 工作流案例

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

Java 操作任务

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

HDFS 操作任务

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

MapReduce 任务

大数据云计算——Azkaban组件

Hive 脚本任务

大数据云计算——Azkaban组件

大数据云计算——Azkaban组件

 

本文地址:https://blog.csdn.net/weixin_41605937/article/details/107524126