欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据私房菜--azkaban实战

程序员文章站 2024-03-22 12:25:22
...

Azkaban调度实战

01 azkaban调度需求

现有用户点击行为数据文件,每天产生会上传到hdfs目录,按天区分目录,现在我们需要每天凌晨两点定时导入Hive表指定分区中,并统计出今日活跃用户数插入指标表中。

日志文件

clicklog

userId   click_time             index
uid1	2020-06-21	12:10:10	a.html 
uid2	2020-06-21	12:15:10	b.html 
uid1	2020-06-21	13:10:10	c.html 
uid1	2020-06-21	15:10:10	d.html 
uid2	2020-06-21	18:10:10	e.html

用户点击行为数据,三个字段是用户id,点击时间,访问页面

hdfs目录会以日期划分文件,例如:

/user_clicks/20200621/clicklog.dat
/user_clicks/20200622/clicklog.dat
/user_clicks/20200623/clicklog.dat
...

Hive表

原始数据分区表

create table user_clicks(id string,click_time string index string) partitioned by(dt string) row format delimited fields terminated by '\t' ;

需要开发一个import.job每日从hdfs对应日期目录下同步数据到该表指定分区。(日期格式同上或者自定义)

指标表

create table user_info(active_num string,date string) row format delimited fields terminated by '\t' ;

需要开发一个analysis.job依赖import.job执行,统计出每日活跃用户(一个用户出现多次算作一次)数并插入user_inof表中。

02 azkaban需求分析

开发两个job:analysis.job依赖import.job执行

import.job

因为虚拟机性能不高,hive执行hql的时长较长,将import操作拆分为两个job:

import1为生成对应hql的shell

import2为执行hql并挂在后台

两个job可以通过&&将命令一起调度,但是挂后台可以一直等待该进程完成,使得analysis.job可以成功执行而不至于数据未导入就开始汇总

具体实现

import1.job

type=command
command=/usr/bin/sh /root/job/import.sh

improt.sh

前面一段有一个时间处理来模拟不同天分的执行,通过返回日期,通过load data到指定的日期目录将上游的点击数据input到指定表homework.user_clicks 的对应分区当中,将sql写为sql文件供import2.job调度

#!/bin/bash

time=$(date "+%M")
sub=$[$time/3]

if [ $sub -eq 0 ];then
  echo "$sub, 20200621"
  dt="20200621"
elif [ $sub -eq 1 ];then
  echo "$sub, 20200622"
  dt="20200622"
else
  dt="20200623"
  echo "$sub, 20200623"
fi

sql='load data inpath "/data/'$dt'/clicklog.dat" into table homework.user_clicks PARTITION(dt='$dt');'
echo $sql > /root/job/import.sql

import2.job

依赖于import1的完成通过hive -f命令完成装数

type=command
dependencies=import1
command=nohup /opt/hive/hive-2.3.7/bin/hive -f /root/job/import.sql > /root/job/import.log &

analysis1.job

依赖于import2的完成生成对应的sql文件

type=command
dependencies=import2
command=/usr/bin/sh /root/job/analysis.sh

analysis.sh

#!/bin/bash

time=$(date "+%M")
sub=$[$time/3]

if [ $sub -eq 0 ];then
  echo "$sub, 20200621"
  dt="20200621"
elif [ $sub -eq 1 ];then
  echo "$sub, 20200622"
  dt="20200622"
else
  dt="20200623"
  echo "$sub, 20200623"
fi

sql="insert into table homework.user_info select count(distinct id),'$dt' from homework.user_clicks where dt='$dt'"
echo $sql > /root/job/analysis.sql

analysis2.job

依赖于analysis1.job对已装载的数据进行数据汇总并插入到homework.user_info汇总表当中

type=command
dependencies=analysis1
command=nohup /opt/hive/hive-2.3.7/bin/hive -f /root/job/analysis.sql > /root/job/analysis.log &

03 需求实现

任务依赖关系为:

大数据私房菜--azkaban实战

通过azkaban进行任务调度:

大数据私房菜--azkaban实战

import2.job完成装数

大数据私房菜--azkaban实战

analysis2.job完成数据汇总

大数据私房菜--azkaban实战

查询homework.user_clicks数据为:

大数据私房菜--azkaban实战

查询汇总数据如下:

大数据私房菜--azkaban实战

相关标签: 大数据 hive