欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

golang如何实现mapreduce单进程版本详解

程序员文章站 2022-08-30 10:49:58
前言   mapreduce作为hadoop的编程框架,是工程师最常接触的部分,也是除去了网络环境和集群配 置之外对整个job执行效率影响很大的部...

前言

  mapreduce作为hadoop的编程框架,是工程师最常接触的部分,也是除去了网络环境和集群配 置之外对整个job执行效率影响很大的部分,所以很有必要深入了解整个过程。元旦放假的第一天,在家没事干,用golang实现了一下mapreduce的单进程版本,。处理对大文件统计最高频的10个单词,因为功能比较简单,所以设计没有解耦合。

  本文先对mapreduce大体概念进行介绍,然后结合代码介绍一下,如果接下来几天有空,我会实现一下分布式高可用的mapreduce版本。下面话不多说了,来一起看看详细的介绍吧。

1. mapreduce大体架构

golang如何实现mapreduce单进程版本详解

  上图是论文中mapreduce的大体架构。总的来说mapreduce的思想就是分治思想:对数据进行分片,然后用mapper进行处理,以key-value形式输出中间文件;然后用reducer进行对mapper输出的中间文件进行合并:将key一致的合到一块,并输出结果文件;如果有需要,采用combiner进行最后的合并。

  归纳来说主要分为5部分:用户程序、master、mapper、reducer、combiner(上图未给出)。

  • 用户程序。用户程序主要对输入数据进行分割,制定mapper、reducer、combiner的代码。
  • master:中控系统。控制分发mapper、reduer的个数,比如生成m个进程处理mapper,n个进程处理reducer。其实对master来说,mapper和reduer都属于worker,只不过跑的程序不一样,mapper跑用户输入的map代码,reduer跑用户输入的reduce代码。master还作为管道负责中间路径传递,比如将mapper生成的中间文件传递给reduer,将reduer生成的结果文件返回,或者传递给combiner(如果有需要的话)。由于master是单点,性能瓶颈,所以可以做集群:主备模式或者分布式模式。可以用zookeeper进行选主,用一些消息中间件进行数据同步。master还可以进行一些策略处理:比如某个worker执行时间特别长,很有可能卡住了,对分配给该worker的数据重新分配给别的worker执行,当然需要对多份数据返回去重处理。
  • mapper:负责将输入数据切成key-value格式。mapper处理完后,将中间文件的路径告知master,master获悉后传递给reduer进行后续处理。如果mapper未处理完,或者已经处理完但是reduer未读完其中间输出文件,分配给该mapper的输入将重新被别的mapper执行。
  • reducer: 接受master发送的mapper输出文件的消息,rpc读取文件并处理,并输出结果文件。n个reduer将产生n个输出文件。
  • combiner: 做最后的归并处理,通常不需要。

  总的来说,架构不复杂。组件间通信用啥都可以,比如rpc、http或者私有协议等。

2. 实现代码介绍

  该版本代码实现了单机单进程版本,mapper、reducer和combiner的实现用协程goroutine实现,通信采用channel。代码写的比较随意,没有解耦合。

  • 功能:统计给定文件中出现的最高频的10个单词
  • 输入:大文件
  • 输出:最高频的10个单词
  • 实现:5个mapper协程、2个reducer、1个combiner。

  为了方便起见,combiner对最高频的10个单词进行堆排序处理,按规范来说应该放在用户程序处理。

  文件目录如下,其中bin文件夹下的big_input_file.txt为输入文件,可以调用generate下的main文件生成,caller文件为入口的用户程序,master目录下分别存放master、mapper、reducer、combiner代码:

.
├── readme.md
├── bin
│ └── file-store
│  └── big_input_file.txt
└── src
 ├── caller
 │ └── main.go
 ├── generate
 │ └── main.go
 └── master
  ├── combiner.go
  ├── mapper.go
  ├── master.go
  └── reducer.go

6 directories, 8 files 

2.1 caller

  用户程序,读入文件并按固定行数进行划分;然后调用master.handle进行处理。

package main
import ( 
 "os"
 "path"
 "path/filepath"
 "bufio"
 "strconv"
 "master"
 "github.com/vinllen/go-logger/logger"
)
const ( 
 limit int = 10000 // the limit line of every file
)
func main() { 
 curdir, err := filepath.abs(filepath.dir(os.args[0]))
 if err != nil {
  logger.error("read path error: ", err.error())
  return
 }
 filedir := path.join(curdir, "file-store")
 _ = os.mkdir(filedir, os.modeperm)
 // 1. read file
 filename := "big_input_file.txt"
 inputfile, err := os.open(path.join(filedir, filename))
 if err != nil {
  logger.error("read inputfile error: ", err.error())
  return
 }
 defer inputfile.close()
 // 2. split inputfile into several pieces that every piece hold 100,000 lines
 filepiecearr := []string{}
 scanner := bufio.newscanner(inputfile)
 piece := 1
outter: 
 for {
  outputfilename := "input_piece_" + strconv.itoa(piece)
  outputfilepos := path.join(filedir, outputfilename)
  filepiecearr = append(filepiecearr, outputfilepos)
  outputfile, err := os.create(outputfilepos)
  if err != nil {
   logger.error("split inputfile error: ", err.error())
   continue
  }
  defer outputfile.close()
  for cnt := 0; cnt < limit; cnt++ {
   if !scanner.scan() {
    break outter
   }
   _, err := outputfile.writestring(scanner.text() + "\n")
   if err != nil {
    logger.error("split inputfile writting error: ", err.error())
    return
   }
  }
  piece++
 }
 // 3. pass to master
 res := master.handle(filepiecearr, filedir)
 logger.warn(res)
}

2.2 master

  master程序,依次生成combiner、reducer、mapper,处理消息中转,输出最后结果。

package master
import (
 "github.com/vinllen/go-logger/logger"
)
var ( 
 mapchanin chan mapinput // channel produced by master while consumed by mapper
 mapchanout chan string // channel produced by mapper while consumed by master
 reducechanin chan string // channel produced by master while consumed by reducer
 reducechanout chan string // channel produced by reducer while consumed by master
 combinechanin chan string // channel produced by master while consumed by combiner
 combinechanout chan []item // channel produced by combiner while consumed by master
)
func handle(inputarr []string, filedir string) []item { 
 logger.info("handle called")
 const(
  mappernumber int = 5
  reducernumber int = 2
 )
 mapchanin = make(chan mapinput)
 mapchanout = make(chan string)
 reducechanin = make(chan string)
 reducechanout = make(chan string)
 combinechanin = make(chan string)
 combinechanout = make(chan []item)
 reducejobnum := len(inputarr)
 combinejobnum := reducernumber
 // start combiner
 go combiner()
 // start reducer
 for i := 1; i <= reducernumber; i++ {
  go reducer(i, filedir)
 }
 // start mapper
 for i := 1; i <= mappernumber; i++ {
  go mapper(i, filedir)
 }
 go func() {
  for i, v := range(inputarr) {
   mapchanin <- mapinput{
    filename: v,
    nr: i + 1,
   } // pass job to mapper
  }
  close(mapchanin) // close map input channel when no more job
 }()
 var res []item
outter: 
 for {
  select {
   case v := <- mapchanout:
    go func() {
     reducechanin <- v
     reducejobnum--
     if reducejobnum <= 0 {
      close(reducechanin)
     }
    }()
   case v := <- reducechanout:
    go func() {
     combinechanin <- v
     combinejobnum--
     if combinejobnum <= 0 {
      close(combinechanin)
     }
    }()
   case v := <- combinechanout:
    res = v
    break outter
  }
 }
 close(mapchanout)
 close(reducechanout)
 close(combinechanout)
 return res
}

2.3 mapper

  mapper程序,读入并按key-value格式生成中间文件,告知master。

package master
import ( 
 "fmt"
 "path"
 "os"
 "bufio"
 "strconv"

 "github.com/vinllen/go-logger/logger"
)
type mapinput struct { 
 filename string
 nr int
}
func mapper(nr int, filedir string) { 
 for {
  val, ok := <- mapchanin // val: filename
  if !ok { // channel close
   break
  }
  inputfilename := val.filename
  nr := val.nr
  file, err := os.open(inputfilename)
  if err != nil {
   errmsg := fmt.sprintf("read file(%s) error in mapper(%d)", inputfilename, nr)
   logger.error(errmsg)
   mapchanout <- ""
   continue
  }
  mp := make(map[string]int)
  scanner := bufio.newscanner(file)
  scanner.split(bufio.scanwords)
  for scanner.scan() {
   str := scanner.text()
   //logger.info(str)
   mp[str]++
  }
  outputfilename := path.join(filedir, "mapper-output-" + strconv.itoa(nr))
  outputfilehandler, err := os.create(outputfilename)
  if err != nil {
   errmsg := fmt.sprintf("write file(%s) error in mapper(%d)", outputfilename, nr)
   logger.error(errmsg)
  } else {
   for k, v := range mp {
    str := fmt.sprintf("%s %d\n", k, v)
    outputfilehandler.writestring(str)
   }
   outputfilehandler.close()
  }
  mapchanout <- outputfilename
 }
}

2.4 reducer

  reducer程序,读入master传递过来的中间文件并归并。

package master
import ( 
 "fmt"
 "bufio"
 "os"
 "strconv"
 "path"
 "strings"
 "github.com/vinllen/go-logger/logger"
)
func reducer(nr int, filedir string) { 
 mp := make(map[string]int) // store the frequence of words
 // read file and do reduce
 for {
  val, ok := <- reducechanin
  if !ok {
   break
  }
  logger.debug("reducer called: ", nr)
  file, err := os.open(val)
  if err != nil {
   errmsg := fmt.sprintf("read file(%s) error in reducer", val)
   logger.error(errmsg)
   continue
  }
  scanner := bufio.newscanner(file)
  for scanner.scan() {
   str := scanner.text()
   arr := strings.split(str, " ")
   if len(arr) != 2 {
    errmsg := fmt.sprintf("read file(%s) error that len of line(%s) != 2(%d) in reducer", val, str, len(arr))
    logger.warn(errmsg)
    continue
   }
   v, err := strconv.atoi(arr[1])
   if err != nil {
    errmsg := fmt.sprintf("read file(%s) error that line(%s) parse error in reduer", val, str)
    logger.warn(errmsg)
    continue
   }
   mp[arr[0]] += v
  }
  if err := scanner.err(); err != nil {
   logger.error("reducer: reading standard input:", err)
  }
  file.close()
 }
 outputfilename := path.join(filedir, "reduce-output-" + strconv.itoa(nr))
 outputfilehandler, err := os.create(outputfilename)
 if err != nil {
  errmsg := fmt.sprintf("write file(%s) error in reducer(%d)", outputfilename, nr)
  logger.error(errmsg)
 } else {
  for k, v := range mp {
   str := fmt.sprintf("%s %d\n", k, v)
   outputfilehandler.writestring(str)
  }
  outputfilehandler.close()
 }
 reducechanout <- outputfilename
}

2.5 combiner

  combiner程序,读入master传递过来的reducer结果文件并归并成一个,然后堆排序输出最高频的10个词语。

package master
import ( 
 "fmt"
 "strings"
 "bufio"
 "os"
 "container/heap"
 "strconv"

 "github.com/vinllen/go-logger/logger"
)
type item struct { 
 key string
 val int
}
type priorityqueue []*item
func (pq priorityqueue) len() int { 
 return len(pq)
}
func (pq priorityqueue) less(i, j int) bool { 
 return pq[i].val > pq[j].val
}
func (pq priorityqueue) swap(i, j int) { 
 pq[i], pq[j] = pq[j], pq[i]
}
func (pq *priorityqueue) push(x interface{}) { 
 item := x.(*item)
 *pq = append(*pq, item)
}
func (pq *priorityqueue) pop() interface{} { 
 old := *pq
 n := len(old)
 item := old[n - 1]
 *pq = old[0 : n - 1]
 return item
}
func combiner() { 
 mp := make(map[string]int) // store the frequence of words
 // read file and do combine
 for {
  val, ok := <- combinechanin
  if !ok {
   break
  }
  logger.debug("combiner called")
  file, err := os.open(val)
  if err != nil {
   errmsg := fmt.sprintf("read file(%s) error in combiner", val)
   logger.error(errmsg)
   continue
  }
  scanner := bufio.newscanner(file)
  for scanner.scan() {
   str := scanner.text()
   arr := strings.split(str, " ")
   if len(arr) != 2 {
    errmsg := fmt.sprintf("read file(%s) error that len of line != 2(%s) in combiner", val, str)
    logger.warn(errmsg)
    continue
   }
   v, err := strconv.atoi(arr[1])
   if err != nil {
    errmsg := fmt.sprintf("read file(%s) error that line(%s) parse error in combiner", val, str)
    logger.warn(errmsg)
    continue
   }
   mp[arr[0]] += v
  }
  file.close()
 }
 // heap sort
 // pq := make(priorityqueue, len(mp))
 pq := make(priorityqueue, 0)
 heap.init(&pq)
 for k, v := range mp {
  node := &item {
   key: k,
   val: v,
  }
  // logger.debug(k, v)
  heap.push(&pq, node)
 }
 res := []item{}
 for i := 0; i < 10 && pq.len() > 0; i++ {
  node := heap.pop(&pq).(*item)
  res = append(res, *node)
 }
 combinechanout <- res
}

3. 总结

  不足以及未实现之处:

  • 各模块间耦合性高
  • master单点故障未扩展
  • 未采用多进程实现,进程间采用rpc通信
  • 未实现单个workder时间过长,另起worker执行任务的代码。

  接下来要是有空,我会实现分布式高可用的代码,模块间采用rpc通讯。

好了,以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对的支持。