mapreduce作为hadoop的编程框架,是工程师最常接触的部分,也是除去了网络环境和集群配 置之外对整个job执行效率影响很大的部分,所以很有必要深入了解整个过程。元旦放假的第一天,在家没事干,用golang实现了一下mapreduce的单进程版本,。处理对大文件统计最高频的10个单词,因为功能比较简单,所以设计没有解耦合。
1. mapreduce大体架构
- 用户程序。用户程序主要对输入数据进行分割,制定mapper、reducer、combiner的代码。
- master:中控系统。控制分发mapper、reduer的个数,比如生成m个进程处理mapper,n个进程处理reducer。其实对master来说,mapper和reduer都属于worker,只不过跑的程序不一样,mapper跑用户输入的map代码,reduer跑用户输入的reduce代码。master还作为管道负责中间路径传递,比如将mapper生成的中间文件传递给reduer,将reduer生成的结果文件返回,或者传递给combiner(如果有需要的话)。由于master是单点,性能瓶颈,所以可以做集群:主备模式或者分布式模式。可以用zookeeper进行选主,用一些消息中间件进行数据同步。master还可以进行一些策略处理:比如某个worker执行时间特别长,很有可能卡住了,对分配给该worker的数据重新分配给别的worker执行,当然需要对多份数据返回去重处理。
- mapper:负责将输入数据切成key-value格式。mapper处理完后,将中间文件的路径告知master,master获悉后传递给reduer进行后续处理。如果mapper未处理完,或者已经处理完但是reduer未读完其中间输出文件,分配给该mapper的输入将重新被别的mapper执行。
- reducer: 接受master发送的mapper输出文件的消息,rpc读取文件并处理,并输出结果文件。n个reduer将产生n个输出文件。
- combiner: 做最后的归并处理,通常不需要。
2. 实现代码介绍
- 功能:统计给定文件中出现的最高频的10个单词
- 输入:大文件
- 输出:最高频的10个单词
- 实现:5个mapper协程、2个reducer、1个combiner。
. ├── readme.md ├── bin │ └── file-store │ └── big_input_file.txt └── src ├── caller │ └── main.go ├── generate │ └── main.go └── master ├── combiner.go ├── mapper.go ├── master.go └── reducer.go 6 directories, 8 files
2.1 caller
package main import ( "os" "path" "path/filepath" "bufio" "strconv" "master" "github.com/vinllen/go-logger/logger" ) const ( limit int = 10000 // the limit line of every file ) func main() { curdir, err := filepath.abs(filepath.dir(os.args[0])) if err != nil { logger.error("read path error: ", err.error()) return } filedir := path.join(curdir, "file-store") _ = os.mkdir(filedir, os.modeperm) // 1. read file filename := "big_input_file.txt" inputfile, err := os.open(path.join(filedir, filename)) if err != nil { logger.error("read inputfile error: ", err.error()) return } defer inputfile.close() // 2. split inputfile into several pieces that every piece hold 100,000 lines filepiecearr := []string{} scanner := bufio.newscanner(inputfile) piece := 1 outter: for { outputfilename := "input_piece_" + strconv.itoa(piece) outputfilepos := path.join(filedir, outputfilename) filepiecearr = append(filepiecearr, outputfilepos) outputfile, err := os.create(outputfilepos) if err != nil { logger.error("split inputfile error: ", err.error()) continue } defer outputfile.close() for cnt := 0; cnt < limit; cnt++ { if !scanner.scan() { break outter } _, err := outputfile.writestring(scanner.text() + "\n") if err != nil { logger.error("split inputfile writting error: ", err.error()) return } } piece++ } // 3. pass to master res := master.handle(filepiecearr, filedir) logger.warn(res) }
2.2 master
package master import ( "github.com/vinllen/go-logger/logger" ) var ( mapchanin chan mapinput // channel produced by master while consumed by mapper mapchanout chan string // channel produced by mapper while consumed by master reducechanin chan string // channel produced by master while consumed by reducer reducechanout chan string // channel produced by reducer while consumed by master combinechanin chan string // channel produced by master while consumed by combiner combinechanout chan []item // channel produced by combiner while consumed by master ) func handle(inputarr []string, filedir string) []item { logger.info("handle called") const( mappernumber int = 5 reducernumber int = 2 ) mapchanin = make(chan mapinput) mapchanout = make(chan string) reducechanin = make(chan string) reducechanout = make(chan string) combinechanin = make(chan string) combinechanout = make(chan []item) reducejobnum := len(inputarr) combinejobnum := reducernumber // start combiner go combiner() // start reducer for i := 1; i <= reducernumber; i++ { go reducer(i, filedir) } // start mapper for i := 1; i <= mappernumber; i++ { go mapper(i, filedir) } go func() { for i, v := range(inputarr) { mapchanin <- mapinput{ filename: v, nr: i + 1, } // pass job to mapper } close(mapchanin) // close map input channel when no more job }() var res []item outter: for { select { case v := <- mapchanout: go func() { reducechanin <- v reducejobnum-- if reducejobnum <= 0 { close(reducechanin) } }() case v := <- reducechanout: go func() { combinechanin <- v combinejobnum-- if combinejobnum <= 0 { close(combinechanin) } }() case v := <- combinechanout: res = v break outter } } close(mapchanout) close(reducechanout) close(combinechanout) return res }
2.3 mapper
package master import ( "fmt" "path" "os" "bufio" "strconv" "github.com/vinllen/go-logger/logger" ) type mapinput struct { filename string nr int } func mapper(nr int, filedir string) { for { val, ok := <- mapchanin // val: filename if !ok { // channel close break } inputfilename := val.filename nr := val.nr file, err := os.open(inputfilename) if err != nil { errmsg := fmt.sprintf("read file(%s) error in mapper(%d)", inputfilename, nr) logger.error(errmsg) mapchanout <- "" continue } mp := make(map[string]int) scanner := bufio.newscanner(file) scanner.split(bufio.scanwords) for scanner.scan() { str := scanner.text() //logger.info(str) mp[str]++ } outputfilename := path.join(filedir, "mapper-output-" + strconv.itoa(nr)) outputfilehandler, err := os.create(outputfilename) if err != nil { errmsg := fmt.sprintf("write file(%s) error in mapper(%d)", outputfilename, nr) logger.error(errmsg) } else { for k, v := range mp { str := fmt.sprintf("%s %d\n", k, v) outputfilehandler.writestring(str) } outputfilehandler.close() } mapchanout <- outputfilename } }
2.4 reducer
package master import ( "fmt" "bufio" "os" "strconv" "path" "strings" "github.com/vinllen/go-logger/logger" ) func reducer(nr int, filedir string) { mp := make(map[string]int) // store the frequence of words // read file and do reduce for { val, ok := <- reducechanin if !ok { break } logger.debug("reducer called: ", nr) file, err := os.open(val) if err != nil { errmsg := fmt.sprintf("read file(%s) error in reducer", val) logger.error(errmsg) continue } scanner := bufio.newscanner(file) for scanner.scan() { str := scanner.text() arr := strings.split(str, " ") if len(arr) != 2 { errmsg := fmt.sprintf("read file(%s) error that len of line(%s) != 2(%d) in reducer", val, str, len(arr)) logger.warn(errmsg) continue } v, err := strconv.atoi(arr[1]) if err != nil { errmsg := fmt.sprintf("read file(%s) error that line(%s) parse error in reduer", val, str) logger.warn(errmsg) continue } mp[arr[0]] += v } if err := scanner.err(); err != nil { logger.error("reducer: reading standard input:", err) } file.close() } outputfilename := path.join(filedir, "reduce-output-" + strconv.itoa(nr)) outputfilehandler, err := os.create(outputfilename) if err != nil { errmsg := fmt.sprintf("write file(%s) error in reducer(%d)", outputfilename, nr) logger.error(errmsg) } else { for k, v := range mp { str := fmt.sprintf("%s %d\n", k, v) outputfilehandler.writestring(str) } outputfilehandler.close() } reducechanout <- outputfilename }
2.5 combiner
package master import ( "fmt" "strings" "bufio" "os" "container/heap" "strconv" "github.com/vinllen/go-logger/logger" ) type item struct { key string val int } type priorityqueue []*item func (pq priorityqueue) len() int { return len(pq) } func (pq priorityqueue) less(i, j int) bool { return pq[i].val > pq[j].val } func (pq priorityqueue) swap(i, j int) { pq[i], pq[j] = pq[j], pq[i] } func (pq *priorityqueue) push(x interface{}) { item := x.(*item) *pq = append(*pq, item) } func (pq *priorityqueue) pop() interface{} { old := *pq n := len(old) item := old[n - 1] *pq = old[0 : n - 1] return item } func combiner() { mp := make(map[string]int) // store the frequence of words // read file and do combine for { val, ok := <- combinechanin if !ok { break } logger.debug("combiner called") file, err := os.open(val) if err != nil { errmsg := fmt.sprintf("read file(%s) error in combiner", val) logger.error(errmsg) continue } scanner := bufio.newscanner(file) for scanner.scan() { str := scanner.text() arr := strings.split(str, " ") if len(arr) != 2 { errmsg := fmt.sprintf("read file(%s) error that len of line != 2(%s) in combiner", val, str) logger.warn(errmsg) continue } v, err := strconv.atoi(arr[1]) if err != nil { errmsg := fmt.sprintf("read file(%s) error that line(%s) parse error in combiner", val, str) logger.warn(errmsg) continue } mp[arr[0]] += v } file.close() } // heap sort // pq := make(priorityqueue, len(mp)) pq := make(priorityqueue, 0) heap.init(&pq) for k, v := range mp { node := &item { key: k, val: v, } // logger.debug(k, v) heap.push(&pq, node) } res := []item{} for i := 0; i < 10 && pq.len() > 0; i++ { node := heap.pop(&pq).(*item) res = append(res, *node) } combinechanout <- res }
3. 总结
- 各模块间耦合性高
- master单点故障未扩展
- 未采用多进程实现,进程间采用rpc通信
- 未实现单个workder时间过长,另起worker执行任务的代码。