欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

学习笔记—MapReduce

程序员文章站 2023-12-03 17:24:52
MapReduce是什么 MapReduce是一种分布式计算编程框架,是Hadoop主要组成部分之一,可以让用户专注于编写核心逻辑代码,最后以高可靠、高容错的方式在大型集群上并行处理大量数据。 MapReduce的存储 MapReduce的数据是存储在HDFS上的,HDFS也是Hadoop的主要组成 ......

mapreduce是什么

mapreduce是一种分布式计算编程框架,是hadoop主要组成部分之一,可以让用户专注于编写核心逻辑代码,最后以高可靠、高容错的方式在大型集群上并行处理大量数据。

mapreduce的存储

mapreduce的数据是存储在hdfs上的,hdfs也是hadoop的主要组成部分之一。下边是mapreduce在hdfs上的存储的图解

学习笔记—MapReduce

hdfs主要有namenode和datanode两部分组成,整个集群有一个namenode和多个datanode,通常每一个节点一个datanode,namenode的主要功能是用来管理客户端client对数据文件的操作请求和储存数据文件的地址。datanode主要是用来储存和管理本节点的数据文件。节点内部数据文件被分为一个或多个block块(block默认大小原来是64mb,后来变为128mb),然后这些块储存在一组datanode中。(这里不对hdfs做过多的介绍,后续会写一篇详细的hdfs笔记)

mapreduce的运行流程

学习笔记—MapReduce

学习笔记—MapReduce

1、首先把需要处理的数据文件上传到hdfs上,然后这些数据会被分为好多个小的分片,然后每个分片对应一个map任务,推荐情况下分片的大小等于block块的大小。然后map的计算结果会暂存到一个内存缓冲区内,该缓冲区默认为100m,等缓存的数据达到一个阈值的时候,默认情况下是80%,然后会在磁盘创建一个文件,开始向文件里边写入数据。

2、map任务的输入数据的格式是<key,value>对的形式,我们也可以自定义自己的<key,value>类型。然后map在往内存缓冲区里写入数据的时候会根据key进行排序,同样溢写到磁盘的文件里的数据也是排好序的,最后map任务结束的时候可能会产生多个数据文件,然后把这些数据文件再根据归并排序合并成一个大的文件。

3、然后每个分片都会经过map任务后产生一个排好序的文件,同样文件的格式也是<key,value>对的形式,然后通过对key进行hash的方式把数据分配到不同的reduce里边去,这样对每个分片的数据进行hash,再把每个分片分配过来的数据进行合并,合并过程中也是不断进行排序的。最后数据经过reduce任务的处理就产生了最后的输出。

4、在我们开发中只需要对中间map和reduce的逻辑进行开发就可以了,中间分片,排序,合并,分配都有mapreduce框架帮我完成了。

mapreduce的资源调度系统

最后我们来看一下mapreduce的资源调度系统yarn。

学习笔记—MapReduce

yarn的基本思想是将资源管理和作业调度/监视的功能分解为单独的守护进程。全局唯一的resourcemanager是负责所有应用程序之间的资源的调度和分配,每个程序有一个applicationmaster,applicationmaster实际上是一个特定于框架的库,其任务是协调来自resourcemanager的资源,并与nodemanager一起执行和监视任务。nodemanager是每台机器框架代理,监视其资源使用情况(cpu,内存,磁盘,网络)并将其报告给resourcemanager。

wordconut代码

  • python实现

map.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys

for line in sys.stdin:
    words = line.strip().split()
    for word in words:
        print('%s\t%s' % (word, 1))

reduce.py

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import sys

current_word = none
sum = 0

for line in sys.stdin:
    word, count = line.strip().split(' ')

    if current_word == none:
        current_word = word

    if word != current_word:
        print('%s\t%s' % (current_word, sum))
        current_word = word
        sum = 0

    sum += int(count)

print('%s\t%s' % (current_word, sum))

我们先把输入文件上传到hdfs上去

hadoop fs -put /input.txt /

​ 然后在linux下运行,为了方便我们把命令写成了shell文件

hadoop_cmd="/usr/local/src/hadoop-2.6.1/bin/hadoop"
stream_jar_path="/usr/local/src/hadoop-2.6.1/share/hadoop/tools/lib/hadoop-streaming-2.6.1.jar"

input_file_path="/input.txt"
output_file_path="/output"

$hadoop_cmd fs -rmr -skiptrush $output_file_path

$hadoop_cmd jar $stream_jar_path \
    -input $input_file_path \
    -output $output_file_path \
    -mapper "python map.py" \
    -reducer "python reduce.py" \
    -file "./map.py" \
    -file "./reduce.py" 
  • java实现

mymap.java

import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.mapper;

import java.io.ioexception;

public class mymap extends mapper<longwritable, text, text, intwritable> {

    private intwritable one = new intwritable(1);
    private text text = new text();

    @override
    protected void map(longwritable key, text value, context context) throws ioexception, interruptedexception {
        string line = value.tostring();
        string[] words = line.split(" ");

        for (string word: words){
            text.set(word);
            context.write(text,one);
        }
    }
}

myreduce.java

import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.reducer;

import java.io.ioexception;

public class myreduce extends reducer<text, intwritable, text, intwritable> {
    private intwritable result = new intwritable();
    @override
    protected void reduce(text key, iterable<intwritable> values, context context) throws ioexception, interruptedexception {
        int sum = 0;
        for (intwritable i:values){
            sum+=i.get();
        }
        result.set(sum);
        context.write(key,result);
    }
}

wordcount.java

import org.apache.hadoop.conf.configuration;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.intwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;


public class wordcount {
    public static void main(string[] args) throws exception {
        configuration configuration = new configuration();
        job job = job.getinstance(configuration, "wordcount");
        job.setjarbyclass(wordcount.class);
        job.setmapperclass(mymap.class);
        job.setreducerclass(myreduce.class);
        job.setoutputkeyclass(text.class);
        job.setoutputvalueclass(intwritable.class);
        fileinputformat.addinputpath(job, new path(args[0]));
        fileoutputformat.setoutputpath(job, new path(args[1]));
        system.exit(job.waitforcompletion(true) ? 0 : 1);
    }
}

把工程打成jar包,然后把jar包和输入文件上传到hdfs

$ hadoop fs -put /wordcount.jar /
$ hadoop fs -put /input.txt /

执行wordcount任务

$ bin/hadoop jar wordcount.jar wordcount /input.txt /user/joe/wordcount/output

欢迎关注公众号:「努力给自己看」

学习笔记—MapReduce