欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Hadoop第二部分:MapReudce(三)

程序员文章站 2024-03-16 11:02:52
...

MapReudce(三)

本文项目地址:https://github.com/KingBobTitan/hadoop.git

MR的Shuffle详解及Join实现

一、回顾

1、MapReduce的历史监控服务:JobHistoryServer

  • 功能:用于监控所有在YARN上运行过的MapReduce程序的信息
  • 配置YARN的日志聚集:存储在hdfs上
  • 启动:web:19888

2、自定义数据类型:在Hadoop中封装JavaBean

  • 封装需要实现序列化

  • 实现接口

    • Writable:只实现了序列化
      • write:序列化
      • readFields:反序列化
    • WritableComparable:实现了序列化和比较器
      • write:序列化
      • readFields:反序列化
      • compareTo:比较的方法
    • 如果自定义的类型作为key会经过shuffle过程,就需要实现WritableComparable接口
  • 排序:自定义数据类型

    • 先检查有没有排序器,如果有排序器就使用排序器

      job.setSortComaparator(RawComparator<T>) => extends WritableComparator
      
    • 如果没有,就调用该类型自带的comparableTo

    • 如果两个都没有,就报错

  • 排序:默认类型:Text、IntWritable

    • 先检查有没有排序器,如果有排序器就使用排序器
    • 如果没有排序器,就调用默认类型的比较器

3、基本的Shuffle过程

  • Input

    • 功能
      • 将所有输入的数据变成KeyValue
      • 将任务进行拆分:计算任务:分片:100片
  • Map

    • 功能:分
      • 根据Input的分片启动MapTask,一个 分片对应一个MapTask
      • 每个MapTask会对自己分片中的每一条keyvalue调用一次map方法
    • 实现逻辑
      • map方法自定义
  • Shuffle

    • 功能

      • 分区:如果有多个Reduce,决定了当前的keyvalue会被哪个reduce进行处理
      • 排序:对key按照排序的规则进行排序
      • 分组:相同key的value进行合并,放入一个迭代器中,每一种只有一条
    • 实现逻辑

      • 分区:默认hash分区

        • 自定义:继承Partitioner<key,value>

          getPartition(key,value,numReduceTask)
          
      • 排序:默认按照key字典升序

        • 自定义一:定义一个排序器【优先级最高】
        • 自定义二:自定义数据类型,实现WritableComparable
      • 分组:默认按照key进行分组

  • Reduce

    • 功能:合
      • 将shuffle输出的数据进行合并处理
    • 合并逻辑
      • reduce方法
  • Output

    • 将Reduce的结果进行输出,保存到对应的文件系统中
    • 默认TextOutputFormat:key与value以制表符分隔

二、课程目标

  1. Shuffle过程详解【重点】

  2. Shuffle中的两个优化【掌握】

  3. MapReduce中的Join方案【掌握】

  4. 读写数据库【了解】

三、Shuffle过程详解

1、功能

  • 分区
  • 排序
  • 分组

2、阶段

  • Input

    • 输入:

      • file1

        hadoop hive hbase spark spark
        hadoop hadoop hadoop
        
      • file2

        hue hive hive spark spark
        hadoop spark spark hbase
        
    • 功能:分片、转换keyvalue

    • 输出

      • split1

        key							value
        0                         hadoop hive hbase spark spark
        10						  hadoop hadoop hadoop
        
      • split2

        key							value
        0							hue hive hive spark spark
        20							hadoop spark spark hbase
        
  • Map

    • 功能:根据分片的个数启动MapTask,然后对每个MapTask中的数据调用map方法

      arr = value.toString.split(" ")
      for(word:arr){
      	this.outputKey(word)
      	this.outputValue(1)
      	context.write(key,value)
      }
      
    • MapTask1

      hadoop      1			
      hive        1   
      hbase       1     
      spark       1          
      spark       1         
      hadoop      1         
      hadoop      1      
      hadoop      1    
      
    • MapTask2

      hue 	1
      hive 	1
      hive 	1
      spark 	1
      spark	1
      hadoop 	1
      spark 	1
      spark 	1
      hbase	1
      
  • Shuffle:分区、排序、分组

    • Map端的Shuffle:处理Map的结果

      • spill:溢写【将内存中的数据写入磁盘变成文件】

        • 每一个MapTask会将自己处理的结果放入一个环形的内存缓冲区【100M】

        • 当缓冲区达到80%,触发溢写,所有即将被溢写的数据会进行分区和排序

          • 分区:默认按照key的hash取余进行分区,本质打标签

            • MapTask1:filea

              hadoop      1		reduce1
              hive        1   	reduce2
              hbase       1     	reduce1
              spark       1    	reduce2
              
            • MapTask1:fileb

              spark       1       reduce2
              hadoop      1       reduce1
              hadoop      1		reduce1
              hadoop      1    	reduce1
              
            • MapTask2:filea

              hue 	1			reduce1
              hive 	1			reduce2
              hive 	1			reduce2
              spark 	1			reduce2
              spark	1			reduce2
              
            • MapTask2:fileb

              hadoop 	1			reduce1
              spark 	1			reduce2
              spark 	1			reduce2
              hbase	1			reduce1
              
          • 排序:调用排序器或者compareTo方法:实现的方式:快排

            • 不是整个批次数据全局有序,而是相同分区内部有序

            • MapTask1:filea

              hadoop      1		reduce1
              hbase       1     	reduce1
              hive        1   	reduce2
              spark       1    	reduce2
              
            • MapTask1:fileb

              spark       1       reduce2
              hadoop      1       reduce1
              hadoop      1		reduce1
              hadoop      1    	reduce1
              
            • MapTask2:filea

              hue 	1			reduce1
              hive 	1			reduce2
              hive 	1			reduce2
              spark 	1			reduce2
              spark	1			reduce2
              
            • MapTask2:fileb

              hadoop 	1			reduce1
              hbase	1			reduce1
              spark 	1			reduce2
              spark 	1			reduce2
              

      Hadoop第二部分:MapReudce(三)

      • merge:合并,每个MapTask会将自己生成所有小文件进行合并,保证每个MapTask只有一个大文件

        • 合并:并且在合并过程中进行排序【每个分区内部有序】实现的方式:归并排序【内存中只放索引】

          • 排序逻辑还是调用排序器或者comparaTo

          • MapTask1

            hadoop      1		reduce1
            hadoop      1       reduce1
            hadoop      1		reduce1
            hadoop      1    	reduce1
            hbase       1     	reduce1
            hive        1   	reduce2
            spark       1    	reduce2
            spark       1       reduce2
            
          • MapTask2

            hadoop 	1			reduce1
            hbase	1			reduce1
            hue 	1			reduce1
            hive 	1			reduce2
            hive 	1			reduce2
            spark 	1			reduce2
            spark	1			reduce2
            spark 	1			reduce2
            spark 	1			reduce2
            
    • **Reduce端的shuffle:**将自己结果给Reduce

      • merge

        • ==拉取:==通过Http协议每个ReduceTask会到每个MapTask的结果中取属于自己的数据

          • reduceTask1

            • MapTask1

              hadoop      1
              hadoop      1 
              hadoop      1
              hadoop      1
              hbase       1
              
            • MapTask2

              hadoop 	1
              hbase	1
              hue 	1
              
          • reduceTask2

            • MapTask1

              hive        1
              spark       1
              spark       1
              
            • MapTask2

              hive 	1
              hive 	1
              spark 	1
              spark	1
              spark 	1
              spark 	1
              
        • ==合并:==合并过程中也排序:实现的方式:归并排序【内存中只放索引】

          • 排序逻辑还是调用排序器或者comparaTo

          • reduceTask1

            hadoop      1
            hadoop      1 
            hadoop      1
            hadoop      1
            hadoop 		1
            hbase       1
            hbase		1
            hue 		1
            
          • reduceTask2

            hive        1
            hive 		1
            hive 		1
            spark       1
            spark       1
            spark 		1
            spark		1
            spark 		1
            spark 		1
            
      • ==group:==将相同key的value合并放入迭代器

        • reduceTask1

          hadoop      1,1,1,1,1
          hbase       1,1
          hue 		1
          
        • reduceTask2

          hive        1,1,1
          spark       1,1,1,1,1,1
          
  • Reduce

    • 功能:对Shuffle的结果进行聚合,对每一条数据调用reduce方法

      reduce(key,Iterator<value>:values){
      	for(value:values){
      		sum+=value.get()
      	}
      	context.write(key,sum)
      }
      
    • ReduceTask1

      hadoop	5
      hbase	2
      hue		1
      
    • ReduceTask2

      hive	3
      spark	6
      
  • Output

    • part-r-00000

      hadoop	5
      hbase	2
      hue		1
      
    • part-r-00001

      hive	3
      spark	6
      

3、流程图

Hadoop第二部分:MapReudce(三)

  • Shuffle

    • 过程:分布式内存 => 磁盘【做内存中无法实现的事情】=> 内存

      • map1:1 3 => 3 1

      • map2:4 5 => 5 4

      • map3:1 7 => 7 1

      • 需求:全局倒序

        • 合并:磁盘:3 1 5 4 7 1 => 7 5 4 3 1 1 => 重新将全局有序的读取到内存中

四、Shuffle中的两个优化

1、Combiner:Map端的聚合

  • Shuffle过程中的另外一个功能,默认是关闭,需要自己手动设置该功能来启用

    • 不是所有的程序都能使用Combiner的

    • 程序需要符合一定的条件才可以

      (a+b) * c = a * c+b * c

  • 不做combiner

Hadoop第二部分:MapReudce(三)

  • 使用combiner

Hadoop第二部分:MapReudce(三)

  • 主要的目的

    • 利用MapTask的并发的个数是远大于Reduce的个数

    • 将聚合的逻辑由每个Map完成一部分,最后再由Reduce做最终的聚合,减轻Reduce的负载

    • 官方的wordcount

Hadoop第二部分:MapReudce(三)

  • 自己开发的wordcount

Hadoop第二部分:MapReudce(三)

  • 自己开发的时候也可以启用Combiner

    job.setCombinerClass(WordCountReduce.class);//设置Combiner
    
    • Combiner的类一般就是Reducer的类,聚合逻辑是一致的
    • 如果分不清逻辑是否可以拆分,可以测试
      • 两次的结果是否一致
    • Reduce的输入类型与Reduce的输出类型是否一致
      • Map输出 => Reduce输入
      • Combiner = Reducer
      • Combiner => Reduce输入

2、Compress:压缩

  • 生活中的压缩类型:zip/rar/7z

  • 大数据中的压缩类型:可分割的压缩类型

    • 300M => 压缩 => 200M
    • block1:128M => split1 => MapTask1【node01】
    • block2:72M => split2 => MapTask2【node02】
    • 类型:snappy、lz4、lzo
  • 常见的压缩选取

    • 压缩和解压比较慢,算法非常复杂,但是压缩比高【压缩以后的大小/原先的大小】
    • 压缩和解压比较快,算法比较简单,但是压缩比低
  • 优点:MapReduce中的压缩

    • 减少磁盘以及网络的IO,提高数据传输和存储的效率
      • Shuffle Write:1T => 硬盘 1s/GB => 1024s
        • 压缩:1T => 压缩=> 700G => 硬盘 1s/GB => 700s + 压缩的时间 = > 750s
      • Shuffle Read :硬盘 => 1T => 1024s
        • 压缩: => 硬盘 => 700G =>解压 1s/GB => 700s + 解压的时间 = > 750s
  • 以后所学的所有关于存储和计算的框架,都支持压缩

  • MapReduce中可以压缩数据的位置

    • 输入:MapReduce可以读取一个压缩的文件作为输入【不用】

      • 数据的文件类型由数据生成决定
      • MapReduce读压缩文件会将压缩的元数据读取进来
    • Shuffle阶段的压缩:将Map输出的结果进行压缩【主要用的地址】

      #启用压缩
      mapreduce.map.output.compress=true
      #指定压缩的类型
      mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec
      
      org.apache.hadoop.io.compress.Lz4Codec
      org.apache.hadoop.io.compress.SnappyCodec
      
      • 如何修改配置

        • mapred-site.xml:永久性所有程序都压缩

        • MapReduce程序中:conf.set(key,value)

        • 运行提交命令时,可以指定参数:临时

          yarn jar sougou.jar cn.itcast.hadoop.mapreduce.compress.HotKeyMR -Dmapreduce.map.output.compress=true -Dmapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.Lz4Codec /app/sougou /app/output/sougou1
          
    • 输出:MapReduce可以输出压缩文件【少用】

      mapreduce.output.fileoutputformat.compress=true
      mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.DefaultCodec
      
  • 检查Hadoop支持哪些压缩;bin/hadoop checknative

    hadoop:  true /export/servers/hadoop-2.6.0-cdh5.14.0/lib/native/libhadoop.so.1.0.0
    zlib:    true /lib64/libz.so.1
    snappy:  true /usr/lib64/libsnappy.so.1
    lz4:     true revision:10301
    bzip2:   true /lib64/libbz2.so.1
    
  • 演示

    • 需求:统计每个搜索词出现的次数

      /**
       * @ClassName WordCount
       * @Description TODO 基于搜狗数据实现热门搜索词的统计
       * @Date 2020/1/9 16:00
       * @Create By     Frank
       */
      public class HotKeyMR extends Configured implements Tool {
      
          /**
           * 构建一个MapReduce程序,配置程序,提交程序
           * @param args
           * @return
           * @throws Exception
           */
          @Override
          public int run(String[] args) throws Exception {
              /**
               * 第一:构造一个MapReduce Job
               */
              //构造一个job对象
              Job job = Job.getInstance(this.getConf(),"mrword");
              //设置job运行的类
              job.setJarByClass(HotKeyMR.class);
              /**
               * 第二:配置job
               */
              //input:设置输入的类以及输入路径
      //        job.setInputFormatClass(TextInputFormat.class); 这是默认的
              Path inputPath = new Path(args[0]);//以程序的第一个参数作为输入路径
              TextInputFormat.setInputPaths(job,inputPath);
              //map
              job.setMapperClass(WordCountMapper.class);//指定Mapper的类
              job.setMapOutputKeyClass(Text.class);//指定map输出的key的类型
              job.setMapOutputValueClass(IntWritable.class);//指定map输出的value的类型
              //shuffle
              job.setCombinerClass(WordCountReduce.class);//设置Combiner
              //reduce
              job.setReducerClass(WordCountReduce.class);//指定reduce的类
              job.setOutputKeyClass(Text.class);//指定reduce输出的 key类型
              job.setOutputValueClass(IntWritable.class);//指定reduce输出的value类型
      //        job.setNumReduceTasks(1);//这是默认的
              //output
      //        job.setOutputFormatClass(TextOutputFormat.class);//这是默认的输出类
              Path outputPath = new Path(args[1]);//用程序的第二个参数作为输出路径
              //如果输出目录已存在,就删除
              FileSystem hdfs = FileSystem.get(this.getConf());
              if(hdfs.exists(outputPath)){
                  hdfs.delete(outputPath,true);
              }
              //设置输出的地址
              TextOutputFormat.setOutputPath(job,outputPath);
      
              /**
               * 第三:提交job
               */
              //提交job运行,并返回boolean值,成功返回true,失败返回false
              return job.waitForCompletion(true) ? 0 : -1;
          }
      
          /**
           * 整个程序的入口,负责调用当前类的run方法
           * @param args
           */
          public static void main(String[] args) {
              //构造一个conf对象,用于管理当前程序的所有配置
              Configuration conf = new Configuration();
              //配置压缩
              conf.set("mapreduce.map.output.compress","true");
              conf.set("mapreduce.map.output.compress.codec","org.apache.hadoop.io.compress.Lz4Codec");
              try {
                  //调用当前类的run方法
                  int status = ToolRunner.run(conf, new HotKeyMR(), args);
                  //根据程序运行的 结果退出
                  System.exit(status);
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      
      
          /**
           * Mapper的类,实现四个泛型,inputkey,inputValue,outputKey,outputValue
           * 输入的泛型:由输入的类决定:TextInputFormat:Longwritable Text
           * 输出的泛型:由代码逻辑决定:Text,IntWritable
           * 重写map方法
           */
          public static class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable>{
              //构造用于输出的key和value
              private Text outputKey = new Text();
              private IntWritable outputValue = new IntWritable(1);
      
              /**
               * map方法:Input传递过来的每一个keyvalue会调用一次map方法
               * @param key:当前的 key
               * @param value:当前的value
               * @param context:上下文,负责将新的keyvalue输出
               * @throws IOException
               * @throws InterruptedException
               */
              @Override
              protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                  //将每一行的内容转换为String
                  String line = value.toString();
                  //对每一行的内容分割
                  String[] words = line.split("\t");
                  //取第二个字段,用户的搜索词作为key
                  this.outputKey.set(words[2]);
                  //输出
                  context.write(this.outputKey,this.outputValue);
              }
          }
      
          /**
           * 所有的Reduce都需要实现四个泛型
           * 输入的keyvalue:就是Map的 输出的keyvalue类型
           * 输出的keyvalue:由代码逻辑决定
           * 重写reduce方法
           */
          public static class WordCountReduce extends Reducer<Text, IntWritable,Text, IntWritable>{
      
              private IntWritable outputValue = new IntWritable();
      
              /**
               * reduce方法 ,每一个keyvalue,会调用一次reduce方法
               * @param key:传进来的key
               * @param values:迭代器,当前key的所有value
               * @param context
               * @throws IOException
               * @throws InterruptedException
               */
              @Override
              protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                  //取出迭代器的值进行累加
                  int sum = 0;
                  for (IntWritable value : values) {
                      sum += value.get();
                  }
                  //封装成输出的value
                  this.outputValue.set(sum);
                  //输出每一个key的结果
                  context.write(key,outputValue);
              }
          }
      
      
      }
      
      

五、MapReduce中的Join方案

1、Reduce Join

  • join发生在reduce端

  • SQL中的join

    inner join:两边都有结果才有
    left join:左边有,结果就有
    right join:右边有,结果就有
    full join:任意一边有,结果就有
    
    
    select * from a join b = select * from a,b  => 笛卡尔积 【大数据表的join,严禁产生笛卡尔积】
    
    
    join:列的关联
    	a		id		name 		age		sex
    	b		id		phone		addr
    	查询张三的手机号
    	
    union:行的关联
    	a		id		name 		age		sex:大一的 数据
        b		id		name 		age		sex:大二的数据
       
    
  • 需求:两份数据

    • 订单数据

      1001,20150710,p0001,2
      1002,20150710,p0002,3
      1002,20150710,p0003,3
      
      订单编号,日期,商品id,商品数量
      
    • 商品数据

      p0001,直升机,1000,2000
      p0002,坦克,1000,3000
      p0003,火箭,10000,2000
      
      商品id,商品的名称,价格,库存
      
    • 关联:得到每个订单的商品名称

      1001,20150710,p0001,2  直升机
      1002,20150710,p0002,3  坦克
      1002,20150710,p0003,3  火箭
      
    • 分析:MapReduce 实现

      • 第一步:结果

        • 除了包含订单的数据,还要包含商品名称
      • 第二步:看是否有分组或者排序

        • 分组:join的字段
        • key:商品id
      • 第三步:value

        • 对于订单数据来说,除了商品id以外其他字段,就是value
        • 对于商品数据来说,除了商品id以外,只需要商品名称
      • 第四步:验证

        • Input

          1001,20150710,p0001,2
          1002,20150710,p0002,3
          1002,20150710,p0003,3
          p0001,直升机,1000,2000
          p0002,坦克,1000,3000
          p0003,火箭,10000,2000
          
        • map

          • key:商品id
          • value:
            • 如果是订单:订单编号,日期,商品数量
            • 如果是商品:商品名称
        • shuffle

          • 分组

            • 相同商品id对应的所有商品和订单数据在一个迭代器中
            p0001			直升机,1001,20150710,p0001,2
            p0002			坦克,1002,20150710,p0002,3
            p0003			火箭,1002,20150710,p0003,3
            
  • 实现

    /**
     * @ClassName ReduceJoin
     * @Description TODO Reduce端实现join的过程
     * @Date 2020/1/9 17:25
     * @Create By     Frank
     */
    public class ReduceJoin extends Configured implements Tool {
    
    
        /**
         * 具体整个MapReduce job的定义:构建、配置、提交
         * @param args
         * @return
         * @throws Exception
         */
        @Override
        public int run(String[] args) throws Exception {
            /**
             * 构建一个job
             */
            //创建一个job的实例
            Job job = Job.getInstance(this.getConf(),"mrjob");
            //设置job运行的类
            job.setJarByClass(ReduceJoin.class);
    
            /**
             * 配置job
             */
            //input:定义输入的方式,输入的路径
            Path orderPath = new Path("datas/mrjoin/orders.txt");
            Path productPath = new Path("datas/mrjoin/product.txt");
            TextInputFormat.setInputPaths(job,orderPath,productPath);
            //map:定义Map阶段的类及输出类型
            job.setMapperClass(MRJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //shuffle:定义shuffle阶段实现的类
            //reduce:定义reduce阶段的类及输出类型
            job.setReducerClass(MRJoinReduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.setNumReduceTasks(1);//设置Reduce的个数,就是分区的个数
            //output:定义输出的类以及输出的路径
            Path outputPath = new Path("datas/output/join/reducejoin");
            //如果输出存在,就删除
            FileSystem hdfs = FileSystem.get(this.getConf());
            if(hdfs.exists(outputPath)){
                hdfs.delete(outputPath,true);
            }
            TextOutputFormat.setOutputPath(job,outputPath);
    
            /**
             * 提交job:并根据job运行的结果返回
             */
            return job.waitForCompletion(true) ? 0:-1;
        }
    
    
        /**
         * 程序的入口
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            //构建一个Conf对象,用于管理当前程序的所有配置
            Configuration conf = new Configuration();
            //调用当前类的run方法
            int status = ToolRunner.run(conf, new ReduceJoin(), args);
            //根据job运行的状态,来退出整个程序
            System.exit(status);
        }
    
        /**
         * 定义Mapper的实现类以及Map过程中的处理逻辑
         */
        public static class MRJoinMapper extends Mapper<LongWritable,Text,Text, Text>{
    
            private Text outputKey = new Text();
            private Text outputValue = new Text();
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //先判断当前的数据是哪个文件的
                FileSplit inputSplit = (FileSplit) context.getInputSplit();//先获取这条数据属于哪个文件分片
                String fileName = inputSplit.getPath().getName();//获取文件名称
                //如果是订单数据,key为第三个字段,value是其他剩余字段
                if("orders.txt".equals(fileName)){
                    String[] items = value.toString().split(",");
                    this.outputKey.set(items[2]);//商品id
                    this.outputValue.set(items[0]+"\t"+items[1]+"\t"+items[3]);//其他字段
                    context.write(this.outputKey,this.outputValue);//将订单数据输出
                }else{
                    //如果是商品数据,key为第一个字段,value是第二个字段
                    String[] split = value.toString().split(",");
                    this.outputKey.set(split[0]);//商品id
                    this.outputValue.set(split[1]);//商品名称
                    context.write(this.outputKey,this.outputValue);//输出商品数据
                }
    
    
            }
        }
    
        /**
         * 定义Reducer的实现类以及Reduce过程中的处理逻辑
         */
        public static class MRJoinReduce extends Reducer<Text,Text,Text,Text>{
    
            private Text outputValue = new Text();
    
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                StringBuilder stringBuilder = new StringBuilder();
                for (Text value : values) {
                    stringBuilder.append(value.toString()+"\t");//将所有的商品的名称和对应的所有订单进行拼接
                }
                this.outputValue.set(stringBuilder.toString());//将商品名称及订单作为value
                context.write(key,this.outputValue);
            }
        }
    
    }
    
    
  • 应用场景

    • 大的数据join大的数据

    • 比较时的效率非常低

    • 商品数据:6万

    • 订单数据:9万条

      Hadoop第二部分:MapReudce(三)

2、Map Join

  • 商品数据:1万

  • 订单数据:2000万条

  • 小的数据join大的数据

  • ==思想 :==将小数据放入分布式缓存,大数据的每个分片需要用到时,直接从分布式缓存中取,然后直接在Map端完成join,不用经过shuffle

Hadoop第二部分:MapReudce(三)

  • 实现

    public class MapJoin extends Configured implements Tool {
        /**
         * 具体整个MapReduce job的定义:构建、配置、提交
         * @param args
         * @return
         * @throws Exception
         */
        @Override
        public int run(String[] args) throws Exception {
            /**
             * 构建一个job
             */
            //创建一个job的实例
            Job job = Job.getInstance(this.getConf(),"mrjob");
            //设置job运行的类
            job.setJarByClass(MapJoin.class);
    
            /**
             * 配置job
             */
            //input:定义输入的方式,输入的路径
            Path orderPath = new Path("datas/mrjoin/orders.txt");
            TextInputFormat.setInputPaths(job,orderPath);
            //将商品的数据放入分布式缓存
            Path productPath = new Path("datas/mrjoin/product.txt");
            job.addCacheFile(productPath.toUri());
            //map:定义Map阶段的类及输出类型
            job.setMapperClass(MRJoinMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            //shuffle:定义shuffle阶段实现的类
            //reduce:定义reduce阶段的类及输出类型
    //        job.setReducerClass(MRJoinReduce.class);
    //        job.setOutputKeyClass(Text.class);
    //        job.setOutputValueClass(Text.class);
            job.setNumReduceTasks(0);//设置Reduce的个数,就是分区的个数
            //output:定义输出的类以及输出的路径
            Path outputPath = new Path("datas/output/join/mapjoin");
            //如果输出存在,就删除
            FileSystem hdfs = FileSystem.get(this.getConf());
            if(hdfs.exists(outputPath)){
                hdfs.delete(outputPath,true);
            }
            TextOutputFormat.setOutputPath(job,outputPath);
    
            /**
             * 提交job:并根据job运行的结果返回
             */
            return job.waitForCompletion(true) ? 0:-1;
        }
    
    
        /**
         * 程序的入口
         * @param args
         * @throws Exception
         */
        public static void main(String[] args) throws Exception {
            //构建一个Conf对象,用于管理当前程序的所有配置
            Configuration conf = new Configuration();
            //调用当前类的run方法
            int status = ToolRunner.run(conf, new MapJoin(), args);
            //根据job运行的状态,来退出整个程序
            System.exit(status);
        }
    
        /**
         * 定义Mapper的实现类以及Map过程中的处理逻辑
         */
        public static class MRJoinMapper extends Mapper<LongWritable,Text,Text, Text>{
    
            private Text outputKey = new Text();
            private Text outputValue = new Text();
            Map<String,String> maps = new HashMap<>();
    
            /**
             * Map和Reduce的类:三个方法
             *      1-setup:会在map或者reduce方法之前执行
             *      2-map/reduce:map逻辑或者reduce逻辑
             *      3-close:最后执行的方法
             * @param context
             * @throws IOException
             * @throws InterruptedException
             */
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                //将分布式缓存的 数据读取进来
                URI[] cacheFiles = context.getCacheFiles();//获取所有的缓存数据
                //读取文件内容
                BufferedReader bufferedReader = new BufferedReader(new FileReader(cacheFiles[0].getPath()));
                String line = null;
                while(StringUtils.isNotBlank(line = bufferedReader.readLine())){
                    //读取到每一行的内容
                    String pid = line.split(",")[0];//商品id
                    String productName = line.split(",")[1];//商品名称
                    //将商品id和名称放入map集合
                    maps.put(pid,productName);
                }
            }
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                //获取订单数据
                String[] items = value.toString().split(",");
                String pid = items[2]; //订单中的商品id
                String productName = maps.get(pid);
                this.outputKey.set(productName);
                this.outputValue.set(value.toString());
                context.write(this.outputKey,this.outputValue);
            }
        }
    
        /**
         * 定义Reducer的实现类以及Reduce过程中的处理逻辑
         */
        public static class MRJoinReduce extends Reducer<Text,Text,Text,Text>{
    
            private Text outputValue = new Text();
    
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                StringBuilder stringBuilder = new StringBuilder();
                for (Text value : values) {
                    stringBuilder.append(value.toString()+"\t");//将所有的商品的名称和对应的所有订单进行拼接
                }
                this.outputValue.set(stringBuilder.toString());//将商品名称及订单作为value
                context.write(key,this.outputValue);
            }
        }
    
    }
    
    
  • 应用场景

    • 适合于小数据量join大数据量的场景

六、读写数据库

1、Input和Output

  • Input:所有的Input都要继承自InputFormat
    • 默认:TextInputFormat extends FileInputFormat extend InputFormat
    • 文件
    • 数据库
  • Output:所有的Input都要继承自OutputFormat
    • 默认:TextOutputFormat extends FileOutputFormat extends OutputFormat
    • 文件
    • 数据库
  • 数据库:JDBC

2、读MySQL

  • 修改输入的类

    job.setInputFormatClass(DBInputFormat.class);
    
  • 自定义一个数据类型用于接收MySQL中的数据

    • 除了要实现Writable接口以外,还要实现DBWritable【数据库数据的序列化】

      public  class DBReader implements Writable,DBWritable
      
    • 实现数据库对象的序列化及反序列化

      	public void write(PreparedStatement statement) throws SQLException {
      		// TODO Auto-generated method stub
      		statement.setString(1, word);
      		statement.setInt(2,number);
      
      	}
      
      	public void readFields(ResultSet resultSet) throws SQLException {
      		// TODO Auto-generated method stub
      		this.word = resultSet.getString(1);
      		this.number = resultSet.getInt(2);
      
      	}
      
  • 创建了conf对象以后,要立马配置jdbc的连接参数

    Configuration conf = new Configuration();
    DBConfiguration.configureDB(
        conf, 
        "com.mysql.jdbc.Driver", 
        "jdbc:mysql://localhost:3306/test",
        "root", 
        "123456"
    );
    
  • 配置读取的数据:表、SQL语句、字段

    DBInputFormat.setInput(
        job, 
        DBReader.class, //存放读取结果的对象
        "wcresult", //表明
        null, //过滤条件
        "number",//排序的字段
        fields  //读取哪些字段
    );
    
    public static void setInput(
    	  Job job,
          Class<? extends DBWritable> inputClass,  //存放读取结果的对象
          String inputQuery,  //SQL语句,SQL语句中返回的字段必须与inputClass属性一致
          String inputCountQuery
          ) {
      }
    

2、写MySQL

  • 配置输出的类

    job.setOutputFormatClass(DBOutputFormat.class);
    
  • 配置输出的参数:表、字段

    DBOutputFormat.setOutput(job, "wcresult", fields);
    
  • 程序会将key输出到MySQL的对象必须实现DBWritable接口【序列化方法】

    public static class WriteMap extends Mapper<LongWritable, Text, DBReader, NullWritable>{
    	
    		private DBReader outputKey = new DBReader();
    		private NullWritable outputValue = NullWritable.get();
    		
    		@Override
    		protected void map(LongWritable key, Text value,
    				Context context)
    						throws IOException, InterruptedException {
    			// TODO Auto-generated method stub
    			String line = value.toString();
    			this.outputKey.setWord(line.split("\t")[0]);
    			this.outputKey.setNumber(Integer.valueOf(line.split("\t")[1]));
    			context.write(outputKey, outputValue);
    		}
    	}
    
相关标签: 大数据 hadoop