欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop案例,附代码

程序员文章站 2022-07-13 15:18:35
...

一、气温指数分析案例

1、需求

找出每个月气温最高的2天

2、数据准备

tianqi.txt

1949-10-01 14:21:02	34c
1949-10-01 19:21:02	38c
1949-10-02 14:01:02	36c
1950-01-01 11:21:02	32c
1950-10-01 12:21:02	37c
1951-12-01 12:21:02	23c
1950-10-02 12:21:02	41c
1950-10-03 12:21:02	27c
1951-07-01 12:21:02	45c
1951-07-02 12:21:02	46c
1951-07-03 12:21:03	47c

结果:
1949 10 1 38c
1949 10 2 36c
1950 1 1 32c
1950 10 2 41c
1950 10 1 37c
1951 7 3 47c
1951 7 2 46c
1951 12 1 23c

3、思路

每年
每个月
最高
2天
1天多条记录?

进一步思考:
年月分组
通过GroupCompartor设置分组规则,保证相同年月的分到同个组
温度降序的方式进入同个组,
通过SortComparator二次排序
遍历所有value,找出温度最高值,并记录对应天,
再找温度次高值,并且确保该次高值的天与温度最高值所对应的天不一样
最结果按照年月日排序

key中要包含时间和温度
自定义数据类型TianQi
包含时间
包含温度
自定义排序比较规则
自定义分组比较
年月相同被视为相同的key
那么reduce迭代时,相同年月的记录有可能是同一天的
reduce中需要判断是否同一天
注意OOM(内存溢出)
数据量很大
全量数据可以切分成最少按一个月份的数据量进行判断
这种业务场景可以设置多个reduce
通过实现partition

4、代码实现

a)TianQi

package com.bigdata.tianqi;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class TianQi implements WritableComparable<TianQi>{
	
	private int year ;
	private int month;
	private int day;
	private int wd;
	public TianQi() {
		super();
		// TODO Auto-generated constructor stub
	}
	public TianQi(int year, int month, int day, int wd) {
		super();
		this.year = year;
		this.month = month;
		this.day = day;
		this.wd = wd;
	}
	public int getYear() {
		return year;
	}
	public void setYear(int year) {
		this.year = year;
	}
	public int getMonth() {
		return month;
	}
	public void setMonth(int month) {
		this.month = month;
	}
	public int getDay() {
		return day;
	}
	public void setDay(int day) {
		this.day = day;
	}
	public int getWd() {
		return wd;
	}
	public void setWd(int wd) {
		this.wd = wd;
	}
	public void write(DataOutput out) throws IOException {
		out.writeInt(year);
		out.writeInt(month);
		out.writeInt(day);
		out.writeInt(wd);
		
	}
	public void readFields(DataInput in) throws IOException {
		year = in.readInt();
		month = in.readInt();
		day = in.readInt();
		wd = in.readInt();
	}
	//按照年月日升序排序
	public int compareTo(TianQi o) {
		
		int c1 = Integer.compare(this.getYear(), o.getYear());
		if(c1 == 0){
			int c2 = Integer.compare(this.getMonth(), o.getMonth());
			if(c2 == 0){
				return Integer.compare(this.getDay(), o.getDay());
			}
			return c2;
		}
		return c1;
	}
	
	@Override
	public String toString() {
		return year+"\t"+month+"\t"+day+"\t"+wd+"c";
	}
	
	

}

b)TianQiMapper

package com.bigdata.tianqi;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class TianQiMapper extends Mapper<LongWritable, Text, TianQi, NullWritable>{
	
	TianQi k = new TianQi();
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		try {
			//切分出日期和温度
			//1949-10-01 14:21:02 	34c
			String line = value.toString();
			String[] split = line.split("\t");//[1949-10-01 14:21:02,34c]
			SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
			Date parse = sdf.parse(split[0]);
			
			Calendar cal = Calendar.getInstance();
			cal.setTime(parse);
			
			k.setYear(cal.get(Calendar.YEAR));
			k.setMonth(cal.get(Calendar.MONTH)+1);
			k.setDay(cal.get(Calendar.DAY_OF_MONTH));
			
			k.setWd(Integer.parseInt(split[1].replace("c", "")));
			context.write(k, NullWritable.get());
		} catch (ParseException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		
	}
}

c)TianQiReducer

package com.bigdata.tianqi;

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class TianQiReduce extends Reducer<TianQi, NullWritable, TianQi, NullWritable> {
	@Override
	protected void reduce(TianQi key, Iterable<NullWritable> values,Context context) throws IOException, InterruptedException {
		//<1949 10 01 38 ,null>
		//<1949 10 01 37 ,null>
		//<1949 10 02 36 ,null>
		//<1949 10 01 34 ,null>
		int flag = 0;
		int day = 0;
		for (NullWritable nullWritable : values) {
			if(flag == 0 ){//写出温度最高的那条数据
				context.write(key, NullWritable.get());
				flag ++;//写出完最高温度后,将flag加1
				day = key.getDay();////写出完最高温度后,将该天记录
			}
			if(flag != 0 && day != key.getDay()){
				context.write(key, NullWritable.get());
				break;
			}
		}
	}
}

d)MyTianQiPartitioner

package com.bigdata.grouporderby;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

import com.bigdata.tianqi.TianQi;

public class TianQiPartitioner extends Partitioner<TianQi, NullWritable>{

	@Override
	public int getPartition(TianQi key, NullWritable value, int numPartitions) {
		return key.getYear() % numPartitions;
	}

}

e)MyTianQiGroupComparator

package com.bigdata.tianqi;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TianQiGroupComparator extends WritableComparator{
	
	TianQi aa = null;
	TianQi bb = null;
	
	public TianQiGroupComparator() {
		super(TianQi.class,true);
	}
	
	@Override//定义一个规则,只要年月相同,则认为k就是一样的,这些kv就会组建一组,调用一次reduce方法
	public int compare(WritableComparable a, WritableComparable b) {
		aa = (TianQi) a;
		bb = (TianQi) b;
		int c1 = Integer.compare(aa.getYear(), bb.getYear());
		if(c1 == 0){
			int c2 = Integer.compare(aa.getMonth(), bb.getMonth());
			return c2;
		}
		return c1;
	}
	
	
}

f)TianQiSortComparator

package com.bigdata.tianqi;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TianQiSortComparator extends WritableComparator{
	TianQi aa = null;
	TianQi bb = null;
	
	public TianQiSortComparator() {
		super(TianQi.class,true);
	}
	
	@Override//按照年月升序 温度降序
	public int compare(WritableComparable a, WritableComparable b) {
		
		aa = (TianQi) a;
		bb = (TianQi) b;
		
		int c1 = Integer.compare(aa.getYear(), bb.getYear());
		if(c1 == 0){
			int c2 = Integer.compare(aa.getMonth(), bb.getMonth());
			if(c2 == 0){
				return -Integer.compare(aa.getWd(), bb.getWd());
			}
			return c2;
		}
		return c1;
	}
}

g)MyTianQiDriver

package com.bigdata.tianqi;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.bigdata.grouporderby.TianQiPartitioner;

public class TianQiDriver {

	public static void main(String[] args) throws Exception {
		//1 创建配置对象
		Configuration conf = new Configuration();
		//2 通过配置对象,创建job
		Job job = Job.getInstance(conf);
		//3 设置job的jar包位置
		job.setJarByClass(TianQiDriver.class);
		//4 设置mapper,reduce类
		job.setMapperClass(TianQiMapper.class);
		job.setReducerClass(TianQiReduce.class);
		//5 设置mapper的keyout,valueout
		job.setMapOutputKeyClass(TianQi.class);
		job.setMapOutputValueClass(NullWritable.class);
		//6 设置最终输出的keyout,valueout
		job.setOutputKeyClass(TianQi.class);
		job.setOutputValueClass(NullWritable.class);
		
		//7 设置输入数据的路径,输出数据的路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		Path path = new Path(args[1]);
		if(path.getFileSystem(conf).exists(path)){
			path.getFileSystem(conf).delete(path,true);
		}
		
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		job.setGroupingComparatorClass(TianQiGroupComparator.class);
		job.setSortComparatorClass(TianQiSortComparator.class);
		job.setPartitionerClass(TianQiPartitioner.class);
		job.setNumReduceTasks(3);
		
		//8 提交任务到yarn集群或者是本地模拟器
		boolean waitForCompletion = job.waitForCompletion(true);
		System.out.println(waitForCompletion);
	}

}

二、好友推荐案例

1、需求

推荐好友的好友,比如给hadoop推荐cat、hello、mr。
Hadoop案例,附代码

2、数据准备

friends.txt

tom hello hadoop cat
world hadoop hello hive
cat tom hive
mr hive hello
hive cat hadoop world hello mr
hadoop tom hive world
hello tom world hive mr

双向好友关系

tom hello hadoop cat
world hadoop hello hive
cat tom hive
mr hive hello
hive cat hadoop world hello mr
hadoop tom hive world
hello tom world hive mr

3、思路

推荐者与被推荐者一定有一个或多个相同的好友,转变为找共同好友,但是,两人不能是直接好友,例如,针对第一行,可以给hello推荐hadoop,也可以给hadoop推荐hello,但是,两者不能为直接好友才可以。
全局去寻找好友列表中两两关系,这种两两关系体现出来的他们是间接好友,并且只要他们组建了,就证明他们是有公共好友的,这里它们的共同好友是tom,可以给它们互相推荐,例如,第一行中的hello:hadoop、hello:cat、hadoop:cat,
但是如果他们是直接好友的话,就不能推荐了,例如第5行中hadoop与world体现出了是间接好友,他们有共同好友hive,可以给他们互相推荐,但是在第二行里面,world与hadoop体现出的是直接好友,因此就不能给他们互相推荐了。所以要从这里面剔除直接好友。那么,直接好友去哪里找呢?
全局去寻找好友依次与好友的两两关系
这种关系体现出来的就是直接好友。就是每行第一个与剩余的每个好友依次组建的两两关系。例如,tom:hello、 tom:hadoop 、tom:cat
因此所有这些两两关系中,既有直接好友关系,也有间接好友关系;要从间接好友中,去除直接好友;
统计两两关系出现次数,即他们共同好友的个数。

API:
map:按好友列表输出两俩关系
reduce:sum两两关系
生成详细报表

结果:

cat:hadoop	2
cat:hello	2
cat:mr	1
cat:world	1
hadoop:hello	3
hadoop:mr	1
hive:tom	3
mr:tom	1
mr:world	2
tom:world	2

4、代码实现

a)RecommendMapper

package com.bigdata.recommend;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//world hadoop hello hive

public class RecommendMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		//tom hello hadoop cat
		String line = value.toString();
		String[] split = line.split(" ");
		for (int i = 1; i <= split.length-1; i++) {//
			//找出直接好友关系,标记为0    0 ,1,2,3
			context.write(new Text(getFd(split[0], split[i])), new IntWritable(0));
			//hadoop,
			for (int j = i+1; j <= split.length-1; j++) {
				//找出间接好友关系,标记为1
				context.write(new Text(getFd(split[i], split[j])), new IntWritable(1));
			}
		}
		
		
	}
	
	public String getFd(String a,String b){
		return a.compareTo(b)>0? b+":"+a:a+":"+b ;
	}
}

b)RecommendReducer

package com.bigdata.recommend;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

//处理完成之后,要输出推荐人与被推荐人  keyout
//  要输出推荐人与被推荐人的共同好友的数量 valueout
public class RecommendReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
	@Override
	protected void reduce(Text key, Iterable<IntWritable> values,
			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
		
		//过来的数据既有直接好友关系,也有间接好友关系(正是我们推荐的,)
		//<hadoop:hello,1>
		//<hadoop:hello,1>
		//<hadoop:hello,1>
		
		//<hadoop:word,0>
		//<hadoop:word,1>
		//<hadoop:word,1>
		//<hadoop:word,1>
		int sum = 0;
		for (IntWritable intWritable : values) {
			if(intWritable.get() == 0){//如果value有是0的,证明该组中有直接好友关系,就不能推荐了
				return ;
			}
			sum = sum + intWritable.get();
		}
		//把推荐人与被推荐   他们拥有的好友的数量写出
		context.write(key, new IntWritable(sum));
		
	}
}

c)RecommendDriver

package com.bigdata.recommend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RecommendDriver {

	public static void main(String[] args) throws Exception {
		//1 创建配置对象
		Configuration conf = new Configuration();
		//2 通过配置对象,创建job
		Job job = Job.getInstance(conf);
		//3 设置job的jar包位置
		job.setJarByClass(RecommendDriver.class);
		//4 设置mapper,reduce类
		job.setMapperClass(RecommendMapper.class);
		job.setReducerClass(RecommendReducer.class);
		//5 设置mapper的keyout,valueout
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		//6 设置最终输出的keyout,valueout
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		//7 设置输入数据的路径,输出数据的路径
		FileInputFormat.setInputPaths(job, new Path(args[0]));
		
		Path outPath = new Path(args[1]);
		if(outPath.getFileSystem(conf).exists(outPath)){
			outPath.getFileSystem(conf).delete(outPath,true);
		}
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		//8 提交任务到yarn集群或者是本地模拟器
		boolean waitForCompletion = job.waitForCompletion(true);
		System.out.println(waitForCompletion);	

	}

}

谢谢观看,有问题感谢指正

相关标签: 大数据