学习笔记—MapReduce
mapreduce是什么
mapreduce是一种分布式计算编程框架,是hadoop主要组成部分之一,可以让用户专注于编写核心逻辑代码,最后以高可靠、高容错的方式在大型集群上并行处理大量数据。
mapreduce的存储
mapreduce的数据是存储在hdfs上的,hdfs也是hadoop的主要组成部分之一。下边是mapreduce在hdfs上的存储的图解
hdfs主要有namenode和datanode两部分组成,整个集群有一个namenode和多个datanode,通常每一个节点一个datanode,namenode的主要功能是用来管理客户端client对数据文件的操作请求和储存数据文件的地址。datanode主要是用来储存和管理本节点的数据文件。节点内部数据文件被分为一个或多个block块(block默认大小原来是64mb,后来变为128mb),然后这些块储存在一组datanode中。(这里不对hdfs做过多的介绍,后续会写一篇详细的hdfs笔记)
mapreduce的运行流程
1、首先把需要处理的数据文件上传到hdfs上,然后这些数据会被分为好多个小的分片,然后每个分片对应一个map任务,推荐情况下分片的大小等于block块的大小。然后map的计算结果会暂存到一个内存缓冲区内,该缓冲区默认为100m,等缓存的数据达到一个阈值的时候,默认情况下是80%,然后会在磁盘创建一个文件,开始向文件里边写入数据。
2、map任务的输入数据的格式是<key,value>对的形式,我们也可以自定义自己的<key,value>类型。然后map在往内存缓冲区里写入数据的时候会根据key进行排序,同样溢写到磁盘的文件里的数据也是排好序的,最后map任务结束的时候可能会产生多个数据文件,然后把这些数据文件再根据归并排序合并成一个大的文件。
3、然后每个分片都会经过map任务后产生一个排好序的文件,同样文件的格式也是<key,value>对的形式,然后通过对key进行hash的方式把数据分配到不同的reduce里边去,这样对每个分片的数据进行hash,再把每个分片分配过来的数据进行合并,合并过程中也是不断进行排序的。最后数据经过reduce任务的处理就产生了最后的输出。
4、在我们开发中只需要对中间map和reduce的逻辑进行开发就可以了,中间分片,排序,合并,分配都有mapreduce框架帮我完成了。
mapreduce的资源调度系统
最后我们来看一下mapreduce的资源调度系统yarn。
yarn的基本思想是将资源管理和作业调度/监视的功能分解为单独的守护进程。全局唯一的resourcemanager是负责所有应用程序之间的资源的调度和分配,每个程序有一个applicationmaster,applicationmaster实际上是一个特定于框架的库,其任务是协调来自resourcemanager的资源,并与nodemanager一起执行和监视任务。nodemanager是每台机器框架代理,监视其资源使用情况(cpu,内存,磁盘,网络)并将其报告给resourcemanager。
wordconut代码
- python实现
map.py
#!/usr/bin/env python # -*- coding:utf-8 -*- import sys for line in sys.stdin: words = line.strip().split() for word in words: print('%s\t%s' % (word, 1))
reduce.py
#!/usr/bin/env python # -*- coding:utf-8 -*- import sys current_word = none sum = 0 for line in sys.stdin: word, count = line.strip().split(' ') if current_word == none: current_word = word if word != current_word: print('%s\t%s' % (current_word, sum)) current_word = word sum = 0 sum += int(count) print('%s\t%s' % (current_word, sum))
我们先把输入文件上传到hdfs上去
hadoop fs -put /input.txt /
然后在linux下运行,为了方便我们把命令写成了shell文件
hadoop_cmd="/usr/local/src/hadoop-2.6.1/bin/hadoop" stream_jar_path="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar" input_file_path="/input.txt" output_file_path="/output" $hadoop_cmd fs -rmr -skiptrush $output_file_path $hadoop_cmd jar $stream_jar_path \ -input $input_file_path \ -output $output_file_path \ -mapper "python map.py" \ -reducer "python reduce.py" \ -file "./map.py" \ -file "./reduce.py"
- java实现
mymap.java
import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.longwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.mapper; import java.io.ioexception; public class mymap extends mapper<longwritable, text, text, intwritable> { private intwritable one = new intwritable(1); private text text = new text(); @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { string line = value.tostring(); string[] words = line.split(" "); for (string word: words){ text.set(word); context.write(text,one); } } }
myreduce.java
import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.reducer; import java.io.ioexception; public class myreduce extends reducer<text, intwritable, text, intwritable> { private intwritable result = new intwritable(); @override protected void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception { int sum = 0; for (intwritable i:values){ sum+=i.get(); } result.set(sum); context.write(key,result); } }
wordcount.java
import org.apache.hadoop.conf.configuration; import org.apache.hadoop.fs.path; import org.apache.hadoop.io.intwritable; import org.apache.hadoop.io.text; import org.apache.hadoop.mapreduce.job; import org.apache.hadoop.mapreduce.lib.input.fileinputformat; import org.apache.hadoop.mapreduce.lib.output.fileoutputformat; public class wordcount { public static void main(string[] args) throws exception { configuration configuration = new configuration(); job job = job.getinstance(configuration, "wordcount"); job.setjarbyclass(wordcount.class); job.setmapperclass(mymap.class); job.setreducerclass(myreduce.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); fileinputformat.addinputpath(job, new path(args[0])); fileoutputformat.setoutputpath(job, new path(args[1])); system.exit(job.waitforcompletion(true) ? 0 : 1); } }
把工程打成jar包,然后把jar包和输入文件上传到hdfs
$ hadoop fs -put /wordcount.jar / $ hadoop fs -put /input.txt /
执行wordcount任务
$ bin/hadoop jar wordcount.jar wordcount /input.txt /user/joe/wordcount/output
欢迎关注公众号:「努力给自己看」