欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

hadoop核心组件(二)

程序员文章站 2022-06-24 09:38:14
1、Hive概念: hive是数据仓库,由解释器、优化器和编译器组成;运行时,元数据存储在关系型数据库中。 2、Hive的架构: (1)用户接口主要有三个:CLi、Client和WUI。其中最常用的是CLi,CLi启动时候,会启动一个Hive副本。Client是hive的客户端,用户连接至Hive ......

1、Hive概念:

hive是数据仓库,由解释器、优化器和编译器组成;运行时,元数据存储在关系型数据库中。

2、Hive的架构:

hadoop核心组件(二)

(1)用户接口主要有三个:CLi、Client和WUI。其中最常用的是CLi,CLi启动时候,会启动一个Hive副本。Client是hive的客户端,用户连接至Hive Server。在启动Client模式的时候,需要指出Hive Server所在的节点,并且在该节点启动Hive Server;WEBUI是通过浏览器访问hive;

(2)Hive元数据存储在数据库中,如MySQL、Derby。Hive中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),表的数据所在的目录等;

(3)解释器、编译器、优化器完成HQL查询语句从词法解析、语法分析、编译、优化以及查询计划的产生。生成的查询计划存储在HDFS中,并在随后有MapReduce调用执行。

(4)Hive数据存储在HDFS中,大部分的查询、计算由Mapreduce完成(包含 * 的查询,比如select * from table不会产生mapreduce任务)

3、Hive的三种模式

(1)local模式,一般用于Unit Test

(2)单用户模式,通过网络连接到一个数据库中,是最经常使用的模式

(3)多用户模式,用于非java客户端访问元数据库,在服务器端启动MetaStoreServer,客户端利用Thrift协议通过MetaStoreServer访问元数据库

4、Hive建表

建表语句见官网:https://cwiki.apache.org/confluence/display/Hive/LanguageManual

创建内部表:

create table psn(
id int,
name string,
likes ARRAY <string>,
address MAP <string,string>
) 
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '-'
MAP KEYS TERMINATED BY ':';

注:插入数据不建议用insert语句,因为它会转成MapReduce,执行时间较长

LOAD DATA [LOCAL] INPATH '/data_path' INTO TABLE psn

 

创建外部表

create EXTERNAL table psn2(
id int,
name string,
likes ARRAY <string>,
address MAP <string,string>
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '-'
MAP KEYS TERMINATED BY ':'
LOCATION '/user/psn2';
EXTERNAL 表示创建外部表
LOCATION 表示数据存放在hdfs上的路径

删除表
drop table psn;               #内表
drop table psn2;               #外表

注:外部表不会删除数据

 

创建分区表

create table psn3(
id int,
name string,
likes ARRAY <string>,
address MAP <string,string>
)
PARTITIONED BY(sex string)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '-'
MAP KEYS TERMINATED BY ':';

注:分区表的字段不能是表中的字段,分区字段不存值

分区表插入数据

LOAD DATA LOCAL INPATH '/root/data_path' INTO TABLE psn3 partition (sex='boy');

导入数据第二种方式

from psn insert into table psn2 select id,name,likes,address;

 

动态分区

静态分区:固定的,通过路径(hdfs来标识)

动态分区:不固定的,可变的(对于一个文件中分区字段有不同的值,导入表的同时分区)

动态分区参数配置
set hive.exec.dynamic.partition=true;               #是否开启动态分区 默认false   
set hive.exec.dynamic.partition.mode=nostrict;      #设置非严格模式 默认strict(至少有一个是静态分区)
set hive.exec.max.dynamic.partitions.pernode;       #每个执行的mr节点上,允许创建的动态分区的最大数量(100set hive.exec.max.dynamic.partitions;               #所有执行的mr节点上,允许创建的所有动态分区的最大数量(1000set hive.exec.max.created.files;                    #所有的mr job允许创建的文件的最大数量(100000

注:动态分区导入数据不使用load data...

        建议使用 from tablle2 insert overwrite table table1 partition(age, sex)  select id, name, age, sex, likes, address distribute by age, sex;

5、创建自定义函数

官网网址:https://cwiki.apache.org/confluence/display/Hive/HivePlugins

package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
 
public final class Lower extends UDF {
  public Text evaluate(final Text s) {
    if (s == null) { return null; }
    return new Text(s.toString().toLowerCase());
  }
}
重点:写完之后打成jar上传至hiverserver上,而不是客户端,进入hive客户端添加jar包
           add jar地址 path:hiverserver主机的地址,而不是hdfs地址;
实践证明上传至客户端有效,只要创建成功,只要不退出本次会话,即使删除本地的jar,创建的临时函数也是有效的
hive> add jar /root/example.jar                                #后面为jar包上传的路径
hive> create  temporary function example as 'com.example.hive.udf.Lower';       #创建临时函数 example为hive sql执行的函数名;字符串为类路径
hive> select id,name,example(name),likes from psn1;                    #运行
hive> drop temporary function example;                            #删除临时函数

6、分桶
-----分桶表是对列值取hash的方式,将不同数据放到不同文件中存储
-----对于hive表中每一个表、分区都可以进一步进行分桶
-----由列的hash值除以桶的个数来决定每条数据划分在哪个桶中
使用场景:数据抽样(simaple)、mapjoin

首先要开启支持分桶
set hive.enforce.bucketing=true;      #默认为flase;设置之后,MR运行时会根据桶的个数自动分配reduce task个数

注:一次作业产生的桶(文件数量)和reduce task个数一致

创建分桶表

CREATE TABLE psnbucket( id INT, name STRING, sex string)
CLUSTERED BY (id) INTO 4 BUCKETS
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

hadoop核心组件(二)

查看创建的分桶表

hadoop核心组件(二)

加载数据

1.insert [into/overwrite] table psnbucket select id, name, sex from psn6;
2.from psn6 insert into psnbucket select id,name,sex;

桶的数量对应执行了4个reduce任务 

hadoop核心组件(二)

桶表抽样查询

TABLESAMPLE(BUCKET x OUT OF y)
x:表示从哪个bucket开始抽取数据
y:必须为该表总bucket数的倍数或因子
 
example:
select id,name,age from bucket_table tablesample(bucket 1 out of 4 on id);

计算抽取哪些数据:

当表总bucket数为32(total)时
      TABLESAMPLE(BUCKET 2 OUT OF 16),抽取哪些数据?
      共抽取2(32/16)个bucket的数据,抽取第2、第18(16+2)个bucket的数据
if(total /y >0){
   抽取第 x、y+ ( total /y )桶的数据
}else if(total /y < 0){
   抽取第x桶的( total /y )的数据  
}else{
   x、y数量有问题 
}
7、Hive索引
hive创建索引后如需使用的话必须先重建索引,索引才能生效

8、Hive优化
核心思想:把Hive SQL当做MapReduce程序去优化
一下SQL不会转成MapReduce来执行
  • select仅查询本表字段 
  • where仅对本表字段做条件过滤 

(1)Explain显示执行计划

 

EXPLAIN [EXTENDED] sql语句

 

(2)Hive的运行方式:本地模式和集群模式

  开启本地模式:

set hive.exec.mode.local.auto=true;

注意:hive.exec.mode.local.auto.inputbytes.max默认值为128M ,表示加载文件的最大值,若大于该配置仍会以集群方式来运行! 

(3)并行计算

set hive.exec.parallel=true;

注意:hive.exec.parallel.thread.number (一次SQL计算中允许并行执行的job个数的最大值) 

(4)严格模式

set hive.mapred.mode=strict;      #(默认为:nonstrict非严格模式) 
查询限制:
           1、对于分区表,必须添加where对于分区字段的条件过滤;
           2、order by语句必须包含limit输出限制;
           3、限制执行笛卡尔积的查询。
(5)Hive排序
   Order By - 对于查询结果做全排序,只允许有一个reduce处理
          (当数据量较大时,应慎用。严格模式下,必须结合limit来使用)
        Sort By - 对于单个reduce的数据进行排序
        Distribute By - 分区排序,经常和Sort By结合使用
        Cluster By - 相当于 Sort By + Distribute By
          (Cluster By不能通过asc、desc的方式指定排序规则;
          可通过 distribute by column sort by column asc|desc 的方式)
(6)Hive Join
  Join计算时,将小表(驱动表)放在join的左边,因为先加载左边(让它优先加载小表) 
  Map Join:在Map端完成Join 
  两种实现方式: 
    1、SQL方式,在SQL语句中添加MapJoin标记(mapjoin hint)
               语法:
                     SELECT  /*+ MAPJOIN(smallTable) */  smallTable.key,  bigTable.value
                     FROM  smallTable  JOIN  bigTable  ON  smallTable.key  =  bigTable.key;
               2、开启自动的MapJoin
set hive.auto.convert.join = true;      #该参数为true时,Hive自动对左边的表统计量,如果是小表就加入内存,即对小表使用Map join

相关配置参数: 

hive.mapjoin.smalltable.filesize;              #大表小表判断的阈值,如果表的大小小于该值则会被加载到内存中运行 
hive.ignore.mapjoin.hint;                  #默认值:true;是否忽略mapjoin hint 即mapjoin标记 
hive.auto.convert.join.noconditionaltask;         #默认值:true;将普通的join转化为普通的mapjoin时,是否将多个mapjoin转化为一个mapjoin 
hive.auto.convert.join.noconditionaltask.size;      #将多个mapjoin转化为一个mapjoin时,其表的最大值 

(7)Map-Side聚合

set hive.map.aggr=true;
hive.groupby.mapaggr.checkinterval              #map端group by执行聚合时处理的多少行数据(默认:100000)
hive.map.aggr.hash.min.reduction                #进行聚合的最小比例(预先对100000条数据做聚合,若聚合之后的数据量/100000的值大于该配置0.5,则不会聚合)
hive.map.aggr.hash.percentmemory                #map端聚合使用的内存的最大值
hive.map.aggr.hash.force.flush.memory.threshold      #map端做聚合操作是hash表的最大可用内容,大于该值则会触发flush
hive.groupby.skewindata                     #是否对GroupBy产生的数据倾斜做优化,默认为false

(8)控制hive中的Map以及Reduce数量

Map数量相关的参数 

mapred.max.split.size        #一个split的最大值,即每个map处理文件的最大值
mapred.min.split.size.per.node   #一个节点上split的最小值
mapred.min.split.size.per.rack   #一个机架上split的最小值

Reduce数量相关的参数 

mapred.reduce.tasks                #强制指定reduce任务的数量
hive.exec.reducers.bytes.per.reducer      #每个reduce任务处理的数据量
hive.exec.reducers.max                     #每个任务最大的reduce数

(9)Hive--JVM重用

适用场景: 1、小文件个数过多    2、task个数过多
set mapred.job.reuse.jvm.num.tasks=n    # 来设置(n为task插槽个数)
缺点:设置开启之后,task插槽会一直占用资源,不论是否有task运行,直到所有的task即整个job全部执行完成时,才会释放所有的task插槽资源!