欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用python编写hadoop的mapper和reducer的实例详解

程序员文章站 2022-03-08 22:30:10
hadoop streaming 原理 hadoop 本身是用 java 开发的,程序也需要用 java 编写,但是通过 hadoop streaming,我们可以使用任意语言来编写程序,让 had...

hadoop streaming 原理

hadoop 本身是用 java 开发的,程序也需要用 java 编写,但是通过 hadoop streaming,我们可以使用任意语言来编写程序,让 hadoop 运行。

hadoop streaming 就是通过将其他语言编写的 mapper 和 reducer 通过参数传给一个事先写好的 java 程序(hadoop 自带的 *-streaming.jar),这个 java 程序会负责创建 mr 作业,另开一个进程来运行 mapper,将得到的输入通过 stdin 传给它,再将 mapper 处理后输出到 stdout 的数据交给 hadoop,经过 partition 和 sort 之后,再另开进程运行 reducer,同样通过 stdin/stdout 得到最终结果。

python的mapreduce代码

因此,使用python编写mapreduce代码的技巧就在于我们使用了 hadoopstreaming 来帮助我们在map 和 reduce间传递数据通过stdin (标准输入)和stdout (标准输出).我们仅仅使用python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为hadoopstreaming会帮我们办好其他事。


创建文件,上传文件

当前路径下,创建一本,包含英文单词(后面mapper 和reduce 统计单词频次需要使用)

hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -mkdir -p /input
hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -put ./book.txt /input
编写mapper.py 文件

将下列的代码保存在/home/hadoop/example/mapper.py中,他将从stdin读取数据并将单词成行分隔开,生成一个列表映射单词与发生次数的关系:

注意:要确保这个脚本有足够权限(chmod +x mapper.py)

#!/usr/bin/env python
import sys
# input comes from stdin (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to stdout (standard output);
        # what we output here will be the input for the
        # reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)
编写reducer 文件

将代码存储在/home/hadoop/example/reducer.py 中,这个脚本的作用是从mapper.py 的stdout中读取结果,然后计算每个单词出现次数的总和,并输出结果到stdout。

同样,要注意脚本权限:chmod +x reducer.py

#!/usr/bin/env python
from operator import itemgetter
import sys

current_word = none
current_count = 0
word = none

# input comes from stdin
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    # convert count (currently a string) to int
    try:
        count = int(count)
    except valueerror:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this if-switch only works because hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to stdout
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)
自测

在运行mapreduce job测试前尝试手工测试你的mapper.py 和 reducer.py脚本,以免得不到任何返回结果。这里有一些建议,关于如何测试你的map和reduce的功能:

hadoop@derekubun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" | ./mapper.py
foo      1
foo      1
quux     1
labs     1
foo      1
bar      1
quux     1
hadoop@derekubun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" |./mapper.py | sort |./reducer.py
bar     1
foo     3
labs    1
quux    2
hadoop 运行

一切准备就绪,我们将在运行python mapreduce job 在hadoop集群上。像我上面所说的,我们使用的是hadoopstreaming 帮助我们传递数据在map和reduce间并通过stdin和stdout,进行标准化输入输出。

hadoop@derekubun:/usr/local/hadoop$ hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.6.jar  \
-mapper 'python mapper.py' \
-file /home/hadoop/example/mapper.py \
-reducer 'python reducer.py' \
-file /home/hadoop/example/reducer.py 
-input hdfs:/input/book.txt \
-output output

第一行是告诉 hadoop 运行 streaming 的 java 程序,接下来的是参数:

这里的mapper 后面跟的其实是一个命令。也就是说,-mapper 和 -reducer 后面跟的文件名不需要带上路径。而 -file 后的参数需要带上路径,为了让 hadoop 将程序分发给其他机器,需要-file 参数指明要分发的程序放在哪里。

注意:如果你在 mapper 后的命令用了引号,加上路径名反而会报错说找不到这个程序。(因为 -file 选项会将对应的本地参数文件上传至 hadoop streaming 的工作路径下,所以再执行 -mapper 对应的参数命令能直接找到对应的文件。

-input 和 -output 后面跟的是 hdfs 上的路径名,这里的 input/book.txt 指的是input 文件夹下刚才上传的文本文件,注意 -output 后面跟着的需要是一个不存在于 hdfs 上的路径,在产生输出的时候 hadoop 会帮你创建这个文件夹,如果已经存在的话就会产生冲突。因此每次执行 hadoop streaming 前可以通过脚本命令 hadoop fs -rmr 清除输出路径。

由于 mapper 和 reducer 参数跟的实际上是命令,所以如果每台机器上 python 的环境配置不一样的话,会用每台机器自己的配置去执行 python 程序。

结果获取

如果运行中遇到问题,注意看报错,然后进行调整。

运行结束之后,结果存储在hdfs上 output目录下。

查看结果:hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -ls output

从hdfs拷贝至本地:hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -get output/* ./

注:如果结果中包含_success 则说明本次运行成功。