Hadoop第二部分:MapReudce(三)
MapReudce(三)
本文项目地址:https://github.com/KingBobTitan/hadoop.git
MR的Shuffle详解及Join实现
一、回顾
1、MapReduce的历史监控服务:JobHistoryServer
- 功能:用于监控所有在YARN上运行过的MapReduce程序的信息
- 配置YARN的日志聚集:存储在hdfs上
- 启动:web:19888
2、自定义数据类型:在Hadoop中封装JavaBean
-
封装需要实现序列化
-
实现接口
- Writable:只实现了序列化
- write:序列化
- readFields:反序列化
- WritableComparable:实现了序列化和比较器
- write:序列化
- readFields:反序列化
- compareTo:比较的方法
- 如果自定义的类型作为key会经过shuffle过程,就需要实现WritableComparable接口
- Writable:只实现了序列化
-
排序:自定义数据类型
-
先检查有没有排序器,如果有排序器就使用排序器
job.setSortComaparator(RawComparator<T>) => extends WritableComparator
-
如果没有,就调用该类型自带的comparableTo
-
如果两个都没有,就报错
-
-
排序:默认类型:Text、IntWritable
- 先检查有没有排序器,如果有排序器就使用排序器
- 如果没有排序器,就调用默认类型的比较器
3、基本的Shuffle过程
-
Input
- 功能
- 将所有输入的数据变成KeyValue
- 将任务进行拆分:计算任务:分片:100片
- 功能
-
Map
- 功能:分
- 根据Input的分片启动MapTask,一个 分片对应一个MapTask
- 每个MapTask会对自己分片中的每一条keyvalue调用一次map方法
- 实现逻辑
- map方法自定义
- 功能:分
-
Shuffle
-
功能
- 分区:如果有多个Reduce,决定了当前的keyvalue会被哪个reduce进行处理
- 排序:对key按照排序的规则进行排序
- 分组:相同key的value进行合并,放入一个迭代器中,每一种只有一条
-
实现逻辑
-
分区:默认hash分区
-
自定义:继承Partitioner<key,value>
getPartition(key,value,numReduceTask)
-
-
排序:默认按照key字典升序
- 自定义一:定义一个排序器【优先级最高】
- 自定义二:自定义数据类型,实现WritableComparable
-
分组:默认按照key进行分组
-
-
-
Reduce
- 功能:合
- 将shuffle输出的数据进行合并处理
- 合并逻辑
- reduce方法
- 功能:合
-
Output
- 将Reduce的结果进行输出,保存到对应的文件系统中
- 默认TextOutputFormat:key与value以制表符分隔
二、课程目标
-
Shuffle过程详解【重点】
-
Shuffle中的两个优化【掌握】
-
MapReduce中的Join方案【掌握】
-
读写数据库【了解】
三、Shuffle过程详解
1、功能
- 分区
- 排序
- 分组
2、阶段
-
Input
-
输入:
-
file1
hadoop hive hbase spark spark hadoop hadoop hadoop
-
file2
hue hive hive spark spark hadoop spark spark hbase
-
-
功能:分片、转换keyvalue
-
输出
-
split1
key value 0 hadoop hive hbase spark spark 10 hadoop hadoop hadoop
-
split2
key value 0 hue hive hive spark spark 20 hadoop spark spark hbase
-
-
-
Map
-
功能:根据分片的个数启动MapTask,然后对每个MapTask中的数据调用map方法
arr = value.toString.split(" ") for(word:arr){ this.outputKey(word) this.outputValue(1) context.write(key,value) }
-
MapTask1
hadoop 1 hive 1 hbase 1 spark 1 spark 1 hadoop 1 hadoop 1 hadoop 1
-
MapTask2
hue 1 hive 1 hive 1 spark 1 spark 1 hadoop 1 spark 1 spark 1 hbase 1
-
-
Shuffle:分区、排序、分组
-
Map端的Shuffle:处理Map的结果
-
spill:溢写【将内存中的数据写入磁盘变成文件】
-
每一个MapTask会将自己处理的结果放入一个环形的内存缓冲区【100M】
-
当缓冲区达到80%,触发溢写,所有即将被溢写的数据会进行分区和排序
-
分区:默认按照key的hash取余进行分区,本质打标签
-
MapTask1:filea
hadoop 1 reduce1 hive 1 reduce2 hbase 1 reduce1 spark 1 reduce2
-
MapTask1:fileb
spark 1 reduce2 hadoop 1 reduce1 hadoop 1 reduce1 hadoop 1 reduce1
-
MapTask2:filea
hue 1 reduce1 hive 1 reduce2 hive 1 reduce2 spark 1 reduce2 spark 1 reduce2
-
MapTask2:fileb
hadoop 1 reduce1 spark 1 reduce2 spark 1 reduce2 hbase 1 reduce1
-
-
排序:调用排序器或者compareTo方法:实现的方式:快排
-
不是整个批次数据全局有序,而是相同分区内部有序
-
MapTask1:filea
hadoop 1 reduce1 hbase 1 reduce1 hive 1 reduce2 spark 1 reduce2
-
MapTask1:fileb
spark 1 reduce2 hadoop 1 reduce1 hadoop 1 reduce1 hadoop 1 reduce1
-
MapTask2:filea
hue 1 reduce1 hive 1 reduce2 hive 1 reduce2 spark 1 reduce2 spark 1 reduce2
-
MapTask2:fileb
hadoop 1 reduce1 hbase 1 reduce1 spark 1 reduce2 spark 1 reduce2
-
-
-
-
merge:合并,每个MapTask会将自己生成所有小文件进行合并,保证每个MapTask只有一个大文件
-
合并:并且在合并过程中进行排序【每个分区内部有序】实现的方式:归并排序【内存中只放索引】
-
排序逻辑还是调用排序器或者comparaTo
-
MapTask1
hadoop 1 reduce1 hadoop 1 reduce1 hadoop 1 reduce1 hadoop 1 reduce1 hbase 1 reduce1 hive 1 reduce2 spark 1 reduce2 spark 1 reduce2
-
MapTask2
hadoop 1 reduce1 hbase 1 reduce1 hue 1 reduce1 hive 1 reduce2 hive 1 reduce2 spark 1 reduce2 spark 1 reduce2 spark 1 reduce2 spark 1 reduce2
-
-
-
-
**Reduce端的shuffle:**将自己结果给Reduce
-
merge
-
==拉取:==通过Http协议每个ReduceTask会到每个MapTask的结果中取属于自己的数据
-
reduceTask1
-
MapTask1
hadoop 1 hadoop 1 hadoop 1 hadoop 1 hbase 1
-
MapTask2
hadoop 1 hbase 1 hue 1
-
-
reduceTask2
-
MapTask1
hive 1 spark 1 spark 1
-
MapTask2
hive 1 hive 1 spark 1 spark 1 spark 1 spark 1
-
-
-
==合并:==合并过程中也排序:实现的方式:归并排序【内存中只放索引】
-
排序逻辑还是调用排序器或者comparaTo
-
reduceTask1
hadoop 1 hadoop 1 hadoop 1 hadoop 1 hadoop 1 hbase 1 hbase 1 hue 1
-
reduceTask2
hive 1 hive 1 hive 1 spark 1 spark 1 spark 1 spark 1 spark 1 spark 1
-
-
-
==group:==将相同key的value合并放入迭代器
-
reduceTask1
hadoop 1,1,1,1,1 hbase 1,1 hue 1
-
reduceTask2
hive 1,1,1 spark 1,1,1,1,1,1
-
-
-
-
Reduce
-
功能:对Shuffle的结果进行聚合,对每一条数据调用reduce方法
reduce(key,Iterator<value>:values){ for(value:values){ sum+=value.get() } context.write(key,sum) }
-
ReduceTask1
hadoop 5 hbase 2 hue 1
-
ReduceTask2
hive 3 spark 6
-
-
Output
-
part-r-00000
hadoop 5 hbase 2 hue 1
-
part-r-00001
hive 3 spark 6
-
3、流程图
-
Shuffle
-
过程:分布式内存 => 磁盘【做内存中无法实现的事情】=> 内存
-
map1:1 3 => 3 1
-
map2:4 5 => 5 4
-
map3:1 7 => 7 1
-
需求:全局倒序
- 合并:磁盘:3 1 5 4 7 1 => 7 5 4 3 1 1 => 重新将全局有序的读取到内存中
-
-
四、Shuffle中的两个优化
1、Combiner:Map端的聚合
-
Shuffle过程中的另外一个功能,默认是关闭,需要自己手动设置该功能来启用
-
不是所有的程序都能使用Combiner的
-
程序需要符合一定的条件才可以
(a+b) * c = a * c+b * c
-
-
不做combiner
- 使用combiner
-
主要的目的
-
利用MapTask的并发的个数是远大于Reduce的个数
-
将聚合的逻辑由每个Map完成一部分,最后再由Reduce做最终的聚合,减轻Reduce的负载
-
官方的wordcount
-
- 自己开发的wordcount
-
自己开发的时候也可以启用Combiner
job.setCombinerClass(WordCountReduce.class);//设置Combiner
- Combiner的类一般就是Reducer的类,聚合逻辑是一致的
- 如果分不清逻辑是否可以拆分,可以测试
- 两次的结果是否一致
- Reduce的输入类型与Reduce的输出类型是否一致
- Map输出 => Reduce输入
- Combiner = Reducer
- Combiner => Reduce输入
2、Compress:压缩
-
生活中的压缩类型:zip/rar/7z
-
大数据中的压缩类型:可分割的压缩类型
- 300M => 压缩 => 200M
- block1:128M => split1 => MapTask1【node01】
- block2:72M => split2 => MapTask2【node02】
- 类型:snappy、lz4、lzo
-
常见的压缩选取
- 压缩和解压比较慢,算法非常复杂,但是压缩比高【压缩以后的大小/原先的大小】
- 压缩和解压比较快,算法比较简单,但是压缩比低
-
优点:MapReduce中的压缩
- 减少磁盘以及网络的IO,提高数据传输和存储的效率
- Shuffle Write:1T => 硬盘 1s/GB => 1024s
- 压缩:1T => 压缩=> 700G => 硬盘 1s/GB => 700s + 压缩的时间 = > 750s
- Shuffle Read :硬盘 => 1T => 1024s
- 压缩: => 硬盘 => 700G =>解压 1s/GB => 700s + 解压的时间 = > 750s
- Shuffle Write:1T => 硬盘 1s/GB => 1024s
- 减少磁盘以及网络的IO,提高数据传输和存储的效率
-
以后所学的所有关于存储和计算的框架,都支持压缩
-
MapReduce中可以压缩数据的位置
-
输入:MapReduce可以读取一个压缩的文件作为输入【不用】
- 数据的文件类型由数据生成决定
- MapReduce读压缩文件会将压缩的元数据读取进来
-
Shuffle阶段的压缩:将Map输出的结果进行压缩【主要用的地址】
#启用压缩 mapreduce.map.output.compress=true #指定压缩的类型 mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec org.apache.hadoop.io.compress.Lz4Codec org.apache.hadoop.io.compress.SnappyCodec
-
如何修改配置
-
mapred-site.xml:永久性所有程序都压缩
-
MapReduce程序中:conf.set(key,value)
-
运行提交命令时,可以指定参数:临时
yarn jar sougou.jar cn.itcast.hadoop.mapreduce.compress.HotKeyMR -Dmapreduce.map.output.compress=true -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec /app/sougou /app/output/sougou1
-
-
-
输出:MapReduce可以输出压缩文件【少用】
mapreduce.output.fileoutputformat.compress=true mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
-
-
检查Hadoop支持哪些压缩;bin/hadoop checknative
hadoop: true /export/servers/hadoop-2.6.0-cdh5.14.0/lib/native/libhadoop.so.1.0.0 zlib: true /lib64/libz.so.1 snappy: true /usr/lib64/libsnappy.so.1 lz4: true revision:10301 bzip2: true /lib64/libbz2.so.1
-
演示
-
需求:统计每个搜索词出现的次数
/** * @ClassName WordCount * @Description TODO 基于搜狗数据实现热门搜索词的统计 * @Date 2020/1/9 16:00 * @Create By Frank */ public class HotKeyMR extends Configured implements Tool { /** * 构建一个MapReduce程序,配置程序,提交程序 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { /** * 第一:构造一个MapReduce Job */ //构造一个job对象 Job job = Job.getInstance(this.getConf(),"mrword"); //设置job运行的类 job.setJarByClass(HotKeyMR.class); /** * 第二:配置job */ //input:设置输入的类以及输入路径 // job.setInputFormatClass(TextInputFormat.class); 这是默认的 Path inputPath = new Path(args[0]);//以程序的第一个参数作为输入路径 TextInputFormat.setInputPaths(job,inputPath); //map job.setMapperClass(WordCountMapper.class);//指定Mapper的类 job.setMapOutputKeyClass(Text.class);//指定map输出的key的类型 job.setMapOutputValueClass(IntWritable.class);//指定map输出的value的类型 //shuffle job.setCombinerClass(WordCountReduce.class);//设置Combiner //reduce job.setReducerClass(WordCountReduce.class);//指定reduce的类 job.setOutputKeyClass(Text.class);//指定reduce输出的 key类型 job.setOutputValueClass(IntWritable.class);//指定reduce输出的value类型 // job.setNumReduceTasks(1);//这是默认的 //output // job.setOutputFormatClass(TextOutputFormat.class);//这是默认的输出类 Path outputPath = new Path(args[1]);//用程序的第二个参数作为输出路径 //如果输出目录已存在,就删除 FileSystem hdfs = FileSystem.get(this.getConf()); if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } //设置输出的地址 TextOutputFormat.setOutputPath(job,outputPath); /** * 第三:提交job */ //提交job运行,并返回boolean值,成功返回true,失败返回false return job.waitForCompletion(true) ? 0 : -1; } /** * 整个程序的入口,负责调用当前类的run方法 * @param args */ public static void main(String[] args) { //构造一个conf对象,用于管理当前程序的所有配置 Configuration conf = new Configuration(); //配置压缩 conf.set("mapreduce.map.output.compress","true"); conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.Lz4Codec"); try { //调用当前类的run方法 int status = ToolRunner.run(conf, new HotKeyMR(), args); //根据程序运行的 结果退出 System.exit(status); } catch (Exception e) { e.printStackTrace(); } } /** * Mapper的类,实现四个泛型,inputkey,inputValue,outputKey,outputValue * 输入的泛型:由输入的类决定:TextInputFormat:Longwritable Text * 输出的泛型:由代码逻辑决定:Text,IntWritable * 重写map方法 */ public static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{ //构造用于输出的key和value private Text outputKey = new Text(); private IntWritable outputValue = new IntWritable(1); /** * map方法:Input传递过来的每一个keyvalue会调用一次map方法 * @param key:当前的 key * @param value:当前的value * @param context:上下文,负责将新的keyvalue输出 * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将每一行的内容转换为String String line = value.toString(); //对每一行的内容分割 String[] words = line.split("\t"); //取第二个字段,用户的搜索词作为key this.outputKey.set(words[2]); //输出 context.write(this.outputKey,this.outputValue); } } /** * 所有的Reduce都需要实现四个泛型 * 输入的keyvalue:就是Map的 输出的keyvalue类型 * 输出的keyvalue:由代码逻辑决定 * 重写reduce方法 */ public static class WordCountReduce extends Reducer<Text, IntWritable,Text, IntWritable>{ private IntWritable outputValue = new IntWritable(); /** * reduce方法 ,每一个keyvalue,会调用一次reduce方法 * @param key:传进来的key * @param values:迭代器,当前key的所有value * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //取出迭代器的值进行累加 int sum = 0; for (IntWritable value : values) { sum += value.get(); } //封装成输出的value this.outputValue.set(sum); //输出每一个key的结果 context.write(key,outputValue); } } }
-
五、MapReduce中的Join方案
1、Reduce Join
-
join发生在reduce端
-
SQL中的join
inner join:两边都有结果才有 left join:左边有,结果就有 right join:右边有,结果就有 full join:任意一边有,结果就有 select * from a join b = select * from a,b => 笛卡尔积 【大数据表的join,严禁产生笛卡尔积】 join:列的关联 a id name age sex b id phone addr 查询张三的手机号 union:行的关联 a id name age sex:大一的 数据 b id name age sex:大二的数据
-
需求:两份数据
-
订单数据
1001,20150710,p0001,2 1002,20150710,p0002,3 1002,20150710,p0003,3 订单编号,日期,商品id,商品数量
-
商品数据
p0001,直升机,1000,2000 p0002,坦克,1000,3000 p0003,火箭,10000,2000 商品id,商品的名称,价格,库存
-
关联:得到每个订单的商品名称
1001,20150710,p0001,2 直升机 1002,20150710,p0002,3 坦克 1002,20150710,p0003,3 火箭
-
分析:MapReduce 实现
-
第一步:结果
- 除了包含订单的数据,还要包含商品名称
-
第二步:看是否有分组或者排序
- 分组:join的字段
- key:商品id
-
第三步:value
- 对于订单数据来说,除了商品id以外其他字段,就是value
- 对于商品数据来说,除了商品id以外,只需要商品名称
-
第四步:验证
-
Input
1001,20150710,p0001,2 1002,20150710,p0002,3 1002,20150710,p0003,3 p0001,直升机,1000,2000 p0002,坦克,1000,3000 p0003,火箭,10000,2000
-
map
- key:商品id
- value:
- 如果是订单:订单编号,日期,商品数量
- 如果是商品:商品名称
-
shuffle
-
分组
- 相同商品id对应的所有商品和订单数据在一个迭代器中
p0001 直升机,1001,20150710,p0001,2 p0002 坦克,1002,20150710,p0002,3 p0003 火箭,1002,20150710,p0003,3
-
-
-
-
-
实现
/** * @ClassName ReduceJoin * @Description TODO Reduce端实现join的过程 * @Date 2020/1/9 17:25 * @Create By Frank */ public class ReduceJoin extends Configured implements Tool { /** * 具体整个MapReduce job的定义:构建、配置、提交 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { /** * 构建一个job */ //创建一个job的实例 Job job = Job.getInstance(this.getConf(),"mrjob"); //设置job运行的类 job.setJarByClass(ReduceJoin.class); /** * 配置job */ //input:定义输入的方式,输入的路径 Path orderPath = new Path("datas/mrjoin/orders.txt"); Path productPath = new Path("datas/mrjoin/product.txt"); TextInputFormat.setInputPaths(job,orderPath,productPath); //map:定义Map阶段的类及输出类型 job.setMapperClass(MRJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //shuffle:定义shuffle阶段实现的类 //reduce:定义reduce阶段的类及输出类型 job.setReducerClass(MRJoinReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setNumReduceTasks(1);//设置Reduce的个数,就是分区的个数 //output:定义输出的类以及输出的路径 Path outputPath = new Path("datas/output/join/reducejoin"); //如果输出存在,就删除 FileSystem hdfs = FileSystem.get(this.getConf()); if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } TextOutputFormat.setOutputPath(job,outputPath); /** * 提交job:并根据job运行的结果返回 */ return job.waitForCompletion(true) ? 0:-1; } /** * 程序的入口 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //构建一个Conf对象,用于管理当前程序的所有配置 Configuration conf = new Configuration(); //调用当前类的run方法 int status = ToolRunner.run(conf, new ReduceJoin(), args); //根据job运行的状态,来退出整个程序 System.exit(status); } /** * 定义Mapper的实现类以及Map过程中的处理逻辑 */ public static class MRJoinMapper extends Mapper<LongWritable,Text,Text, Text>{ private Text outputKey = new Text(); private Text outputValue = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //先判断当前的数据是哪个文件的 FileSplit inputSplit = (FileSplit) context.getInputSplit();//先获取这条数据属于哪个文件分片 String fileName = inputSplit.getPath().getName();//获取文件名称 //如果是订单数据,key为第三个字段,value是其他剩余字段 if("orders.txt".equals(fileName)){ String[] items = value.toString().split(","); this.outputKey.set(items[2]);//商品id this.outputValue.set(items[0]+"\t"+items[1]+"\t"+items[3]);//其他字段 context.write(this.outputKey,this.outputValue);//将订单数据输出 }else{ //如果是商品数据,key为第一个字段,value是第二个字段 String[] split = value.toString().split(","); this.outputKey.set(split[0]);//商品id this.outputValue.set(split[1]);//商品名称 context.write(this.outputKey,this.outputValue);//输出商品数据 } } } /** * 定义Reducer的实现类以及Reduce过程中的处理逻辑 */ public static class MRJoinReduce extends Reducer<Text,Text,Text,Text>{ private Text outputValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder stringBuilder = new StringBuilder(); for (Text value : values) { stringBuilder.append(value.toString()+"\t");//将所有的商品的名称和对应的所有订单进行拼接 } this.outputValue.set(stringBuilder.toString());//将商品名称及订单作为value context.write(key,this.outputValue); } } }
-
应用场景
-
大的数据join大的数据
-
比较时的效率非常低
-
商品数据:6万
-
订单数据:9万条
-
2、Map Join
-
商品数据:1万
-
订单数据:2000万条
-
小的数据join大的数据
-
==思想 :==将小数据放入分布式缓存,大数据的每个分片需要用到时,直接从分布式缓存中取,然后直接在Map端完成join,不用经过shuffle
-
实现
public class MapJoin extends Configured implements Tool { /** * 具体整个MapReduce job的定义:构建、配置、提交 * @param args * @return * @throws Exception */ @Override public int run(String[] args) throws Exception { /** * 构建一个job */ //创建一个job的实例 Job job = Job.getInstance(this.getConf(),"mrjob"); //设置job运行的类 job.setJarByClass(MapJoin.class); /** * 配置job */ //input:定义输入的方式,输入的路径 Path orderPath = new Path("datas/mrjoin/orders.txt"); TextInputFormat.setInputPaths(job,orderPath); //将商品的数据放入分布式缓存 Path productPath = new Path("datas/mrjoin/product.txt"); job.addCacheFile(productPath.toUri()); //map:定义Map阶段的类及输出类型 job.setMapperClass(MRJoinMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); //shuffle:定义shuffle阶段实现的类 //reduce:定义reduce阶段的类及输出类型 // job.setReducerClass(MRJoinReduce.class); // job.setOutputKeyClass(Text.class); // job.setOutputValueClass(Text.class); job.setNumReduceTasks(0);//设置Reduce的个数,就是分区的个数 //output:定义输出的类以及输出的路径 Path outputPath = new Path("datas/output/join/mapjoin"); //如果输出存在,就删除 FileSystem hdfs = FileSystem.get(this.getConf()); if(hdfs.exists(outputPath)){ hdfs.delete(outputPath,true); } TextOutputFormat.setOutputPath(job,outputPath); /** * 提交job:并根据job运行的结果返回 */ return job.waitForCompletion(true) ? 0:-1; } /** * 程序的入口 * @param args * @throws Exception */ public static void main(String[] args) throws Exception { //构建一个Conf对象,用于管理当前程序的所有配置 Configuration conf = new Configuration(); //调用当前类的run方法 int status = ToolRunner.run(conf, new MapJoin(), args); //根据job运行的状态,来退出整个程序 System.exit(status); } /** * 定义Mapper的实现类以及Map过程中的处理逻辑 */ public static class MRJoinMapper extends Mapper<LongWritable,Text,Text, Text>{ private Text outputKey = new Text(); private Text outputValue = new Text(); Map<String,String> maps = new HashMap<>(); /** * Map和Reduce的类:三个方法 * 1-setup:会在map或者reduce方法之前执行 * 2-map/reduce:map逻辑或者reduce逻辑 * 3-close:最后执行的方法 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void setup(Context context) throws IOException, InterruptedException { //将分布式缓存的 数据读取进来 URI[] cacheFiles = context.getCacheFiles();//获取所有的缓存数据 //读取文件内容 BufferedReader bufferedReader = new BufferedReader(new FileReader(cacheFiles[0].getPath())); String line = null; while(StringUtils.isNotBlank(line = bufferedReader.readLine())){ //读取到每一行的内容 String pid = line.split(",")[0];//商品id String productName = line.split(",")[1];//商品名称 //将商品id和名称放入map集合 maps.put(pid,productName); } } @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取订单数据 String[] items = value.toString().split(","); String pid = items[2]; //订单中的商品id String productName = maps.get(pid); this.outputKey.set(productName); this.outputValue.set(value.toString()); context.write(this.outputKey,this.outputValue); } } /** * 定义Reducer的实现类以及Reduce过程中的处理逻辑 */ public static class MRJoinReduce extends Reducer<Text,Text,Text,Text>{ private Text outputValue = new Text(); @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder stringBuilder = new StringBuilder(); for (Text value : values) { stringBuilder.append(value.toString()+"\t");//将所有的商品的名称和对应的所有订单进行拼接 } this.outputValue.set(stringBuilder.toString());//将商品名称及订单作为value context.write(key,this.outputValue); } } }
-
应用场景
- 适合于小数据量join大数据量的场景
六、读写数据库
1、Input和Output
- Input:所有的Input都要继承自InputFormat
- 默认:TextInputFormat extends FileInputFormat extend InputFormat
- 文件
- 数据库
- Output:所有的Input都要继承自OutputFormat
- 默认:TextOutputFormat extends FileOutputFormat extends OutputFormat
- 文件
- 数据库
- 数据库:JDBC
2、读MySQL
-
修改输入的类
job.setInputFormatClass(DBInputFormat.class);
-
自定义一个数据类型用于接收MySQL中的数据
-
除了要实现Writable接口以外,还要实现DBWritable【数据库数据的序列化】
public class DBReader implements Writable,DBWritable
-
实现数据库对象的序列化及反序列化
public void write(PreparedStatement statement) throws SQLException { // TODO Auto-generated method stub statement.setString(1, word); statement.setInt(2,number); } public void readFields(ResultSet resultSet) throws SQLException { // TODO Auto-generated method stub this.word = resultSet.getString(1); this.number = resultSet.getInt(2); }
-
-
创建了conf对象以后,要立马配置jdbc的连接参数
Configuration conf = new Configuration(); DBConfiguration.configureDB( conf, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/test", "root", "123456" );
-
配置读取的数据:表、SQL语句、字段
DBInputFormat.setInput( job, DBReader.class, //存放读取结果的对象 "wcresult", //表明 null, //过滤条件 "number",//排序的字段 fields //读取哪些字段 ); public static void setInput( Job job, Class<? extends DBWritable> inputClass, //存放读取结果的对象 String inputQuery, //SQL语句,SQL语句中返回的字段必须与inputClass属性一致 String inputCountQuery ) { }
2、写MySQL
-
配置输出的类
job.setOutputFormatClass(DBOutputFormat.class);
-
配置输出的参数:表、字段
DBOutputFormat.setOutput(job, "wcresult", fields);
-
程序会将key输出到MySQL的对象必须实现DBWritable接口【序列化方法】
public static class WriteMap extends Mapper<LongWritable, Text, DBReader, NullWritable>{ private DBReader outputKey = new DBReader(); private NullWritable outputValue = NullWritable.get(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub String line = value.toString(); this.outputKey.setWord(line.split("\t")[0]); this.outputKey.setNumber(Integer.valueOf(line.split("\t")[1])); context.write(outputKey, outputValue); } }
上一篇: python的图像识前的准备
推荐阅读
-
Hadoop第二部分:MapReudce(三)
-
JavaScript进阶教程(第三课第二部分)第1/2页_基础知识
-
《Spring Security3》第三章第二部分翻译(退出功能的实现)
-
《Spring Security3》第二章第三部分翻译(下)附前两章doc文档
-
《Spring Security3》第二章第三部分翻译(中)
-
《Spring Security3》第二章第三部分翻译(上)
-
第二部分 Common的实现 第2章 Hadoop配置信息处理 2.1 配置文
-
第六天 - 安装第二、三台CentOS - SSH免密登陆 - hadoop全分布式安装、配置、集群启动
-
第二部分 Common的实现 第2章 Hadoop配置信息处理 2.1 配置文