欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

大数据之Hadoop-MapReduce On Yarn

程序员文章站 2022-04-29 10:57:57
...

大数据之Hadoop-Map/Reduce On Yarn

1.网站基本指标分析

(1)PV:Page View浏览量,页面的浏览次数,衡量网站用户访问的网页数量;用户每打开一个页面就记录一次,多次打开同一个页面则浏览量累计;

(2)UV:Unique Visitor,独立访客数,1天内访问某站点的人数(以cookie为依据),一天内同一访客的多次访问只计为一个访客;

(3)IP:Internal Protocal,独立IP数,指1天内使用不同IP地址的用户访问网站的数量,同一IP不管访问了几个页面,独立IP均为1;

(4)VV:Visit View,访客的访客次数,记录所有访客1天内访问了访问了多少次你的网站;当访客完成浏览并关掉该网站的所有页面时便完成了一次访问,同一访客1天内可能有多次访问记录。

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


public class MyPVMapReduce extends Configured implements Tool {
	
	//1、自己的map类
	//继承Mapper类,<KEYIN, VALUEIN, KEYOUT, VALUEOUT>   输入的key,输入的value,输出的key,输出的value
	public static class MyPVMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
		Text province_id = new Text();
		IntWritable mr_value = new IntWritable(1);
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {
			//(pro_id,1)
			//获得一整条数据
			String line = key.toString();
			//将数据进行切割获得一串数组
			String [] str = line.split("\t");
			//取出数组下标为23的字符串  =》 pro_id
			String pro_id = str[23];
			String url = str[1];
			
			/**
			 * 其实很多情况下,数据是不完整的,所以在map方法当中,我们要对数据进行清洗
			 * 做if判断,去掉不符合逻辑的数据
			 */
			//切割之后数组长度小于30,认为这条数据字段缺失,丢弃
			if(str.length <= 30){
				return;
			}
			if(StringUtils.isBlank(url)){
				return;
			}
			//最后要具体查看数据结果,判断是否符合我们的要求(可能还要做更多的过滤)
			province_id.set(pro_id);
			context.write(province_id, mr_value);
			
		}
		
		
			
	}
	
	
	//1.5 实现combiner
	public static class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{

	
}
	
	
	
	
	//2自己的reduce类
	//	reduce类的输入,其实就是map类中map方法的输出							输入key  输入value  输出key  输出value
	public static class MyPVReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
		IntWritable total = new IntWritable();
		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
				
			int count = -1;
			for (IntWritable intWritable : values) {
				count += intWritable.get();
				}
			total.set(count);
			context.write(key, total);
		}
		

		
	}
	//3运行类,run方法,在测试的时候使用main函数,调用这个类的run方法来运行
	
	/**
	 * 
	 * @param args 参数是要接受main方法得到的参数,在run中使用
	 * @return
	 * @throws Exception
	 */
    public int run(String[] args) throws Exception {  
    	//通过调用this的getConf方法得到从外部传入的conf对象
    	Configuration conf = this.getConf();
    	
    	Job job = Job.getInstance(conf,this.getClass().getSimpleName());
    	
    	job.setJarByClass(MyPVMapReduce.class);
    	
    	//输入路径
    	Path inpath = new Path(args[0]);
    	FileInputFormat.addInputPath(job, inpath);
    	//输出路径
    	Path outpath = new Path(args[1]);
    	FileOutputFormat.setOutputPath(job, outpath);
    	
    	//执行前先判断输出路径是否存在,存在就将该路径删除
    	FileSystem fs = outpath.getFileSystem(conf);
    	if(fs.exists(outpath)){
    		fs.delete(outpath,true);
    	}
    	
    	
    	//设置Map相关参数
    	job.setMapOutputKeyClass(Text.class);
    	job.setMapOutputValueClass(IntWritable.class);
    	job.setMapperClass(MyPVMapper.class);
    	
    	//设置shuffle
    	
    	
    	//设置reduce相关参数
    	job.setOutputKeyClass(Text.class);
    	job.setOutputValueClass(IntWritable.class);
    	job.setReducerClass(MyPVReducer.class);
    	
    	int isSuccess =  job.waitForCompletion(true)?0:1;
    	
    	return isSuccess;
    }  

    
    public static void main(String[] args) {
    	Configuration conf = new Configuration();
    	args = new String[]{
    			"hdfs://192.168.126.160:8020/2015082818",
    			"hdfs://192.168.126.160:8020/out"
    	};
    	try {
			int isSucces =  ToolRunner.run(conf,new MyPVMapReduce(), args);
			System.out.println("isSuccess"+isSucces);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
    }			
}

2.MapReduce二次排序

1.准备要分析的数据

year	tmp	date
1999	20	10-21
2000	31	06-10
2001	25	05-01
2000	26	08-21
2001	28	08-11
2001	24	08-12
1999	20	11-24
2000	34	03-10
2001	24	03-01
2000	26	08-29
2001	24	01-19
2001	21	02-13
2003	12	06-15
2006	19	02-13

我们需要的格式,首先同一个年份放在同一个partition中,然后温度按照升序排序
默认的key满足不了我们的要求,所以我们要使用自定义的key,然后自定义排序。
修改了key,默认的分区规则,我们也要修改

2.自定义key:
1、数据类型:
数据类型都实现writable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储
基本数据类型(MR中的所有数据类型都统一的实现了writable的接口):
BooleanWritable:标准布尔型数值 ByteWritable:单字节数值
DoubleWritable:双字节数值 FloatWritable:浮点数

IntWritable:整形数值 LongWritable:长整形数值
Text:使用UTF8格式存储的文本
NullWritable:当<key,value>中的key或value为空时使用
2、key要实现排序和对象序列化(比如二次排序中的运用)
因此key要实现WritableComparable接口
3、hadoop 中还供了一种更原生的比较器 RawComparator,WritableComparator就是其子类
RawComparator的一个优势在与其可以直接对序列化的对象进行比较,在key中要实现这个接口,还有自己重写的sort和group,都要实现这个接口

package com.nike.hadoop.demo02;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class CustomerKey implements WritableComparable<CustomerKey>{
	private int year;
	private double temp;
	private String date;

	public int getYear() {
		return year;
	}

	public void setYear(int year) {
		this.year = year;
	}

	public double getTemp() {
		return temp;
	}

	public void setTemp(double temp) {
		this.temp = temp;
	}

	public String getDate() {
		return date;
	}

	public void setDate(String date) {
		this.date = date;
	}
	
	

	public void setAll(int year, double temp, String date) {
		this.year = year;
		this.temp = temp;
		this.date = date;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeInt(year);
		out.writeDouble(temp);
		out.writeUTF(date);
		
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.year = in.readInt();
		this.temp = in.readDouble();
		this.date = in.readUTF();
		
	}

	@Override
	public int compareTo(CustomerKey o) {
		//自定义key排序
		if(this.year == o.year){
			//年份相等,进行温度的比较
			return this.temp - o.temp>0?1:-1;
		}
		//年份不相等,直接相减
		return this.year - o.year;
	}

}

自定Map/Reduce

package com.nike.hadoop.demo02;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Shell;


public class MySortMapRed {

	//1.自定义Map类
	/**
	 * @author 猪猪
	 * 继承Mapper类,<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
	 * KETIN:输入的key
	 * VALUEIN:输入的value
	 * KEYOUT:输出的key
	 * VALUEOUT:输出的value
	 */
	public static class MyMapper extends Mapper<LongWritable, Text, CustomerKey, NullWritable>{
		CustomerKey customerKey = new CustomerKey();
		NullWritable mr_val = NullWritable.get();
		@Override
		protected void map(LongWritable key,Text value,
				Mapper<LongWritable, Text, CustomerKey, NullWritable>.Context context)
				throws IOException, InterruptedException {
			String line = value.toString();
			String[] vals = line.split("\t");
			int year = Integer.parseInt(vals[0]);
			double tmp = Double.parseDouble(vals[1]);
			String date = vals[2];
			
			customerKey.setAll(year, tmp, date);
			context.write(customerKey, mr_val);
		}		
	}
	//2.自定义reduce类
	//reduce类的输入,其实就是map类的输出
	public static class MyReducer extends Reducer<CustomerKey, NullWritable,CustomerKey, NullWritable>{
		private NullWritable rval = NullWritable.get();
		@Override
		protected void reduce(CustomerKey mKey,Iterable<NullWritable> mVal,
				Reducer<CustomerKey, NullWritable, CustomerKey, NullWritable>.Context context)
				throws IOException, InterruptedException {
				context.write(mKey, rval);
		}
	}
	
	//3.运行类,run方法
	public int run(String[] args) throws Exception{
		Configuration config = new Configuration();
		//通过上下文,构建一个job实例,并且传入名称
		Job job = Job.getInstance(config,this.getClass().getSimpleName());
		//该参数必须添加,否则本地运行没有问题,服务器上会报错。
		job.setJarByClass(MySortMapRed.class);
		
		//设置任务从哪里读取数据
		//调用这个任务的时候,要往args中传入参数,第一参数为数据源
		Path inPath = new Path(args[0]);
		FileInputFormat.addInputPath(job, inPath);
		//调用这个任务的时候,要往args中传入参数,第二参数为结果输出源
		Path outPath = new Path(args[1]);
		FileOutputFormat.setOutputPath(job, outPath);
		
		//设置Mapper类参数
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(CustomerKey.class);
		job.setMapOutputValueClass(NullWritable.class);
		//设置reduce类参数
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(CustomerKey.class);
		job.setOutputValueClass(NullWritable.class);
		//job.setCombinerClass(MyCombiner.class);
		//job.setNumReduceTasks(2);
		
		//提交job运行成功返回0,运行失败返回1;
		boolean isSuccess = job.waitForCompletion(true);
		return isSuccess?0:1;
		
	} 
	
	public static void main(String[] args) {
		args = new String[]{
			"hdfs://192.168.159.122:8020/tmp.txt",	
			"hdfs://192.168.159.122:8020/sort01"	
		};
		MySortMapRed myMapRed = new MySortMapRed();
		try {
			int res = -1;
			res = myMapRed.run(args);
			System.out.println(res);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

最终得到的结果为

数据:year=1999, temp=20.0, date=11-24
数据:year=1999, temp=20.0, date=10-21
数据:year=2000, temp=26.0, date=08-29
数据:year=2000, temp=26.0, date=08-21
数据:year=2000, temp=31.0, date=06-10
数据:year=2000, temp=34.0, date=03-10
数据:year=2001, temp=21.0, date=02-13
数据:year=2001, temp=24.0, date=03-01
数据:year=2001, temp=24.0, date=01-19
数据:year=2001, temp=24.0, date=08-12
数据:year=2001, temp=25.0, date=05-01
数据:year=2001, temp=28.0, date=08-11
数据:year=2003, temp=12.0, date=06-15
数据:year=2006, temp=19.0, date=02-13
数据:year=2006, temp=31.0, date=07-13

需求:根据年份获取不同的数据块

实现:自定义分区Partition

package com.nike.hadoop.demo02;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class MySortPartition extends Partitioner<CustomerKey, NullWritable>{

	@Override
	public int getPartition(CustomerKey key, NullWritable value,
			int numPartitions) {
		System.out.println("自定义分区。。。。。。。。");
		return key.getYear()%6;
	}
}
	//3.运行类,run方法
	public int run(String[] args) throws Exception{
		Configuration config = new Configuration();
		//通过上下文,构建一个job实例,并且传入名称
		Job job = Job.getInstance(config,this.getClass().getSimpleName());
		//该参数必须添加,否则本地运行没有问题,服务器上会报错。
		job.setJarByClass(MySortMapRed.class);
		
		//设置任务从哪里读取数据
		//调用这个任务的时候,要往args中传入参数,第一参数为数据源
		Path inPath = new Path(args[0]);
		FileInputFormat.addInputPath(job, inPath);
		//调用这个任务的时候,要往args中传入参数,第二参数为结果输出源
		Path outPath = new Path(args[1]);
		FileOutputFormat.setOutputPath(job, outPath);
		
		//设置Mapper类参数
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(CustomerKey.class);
		job.setMapOutputValueClass(NullWritable.class);
		//设置分区和reduce
		job.setPartitionerClass(MySortPartition.class);
		job.setNumReduceTasks(6);
		//设置reduce类参数
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(CustomerKey.class);
		job.setOutputValueClass(NullWritable.class);
		//job.setCombinerClass(MyCombiner.class);
		//job.setNumReduceTasks(2);
		
		//提交job运行成功返回0,运行失败返回1;
		boolean isSuccess = job.waitForCompletion(true);
		return isSuccess?0:1;
		
	} 

运行结果:

大数据之Hadoop-MapReduce On Yarn

实现:自定义分组

group:把key相同的数据value放到同一个迭代器中

需求:相同年份的记录放到一起

hadoop 中还供了一种更原生的比较器 RawComparator,WritableComparator就是其子类
RawComparator的一个优势在与其可以直接对序列化的对象进行比较,在key中要实现这个接口,还有自己重写的sort和group,都要实现这个接口。

package com.nike.hadoop.demo02;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MySortGroup extends WritableComparator{
	//如果不初始化CustomerKey对象,就会报空指针异常;
	public MySortGroup() {
		super(CustomerKey.class,true);
	}
	
	@Override
	public int compare(WritableComparable a, WritableComparable b) {
		int ayear = ((CustomerKey)a).getYear();
		int byear = ((CustomerKey)a).getYear();
		
		return ayear-byear;
	}
}
	public int run(String[] args) throws Exception{
		Configuration config = new Configuration();
		//通过上下文,构建一个job实例,并且传入名称
		Job job = Job.getInstance(config,this.getClass().getSimpleName());
		//该参数必须添加,否则本地运行没有问题,服务器上会报错。
		job.setJarByClass(MySortMapRed.class);
		
		//设置任务从哪里读取数据
		//调用这个任务的时候,要往args中传入参数,第一参数为数据源
		Path inPath = new Path(args[0]);
		FileInputFormat.addInputPath(job, inPath);
		//调用这个任务的时候,要往args中传入参数,第二参数为结果输出源
		Path outPath = new Path(args[1]);
		FileOutputFormat.setOutputPath(job, outPath);
		
		//设置Mapper类参数
		job.setMapperClass(MyMapper.class);
		job.setMapOutputKeyClass(CustomerKey.class);
		job.setMapOutputValueClass(NullWritable.class);
		//设置分区和reduce
		job.setPartitionerClass(MySortPartition.class);
		job.setNumReduceTasks(8);
		//设置分组
		job.setGroupingComparatorClass(MySortGroup.class);
		//设置reduce类参数
		job.setReducerClass(MyReducer.class);
		job.setOutputKeyClass(CustomerKey.class);
		job.setOutputValueClass(NullWritable.class);
		//job.setCombinerClass(MyCombiner.class);
		//job.setNumReduceTasks(2);
		
		//提交job运行成功返回0,运行失败返回1;
		boolean isSuccess = job.waitForCompletion(true);
		return isSuccess?0:1;
		
	}

计算结果:

[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00000
数据:year=2000, temp=20.0, date=06-03
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00001
数据:year=2001, temp=12.0, date=01-12
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00002
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00003
数据:year=2003, temp=12.0, date=06-15
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00004
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00005
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00006
数据:year=2006, temp=19.0, date=02-13
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00007
数据:year=1999, temp=20.0, date=11-24