大数据之Hadoop-MapReduce On Yarn
大数据之Hadoop-Map/Reduce On Yarn
1.网站基本指标分析
(1)PV:Page View浏览量,页面的浏览次数,衡量网站用户访问的网页数量;用户每打开一个页面就记录一次,多次打开同一个页面则浏览量累计;
(2)UV:Unique Visitor,独立访客数,1天内访问某站点的人数(以cookie为依据),一天内同一访客的多次访问只计为一个访客;
(3)IP:Internal Protocal,独立IP数,指1天内使用不同IP地址的用户访问网站的数量,同一IP不管访问了几个页面,独立IP均为1;
(4)VV:Visit View,访客的访客次数,记录所有访客1天内访问了访问了多少次你的网站;当访客完成浏览并关掉该网站的所有页面时便完成了一次访问,同一访客1天内可能有多次访问记录。
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyPVMapReduce extends Configured implements Tool {
//1、自己的map类
//继承Mapper类,<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 输入的key,输入的value,输出的key,输出的value
public static class MyPVMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
Text province_id = new Text();
IntWritable mr_value = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
//(pro_id,1)
//获得一整条数据
String line = key.toString();
//将数据进行切割获得一串数组
String [] str = line.split("\t");
//取出数组下标为23的字符串 =》 pro_id
String pro_id = str[23];
String url = str[1];
/**
* 其实很多情况下,数据是不完整的,所以在map方法当中,我们要对数据进行清洗
* 做if判断,去掉不符合逻辑的数据
*/
//切割之后数组长度小于30,认为这条数据字段缺失,丢弃
if(str.length <= 30){
return;
}
if(StringUtils.isBlank(url)){
return;
}
//最后要具体查看数据结果,判断是否符合我们的要求(可能还要做更多的过滤)
province_id.set(pro_id);
context.write(province_id, mr_value);
}
}
//1.5 实现combiner
public static class MyCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
}
//2自己的reduce类
// reduce类的输入,其实就是map类中map方法的输出 输入key 输入value 输出key 输出value
public static class MyPVReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
IntWritable total = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values,
Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int count = -1;
for (IntWritable intWritable : values) {
count += intWritable.get();
}
total.set(count);
context.write(key, total);
}
}
//3运行类,run方法,在测试的时候使用main函数,调用这个类的run方法来运行
/**
*
* @param args 参数是要接受main方法得到的参数,在run中使用
* @return
* @throws Exception
*/
public int run(String[] args) throws Exception {
//通过调用this的getConf方法得到从外部传入的conf对象
Configuration conf = this.getConf();
Job job = Job.getInstance(conf,this.getClass().getSimpleName());
job.setJarByClass(MyPVMapReduce.class);
//输入路径
Path inpath = new Path(args[0]);
FileInputFormat.addInputPath(job, inpath);
//输出路径
Path outpath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outpath);
//执行前先判断输出路径是否存在,存在就将该路径删除
FileSystem fs = outpath.getFileSystem(conf);
if(fs.exists(outpath)){
fs.delete(outpath,true);
}
//设置Map相关参数
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setMapperClass(MyPVMapper.class);
//设置shuffle
//设置reduce相关参数
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(MyPVReducer.class);
int isSuccess = job.waitForCompletion(true)?0:1;
return isSuccess;
}
public static void main(String[] args) {
Configuration conf = new Configuration();
args = new String[]{
"hdfs://192.168.126.160:8020/2015082818",
"hdfs://192.168.126.160:8020/out"
};
try {
int isSucces = ToolRunner.run(conf,new MyPVMapReduce(), args);
System.out.println("isSuccess"+isSucces);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2.MapReduce二次排序
1.准备要分析的数据
year tmp date
1999 20 10-21
2000 31 06-10
2001 25 05-01
2000 26 08-21
2001 28 08-11
2001 24 08-12
1999 20 11-24
2000 34 03-10
2001 24 03-01
2000 26 08-29
2001 24 01-19
2001 21 02-13
2003 12 06-15
2006 19 02-13
我们需要的格式,首先同一个年份放在同一个partition中,然后温度按照升序排序
默认的key满足不了我们的要求,所以我们要使用自定义的key,然后自定义排序。
修改了key,默认的分区规则,我们也要修改
2.自定义key:
1、数据类型:
数据类型都实现writable接口,以便用这些类型定义的数据可以被序列化进行网络传输和文件存储
基本数据类型(MR中的所有数据类型都统一的实现了writable的接口):
BooleanWritable:标准布尔型数值 ByteWritable:单字节数值
DoubleWritable:双字节数值 FloatWritable:浮点数
IntWritable:整形数值 LongWritable:长整形数值
Text:使用UTF8格式存储的文本
NullWritable:当<key,value>中的key或value为空时使用
2、key要实现排序和对象序列化(比如二次排序中的运用)
因此key要实现WritableComparable接口
3、hadoop 中还供了一种更原生的比较器 RawComparator,WritableComparator就是其子类
RawComparator的一个优势在与其可以直接对序列化的对象进行比较,在key中要实现这个接口,还有自己重写的sort和group,都要实现这个接口
package com.nike.hadoop.demo02;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class CustomerKey implements WritableComparable<CustomerKey>{
private int year;
private double temp;
private String date;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public double getTemp() {
return temp;
}
public void setTemp(double temp) {
this.temp = temp;
}
public String getDate() {
return date;
}
public void setDate(String date) {
this.date = date;
}
public void setAll(int year, double temp, String date) {
this.year = year;
this.temp = temp;
this.date = date;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeDouble(temp);
out.writeUTF(date);
}
@Override
public void readFields(DataInput in) throws IOException {
this.year = in.readInt();
this.temp = in.readDouble();
this.date = in.readUTF();
}
@Override
public int compareTo(CustomerKey o) {
//自定义key排序
if(this.year == o.year){
//年份相等,进行温度的比较
return this.temp - o.temp>0?1:-1;
}
//年份不相等,直接相减
return this.year - o.year;
}
}
自定Map/Reduce
package com.nike.hadoop.demo02;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Shell;
public class MySortMapRed {
//1.自定义Map类
/**
* @author 猪猪
* 继承Mapper类,<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KETIN:输入的key
* VALUEIN:输入的value
* KEYOUT:输出的key
* VALUEOUT:输出的value
*/
public static class MyMapper extends Mapper<LongWritable, Text, CustomerKey, NullWritable>{
CustomerKey customerKey = new CustomerKey();
NullWritable mr_val = NullWritable.get();
@Override
protected void map(LongWritable key,Text value,
Mapper<LongWritable, Text, CustomerKey, NullWritable>.Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] vals = line.split("\t");
int year = Integer.parseInt(vals[0]);
double tmp = Double.parseDouble(vals[1]);
String date = vals[2];
customerKey.setAll(year, tmp, date);
context.write(customerKey, mr_val);
}
}
//2.自定义reduce类
//reduce类的输入,其实就是map类的输出
public static class MyReducer extends Reducer<CustomerKey, NullWritable,CustomerKey, NullWritable>{
private NullWritable rval = NullWritable.get();
@Override
protected void reduce(CustomerKey mKey,Iterable<NullWritable> mVal,
Reducer<CustomerKey, NullWritable, CustomerKey, NullWritable>.Context context)
throws IOException, InterruptedException {
context.write(mKey, rval);
}
}
//3.运行类,run方法
public int run(String[] args) throws Exception{
Configuration config = new Configuration();
//通过上下文,构建一个job实例,并且传入名称
Job job = Job.getInstance(config,this.getClass().getSimpleName());
//该参数必须添加,否则本地运行没有问题,服务器上会报错。
job.setJarByClass(MySortMapRed.class);
//设置任务从哪里读取数据
//调用这个任务的时候,要往args中传入参数,第一参数为数据源
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
//调用这个任务的时候,要往args中传入参数,第二参数为结果输出源
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
//设置Mapper类参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(CustomerKey.class);
job.setMapOutputValueClass(NullWritable.class);
//设置reduce类参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(CustomerKey.class);
job.setOutputValueClass(NullWritable.class);
//job.setCombinerClass(MyCombiner.class);
//job.setNumReduceTasks(2);
//提交job运行成功返回0,运行失败返回1;
boolean isSuccess = job.waitForCompletion(true);
return isSuccess?0:1;
}
public static void main(String[] args) {
args = new String[]{
"hdfs://192.168.159.122:8020/tmp.txt",
"hdfs://192.168.159.122:8020/sort01"
};
MySortMapRed myMapRed = new MySortMapRed();
try {
int res = -1;
res = myMapRed.run(args);
System.out.println(res);
} catch (Exception e) {
e.printStackTrace();
}
}
}
最终得到的结果为
数据:year=1999, temp=20.0, date=11-24
数据:year=1999, temp=20.0, date=10-21
数据:year=2000, temp=26.0, date=08-29
数据:year=2000, temp=26.0, date=08-21
数据:year=2000, temp=31.0, date=06-10
数据:year=2000, temp=34.0, date=03-10
数据:year=2001, temp=21.0, date=02-13
数据:year=2001, temp=24.0, date=03-01
数据:year=2001, temp=24.0, date=01-19
数据:year=2001, temp=24.0, date=08-12
数据:year=2001, temp=25.0, date=05-01
数据:year=2001, temp=28.0, date=08-11
数据:year=2003, temp=12.0, date=06-15
数据:year=2006, temp=19.0, date=02-13
数据:year=2006, temp=31.0, date=07-13
需求:根据年份获取不同的数据块
实现:自定义分区Partition
package com.nike.hadoop.demo02;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class MySortPartition extends Partitioner<CustomerKey, NullWritable>{
@Override
public int getPartition(CustomerKey key, NullWritable value,
int numPartitions) {
System.out.println("自定义分区。。。。。。。。");
return key.getYear()%6;
}
}
//3.运行类,run方法
public int run(String[] args) throws Exception{
Configuration config = new Configuration();
//通过上下文,构建一个job实例,并且传入名称
Job job = Job.getInstance(config,this.getClass().getSimpleName());
//该参数必须添加,否则本地运行没有问题,服务器上会报错。
job.setJarByClass(MySortMapRed.class);
//设置任务从哪里读取数据
//调用这个任务的时候,要往args中传入参数,第一参数为数据源
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
//调用这个任务的时候,要往args中传入参数,第二参数为结果输出源
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
//设置Mapper类参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(CustomerKey.class);
job.setMapOutputValueClass(NullWritable.class);
//设置分区和reduce
job.setPartitionerClass(MySortPartition.class);
job.setNumReduceTasks(6);
//设置reduce类参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(CustomerKey.class);
job.setOutputValueClass(NullWritable.class);
//job.setCombinerClass(MyCombiner.class);
//job.setNumReduceTasks(2);
//提交job运行成功返回0,运行失败返回1;
boolean isSuccess = job.waitForCompletion(true);
return isSuccess?0:1;
}
运行结果:
实现:自定义分组
group:把key相同的数据value放到同一个迭代器中
需求:相同年份的记录放到一起
hadoop 中还供了一种更原生的比较器 RawComparator,WritableComparator就是其子类
RawComparator的一个优势在与其可以直接对序列化的对象进行比较,在key中要实现这个接口,还有自己重写的sort和group,都要实现这个接口。
package com.nike.hadoop.demo02;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MySortGroup extends WritableComparator{
//如果不初始化CustomerKey对象,就会报空指针异常;
public MySortGroup() {
super(CustomerKey.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
int ayear = ((CustomerKey)a).getYear();
int byear = ((CustomerKey)a).getYear();
return ayear-byear;
}
}
public int run(String[] args) throws Exception{
Configuration config = new Configuration();
//通过上下文,构建一个job实例,并且传入名称
Job job = Job.getInstance(config,this.getClass().getSimpleName());
//该参数必须添加,否则本地运行没有问题,服务器上会报错。
job.setJarByClass(MySortMapRed.class);
//设置任务从哪里读取数据
//调用这个任务的时候,要往args中传入参数,第一参数为数据源
Path inPath = new Path(args[0]);
FileInputFormat.addInputPath(job, inPath);
//调用这个任务的时候,要往args中传入参数,第二参数为结果输出源
Path outPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outPath);
//设置Mapper类参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(CustomerKey.class);
job.setMapOutputValueClass(NullWritable.class);
//设置分区和reduce
job.setPartitionerClass(MySortPartition.class);
job.setNumReduceTasks(8);
//设置分组
job.setGroupingComparatorClass(MySortGroup.class);
//设置reduce类参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(CustomerKey.class);
job.setOutputValueClass(NullWritable.class);
//job.setCombinerClass(MyCombiner.class);
//job.setNumReduceTasks(2);
//提交job运行成功返回0,运行失败返回1;
boolean isSuccess = job.waitForCompletion(true);
return isSuccess?0:1;
}
计算结果:
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00000
数据:year=2000, temp=20.0, date=06-03
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00001
数据:year=2001, temp=12.0, date=01-12
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00002
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00003
数据:year=2003, temp=12.0, date=06-15
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00004
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00005
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00006
数据:year=2006, temp=19.0, date=02-13
[aaa@qq.com ~]# hdfs dfs -cat /sort05/part-r-00007
数据:year=1999, temp=20.0, date=11-24
上一篇: html基础-基础知识
下一篇: 更清晰的查看网页结构