欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据企业学习篇03_3------hive 高级

程序员文章站 2022-04-28 23:46:49
...

一、hive的压缩

<1>企业中使用比较多的是Snappy
<2>设置:

set mapreduce.map.output.compress=true
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec 

<3>注意事项:
可以减少磁盘IO
可以减少网络IO
压缩算法必须是可分割

二、hive数据存储

<1>数据存储:
*按行存储—-textfile
*按列存储—-orc、parquet
<2>如何使用orc(parquet同理)
*创建表

create table page_views_orc(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
**STORED AS orc ;**

*将已存在数据转换为orcFile

insert into table page_views_orc select * from page_views ;

*查看HDFS文件大小

dfs -du -h /user/hive/warehouse/page_views_orc/ ;

注意:可以在创建表的时候指定压缩算法(默认zlib)

create table page_views_orc_snappy(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");

总结:
在实际的项目开发当中,hive表的数据
* 存储格式
orcfile / qarquet
* 数据压缩
snappy

三、hive 企业优化

<1>为什么有的SQL执行MapReduce,有的不呢?
大数据企业学习篇03_3------hive 高级
一般企业优化可以将值改为more
<2>可以通过explain查看执行计划

EXPLAIN select deptno,avg(sal) avg_sal from emp group by deptno ;

EXPLAIN EXTENDED select deptno,avg(sal) avg_sal from emp group by deptno ; 

<3>高级优化
*大表拆分——子表

create table page_views_par_snappy AS select ip,req_add from page_views ;

*外部表,分区表结合使用,多级分区(企业中基本这么使用)

create external table if not exists default.user(
id int,
name string,
phone string
)
partitioned by(year string,month string,day string)
row format delimited
field terminated by '\t'
locaton 'hdfs://mycluter/user/hive/warehouse/'

*数据
存储格式(textfile、orc、parquet)
数据压缩(snappy)

*SQL优化
优化SQL语句
join,filter

select e.a, e,b ,d.h,d.f from 
(select .... from emp where emp.filter) e 
join 
(select .... from dept where emdeptp.filter)  d 
on(e.deptno = d.deptno);

*MapReduce
—–Reduce Number

set mapreduce.job.reduces=1 (改)

—–JVM重用

set mapreduce.job.jvm.numtasks=1(改)

—–推测执行
大数据企业学习篇03_3------hive 高级

—–并行执行
大数据企业学习篇03_3------hive 高级

—–map的数目
minSize=max{minSplitSize,mapred.min.split.size}
maxSize=mapred.max.split.size
splitSize=max{minSize,min{maxSize,blockSize}}

结论:
1.如果想增加map个数,则设置mapred.map.tasks 为一个较大的值。
2.如果想减小map个数,则设置mapred.min.split.size 为一个较大的值。
3.如果输入中有很多小文件,依然想减少map个数,则需要将小文件merger为大文件,然后使用准则2。

<4>大数据企业学习篇03_3------hive 高级
<5>
大数据企业学习篇03_3------hive 高级
<6>大数据企业学习篇03_3------hive 高级
注意:
Common/Shuffle/Reduce Join
连接发生的阶段,发生在 Reduce Task
大表对大表
每个表的数据都是从文件中读取的

Map Join
连接发生的阶段,发生在 Map Task
小表对大表
* 大表的数据放从文件中读取 cid
* 小表的数据内存中 id
DistributedCache
需要设置属性:

set hive.auto.convert.join=true

SMB Join
Sort-Merge-BUCKET Join
大数据企业学习篇03_3------hive 高级

四、hive streaming实战—电影评分分析项目

<1>获取数据集
https://grouplens.org/datasets/movielens/
<2>创建表

CREATE TABLE u_data (
  userid INT,
  movieid INT,
  rating INT,
  unixtime STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;

<3>解压数据文件

unzip ml-100k.zip

<4>加载数据

LOAD DATA LOCAL INPATH '/soft/datas/u.data'
OVERWRITE INTO TABLE u_data;

<5>验证数据是否正确

SELECT COUNT(*) FROM u_data;

<6>创建python脚本文件(weekday_mapper.py)

import sys
import datetime

for line in sys.stdin:
  line = line.strip()
  userid, movieid, rating, unixtime = line.split('\t')
  weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
  print '\t'.join([userid, movieid, rating, str(weekday)])

<7>使用mapper脚本

CREATE TABLE u_data_new (
  userid INT,
  movieid INT,
  rating INT,
  weekday INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';

add FILE /soft/datas/u.data/weekday_mapper.py;//加载python脚本

INSERT OVERWRITE TABLE u_data_new
SELECT
  TRANSFORM (userid, movieid, rating, unixtime)--原表输入字段
  USING 'python weekday_mapper.py' --使用脚本处理
  AS (userid, movieid, rating, weekday) --输出字段
FROM u_data; --原表

#分析星期几电影人数最多
SELECT weekday, COUNT(*)
FROM u_data_new
GROUP BY weekday order by COUNT(*) desc;

五、某网站用户日志分析

<1>项目思路
* 原表
* 针对不同的业务创建不同的子表
* 数据存储格式orc/parquet
* 数据压缩 snappy
* map output 数据压缩 snappy
* 外部表
* 分区表(按year,month,day分区)
<2>依据业务数据表
大数据企业学习篇03_3------hive 高级
*方式一、原始表bf_log_src,加载数据(预先处理)

create table IF NOT EXISTS default.web_log_src (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
stored as textfile ;  

load data local inpath '/opt/datas/moodle.web.access.log' into table default.web_log_src ;

select count(*) from bf_log_src ;

select * from  bf_log_src limit 5 ; //查看发现数据有问题,需要对原始数据进行预处理

*方式二、创建正则表RegexSerDe

drop table if exists default.web_log_src ;
create table IF NOT EXISTS default.web_log_src (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  "input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
)
STORED AS TEXTFILE;

load data local inpath '/opt/datas/moodle.web.access.log' into table default.web_log_src ;

参考链接:
https://cwiki.apache.org/confluence/display/Hive/GettingStarted //官方文档
http://wpjam.qiniudn.com/tool/regexpal/ //检验正则表达式

<3>项目需求
大数据企业学习篇03_3------hive 高级
大数据企业学习篇03_3------hive 高级
大数据企业学习篇03_3------hive 高级
大数据企业学习篇03_3------hive 高级

<4>数据ETL
*拆分表(子表)、数据存储格式

drop table if exists default.web_log_comm ;
create table IF NOT EXISTS default.web_log_comm (
remote_addr string,
time_local string,
request string,
http_referer string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");

insert into table default.web_log_comm select remote_addr, time_local, request,http_referer from  default.web_log_src ;

select * from web_log_comm limit 5 ;

*数据预处理ETL(udf、python)
定义UDF,对原表数据进行清洗
第一个udf
去除引号

add jar /opt/datas/hiveudf2.jar ;
create temporary function my_removequotes as "com.xiaojiangshi.hive.udf.RemoveQuotesUDF" ;

insert overwrite table default.web_log_comm select my_removequotes(remote_addr), my_removequotes(time_local), my_removequotes(request), my_removequotes(http_referer) from  default.web_log_src ;

select * from bf_log_comm limit 5 ;

第二个 UDF
处理日期时间字段

31/Aug/2017:00:04:37 +0800

20170831000437

add jar /opt/datas/hiveudf3.jar ;
create temporary function my_datetransform as "com.xiaojiangshi.hive.udf.DateTransformUDF" ;

insert overwrite table default.web_log_comm select my_removequotes(remote_addr), my_datetransform(my_removequotes(time_local)), my_removequotes(request), my_removequotes(http_referer) from  default.web_log_src ;

select * from web_log_comm limit 5 ;

<5>数据分析HQL

desc function extended substring ;
substring('Facebook', 5, 1)
'b'
下标从1开始计数

select substring('20150831230437',9,2) hour from bf_log_comm limit 1 ;

select t.hour, count(*) cnt from
(select substring(time_local,9,2) hour from bf_log_comm ) t
group by t.hour order by  cnt desc ;

----
select t.prex_ip, count(*) cnt from
(
select substring(remote_addr,1,7) prex_ip from bf_log_comm
) t
group by t.prex_ip order by  cnt desc limit 20 ;