hadoop streaming编程和golang实现mr demo
hadoop streaming是什么?为什么要用hadoop streaming?hadoop streaming怎么用?接下来我们就来解决这些问题。
1、首先,hadoop streaming是一种编程工具,它是由hadoop提供的。
2、为什么要用hadoop streaming呢?
hadoop框架是用java语言写的,也就是说,hadoop框架中运行的所有应用程序都要用java语言来写才能正常地在hadoop集群中运行。那么问题来了,如果有些开发者就是不会java语言,但是又想使用mapreduce这个并行计算模型的话,那该怎么办?
1)就是基于这样的考虑,所以hadoop提供了hadoop streaming这个编程工具,它支持用任何编程语言来编写mapreduce的map函数和reduce函数。
2)但是,map/reduce函数的数据流必须遵循相应编程语言的标准输入输出(stdin、stdout),即你用什么编程语言实现业务逻辑,就必须要通过该语言的标准输入stdin读取数据,通过该语言的标准输出stdout输出数据。
3) 比如对于unix/linux,cat、awk、grep等这些都可以做为标准输入获取数据,而诸如重定向符>、>>、管道符(|)等都可以作为标准输出。
而对于c++,标准输入时cin,标准输出是cout
......
hadoop streaming的数据流:
hadoop streaming是如何处理数据的?
hadoop streaming通过用户编写的map函数中标准输入读取数据(一行一行地读取),按照mapd函数的处理逻辑处理后,将处理后的数据由标准输出进行输出到下一个阶段,reduce函数也是按行读取数据,按照函数的处理逻辑处理完数据后将它们通过标准输出写到hdfs的指定目录中。
ps:不管使用的是何种编程语言,在map函数中,原始数据会被处理成<key,value>的形式,但是key与value之间必须通过\t分隔符分隔,分隔符左边的是key,分隔符右边的是value,如果没有使用\t分隔符,那么整行都会被当作key处理。
3、理解了前两个问题后,我们对hadoop streaming已经有了基本的了解,接下来就应该了解该如何使用hadoop streaming这个编程工具了。
先通过linux shell查看一下它的用法:
可以看到它的用法是:$HADOOP_HOME/bin jar hadoop-streaming.jar [options]
options是可选项
例子1:
以shell命令作为mapper类以及 reducer类的实现
hadoop jar ../share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-input /hadoop-streaming/* \
-output /hadoop-output \
-mapper /bin/cat \
-reducer /bin/wc
ps1:每行最后的\符号的作用是转义,将换行符转以为普通字符,因为我们不想在一行中输完全部内容,所以用转义字符转义换行符,以在下行行中输入未输完的内容
ps2:输入文件必须在hdfs上,输出也是输出到hdfs上
ps3:为了避免发生函数脚本找不到的问题,最好使用-file参数,将脚本文件提交到集群中
例子2:
以shell脚本作为mapper和reducer的实现:
mapper.sh、reducer.sh
mapper.sh
#!/bin/bash
cat
reducer.sh
#!/bin/bash
wc
运行:
hadoop jar ../share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-input /hadoop-streaming/* \
-output /hadoop-output \
-mapper mapper.sh \
--reducer reducer.sh \
-file mapper.sh \
-file reducer.sh
例子3:
mapper和reducer以shell脚本处理较复杂的逻辑,如单词统计,每行都有多个单词
mapper.sh
#!/bin/bash
while read line
do
for word in $line
do
echo $word 1
done
done
reducer.sh
#!/bin/bash
count=0
read word1
reduce-word=`echo $word1 | awk $1`
while read word2
do
map-word=`echo $-word | awk $1`
if [ $reduce-word = $map-word ]
count=count+1
else
echo $reduce-word $count
count=0
reduce-word=map-word
count=count+1
fi
done
运行:
hadoop jar ../share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-input /hadoop-streaming/* \
-output /hadoop-output \
-mapper mapper.sh \
-reducer reducer.sh \
-file mapper.sh \
-file reducer.sh
在Golang中编写Map / Reduce任务
代码:
package main
import (
"github.com/vistarmedia/gossamr"
"log"
"strings"
)
type WordCount struct{}
func (wc *WordCount) Map(p int64, line string, c gossamr.Collector) error {
for _, word := range strings.Fields(line) {
c.Collect(strings.ToLower(word), int64(1))
}
return nil
}
func (wc *WordCount) Reduce(word string, counts chan int64, c gossamr.Collector) error {
var sum int64
for v := range counts {
sum += v
}
c.Collect(sum, word)
return nil
}
func main() {
wordcount := gossamr.NewTask(&WordCount{})
err := gossamr.Run(wordcount)
if err != nil {
log.Fatal(err)
}
}
运行:
./bin/hadoop jar ./contrib/streaming/hadoop-streaming-1.2.1.jar \
-input /mytext.txt \
-output /output.15 \
-mapper "gossamr -task 0 -phase map" \
-reducer "gossamr -task 0 -phase reduce" \
-io typedbytes \
-file ./wordcount
-numReduceTasks 6
原文链接:https://www.jianshu.com/p/c3fc0400406d