欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Hadoop学习(4)-mapreduce的一些注意事项

程序员文章站 2022-03-23 13:41:51
关于mapreduce的一些注意细节 如果把mapreduce程序打包放到了liux下去运行, 命令java –cp xxx.jar 主类名 如果报错了,说明是缺少相关的依赖jar包 用命令hadoop jar xxx.jar 类名因为在集群机器上用 hadoop jar xx.jar mr.wc. ......

关于mapreduce的一些注意细节

如果把mapreduce程序打包放到了liux下去运行,

命令java  –cp  xxx.jar 主类名

如果报错了,说明是缺少相关的依赖jar包

用命令hadoop jar xxx.jar 类名因为在集群机器上用 hadoop jar xx.jar mr.wc.jobsubmitter 命令来启动客户端main方法时,hadoop jar这个命令会将所在机器上的hadoop安装目录中的jar包和配置文件加入到运行时的classpath中

那么,我们的客户端main方法中的new configuration()语句就会加载classpath中的配置文件,自然就有了

fs.defaultfs 和 mapreduce.framework.name 和 yarn.resourcemanager.hostname 这些参数配置

会把本地hadoop的相关的所有jar包都会引用

mapreduce也有本地的job运行,就是可以不用提交到yarn上,可以以单机的模式跑一边以多个线程模拟也可以。

就是如果不管在linux下还是windows下,提交job都会默认的提交到本地去运行,

如果在linux默认提交到yarn上运行,需要写配置文件hadoop/etc/mapred-site.xml文件

<configuration>

<property>

<name>mapreduce.framework.name</name>

<value>yarn</value>

</property>

</configuration>

 

key,value对,如果是自己的类的话,那么这个类要实现writable,同时要把你想序列化的数据转化成二进制,然后放到重写方法wirte参数的dataoutput里面,另一个readfields重写方法是用来反序列化用的,

注意反序列化的时候,会先拿这个类的无参构造方法构造出一个对象出来,然后再通过readfields方法来复原这个对象。

 

dataoutput也是一种流,只不过是hadoop的在封装,自己用的时候,里面需要加个fileoutputstream对象

dataoutput写字符串的时候要用writeutf(“字符串”),他这样编码的时候,会在字符串的前面先加上字符串的长度,这是考虑到字符编码对其的问题,hadoop解析的时候就会先读前面两个字节,看一看这个字符串有多长,不然如果用write(字符串.getbytes())这样他不知道这个字符串到底有多少个字节。

 

在reduce阶段,如果把一个对象写到hdfs里面,那么会调用字符串的tostring方法,你可以重写这个类的tostring方法 

举例,下面这个类就可以在hadoop里序列化

package mapreduce2;

import java.io.datainput;
import java.io.dataoutput;
import java.io.ioexception;

import org.apache.hadoop.hdfs.client.hdfsclientconfigkeys.write;
import org.apache.hadoop.io.writable;
import org.apache.hadoop.util.waitable;

public class flowbean implements writable {

    private int up;//上行流量
    private int down;//下行流量
    private int sum;//总流量
    private string phone;//电话号
    
    public flowbean(int up, int down, string phone) {
        this.up = up;
        this.down = down;
        this.sum = up + down;
        this.phone = phone;
    }
    public int getup() {
        return up;
    }
    public void setup(int up) {
        this.up = up;
    }
    public int getdown() {
        return down;
    }
    public void setdown(int down) {
        this.down = down;
    }
    public int getsum() {
        return sum;
    }
    public void setsum(int sum) {
        this.sum = sum;
    }
    public string getphone() {
        return phone;
    }
    public void setphone(string phone) {
        this.phone = phone;
    }
    @override
    public void readfields(datainput di) throws ioexception {
        //注意这里读的顺序要和写的顺序是一样的
        this.up = di.readint();
        this.down = di.readint();
        this.sum = this.up + this.down;
        this.phone = di.readutf();
    }
    @override
    public void write(dataoutput do) throws ioexception {
        do.writeint(this.up);
        do.writeint(this.down);
        do.writeint(this.sum);
        do.writeutf(this.phone);
    }
    @override
    public string tostring() {
        return "电话号"+this.phone+" 总流量"+this.sum;
    }
}

 

 

 当所有的reducetask都运行完之后,还会调用一个cleanup方法

应用练习:统计一个页面访问总量为n条的数据

方案一:只用一个reducetask,利用cleanup方法,在reducetask阶段,先不直接放到hdfs里面,而是存到一个treemap里面

再在reducetask结束后,在cleanup里面通过把treemap里面前五输出到hdfs里面;

package cn.edu360.mr.page.topn;

public class pagecount implements comparable<pagecount>{
    
    private string page;
    private int count;
    
    public void set(string page, int count) {
        this.page = page;
        this.count = count;
    }
    
    public string getpage() {
        return page;
    }
    public void setpage(string page) {
        this.page = page;
    }
    public int getcount() {
        return count;
    }
    public void setcount(int count) {
        this.count = count;
    }

    @override
    public int compareto(pagecount o) {
        return o.getcount()-this.count==0?this.page.compareto(o.getpage()):o.getcount()-this.count;
    }
    
    

}

 

map类

import java.io.ioexception;

import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.mapper;

public class pagetopnmapper extends mapper<longwritable, text, text, intwritable>{
    
    @override
    protected void map(longwritable key, text value, context context)
            throws ioexception, interruptedexception {
        string line = value.tostring();
        string[] split = line.split(" ");
        context.write(new text(split[1]), new intwritable(1));
    }

}

reduce类

package cn.edu360.mr.page.topn;

import java.io.ioexception;
import java.util.map.entry;
import java.util.set;
import java.util.treemap;

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.reducer;

public class pagetopnreducer extends reducer<text, intwritable, text, intwritable>{
    
    treemap<pagecount, object> treemap = new treemap<>();
    
    @override
    protected void reduce(text key, iterable<intwritable> values,
            reducer<text, intwritable, text, intwritable>.context context) throws ioexception, interruptedexception {
        int count = 0;
        for (intwritable value : values) {
            count += value.get();
        }
        pagecount pagecount = new pagecount();
        pagecount.set(key.tostring(), count);
        
        treemap.put(pagecount,null);
        
    }
    @override
    protected void cleanup(context context)
            throws ioexception, interruptedexception {
        configuration conf = context.getconfiguration();
    //可以在cleanup里面拿到configuration,从里面读取要拿前几条数据 int topn = conf.getint("top.n", 5); set<entry<pagecount, object>> entryset = treemap.entryset(); int i= 0; for (entry<pagecount, object> entry : entryset) { context.write(new text(entry.getkey().getpage()), new intwritable(entry.getkey().getcount())); i++; if(i==topn) return; } } }

然后jobsubmit类,注意这个要设定configuration,这里面有几种方法

第一种是加载配置文件

        configuration conf = new configuration();
        conf.addresource("xx-oo.xml");

然后再在xx-oo.xml文件里面写

<configuration>
    <property>
        <name>top.n</name>
        <value>6</value>
    </property>
</configuration>

第二种方式

    //通过直接设定
        conf.setint("top.n", 3);
        //通过对java主程序 直接传进来的参数
        conf.setint("top.n", integer.parseint(args[0]));

第三种方式通过获取配置文件参数

     properties props = new properties();
        props.load(jobsubmitter.class.getclassloader().getresourceasstream("topn.properties"));
        conf.setint("top.n", integer.parseint(props.getproperty("top.n")));

然后再在topn.properties里面配置参数

top.n=5

subsubmit类,默认在本机模拟运行

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;

public class jobsubmitter {

    public static void main(string[] args) throws exception {

        /**
         * 通过加载classpath下的*-site.xml文件解析参数
         */
        configuration conf = new configuration();
        conf.addresource("xx-oo.xml");
        
        /**
         * 通过代码设置参数
         */
        //conf.setint("top.n", 3);
        //conf.setint("top.n", integer.parseint(args[0]));
        
        /**
         * 通过属性配置文件获取参数
         */
        /*properties props = new properties();
        props.load(jobsubmitter.class.getclassloader().getresourceasstream("topn.properties"));
        conf.setint("top.n", integer.parseint(props.getproperty("top.n")));*/
        
        job job = job.getinstance(conf);

        job.setjarbyclass(jobsubmitter.class);

        job.setmapperclass(pagetopnmapper.class);
        job.setreducerclass(pagetopnreducer.class);

        job.setmapoutputkeyclass(text.class);
        job.setmapoutputvalueclass(intwritable.class);
        
        job.setoutputkeyclass(text.class);
        job.setoutputvalueclass(intwritable.class);

        fileinputformat.setinputpaths(job, new path("f:\\mrdata\\url\\input"));
        fileoutputformat.setoutputpath(job, new path("f:\\mrdata\\url\\output"));

        job.waitforcompletion(true);

    }
}

 有时一个任务一个mapreduce是完成不了的,有可能会拆分成两个或多个mapreduce

map阶段会有自己的排序机制,比如一组数据(a,1),(b,1),(a,1),(c,1),他会先处理key为1的一组数据,

这个排序机制我们也可以自己去实现,要对这个类实现comparable接口,然后重写compareto方法。

但要注意这个排序机制只是对于一个reducetask来说的,如果有多个的话,只会得到局部排序。

如果要多个reducetask的话,我们就需要控制数据的分发规则,这样虽然是会生成多个排序后的文件,但这些文件整体上依然是有序的。因为我们控制了每一个reducetask处理数据的范围。

 

 

 

额外java知识点补充

treemap,放进去的东西会自动排序

两种treemap的自定义方法,第一种是传入一个comparator

public class treemaptest {
    
    public static void main(string[] args) {
        
        treemap<flowbean, string> tm1 = new treemap<>(new comparator<flowbean>() {
            @override
            public int compare(flowbean o1, flowbean o2) {
                //如果两个类总流量相同的会比较电话号
                if( o2.getamountflow()-o1.getamountflow()==0){
                    return o1.getphone().compareto(o2.getphone());
                }
                //如果流量不同,就按从小到大的顺序排序
                return o2.getamountflow()-o1.getamountflow();
            }
        });
        flowbean b1 = new flowbean("1367788", 500, 300);
        flowbean b2 = new flowbean("1367766", 400, 200);
        flowbean b3 = new flowbean("1367755", 600, 400);
        flowbean b4 = new flowbean("1367744", 300, 500);
        
        tm1.put(b1, null);
        tm1.put(b2, null);
        tm1.put(b3, null);
        tm1.put(b4, null);
        //treeset的遍历
        set<entry<flowbean,string>> entryset = tm1.entryset();
        for (entry<flowbean,string> entry : entryset) {
            system.out.println(entry.getkey() +"\t"+ entry.getvalue());
        }
    }

}

第二种是在这个类中,实现一个comparable接口

package cn.edu360.mr.page.topn;

public class pagecount implements comparable<pagecount>{
    
    private string page;
    private int count;
    
    public void set(string page, int count) {
        this.page = page;
        this.count = count;
    }
    
    public string getpage() {
        return page;
    }
    public void setpage(string page) {
        this.page = page;
    }
    public int getcount() {
        return count;
    }
    public void setcount(int count) {
        this.count = count;
    }

    @override
    public int compareto(pagecount o) {
        return o.getcount()-this.count==0?this.page.compareto(o.getpage()):o.getcount()-this.count;
    }
    
    

}