欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce编程实现按词频统计的排序输出

程序员文章站 2022-06-04 19:14:28
...

先计数后排序
计数

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
	
	Text k = new Text();
	IntWritable v = new IntWritable(1);
	@Override
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		System.out.println(key.toString());
		//获取一行
		String line = value.toString();
		//切割单词
		String[] words = line.split(" ");
		//循环翻译
		for (String word: words) {
		k.set(word);
		context.write(k, v);
		}
	}
}
import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	IntWritable v = new IntWritable();
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Context context) throws IOException, InterruptedException {
		int sum = 0;
		//累计求和
		for (IntWritable value : values) {
			sum += value.get();
			}
		v.set(sum);
		//输出
		context.write(key, v);
	}

}

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordcountDriver {
	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
		args = new String[] {"f:/input1","f:/output77"};
		Configuration conf = new Configuration();
		//获取job
		Job job = Job.getInstance(conf);
		//设置jar包
		job.setJarByClass(WordcountDriver.class);
		//关联mapper和reducer
		job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);
		//map输出的k和v
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
		//最终输出kv
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
		//输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
		//提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
	}

}

按词频排序

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;


public class tongji implements WritableComparable <tongji>{
    private long number;

    public tongji(long number) {
        this.number = number;
    }

    public tongji() {
    }

    public long getNumber() {
        return number;
    }

    public void setNumber(long number) {
        this.number = number;
    }

    @Override
    public String toString() {
        return String.valueOf(number);
    }




    @Override
    public int compareTo(tongji o) {
        int result;

        // 按照总流量大小,倒序排列
        if (number > o.getNumber()) {
            result = -1;
        }else if (number < o.getNumber()) {
            result = 1;
        }else {
            result = 0;
        }

        return result;
    }




    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(number);

    }

    @Override
    public void readFields(DataInput in) throws IOException {
        number =in.readLong();

    }
}

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordcountMapper extends Mapper<LongWritable, Text, tongji, Text>{

    tongji k = new tongji();
    Text v = new Text();
    @Override
    protected void map(LongWritable key, Text value,Context context)
            throws IOException, InterruptedException {
        //获取一行
        String line = value.toString();
        //切割单词
        String[] fields = line.split("\t");
        // 3 封装对象
        String danci = fields[0];
        long number = Long.parseLong(fields[1]);
        k.setNumber(number);
        v.set(danci);
        context.write(k,v);
    }
}

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<tongji,Text,Text, tongji> {

    @Override
    protected void reduce(tongji key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        for (Text value : values) {
            context.write(value,key);
        }

    }
}

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordcountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        //获取job
        Job job = Job.getInstance(conf);
        //设置jar包
        job.setJarByClass(WordcountDriver.class);
        //关联mapper和reducer
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordCountReducer.class);
        //map输出的k和v
        job.setMapOutputKeyClass(tongji.class);
        job.setMapOutputValueClass(Text.class);
        //最终输出kv
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(tongji.class);
        //输入和输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        //提交job
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
    }

}


相关标签: Mapreduce