欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MapReduce离线计算框架快速入门

程序员文章站 2024-03-08 09:31:52
...

先来聊一聊什么是计算框架

是指实现某项任务或某项工作从开始到结束的计算过程或流的结构

图解:mapReduce的原理分析

  • 1.在HDFS中读取数据输入到MapReduce计算框架中
  • 2.进行数据的切分
  • 3.进行数据的进一步切分
  • 4.进行map拆分
  • 5.进行partitioner计算并对key进行排序
  • 6.进行reduce汇总计算
  • 7.将结果输出到HDFS文件系统
    MapReduce离线计算框架快速入门

什么是并行计算框架

一个大的任务拆分成多个小任务,将多个小任务分发到多个节点中,每个节点同时执行计算

MapReduce离线计算框架快速入门

hadoop为什么比传统数据快

  • 分布式存储
  • 分布式并行计算
  • 节点横向扩展
  • 移动程序到数据段
  • 多个数据副本

MapReduce的核心思想

分而治之,先分后合:将一个大的,复杂的工作或者任务,拆分成多个小任务,并行计算,最终进行合并
MapReduce由Map和Reduce组成
Map:将数据进行拆分
Reduce:对数据进行汇总

MapReduce离线计算框架快速入门

WordCount计算实例代码

需求:计算每个单词出现的次数

WordCount-Map实现

1、实例一个class 继承Mapper<输入的key的数据类型,输入的value的数据类型,输出的key的数据类型,输出的value的数据类型
2、重写map方法 map(LongWritable key, Text value, Context context)

  • key: 行首字母的偏移量
  • value: 一行数据
  • context:上下文对象

3、根据业务需求进行切分,然后逐一输出

package com.czxy.hadoop.mapreduce.demo02;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * @author aaa@qq.com
 * @version v 1.0
 * @date 2019/11/12
 */
public class WordCountMapper  extends Mapper<LongWritable, Text,Text, IntWritable> {

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1.切分
        String[] words = value.toString().split("\t");
        //2.遍历  输出
        for (String word : words) {
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

WordCount-Reduce实现

1、实例一个class 继承Reducer<输入的key的数据类型,输入的value的数据类型,输出的key的数据类型,输出的value的数据类型
2、重写reduce方法 reduce(Text key, Iterable values, Context context)

  • key: 去重后单词
  • values: 标记的1(好多个1,key出现几次就有几个1)
  • context:上下文对象

3、遍历values 进行汇总计算

package com.czxy.hadoop.mapreduce.demo02;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * @author aaa@qq.com
 * @version v 1.0
 * @date 2019/11/12
 */
public class WordCountRedduce extends Reducer<Text, IntWritable,Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //1.定义sum
        int sum = 0;
        //2.求和
        for (IntWritable value : values) {
            sum += value.get();
        }
        //3.输出
        context.write(key, new IntWritable(sum));
    }
}

WordCount-Driver实现

1、实例一个class 继承Configured 实现Tool
2、重写run方法
3、在run方法中将自己编写的map和reduce添加到集群

package com.czxy.hadoop.mapreduce.demo02;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author aaa@qq.com
 * @version v 1.0
 * @date 2019/11/12
 */
public class WordCountApp extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //1.实例job
        Job job = Job.getInstance(new Configuration(), "wordCount01");
        //2.设置输入的类 与 路径
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.setInputPaths(job, new Path("phone_data .txt"));
        //3.设置map 与 k v
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        //4.设置reduce 与 k v
        job.setReducerClass(WordCountRedduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //5.设置输出类与路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("output"));
        //6.开启作业
        boolean result = job.waitForCompletion(true);
        return result ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        ToolRunner.run(new WordCountApp(), args);
    }
}