大数据学习(二)-------- MapReduce
前提已经安装好hadoop的hdfs集群,可以查看
**https://www.cnblogs.com/tree1123/p/10683570.html
mapreduce是hadoop的运算框架,可以对hdfs中的数据分开进行计算,先执行很多maptask,在执行reducetask,这个过程中任务的执行需要一个任务调度的平台,就是yarn。
一、安装yarn集群
yarn集群中有两个角色:
主节点:resource manager 1台
从节点:node manager n台
resource manager一般安装在一台专门的机器上
node manager应该与hdfs中的data node重叠在一起
修改配置文件:yarn-site.xml
<property> <name>yarn.resourcemanager.hostname</name> <value>主机名</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>2048</value> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>2</value> </property>
然后scp到所有机器,修改主节点hadoop的slaves文件,列入要启动nodemanager的机器,配好免密
然后,就可以用脚本启动yarn集群:
sbin/start-yarn.sh
停止:
sbin/stop-yarn.sh
页面:http://主节点:8088 看看node manager节点是否识别
开发一个提交job到yarn的客户端类,mapreduce所有jar和自定义类,打成jar包上传到hadoop集群中的任意一台机器上,运行jar包中的(yarn客户端类
hadoop jar ......jobsubmitter
二、开发mapreduce程序
主要需要开发:
map阶段的进、出数据,
reduce阶段的进、出数据,
类型都应该是实现了hadoop序列化框架的类型,如:
string对应text
integer对应intwritable
long对应longwritable
例子wordcount代码:
wordcountmapper
public class wordcountmapper extends mapper<longwritable, text, text, intwritable>{ @override protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception { // 切单词 string line = value.tostring(); string[] words = line.split(" "); for(string word:words){ context.write(new text(word), new intwritable(1)); } } }
wordcountreducer
public class wordcountreducer extends reducer<text, intwritable, text, intwritable>{ @override protected void reduce(text key, iterable<intwritable> values,context context) throws ioexception, interruptedexception { int count = 0; iterator<intwritable> iterator = values.iterator(); while(iterator.hasnext()){ intwritable value = iterator.next(); count += value.get(); } context.write(key, new intwritable(count)); } } public class jobsubmitter { public static void main(string[] args) throws exception { // 在代码中设置jvm系统参数,用于给job对象来获取访问hdfs的用户身份 system.setproperty("hadoop_user_name", "root"); configuration conf = new configuration(); // 1、设置job运行时要访问的默认文件系统 conf.set("fs.defaultfs", "hdfs://hdp-01:9000"); // 2、设置job提交到哪去运行 conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.hostname", "hdp-01"); // 3、如果要从windows系统上运行这个job提交客户端程序,则需要加这个跨平台提交的参数 conf.set("mapreduce.app-submission.cross-platform","true"); job job = job.getinstance(conf); // 1、封装参数:jar包所在的位置 job.setjar("d:/wc.jar"); //job.setjarbyclass(jobsubmitter.class); // 2、封装参数: 本次job所要调用的mapper实现类、reducer实现类 job.setmapperclass(wordcountmapper.class); job.setreducerclass(wordcountreducer.class); // 3、封装参数:本次job的mapper实现类、reducer实现类产生的结果数据的key、value类型 job.setmapoutputkeyclass(text.class); job.setmapoutputvalueclass(intwritable.class); job.setoutputkeyclass(text.class); job.setoutputvalueclass(intwritable.class); path output = new path("/wordcount/output"); filesystem fs = filesystem.get(new uri("hdfs://hdp-01:9000"),conf,"root"); if(fs.exists(output)){ fs.delete(output, true); } // 4、封装参数:本次job要处理的输入数据集所在路径、最终结果的输出路径 fileinputformat.setinputpaths(job, new path("/wordcount/input")); fileoutputformat.setoutputpath(job, output); // 注意:输出路径必须不存在 // 5、封装参数:想要启动的reduce task的数量 job.setnumreducetasks(2); // 6、提交job给yarn boolean res = job.waitforcompletion(true); system.exit(res?0:-1); } }
mr还有一些高级的用法:自定义类型,自定义partitioner,combiner,排序,倒排索引,自定义groupingcomparator
三、mapreduce与yarn的核心机制
yarn是一个分布式程序的运行调度平台
yarn中有两大核心角色:
1、resource manager
接受用户提交的分布式计算程序,并为其划分资源
管理、监控各个node manager上的资源情况,以便于均衡负载
2、node manager
管理它所在机器的运算资源(cpu + 内存)
负责接受resource manager分配的任务,创建容器、回收资源
mapreduce工作机制:
划分输入切片——》 环形缓冲区 ——》 分区排序 ——》combiner 局部聚合——》shuffle ——》groupingcomparator——》输出
下一篇: git 版本库基础知识学习