MapReduce入门
1.MapReduce简介
MapReduce是一种分布式计算框架,用于大规模数据集的并行计算。
2.MapReduce框架
- MapReduce框架中主要包括两个函数: Map和Reduce,这两个函数也是程序猿需要进行编程的函数。
- MapReduce采用 “分而治之” 思想,将一个分布式文件系统中的大规模数据集切分成许多独立的切片,然后这些切片可以由多个map、reduce任务进行处理
- MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,原因是, 移动数据需要大量的网络传输开销
3.MapReduce编程模型
MapReduce由两个阶段组成:Map和Reduce
- map()函数以key/value键值对作为输入,经过map函数处理后产生另外一系列key/value键值对作为中间输出存储写入到磁盘中。
- reduce()函数以key及对应的value列表作为输入,经合并相同的key的value值后,产生另外一系列key/value对作为最终输出写入HDFS中
- 在map()函数输出后到reduce()函数输入前,这期间的过程叫做shuffle阶段,这个过程是MapReduce计算框架主要优化的地方。在下文会详细介绍。
- 指定三个组件分别是 InputFormat、Partitioner 和 OutputFormat, 它们均需要用户根据自己的应用需求配置
1.InputFormat指定输入文件格式。将输入数据切分成若干个 split,且将每个 split 中的数据解析成一个个 map() 函数 要求的 key/value 对。
2.Partitioner确定 map() 函数产生的每个 key/value 对发给哪个 Reduce 任务函数处理。 有几个reduce任务就会有几个Partition
3.OutputFormat指定输出文件格式,即每个 key/value 对以何种形式保存到输出文件中。
4.MapReduce计算流程
输入分片(input split)
在进行map计算之前,MapReduce会根据输入文件计算输入分片(input split),每个输入分片(input split)由一个map任务处理,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组,输入分片(input split)往往和hdfs的block(块)关系很密切。
举例: 假如我们设定hdfs的DataNode的大小是128mb,如果我们输入有三个文件,大小分别是3mb、130mb和200mb,那么MapReduce会把3mb文件分为一个输入分片(input split),130mb则是两个输入分片(input split)而200mb也是两个输入分片(input split),换句话说我们如果不在map计算前做输入分片调整,例如合并小文件,那么就会有5个map任务将执行,而且每个map执行的数据大小不均,这个也是MapReduce优化计算的一个关键点。
map阶段
map函数处理逻辑由程序猿编写;一般map操作都是本地化操作,也就是在数据存储节点上进行。
combiner阶段
combiner阶段是程序员可以选择的,combiner其实也是一种reduce操作,因此我们看见WordCount类里是用reduce进行加载的。Combiner是一个本地化的reduce操作,它是map运算的后续操作,主要是在map计算出中间文件前做一个简单的合并重复key值的操作,例如下文对文件里的单词频率做统计,map计算时候如果碰到一个单词就会记录为1,但是这篇文章里相同的单词可能会出现n多次,那么map输出文件冗余就会很多,因此在reduce计算前对相同的key做一个合并操作,那么文件会变小,这样就提高了宽带的传输效率,毕竟hadoop计算力宽带资源往往是计算的瓶颈也是最为宝贵的资源,但是combiner操作是有风险的,使用它的原则是combiner的输入不会影响到reduce计算的最终输入,例如:如果计算只是求总数,最大值,最小值可以使用combiner,但是求中位值计算中使用combiner的话,最终的reduce计算结果就会出错。
shuffle阶段(包括Partition, Sort, Spill, Meger, Combiner,Copy, Memery, Disk……)
- MemoryBuffer:内存缓冲区,每个map的结果和partition处理的key/value结果都保存在缓存中。缓冲区默认大小为100M,溢写阈值为100M* 0.8=80M
- Spill:内存缓冲区达到阈值时,溢写spill线程锁住这80M
的缓冲区,开始将数据写出到本地磁盘中,然后释放内存。每次溢写都生成一个数据文件。溢出的数据到磁盘前会对数据进行key排序sort以及合并combiner。发送相同Reduce的key数量,会拼接到一起,减少partition的索引数量。 - Partitioner:决定数据由哪个Reducer处理。可以采用hash法进行分区。
- Sort:缓冲区数据按照key进行排序。
- Disk:将数据写入磁盘中
reduce阶段
和map函数一样也是由程序猿编写的,最终结果是存储在hdfs上的。
5.HDFS block和MapReduce split之间的联系?
Block:HDFS中最小的数据存储单位,默认是128M;Split:MapReduce中最小的计算单元,默认与Block一一对应。
两者的对应关系是任意的,可有用户控制。
6.例子 WordCount(Python编写)
部分数据截图
map函数
import re
data_path = './data/The_man_of_property.txt'
p = re.compile(r'\w+') # 正则表达式,确保切分的为一个正确的单词
with open(data_path, 'r', encoding='utf-8') as f: # 打开文件
for line in f.readlines():
word_list = line.strip().split(' ') # 按照空格切分单词,生成一个列表
for word in word_list: # 对每一个单词进行处理
re_word = p.findall(word) # 利用正则表达式对每一个单词进行处理,去除标点符号等
if len(re_word)==0:
continue
word = re_word[0].lower() # 将单词小写
print("%s,%s" % (word, 1)) # 将每个单词以键值对的形式输出
reduce函数
data_path = './data/reduce_test'
cur_word = None
sum = 0
with open(data_path,'r',encoding='utf-8') as f:
for line in f.readlines():
word , val = line.strip().split(',')
if cur_word == None:
cur_word = word
if cur_word != word:
print("%s,%s" % (word, sum))
sum = 0
cur_word = word
sum = sum + int(val)
print("%s,%s"%(word,sum)) # 将每个单词计数
Python环境运行脚本
HADOOP_CMD="/usr/local/src/hadoop-2.6.1/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"
INPUT_FILE_PATH_1="/data/The_man_of_property.txt"
OUTPUT_PATH="/output/wc"
$HADOOP_CMD fs -rm -r -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map_t.py" \
-reducer "python reducer.py" \
-file ./map_t.py \
-file ./reducer.py
下一篇: hed边缘检测模型之裂缝检测