使用python编写hadoop的mapper和reducer的实例详解
hadoop streaming 原理
hadoop 本身是用 java 开发的,程序也需要用 java 编写,但是通过 hadoop streaming,我们可以使用任意语言来编写程序,让 hadoop 运行。
hadoop streaming 就是通过将其他语言编写的 mapper 和 reducer 通过参数传给一个事先写好的 java 程序(hadoop 自带的 *-streaming.jar),这个 java 程序会负责创建 mr 作业,另开一个进程来运行 mapper,将得到的输入通过 stdin 传给它,再将 mapper 处理后输出到 stdout 的数据交给 hadoop,经过 partition 和 sort 之后,再另开进程运行 reducer,同样通过 stdin/stdout 得到最终结果。
python的mapreduce代码
因此,使用python编写mapreduce代码的技巧就在于我们使用了 hadoopstreaming 来帮助我们在map 和 reduce间传递数据通过stdin (标准输入)和stdout (标准输出).我们仅仅使用python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为hadoopstreaming会帮我们办好其他事。
创建文件,上传文件
当前路径下,创建一本,包含英文单词(后面mapper 和reduce 统计单词频次需要使用)
hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -mkdir -p /input hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -put ./book.txt /input
编写mapper.py 文件
将下列的代码保存在/home/hadoop/example/mapper.py中,他将从stdin读取数据并将单词成行分隔开,生成一个列表映射单词与发生次数的关系:
注意:要确保这个脚本有足够权限(chmod +x mapper.py)
#!/usr/bin/env python import sys # input comes from stdin (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to stdout (standard output); # what we output here will be the input for the # reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1)
编写reducer 文件
将代码存储在/home/hadoop/example/reducer.py 中,这个脚本的作用是从mapper.py 的stdout中读取结果,然后计算每个单词出现次数的总和,并输出结果到stdout。
同样,要注意脚本权限:chmod +x reducer.py
#!/usr/bin/env python from operator import itemgetter import sys current_word = none current_count = 0 word = none # input comes from stdin for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) except valueerror: # count was not a number, so silently # ignore/discard this line continue # this if-switch only works because hadoop sorts map output # by key (here: word) before it is passed to the reducer if current_word == word: current_count += count else: if current_word: # write result to stdout print '%s\t%s' % (current_word, current_count) current_count = count current_word = word # do not forget to output the last word if needed! if current_word == word: print '%s\t%s' % (current_word, current_count)
自测
在运行mapreduce job测试前尝试手工测试你的mapper.py 和 reducer.py脚本,以免得不到任何返回结果。这里有一些建议,关于如何测试你的map和reduce的功能:
hadoop@derekubun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" | ./mapper.py foo 1 foo 1 quux 1 labs 1 foo 1 bar 1 quux 1 hadoop@derekubun:/usr/local/hadoop$ echo "foo foo quux labs foo bar quux" |./mapper.py | sort |./reducer.py bar 1 foo 3 labs 1 quux 2
hadoop 运行
一切准备就绪,我们将在运行python mapreduce job 在hadoop集群上。像我上面所说的,我们使用的是hadoopstreaming 帮助我们传递数据在map和reduce间并通过stdin和stdout,进行标准化输入输出。
hadoop@derekubun:/usr/local/hadoop$ hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.6.jar \ -mapper 'python mapper.py' \ -file /home/hadoop/example/mapper.py \ -reducer 'python reducer.py' \ -file /home/hadoop/example/reducer.py -input hdfs:/input/book.txt \ -output output
第一行是告诉 hadoop 运行 streaming 的 java 程序,接下来的是参数:
这里的mapper 后面跟的其实是一个命令。也就是说,-mapper 和 -reducer 后面跟的文件名不需要带上路径。而 -file 后的参数需要带上路径,为了让 hadoop 将程序分发给其他机器,需要-file 参数指明要分发的程序放在哪里。
注意:如果你在 mapper 后的命令用了引号,加上路径名反而会报错说找不到这个程序。(因为 -file 选项会将对应的本地参数文件上传至 hadoop streaming 的工作路径下,所以再执行 -mapper 对应的参数命令能直接找到对应的文件。
-input 和 -output 后面跟的是 hdfs 上的路径名,这里的 input/book.txt 指的是input 文件夹下刚才上传的文本文件,注意 -output 后面跟着的需要是一个不存在于 hdfs 上的路径,在产生输出的时候 hadoop 会帮你创建这个文件夹,如果已经存在的话就会产生冲突。因此每次执行 hadoop streaming 前可以通过脚本命令 hadoop fs -rmr 清除输出路径。
由于 mapper 和 reducer 参数跟的实际上是命令,所以如果每台机器上 python 的环境配置不一样的话,会用每台机器自己的配置去执行 python 程序。
结果获取
如果运行中遇到问题,注意看报错,然后进行调整。
运行结束之后,结果存储在hdfs上 output目录下。
查看结果:hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -ls output
从hdfs拷贝至本地:hadoop@derekubun:/usr/local/hadoop$ bin/hdfs dfs -get output/* ./
注:如果结果中包含_success 则说明本次运行成功。