使用mapReduce统计文本中单词个数
程序员文章站
2022-05-28 19:37:40
...
上传文件
1 上传文件
hdfs dfs -mkdir /wordfile //创建文件夹
hdfs dfs -put wordcount.txt /wordfile
2 或者代码api
@Before
public void setUp() throws URISyntaxException, IOException, InterruptedException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://qf");
conf.set("dfs.nameservices", "qf");
conf.set("dfs.ha.namenodes.qf", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.qf.nn1", "192.168.1.6:8020");//nn1表示active状态的namenode
conf.set("dfs.namenode.rpc-address.qf.nn2", "192.168.1.5:8020");//nn1表示standby状态的namenode
//conf.setBoolean(name, value);
conf.set("dfs.client.failover.proxy.provider.qf", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
fileSystem = FileSystem.get(new URI("hdfs://qf"), conf, "root");
}
@Test
public void upload() throws IOException {
FSDataOutputStream outputStream = fileSystem.create(new Path("/a.txt"),true);
FileInputStream inputStream = new FileInputStream("d:\\a.txt");
IOUtils.copy(inputStream,outputStream);
IOUtils.closeQuietly(inputStream);
IOUtils.closeQuietly(outputStream);
fileSystem.close();
}
书写map
package com.msunsoft.hadoop.Test;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
//哪个文字出现了几次
/**
* k1:偏移量
* v2:一行中的文字
* k2:结果的key,即哪个文字
* v2:结果的数量,即哪个文字出现了几次
*/
public class MapReduceMapOne extends Mapper<LongWritable, Text,Text,LongWritable> {
//map方法就是把k1,v1转化为k2,v2, key表示key1,value表示v2
//context为上下文,发送给suffer
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
Text text=new Text();LongWritable longWritable = new LongWritable();
String[] split = value.toString().split(" ");
for(String word :split){
text.set(word);
longWritable.set(1);
context.write(text,longWritable);
}
}
}
书写reduce
package com.msunsoft.hadoop.Test;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MapReduceReducerTwo extends Reducer<Text, LongWritable,Text,LongWritable> {
//reduce方法作用,将新的k2和v2转为k3和v3,将k3和v3写入上下文中
//key表示k2,values表示v2的集合,比如一行中有a,a,b,c 则a有2个,b有1个c有1个,则表示为a:[1,1] b:[1],c:[1]
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//遍历集合,将集合中的数字相加,得到v3
long count =0;
for (LongWritable value:values){
count +=value.get();
}
context.write(key,new LongWritable(count));
}
}
书写job调用main方法
package com.msunsoft.hadoop.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.ToolRunner;
import javax.lang.model.SourceVersion;
import org.apache.hadoop.util.Tool;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Set;
public class MapReduceJobMainThree extends Configured implements Tool {
//该方法用于指定一个job任务
@Override
public int run(String[] strings) throws Exception {
//创建一个job任务对象
Job job = Job.getInstance(super.getConf(),"wordcount");
//配置job任务对象(八个步骤)
job.setInputFormatClass(TextInputFormat.class);//指定输入对象
TextInputFormat.addInputPath(job,new Path("hdfs://a.txt"));
//指定map阶段的处理方式
job.setMapperClass(MapReduceMapOne.class);
job.setMapOutputKeyClass(Text.class);//设置map阶段key2的类型
job.setMapOutputValueClass(LongWritable.class);
//后面中间的步采用默认的方式
//第7步 指定reduce阶段的处理方式和数据类型
job.setReducerClass(MapReduceReducerTwo.class);
//设置k3的类型
job.setOutputKeyClass(Text.class);
//设置v3的类型
job.setOutputValueClass(LongWritable.class);
//第7步设置输出类型
job.setOutputFormatClass(TextOutputFormat.class);
//设置输出的路径,windows为file:///D:\mapreduce\\output
TextOutputFormat.setOutputPath(job,new Path("hdfs://node5:8020/wordcount.out"));
//等待任务结束
boolean bl = job.waitForCompletion(true);
return bl?0:1;
}
//启动job任务
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
int run = ToolRunner.run(configuration, new MapReduceJobMainThree(),args);
System.exit(run);
}
}
调用方式,打jar包或调用程序main方法
程序调用main方法需要改程序文件路径
上一篇: 搭建学习环境准备篇