欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hive中Common Join 和Map Join的机制(例子详解)

程序员文章站 2022-04-28 23:37:07
...

笼统的说,Hive中的Join可分为Common Join(Reduce阶段完成join)和Map Join(Map阶段完成join)。本文简单介绍一下两种join的原理和机制。
一 .Hive Common Join

如果不指定MapJoin或者不符合MapJoin的条件,那么Hive解析器会将Join操作转换成Common Join,即:在Reduce阶段完成join.
整个过程包含Map、Shuffle、Reduce阶段。

Map阶段
读取源表的数据,Map输出时候以Join on条件中的列为key,如果Join有多个关联键,则以这些关联键的组合作为key;
Map输出的value为join之后所关心的(select或者where中需要用到的)列;同时在value中还会包含表的Tag信息,用于标明此value对应哪个表;
按照key进行排序

Shuffle阶段
根据key的值进行hash,并将key/value按照hash值推送至不同的reduce中,这样确保两个表中相同的key位于同一个reduce中

Reduce阶段
根据key的值完成join操作,期间通过Tag来识别不同表中的数据。
以下面的HQL为例,图解其过程:

SELECT
a.id,a.dept,b.age
FROM a join b
ON (a.id = b.id);

hive中Common Join 和Map Join的机制(例子详解)
二. Hive Map Join
MapJoin通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数 hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M.
Hive0.7之前,需要使用hint提示* /+ mapjoin(table) */才会执行MapJoin,否则执行Common Join,但在0.7版本之后,默认自动会转换Map Join,由数 hive.auto.convert.join来控制,默认为true.

假设a表为一张大表,b为小表,并且hive.auto.convert.join=true,那么Hive在执行时候会自动转化为MapJoin。

执行流程如下:
1.通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会对HashTableFiles进行压缩。
2.MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
hive中Common Join 和Map Join的机制(例子详解)
三.测试
准备数据

hive (test)> select * from stu_190802;
stu_190802.id	stu_190802.name	stu_190802.sex	stu_190802.department	stu_190802.age
16	xm	m	2	25
16	xm	m	2	25
16	xm	m	2	25
1	zs	m	1	18
2	ls	m	1	19
3	ww	m	1	20
4	zq	f	1	18
5	ll	f	1	21
6	hl	f	1	19
7	xh	f	1	20
8	cl	f	1	22
9	fj	m	1	19
10	wb	m	2	23
11	wf	f	2	24
12	jj	m	2	21
13	yy	m	2	20
14	ld	f	2	18
15	ch	f	2	22
1	zs	m	1	17
1	zs	m	1	19

hive (test)> select * from department;
department.id	department.leader
1	zs
2	wl

默认是ture 会开启小表mapjoin

hive (test)> set hive.auto.convert.join;
hive.auto.convert.join=true
hive (test)> set hive.mapjoin.smalltable.filesize;
hive.mapjoin.smalltable.filesize=25000000

两个表进行关联默认会执行mapjoin

select 
       t1.*
      ,t2.leader 
from stu_190802 t1 
left join department t2 
on t1.department =t2.id;
Query ID = finup_20191225144340_00ff1ec6-8d8a-44ca-b92f-08e7b2a1074e
Total jobs = 1
SLF4J: Found binding in [jar:file:/Users/finup/opt/hive3.1.1/lib/log4j-slf4j-impl-2.10.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
============将小表读入内存,生成HashTableFiles====================================
2019-12-25 14:43:54	Dump the side-table for tag: 1 with group count: 2 into file: file:/Users/finup/opt/hive3.1.1/iotmp/4f121d43-3a97-4a8f-aff7-5cc8e02eb80b/hive_2019-12-25_14-43-40_204_201189786632654257-1/-local-10004/HashTable-Stage-3/MapJoin-mapfile01--.hashtable
2019-12-25 14:43:54	End of local task; Time Taken: 1.186 sec.
Execution completed successfully
MapredLocal task succeeded
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
==================只有map没有reduce跟shuffle===============
Job running in-process (local Hadoop)
2019-12-25 14:43:58,652 Stage-3 map = 100%,  reduce = 0%
Ended Job = job_local166338284_0073
MapReduce Jobs Launched:
Stage-Stage-3:  HDFS Read: 25833140 HDFS Write: 7705 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
t1.id	t1.name	t1.sex	t1.department	t1.age	t2.leader
16	xm	m	2	25	wl
16	xm	m	2	25	wl
16	xm	m	2	25	wl
1	zs	m	1	18	zs
2	ls	m	1	19	zs
3	ww	m	1	20	zs
4	zq	f	1	18	zs
5	ll	f	1	21	zs
6	hl	f	1	19	zs
7	xh	f	1	20	zs
8	cl	f	1	22	zs
9	fj	m	1	19	zs
10	wb	m	2	23	wl
11	wf	f	2	24	wl
12	jj	m	2	21	wl
13	yy	m	2	20	wl
14	ld	f	2	18	wl
15	ch	f	2	22	wl
1	zs	m	1	17	zs
1	zs	m	1	19	zs
Time taken: 18.452 seconds, Fetched: 20 row(s)

explian查看执行计划

explain 
select t1.*,t2.leader 
from stu_190802 t1 
left join department t2 
on t1.department =t2.id;
hive (test)> explain select t1.*,t2.leader from stu_190802 t1 left join department t2 on t1.department =t2.id;
OK
==================生成了3个stage======================
Explain
STAGE DEPENDENCIES:
  Stage-4 is a root stage
  Stage-3 depends on stages: Stage-4
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-4
    Map Reduce Local Work
      Alias -> Map Local Tables:
        $hdt$_1:t2
          Fetch Operator
            limit: -1
      Alias -> Map Local Operator Tree:
        $hdt$_1:t2
          TableScan
            alias: t2
            Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: id (type: int), leader (type: string)
              outputColumnNames: _col0, _col1
              Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: NONE
              HashTable Sink Operator
                keys:
                  0 UDFToDouble(_col3) (type: double)
                  1 UDFToDouble(_col0) (type: double)

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: t1
            Statistics: Num rows: 20 Data size: 229 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: id (type: bigint), name (type: string), sex (type: string), department (type: string), age (type: bigint)
              outputColumnNames: _col0, _col1, _col2, _col3, _col4
              Statistics: Num rows: 20 Data size: 229 Basic stats: COMPLETE Column stats: NONE
              Map Join Operator
                condition map:
                     Left Outer Join 0 to 1
                keys:
                  0 UDFToDouble(_col3) (type: double)
                  1 UDFToDouble(_col0) (type: double)
                outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6
                Statistics: Num rows: 22 Data size: 251 Basic stats: COMPLETE Column stats: NONE
                Select Operator
                  expressions: _col0 (type: bigint), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: bigint), _col6 (type: string)
                  outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
                  Statistics: Num rows: 22 Data size: 251 Basic stats: COMPLETE Column stats: NONE
                  File Output Operator
                    compressed: false
                    Statistics: Num rows: 22 Data size: 251 Basic stats: COMPLETE Column stats: NONE
                    table:
                        input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                        output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                        serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
      Execution mode: vectorized
      Local Work:
        Map Reduce Local Work

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

Time taken: 0.329 seconds, Fetched: 65 row(s)

关闭自动mapjoin

set hive.auto.convert.join=false;
hive (test)> select t1.*,t2.leader from stu_190802 t1 left join department t2 on t1.department =t2.id;
Query ID = finup_20191225144559_4682d858-ef03-4189-b048-031b4659f7ac
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Defaulting to jobconf value of: 3
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Job running in-process (local Hadoop)
===================有map 也有reduce操作======================
2019-12-25 14:46:01,259 Stage-1 map = 100%,  reduce = 100%
Ended Job = job_local751812336_0074
MapReduce Jobs Launched:
Stage-Stage-1:  HDFS Read: 129166985 HDFS Write: 38525 SUCCESS
Total MapReduce CPU Time Spent: 0 msec
OK
t1.id	t1.name	t1.sex	t1.department	t1.age	t2.leader
1	zs	m	1	19	zs
1	zs	m	1	17	zs
9	fj	m	1	19	zs
8	cl	f	1	22	zs
7	xh	f	1	20	zs
6	hl	f	1	19	zs
5	ll	f	1	21	zs
4	zq	f	1	18	zs
3	ww	m	1	20	zs
2	ls	m	1	19	zs
1	zs	m	1	18	zs
15	ch	f	2	22	wl
14	ld	f	2	18	wl
13	yy	m	2	20	wl
12	jj	m	2	21	wl
11	wf	f	2	24	wl
10	wb	m	2	23	wl
16	xm	m	2	25	wl
16	xm	m	2	25	wl
16	xm	m	2	25	wl
Time taken: 1.718 seconds, Fetched: 20 row(s)

查看关闭mapjoin后的执行计划

hive (test)> explain select t1.*,t2.leader from stu_190802 t1 left join department t2 on t1.department =t2.id;
OK
==================生成了2个stage======================
Explain
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: t1
            Statistics: Num rows: 20 Data size: 229 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: id (type: bigint), name (type: string), sex (type: string), department (type: string), age (type: bigint)
              outputColumnNames: _col0, _col1, _col2, _col3, _col4
              Statistics: Num rows: 20 Data size: 229 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: UDFToDouble(_col3) (type: double)
                sort order: +
                Map-reduce partition columns: UDFToDouble(_col3) (type: double)
                Statistics: Num rows: 20 Data size: 229 Basic stats: COMPLETE Column stats: NONE
                value expressions: _col0 (type: bigint), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: bigint)
          TableScan
            alias: t2
            Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: NONE
            Select Operator
              expressions: id (type: int), leader (type: string)
              outputColumnNames: _col0, _col1
              Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: UDFToDouble(_col0) (type: double)
                sort order: +
                Map-reduce partition columns: UDFToDouble(_col0) (type: double)
                Statistics: Num rows: 2 Data size: 8 Basic stats: COMPLETE Column stats: NONE
                value expressions: _col1 (type: string)
      Reduce Operator Tree:
        Join Operator
          condition map:
               Left Outer Join 0 to 1
          keys:
            0 UDFToDouble(_col3) (type: double)
            1 UDFToDouble(_col0) (type: double)
          outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col6
          Statistics: Num rows: 22 Data size: 251 Basic stats: COMPLETE Column stats: NONE
          Select Operator
            expressions: _col0 (type: bigint), _col1 (type: string), _col2 (type: string), _col3 (type: string), _col4 (type: bigint), _col6 (type: string)
            outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5
            Statistics: Num rows: 22 Data size: 251 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              Statistics: Num rows: 22 Data size: 251 Basic stats: COMPLETE Column stats: NONE
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

commen join 时map段会有sort order: + 排序的功能,但是在mapjoin 时map 段不会sort

总结
1.mapjoin 可以看做 boardcast join 就是将小表的数据加载到内存中并且没有shuffle过程,加快处理效率,但是这样如果数据量过大,加载到内存有可能会引起OOM
2.普通join 会产生shuffle,会影响效率(数据传输);也可能产生数据倾斜(一个key太多,那任务处理就会很慢)