欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

hadoop streaming编程和golang实现mr demo

程序员文章站 2022-04-28 14:49:27
...

hadoop streaming是什么?为什么要用hadoop streaming?hadoop streaming怎么用?接下来我们就来解决这些问题。

1、首先,hadoop streaming是一种编程工具,它是由hadoop提供的。
2、为什么要用hadoop streaming呢?
hadoop框架是用java语言写的,也就是说,hadoop框架中运行的所有应用程序都要用java语言来写才能正常地在hadoop集群中运行。那么问题来了,如果有些开发者就是不会java语言,但是又想使用mapreduce这个并行计算模型的话,那该怎么办?
1)就是基于这样的考虑,所以hadoop提供了hadoop streaming这个编程工具,它支持用任何编程语言来编写mapreduce的map函数和reduce函数。
2)但是,map/reduce函数的数据流必须遵循相应编程语言的标准输入输出(stdin、stdout),即你用什么编程语言实现业务逻辑,就必须要通过该语言的标准输入stdin读取数据,通过该语言的标准输出stdout输出数据。
3) 比如对于unix/linux,cat、awk、grep等这些都可以做为标准输入获取数据,而诸如重定向符>、>>、管道符(|)等都可以作为标准输出。
而对于c++,标准输入时cin,标准输出是cout
......
hadoop streaming的数据流:
hadoop streaming是如何处理数据的?
hadoop streaming通过用户编写的map函数中标准输入读取数据(一行一行地读取),按照mapd函数的处理逻辑处理后,将处理后的数据由标准输出进行输出到下一个阶段,reduce函数也是按行读取数据,按照函数的处理逻辑处理完数据后将它们通过标准输出写到hdfs的指定目录中。

ps:不管使用的是何种编程语言,在map函数中,原始数据会被处理成<key,value>的形式,但是key与value之间必须通过\t分隔符分隔,分隔符左边的是key,分隔符右边的是value,如果没有使用\t分隔符,那么整行都会被当作key处理。

3、理解了前两个问题后,我们对hadoop streaming已经有了基本的了解,接下来就应该了解该如何使用hadoop streaming这个编程工具了。
先通过linux shell查看一下它的用法:

hadoop streaming编程和golang实现mr demo

可以看到它的用法是:$HADOOP_HOME/bin jar hadoop-streaming.jar [options]
options是可选项
例子1:
以shell命令作为mapper类以及 reducer类的实现

hadoop jar ../share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-input /hadoop-streaming/* \
-output /hadoop-output \
-mapper /bin/cat \
-reducer /bin/wc

ps1:每行最后的\符号的作用是转义,将换行符转以为普通字符,因为我们不想在一行中输完全部内容,所以用转义字符转义换行符,以在下行行中输入未输完的内容

ps2:输入文件必须在hdfs上,输出也是输出到hdfs上

ps3:为了避免发生函数脚本找不到的问题,最好使用-file参数,将脚本文件提交到集群中

例子2:
以shell脚本作为mapper和reducer的实现:
mapper.sh、reducer.sh

mapper.sh
#!/bin/bash
cat

reducer.sh
#!/bin/bash
wc

运行:

hadoop jar ../share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-input /hadoop-streaming/* \
-output /hadoop-output \
-mapper mapper.sh \
--reducer reducer.sh \
-file mapper.sh \
-file reducer.sh

例子3:
mapper和reducer以shell脚本处理较复杂的逻辑,如单词统计,每行都有多个单词
mapper.sh

#!/bin/bash
while read line
do
    for word in $line
    do
        echo $word  1
    done
done

reducer.sh

#!/bin/bash
count=0
read word1
reduce-word=`echo $word1 | awk  $1`
while read word2
do
    map-word=`echo $-word | awk  $1`
    if [ $reduce-word = $map-word ]
        count=count+1
    else
         echo $reduce-word  $count
         count=0
          reduce-word=map-word
          count=count+1
     fi
done

运行:

hadoop jar ../share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-input /hadoop-streaming/* \
-output /hadoop-output \
-mapper mapper.sh \
-reducer reducer.sh \
-file mapper.sh \
-file reducer.sh

在Golang中编写Map / Reduce任务

代码:

package main

import (
	"github.com/vistarmedia/gossamr"
	"log"
	"strings"
)

type WordCount struct{}

func (wc *WordCount) Map(p int64, line string, c gossamr.Collector) error {
	for _, word := range strings.Fields(line) {
		c.Collect(strings.ToLower(word), int64(1))
	}
	return nil
}

func (wc *WordCount) Reduce(word string, counts chan int64, c gossamr.Collector) error {
	var sum int64
	for v := range counts {
		sum += v
	}
	c.Collect(sum, word)
	return nil
}

func main() {
	wordcount := gossamr.NewTask(&WordCount{})
	err := gossamr.Run(wordcount)
	if err != nil {
		log.Fatal(err)
	}
}

运行:

	
./bin/hadoop jar ./contrib/streaming/hadoop-streaming-1.2.1.jar \
	
  -input /mytext.txt \
	
  -output /output.15 \
	
  -mapper "gossamr -task 0 -phase map" \
	
  -reducer "gossamr -task 0 -phase reduce" \
	
  -io typedbytes \
	
  -file ./wordcount
	
  -numReduceTasks 6


原文链接:https://www.jianshu.com/p/c3fc0400406d

原文链接:https://ask.csdn.net/questions/1011739

相关标签: 大数据