欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hive调优

程序员文章站 2022-04-16 10:01:24
...

1、fetch抓取(hive可以避免进行mapreduce) hive.fetch.task.conversion
Hive中对某些情况的查询可以不必使用MapReduce计算。例如:SELECT * FROM employees;在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。
hive调优
该属性修改为more以后,在全局查找、字段查找、limit查找等都不走mapreduce。

none : disable hive.fetch.task.conversion
minimal : SELECT STAR, FILTER on partition columns, LIMIT only
more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
2、本地模式 hive.exec.mode.local.auto
大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过,有时Hive的输入数据量是非常小的。在这种情况下,为查询触发执行任务时消耗可能会比实际job的执行时间要多的多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。

//开启本地mr
set hive.exec.mode.local.auto=true; 
//设置local mr的最大输入数据量,当输入数据量小于这个值时采用local  mr的方式,默认为134217728,即128M
set hive.exec.mode.local.auto.inputbytes.max=51234560;
//设置local mr的最大输入文件个数,当输入文件个数小于这个值时采用local mr的方式,默认为4
set hive.exec.mode.local.auto.input.files.max=10;

3、表的优化
3-1 Join
原则:
(1)小表join大表,将key相对分散,数据量小的表放在左边。再进一步,可以使用group让小的维度表(1000条以下的记录条数)先进内存。在map端完成reduce。
select count(distinct s_id) from student;尽量改成select count(1) from (select s_id group by s_if from student)temp;
因为前者只能有一个reduce task。
(2)多个表关联时,最好分拆成小段,避免大sql(无法控制中间job)
(3)大表join大表时,尽量减少输入(过滤空值等),如果要求无法过滤空id,那么就将他们转换成随机字符串,这样避免了数据倾斜。

SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;
SELECT a.* FROM nullidtable a LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat('hive', rand()) ELSE a.id END = b.id;

3-2 MapJoin hive.auto.convert.join
如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join。容易发生数据倾斜。可以用MapJoin把小表全部加载到内存在map端进行join,避免reducer处理。

//设置自动选择Mapjoin,默认为true
set hive.auto.convert.join = true; 
//大表小表的阈值设置(默认25M以下认为是小表)
set hive.mapjoin.smalltable.filesize=25123456;

hive调优
首先是Task A,它是一个Local Task(在客户端本地执行的Task),负责扫描小表b的数据,将其转换成一个HashTable的数据结构,并写入本地的文件中,之后将该文件加载到DistributeCache中。
接下来是Task B,该任务是一个没有Reduce的MR,启动MapTasks扫描大表a,在Map阶段,根据a的每一条记录去和DistributeCache中b表对应的HashTable关联,并直接输出结果。
由于MapJoin没有Reduce,所以由Map直接输出结果文件,有多少个Map Task,就有多少个结果文件。

3-3 GroupBy
开启Map端聚合参数设置:

//是否在Map端进行聚合,默认为True
set hive.map.aggr = true;
//在Map端进行聚合操作的条目数目
set hive.groupby.mapaggr.checkinterval = 100000;
//有数据倾斜的时候进行负载均衡(默认是false)
set hive.groupby.skewindata = true;

生成的查询计划会有两个MR Job。第一个MR Job中,Map的输出结果会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的Group By Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;第二个MR Job再根据预处理的数据结果按照Group By Key分布到Reduce中(这个过程可以保证相同的Group By Key被分布到同一个Reduce中),最后完成最终的聚合操作。

3-4 使用分区剪裁、列剪裁
1、先过滤,再聚合
2、尽量少用select *
3、用不到的分区不要输入

3-5 动态分区调整 hive.exec.dynamic.partition
以第一个表的分区规则,来对应第二个表的分区规则,将第一个表的所有分区,全部拷贝到第二个表中来,第二个表在加载数据的时候,不需要指定分区了,直接用第一个表的分区即可。

//开启动态分区功能(默认true,开启)
set hive.exec.dynamic.partition=true;
//设置为非严格模式(动态分区的模式,默认strict,表示必须指定至少一个分区为静态分区,nonstrict模式表示允许所有的分区字段都可以使用动态分区。)
set hive.exec.dynamic.partition.mode=nonstrict;
//在所有执行MR的节点上,最大一共可以创建多少个动态分区。
set  hive.exec.max.dynamic.partitions=1000;
//在每个执行MR的节点上,最大可以创建多少个动态分区。该参数需要根据实际的数据来设定。比如:源数据中包含了一年的数据,即day字段有365个值,那么该参数就需要设置成大于365,如果使用默认值100,则会报错。
set hive.exec.max.dynamic.partitions.pernode=100
//整个MR Job中,最大可以创建多少个HDFS文件。
在linux系统当中,每个linux用户最多可以开启1024个进程,每一个进程最多可以打开2048个文件,即持有2048个文件句柄,下面这个值越大,就可以打开文件句柄越大
set hive.exec.max.created.files=100000;
//当有空分区生成时,是否抛出异常。一般不需要设置。
set hive.error.on.empty.partition=false;
INSERT overwrite TABLE ori_partitioned_target PARTITION (p_time)
SELECT id, time, uid, keyword, url_rank, click_num, click_url, p_time
FROM ori_partitioned;

4、数据倾斜
4-1 map task数量
1、map task数量由文件总个数,文件大小,block大小决定。其中set dfs.block.size可以查看block大小。
2、是不是map数越多越好?
不是,如果一个任务有很多小文件(远远小于块大小128m),则每个小文件也会被当做一个块,用一个map任务来完成,而一个map任务启动和初始化的时间远远大于逻辑处理的时间,就会造成很大的资源浪费。而且,同时可执行的map数是受限的。需要我们手动的减少map task的数量(小文件合并)。
3、是不是保证每个map处理接近128m的文件块,就高枕无忧了?
也不一定,比如有一个127m的文件,正常会用一个map去完成,但这个文件只有一个或者两个小字段,却有几千万的记录,如果map处理的逻辑比较复杂,用一个map任务去做,肯定也比较耗时。需要我们手动增加map task的数量。

//小文件合并
set mapred.max.split.size=112345600;
set mapred.min.split.size.per.node=112345600;
set mapred.min.split.size.per.rack=112345600;
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

前面三个参数确定合并文件块的大小,大于文件块大小128m的,按照128m来分隔,小于128m,大于100m的,按照100m来分隔,把那些小于100m的(包括小文件和分隔大文件剩下的),进行合并。

//增加map task数量
et mapreduce.job.reduces =10;
create table a_1 as
select * from a
distribute by rand(123);

这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。

4-2 reduce task数量
1、调整reduce task数量的两种方法:
(1)

//每个Reduce处理的数据量默认是256MB
hive.exec.reducers.bytes.per.reducer=256123456
//每个任务最大的reduce数,默认为1009
hive.exec.reducers.max=1009
//计算reducer数的公式
N=min(参数2,总输入数据量/参数1)

(2)

set mapreduce.job.reduces = 15;

2、reduce task数量并不是越多越好
1)过多的启动和初始化reduce也会消耗时间和资源。
2)另外,有多少个reduce,就会有多少个输出文件,如果生成了很多个小文件,那么如果这些小文件作为下一个任务的输入,则也会出现小文件过多的问题。

5、执行计划(explain)
基本语法:explain [extended] select * from student;

6、并行执行 hive.exec.parallel

set hive.exec.parallel=true;              //打开任务并行执行
set hive.exec.parallel.thread.number=16;  //同一个sql允许最大并行度,默认为8。

9、严格模式 == hive.mapred.mode==
严格模式下有以下三个限制:
1)对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。换句话说,就是用户不允许扫描所有分区。进行这个限制的原因是,通常分区表都拥有非常大的数据集,而且数据增加迅速。没有进行分区限制的查询可能会消耗令人不可接受的巨大资源来处理这个表。
2)对于使用了order by语句的查询,要求必须使用limit语句。因为order by为了执行排序过程会将所有的结果数据分发到同一个Reducer中进行处理,强制要求用户增加这个LIMIT语句可以防止Reducer额外执行很长一段时间。
3)限制笛卡尔积的查询。对关系型数据库非常了解的用户可能期望在执行JOIN查询的时候不使用ON语句而是使用where语句,这样关系数据库的执行优化器就可以高效地将WHERE语句转化成那个ON语句。不幸的是,Hive并不会执行这种优化,因此,如果表足够大,那么这个查询就会出现不可控的情况。

10、JVM重用 mapred.job.reuse.jvm.num.tasks
省略资源回收再利用。

11、推测执行 hive.mapred.reduce.tasks.speculative.execution
如果用户对于运行时的偏差非常敏感的话,那么可以将这些功能关闭掉。如果用户因为输入数据量很大而需要执行长时间的map或者Reduce task的话,那么启动推测执行造成的浪费是非常巨大大。

相关标签: hive 调优