MapReduce编程实现按词频统计的排序输出
程序员文章站
2022-06-04 19:14:28
...
先计数后排序计数
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text k = new Text();
IntWritable v = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
System.out.println(key.toString());
//获取一行
String line = value.toString();
//切割单词
String[] words = line.split(" ");
//循环翻译
for (String word: words) {
k.set(word);
context.write(k, v);
}
}
}
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
//累计求和
for (IntWritable value : values) {
sum += value.get();
}
v.set(sum);
//输出
context.write(key, v);
}
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[] {"f:/input1","f:/output77"};
Configuration conf = new Configuration();
//获取job
Job job = Job.getInstance(conf);
//设置jar包
job.setJarByClass(WordcountDriver.class);
//关联mapper和reducer
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//map输出的k和v
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//最终输出kv
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}
按词频排序
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class tongji implements WritableComparable <tongji>{
private long number;
public tongji(long number) {
this.number = number;
}
public tongji() {
}
public long getNumber() {
return number;
}
public void setNumber(long number) {
this.number = number;
}
@Override
public String toString() {
return String.valueOf(number);
}
@Override
public int compareTo(tongji o) {
int result;
// 按照总流量大小,倒序排列
if (number > o.getNumber()) {
result = -1;
}else if (number < o.getNumber()) {
result = 1;
}else {
result = 0;
}
return result;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(number);
}
@Override
public void readFields(DataInput in) throws IOException {
number =in.readLong();
}
}
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordcountMapper extends Mapper<LongWritable, Text, tongji, Text>{
tongji k = new tongji();
Text v = new Text();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
//获取一行
String line = value.toString();
//切割单词
String[] fields = line.split("\t");
// 3 封装对象
String danci = fields[0];
long number = Long.parseLong(fields[1]);
k.setNumber(number);
v.set(danci);
context.write(k,v);
}
}
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer<tongji,Text,Text, tongji> {
@Override
protected void reduce(tongji key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
for (Text value : values) {
context.write(value,key);
}
}
}
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordcountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
//获取job
Job job = Job.getInstance(conf);
//设置jar包
job.setJarByClass(WordcountDriver.class);
//关联mapper和reducer
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordCountReducer.class);
//map输出的k和v
job.setMapOutputKeyClass(tongji.class);
job.setMapOutputValueClass(Text.class);
//最终输出kv
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(tongji.class);
//输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//提交job
boolean result = job.waitForCompletion(true);
System.exit(result?0:1);
}
}