大数据企业学习篇03_3------hive 高级
一、hive的压缩
<1>企业中使用比较多的是Snappy
<2>设置:
set mapreduce.map.output.compress=true
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec
<3>注意事项:
可以减少磁盘IO
可以减少网络IO
压缩算法必须是可分割
二、hive数据存储
<1>数据存储:
*按行存储—-textfile
*按列存储—-orc、parquet
<2>如何使用orc(parquet同理)
*创建表
create table page_views_orc(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
**STORED AS orc ;**
*将已存在数据转换为orcFile
insert into table page_views_orc select * from page_views ;
*查看HDFS文件大小
dfs -du -h /user/hive/warehouse/page_views_orc/ ;
注意:可以在创建表的时候指定压缩算法(默认zlib)
create table page_views_orc_snappy(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");
总结:
在实际的项目开发当中,hive表的数据
* 存储格式
orcfile / qarquet
* 数据压缩
snappy
三、hive 企业优化
<1>为什么有的SQL执行MapReduce,有的不呢?
一般企业优化可以将值改为more
<2>可以通过explain查看执行计划
EXPLAIN select deptno,avg(sal) avg_sal from emp group by deptno ;
EXPLAIN EXTENDED select deptno,avg(sal) avg_sal from emp group by deptno ;
<3>高级优化
*大表拆分——子表
create table page_views_par_snappy AS select ip,req_add from page_views ;
*外部表,分区表结合使用,多级分区(企业中基本这么使用)
create external table if not exists default.user(
id int,
name string,
phone string
)
partitioned by(year string,month string,day string)
row format delimited
field terminated by '\t'
locaton 'hdfs://mycluter/user/hive/warehouse/'
*数据
存储格式(textfile、orc、parquet)
数据压缩(snappy)
*SQL优化
优化SQL语句
join,filter
select e.a, e,b ,d.h,d.f from
(select .... from emp where emp.filter) e
join
(select .... from dept where emdeptp.filter) d
on(e.deptno = d.deptno);
*MapReduce
—–Reduce Number
set mapreduce.job.reduces=1 (改)
—–JVM重用
set mapreduce.job.jvm.numtasks=1(改)
—–推测执行
—–并行执行
—–map的数目
minSize=max{minSplitSize,mapred.min.split.size}
maxSize=mapred.max.split.size
splitSize=max{minSize,min{maxSize,blockSize}}
结论:
1.如果想增加map个数,则设置mapred.map.tasks 为一个较大的值。
2.如果想减小map个数,则设置mapred.min.split.size 为一个较大的值。
3.如果输入中有很多小文件,依然想减少map个数,则需要将小文件merger为大文件,然后使用准则2。
<4>
<5>
<6>
注意:
Common/Shuffle/Reduce Join
连接发生的阶段,发生在 Reduce Task
大表对大表
每个表的数据都是从文件中读取的
Map Join
连接发生的阶段,发生在 Map Task
小表对大表
* 大表的数据放从文件中读取 cid
* 小表的数据内存中 id
DistributedCache
需要设置属性:
set hive.auto.convert.join=true
SMB Join
Sort-Merge-BUCKET Join
四、hive streaming实战—电影评分分析项目
<1>获取数据集
https://grouplens.org/datasets/movielens/
<2>创建表
CREATE TABLE u_data (
userid INT,
movieid INT,
rating INT,
unixtime STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;
<3>解压数据文件
unzip ml-100k.zip
<4>加载数据
LOAD DATA LOCAL INPATH '/soft/datas/u.data'
OVERWRITE INTO TABLE u_data;
<5>验证数据是否正确
SELECT COUNT(*) FROM u_data;
<6>创建python脚本文件(weekday_mapper.py)
import sys
import datetime
for line in sys.stdin:
line = line.strip()
userid, movieid, rating, unixtime = line.split('\t')
weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
print '\t'.join([userid, movieid, rating, str(weekday)])
<7>使用mapper脚本
CREATE TABLE u_data_new (
userid INT,
movieid INT,
rating INT,
weekday INT)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\t';
add FILE /soft/datas/u.data/weekday_mapper.py;//加载python脚本
INSERT OVERWRITE TABLE u_data_new
SELECT
TRANSFORM (userid, movieid, rating, unixtime)--原表输入字段
USING 'python weekday_mapper.py' --使用脚本处理
AS (userid, movieid, rating, weekday) --输出字段
FROM u_data; --原表
#分析星期几电影人数最多
SELECT weekday, COUNT(*)
FROM u_data_new
GROUP BY weekday order by COUNT(*) desc;
五、某网站用户日志分析
<1>项目思路
* 原表
* 针对不同的业务创建不同的子表
* 数据存储格式orc/parquet
* 数据压缩 snappy
* map output 数据压缩 snappy
* 外部表
* 分区表(按year,month,day分区)
<2>依据业务数据表
*方式一、原始表bf_log_src,加载数据(预先处理)
create table IF NOT EXISTS default.web_log_src (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
stored as textfile ;
load data local inpath '/opt/datas/moodle.web.access.log' into table default.web_log_src ;
select count(*) from bf_log_src ;
select * from bf_log_src limit 5 ; //查看发现数据有问题,需要对原始数据进行预处理
*方式二、创建正则表RegexSerDe
drop table if exists default.web_log_src ;
create table IF NOT EXISTS default.web_log_src (
remote_addr string,
remote_user string,
time_local string,
request string,
status string,
body_bytes_sent string,
request_body string,
http_referer string,
http_user_agent string,
http_x_forwarded_for string,
host string
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
"input.regex" = "(\"[^ ]*\") (\"-|[^ ]*\") (\"[^\]]*\") (\"[^\"]*\") (\"[0-9]*\") (\"[0-9]*\") (-|[^ ]*) (\"[^ ]*\") (\"[^\"]*\") (-|[^ ]*) (\"[^ ]*\")"
)
STORED AS TEXTFILE;
load data local inpath '/opt/datas/moodle.web.access.log' into table default.web_log_src ;
参考链接:
https://cwiki.apache.org/confluence/display/Hive/GettingStarted //官方文档
http://wpjam.qiniudn.com/tool/regexpal/ //检验正则表达式
<3>项目需求
<4>数据ETL
*拆分表(子表)、数据存储格式
drop table if exists default.web_log_comm ;
create table IF NOT EXISTS default.web_log_comm (
remote_addr string,
time_local string,
request string,
http_referer string
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");
insert into table default.web_log_comm select remote_addr, time_local, request,http_referer from default.web_log_src ;
select * from web_log_comm limit 5 ;
*数据预处理ETL(udf、python)
定义UDF,对原表数据进行清洗
第一个udf
去除引号
add jar /opt/datas/hiveudf2.jar ;
create temporary function my_removequotes as "com.xiaojiangshi.hive.udf.RemoveQuotesUDF" ;
insert overwrite table default.web_log_comm select my_removequotes(remote_addr), my_removequotes(time_local), my_removequotes(request), my_removequotes(http_referer) from default.web_log_src ;
select * from bf_log_comm limit 5 ;
第二个 UDF
处理日期时间字段
31/Aug/2017:00:04:37 +0800
20170831000437
add jar /opt/datas/hiveudf3.jar ;
create temporary function my_datetransform as "com.xiaojiangshi.hive.udf.DateTransformUDF" ;
insert overwrite table default.web_log_comm select my_removequotes(remote_addr), my_datetransform(my_removequotes(time_local)), my_removequotes(request), my_removequotes(http_referer) from default.web_log_src ;
select * from web_log_comm limit 5 ;
<5>数据分析HQL
desc function extended substring ;
substring('Facebook', 5, 1)
'b'
下标从1开始计数
select substring('20150831230437',9,2) hour from bf_log_comm limit 1 ;
select t.hour, count(*) cnt from
(select substring(time_local,9,2) hour from bf_log_comm ) t
group by t.hour order by cnt desc ;
----
select t.prex_ip, count(*) cnt from
(
select substring(remote_addr,1,7) prex_ip from bf_log_comm
) t
group by t.prex_ip order by cnt desc limit 20 ;