欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

使用mapReduce统计文本中单词个数

程序员文章站 2022-05-28 19:37:40
...

上传文件

1 上传文件
hdfs dfs -mkdir /wordfile //创建文件夹
hdfs dfs -put wordcount.txt /wordfile
2 或者代码api

    @Before
    public void setUp() throws URISyntaxException, IOException, InterruptedException {

        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://qf");
        conf.set("dfs.nameservices", "qf");
        conf.set("dfs.ha.namenodes.qf", "nn1,nn2");
        conf.set("dfs.namenode.rpc-address.qf.nn1", "192.168.1.6:8020");//nn1表示active状态的namenode
        conf.set("dfs.namenode.rpc-address.qf.nn2", "192.168.1.5:8020");//nn1表示standby状态的namenode

//conf.setBoolean(name, value);
        conf.set("dfs.client.failover.proxy.provider.qf", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
        fileSystem = FileSystem.get(new URI("hdfs://qf"), conf, "root");
    }
    @Test
    public void upload() throws IOException {
        FSDataOutputStream outputStream = fileSystem.create(new Path("/a.txt"),true);
        FileInputStream inputStream = new FileInputStream("d:\\a.txt");
        IOUtils.copy(inputStream,outputStream);
        IOUtils.closeQuietly(inputStream);
        IOUtils.closeQuietly(outputStream);
        fileSystem.close();
    }

书写map

package com.msunsoft.hadoop.Test;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
//哪个文字出现了几次
/**
 * k1:偏移量
 * v2:一行中的文字
 * k2:结果的key,即哪个文字
 * v2:结果的数量,即哪个文字出现了几次
 */
public class MapReduceMapOne extends Mapper<LongWritable, Text,Text,LongWritable> {
    //map方法就是把k1,v1转化为k2,v2, key表示key1,value表示v2
    //context为上下文,发送给suffer
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text text=new Text();LongWritable longWritable = new LongWritable();
        String[] split = value.toString().split(" ");
        for(String word :split){
            text.set(word);
            longWritable.set(1);
            context.write(text,longWritable);
        }
    }
}

书写reduce

package com.msunsoft.hadoop.Test;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MapReduceReducerTwo extends Reducer<Text, LongWritable,Text,LongWritable> {
    //reduce方法作用,将新的k2和v2转为k3和v3,将k3和v3写入上下文中
    //key表示k2,values表示v2的集合,比如一行中有a,a,b,c 则a有2个,b有1个c有1个,则表示为a:[1,1] b:[1],c:[1]
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //遍历集合,将集合中的数字相加,得到v3
        long count =0;
        for (LongWritable value:values){
            count +=value.get();
        }
        context.write(key,new LongWritable(count));
    }
}

书写job调用main方法

package com.msunsoft.hadoop.Test;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;

import javax.lang.model.SourceVersion;
import org.apache.hadoop.util.Tool;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Set;

public class MapReduceJobMainThree extends Configured implements Tool {

    //该方法用于指定一个job任务
    @Override
    public int run(String[] strings) throws Exception {
        //创建一个job任务对象
        Job job = Job.getInstance(super.getConf(),"wordcount");
        //配置job任务对象(八个步骤)
        job.setInputFormatClass(TextInputFormat.class);//指定输入对象
        TextInputFormat.addInputPath(job,new Path("hdfs://a.txt"));
        //指定map阶段的处理方式
        job.setMapperClass(MapReduceMapOne.class);
        job.setMapOutputKeyClass(Text.class);//设置map阶段key2的类型
        job.setMapOutputValueClass(LongWritable.class);
        //后面中间的步采用默认的方式
        //第7步 指定reduce阶段的处理方式和数据类型
        job.setReducerClass(MapReduceReducerTwo.class);
        //设置k3的类型
        job.setOutputKeyClass(Text.class);
        //设置v3的类型
        job.setOutputValueClass(LongWritable.class);
        //第7步设置输出类型
        job.setOutputFormatClass(TextOutputFormat.class);
        //设置输出的路径,windows为file:///D:\mapreduce\\output
        TextOutputFormat.setOutputPath(job,new Path("hdfs://node5:8020/wordcount.out"));

        //等待任务结束
        boolean bl = job.waitForCompletion(true);
        return bl?0:1;
    }
    //启动job任务
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        int run = ToolRunner.run(configuration, new MapReduceJobMainThree(),args);
        System.exit(run);
    }
}

调用方式,打jar包或调用程序main方法

程序调用main方法需要改程序文件路径
使用mapReduce统计文本中单词个数

相关标签: hadoop mapreduce