欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

大数据学习(二)-------- MapReduce

程序员文章站 2022-04-09 17:16:16
前提已经安装好hadoop的hdfs集群,可以查看 https://www.cnblogs.com/tree1123/p/10683570.html Mapreduce是hadoop的运算框架,可以对hdfs中的数据分开进行计算,先执行很多maptask,在执行reducetask,这个过程中任务的 ......

前提已经安装好hadoop的hdfs集群,可以查看

**https://www.cnblogs.com/tree1123/p/10683570.html

mapreduce是hadoop的运算框架,可以对hdfs中的数据分开进行计算,先执行很多maptask,在执行reducetask,这个过程中任务的执行需要一个任务调度的平台,就是yarn。

一、安装yarn集群

yarn集群中有两个角色:

主节点:resource manager  1台

从节点:node manager   n台

 

resource manager一般安装在一台专门的机器上

node manager应该与hdfs中的data node重叠在一起

修改配置文件:yarn-site.xml

<property>
<name>yarn.resourcemanager.hostname</name>
<value>主机名</value>
</property>

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>2048</value>
</property>

<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>2</value>
</property>

然后scp到所有机器,修改主节点hadoop的slaves文件,列入要启动nodemanager的机器,配好免密

然后,就可以用脚本启动yarn集群:

sbin/start-yarn.sh

停止:

sbin/stop-yarn.sh

页面:http://主节点:8088 看看node manager节点是否识别

开发一个提交job到yarn的客户端类,mapreduce所有jar和自定义类,打成jar包上传到hadoop集群中的任意一台机器上,运行jar包中的(yarn客户端类

hadoop jar ......jobsubmitter

二、开发mapreduce程序

主要需要开发:

map阶段的进、出数据,

reduce阶段的进、出数据,

类型都应该是实现了hadoop序列化框架的类型,如:

string对应text

integer对应intwritable

long对应longwritable

例子wordcount代码:

wordcountmapper

public class wordcountmapper extends mapper<longwritable, text, text, intwritable>{
    
    @override
    protected void map(longwritable key, text value, context context)
            throws ioexception, interruptedexception {

        // 切单词
        string line = value.tostring();
        string[] words = line.split(" ");
        for(string word:words){
            context.write(new text(word), new intwritable(1));
            
        }
    }
}

wordcountreducer

public class wordcountreducer extends reducer<text, intwritable, text, intwritable>{
    
    
    @override
    protected void reduce(text key, iterable<intwritable> values,context context) throws ioexception, interruptedexception {
    
        
        int count = 0;
        
        iterator<intwritable> iterator = values.iterator();
        while(iterator.hasnext()){
            
            intwritable value = iterator.next();
            count += value.get();
        }
        
        context.write(key, new intwritable(count));
        
    }

}







public class jobsubmitter {
    
    public static void main(string[] args) throws exception {
        
        // 在代码中设置jvm系统参数,用于给job对象来获取访问hdfs的用户身份
        system.setproperty("hadoop_user_name", "root");
        
        
        configuration conf = new configuration();
        // 1、设置job运行时要访问的默认文件系统
        conf.set("fs.defaultfs", "hdfs://hdp-01:9000");
        // 2、设置job提交到哪去运行
        conf.set("mapreduce.framework.name", "yarn");
        conf.set("yarn.resourcemanager.hostname", "hdp-01");
        // 3、如果要从windows系统上运行这个job提交客户端程序,则需要加这个跨平台提交的参数
        conf.set("mapreduce.app-submission.cross-platform","true");
        
        job job = job.getinstance(conf);
        
        // 1、封装参数:jar包所在的位置
        job.setjar("d:/wc.jar");
        //job.setjarbyclass(jobsubmitter.class);
        
        // 2、封装参数: 本次job所要调用的mapper实现类、reducer实现类
        job.setmapperclass(wordcountmapper.class);
        job.setreducerclass(wordcountreducer.class);
        
        // 3、封装参数:本次job的mapper实现类、reducer实现类产生的结果数据的key、value类型
        job.setmapoutputkeyclass(text.class);
        job.setmapoutputvalueclass(intwritable.class);
        
        job.setoutputkeyclass(text.class);
        job.setoutputvalueclass(intwritable.class);
        
        
        
        path output = new path("/wordcount/output");
        filesystem fs = filesystem.get(new uri("hdfs://hdp-01:9000"),conf,"root");
        if(fs.exists(output)){
            fs.delete(output, true);
        }
        
        // 4、封装参数:本次job要处理的输入数据集所在路径、最终结果的输出路径
        fileinputformat.setinputpaths(job, new path("/wordcount/input"));
        fileoutputformat.setoutputpath(job, output);  // 注意:输出路径必须不存在
        
        
        // 5、封装参数:想要启动的reduce task的数量
        job.setnumreducetasks(2);
        
        // 6、提交job给yarn
        boolean res = job.waitforcompletion(true);
        
        system.exit(res?0:-1);
        
    }
    
    

}

mr还有一些高级的用法:自定义类型,自定义partitioner,combiner,排序,倒排索引,自定义groupingcomparator

三、mapreduce与yarn的核心机制

yarn是一个分布式程序的运行调度平台

yarn中有两大核心角色:

1、resource manager

接受用户提交的分布式计算程序,并为其划分资源

管理、监控各个node manager上的资源情况,以便于均衡负载

 

2、node manager

管理它所在机器的运算资源(cpu + 内存)

负责接受resource manager分配的任务,创建容器、回收资源

mapreduce工作机制:

划分输入切片——》 环形缓冲区 ——》 分区排序 ——》combiner 局部聚合——》shuffle ——》groupingcomparator——》输出