Mit6.824 Lab1-MapReduce
前言
Mit6.824 是我在学习一些分布式系统方面的知识的时候偶然看到的,然后就开始尝试跟课。不得不说,国外的课程难度是真的大,一周的时间居然要学一门 Go 语言,然后还要读论文,进而做MapReduce 实验。
由于 MR(MapReduce) 框架需要建立在 DFS(Distributed File System)的基础上实现,所以本实验是通过使用多线程来模拟分布式环境。虽然难度上大大降低,但是通过该实验,还是会让我们对 MR 的核心原理有一个较为深刻的认识。
做实验之前我们需要先把经典的 MapReduce 论文给看了,窝比较建议直接看英文原文,但如果时间不充裕的话,可以直接在网上找中文的翻译版。
刚开始做这个实验的时候真的是一头雾水,完全不知道如何下手。后来发现这个工程有一个自动化测试文件(test_test.go),每部分实验都会使用这个测试文件里的函数对代码进行测试。我们只要顺着这个测试函数逐步倒推,然后补全代码即可。
Part I: Map/Reduce input and output
第一部分是先实现一个顺序版(sequential)的MR,让我们对 MR 的流程有一个大体的认识,并且实现doMap()
和 doReduce()
两个函数。
其包含两个测试函数TestSequentialSingle()
和 TestSequentialMany()
。
TestSequentialSingle()
每个map worker处理一个文件,所以map worker的数量就等于文件的数量。
测试单个map worker 和 reduce worker。
func TestSequentialSingle(t *testing.T) { mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc) mr.Wait() check(t, mr.files) checkWorker(t, mr.stats) cleanup(mr) }
TestSequentialMany()
此测试函数测试多个 map worker 和多个 reduce worker。
其运行逻辑和TestSequentialSingle
类似。
func TestSequentialMany(t *testing.T) { mr := Sequential("test", makeInputs(5), 3, MapFunc, ReduceFunc) mr.Wait() check(t, mr.files) checkWorker(t, mr.stats) cleanup(mr) }
Sequential()
测试函数将工作名称,测试文件,reduce 的数量,用户定义的 map 函数,reduce 函数
五个实参传递给Sequential()
// Sequential runs map and reduce tasks sequentially, waiting for each task to // complete before running the next. func Sequential(jobName string, files []string, nreduce int, mapF func(string, string) []KeyValue, reduceF func(string, []string) string, ) (mr *Master) { mr = newMaster("master") go mr.run(jobName, files, nreduce, func(phase jobPhase) { switch phase { case mapPhase: for i, f := range mr.files { doMap(mr.jobName, i, f, mr.nReduce, mapF) } case reducePhase: for i := 0; i < mr.nReduce; i++ { doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF) } } }, func() { mr.stats = []int{len(files) + nreduce} }) return }
Sequential()
首先获取一个Master
对象的指针,然后利用函数闭包运行Master.run()
。
Master.run()
// run executes a mapreduce job on the given number of mappers and reducers. // // First, it divides up the input file among the given number of mappers, and // schedules each task on workers as they become available. Each map task bins // its output in a number of bins equal to the given number of reduce tasks. // Once all the mappers have finished, workers are assigned reduce tasks. // // When all tasks have been completed, the reducer outputs are merged, // statistics are collected, and the master is shut down. // // Note that this implementation assumes a shared file system. func (mr *Master) run(jobName string, files []string, nreduce int, schedule func(phase jobPhase), finish func(), ) { mr.jobName = jobName mr.files = files mr.nReduce = nreduce fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName) schedule(mapPhase) schedule(reducePhase) finish() mr.merge() fmt.Printf("%s: Map/Reduce task completed\n", mr.address) mr.doneChannel <- true }
doMap()
doMap()
和 doReduce()
是需要我们去实现的函数。doMap()
的实现主要是将用户定义的MapFunc()
切割的文本,通过 hash 分到 'nReduce'个切片中去。
func doMap( jobName string, // the name of the MapReduce job mapTaskNumber int, // which map task this is inFile string, nReduce int, // the number of reduce task that will be run ("R" in the paper) mapF func(file string, contents string) []KeyValue, ) { // read contents from 'infile' dat,err := ioutil.ReadFile(inFile) if err != nil { log.Fatal("doMap: readFile ", err) } //transfer data into ‘kvSlice’ according to the mapF() kvSlice := mapF(inFile, string(dat)) //divide the ‘kvSlice’ into 'reduceKv' according to the ihash() var reduceKv [][]KeyValue // temporary variable which will be written into reduce files for i:=0;i<nReduce;i++ { s1 := make([]KeyValue,0) reduceKv = append(reduceKv, s1) } for _,kv := range kvSlice{ hash := ihash(kv.Key) % nReduce reduceKv[hash] = append(reduceKv[hash],kv) } //write 'reduceKv' into ‘nReduce’ JSON files for i := 0;i<nReduce;i++ { file,err := os.Create(reduceName(jobName,mapTaskNumber,i)) if err != nil { log.Fatal("doMap: create ", err) } enc := json.NewEncoder(file) for _, kv := range reduceKv[i]{ err := enc.Encode(&kv) if err != nil { log.Fatal("doMap: json encodem ", err) } } file.Close() } }
doReduce()
doReduce()
主要是将 key 值相同的 value 打包发送给用户定义的 ReduceFunc()
,获得一个新的 kv对,key 值不变,而value值则是ReduceFunc()
的返回值,排序,最后将新的 kv对 切片写入文件。
type ByKey []KeyValue func (a ByKey) Len() int { return len(a) } func (a ByKey) Swap(i, j int) { a[i],a[j] = a[j],a[i] } func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key } func doReduce( jobName string, // the name of the whole MapReduce job reduceTaskNumber int, // which reduce task this is outFile string, // write the output here nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string) string, ) { //read kv slice from the json file var kvSlice []KeyValue for i := 0;i<nMap;i++{ //file, _ := os.OpenFile(reduceName(jobName,i,reduceTaskNumber), os.O_RDONLY, 0666) file,err := os.Open(reduceName(jobName,i,reduceTaskNumber)) if err != nil { log.Fatal("doReduce: open ", err) } var kv KeyValue dec := json.NewDecoder(file) for{ err := dec.Decode(&kv) kvSlice = append(kvSlice,kv) if err == io.EOF { break } } file.Close() /********/ //此处如果用 defer,可能会造成文件开启过多,造成程序崩溃 /********/ } //sort the intermediate kv slices by key sort.Sort(ByKey(kvSlice)) //process kv slices in the reduceF() var reduceFValue []string var outputKv []KeyValue var preKey string = kvSlice[0].Key for i,kv := range kvSlice{ if i == (len(kvSlice) - 1) { reduceFValue = append(reduceFValue, kv.Value) outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)}) } else { if kv.Key != preKey { outputKv = append(outputKv, KeyValue{preKey, reduceF(preKey, reduceFValue)}) reduceFValue = make([]string, 0) } reduceFValue = append(reduceFValue, kv.Value) } preKey = kv.Key } //write the reduce output as JSON encoded kv objects to the file named outFile file,err := os.Create(outFile) if err != nil { log.Fatal("doRuduce: create ", err) } defer file.Close() enc := json.NewEncoder(file) for _, kv := range outputKv{ err := enc.Encode(&kv) if err != nil { log.Fatal("doRuduce: json encode ", err) } } }
Part II: Single-worker word count
第二部分是实现mapF()
和 reduceF()
函数,来实现通过顺序 MR统计词频的功能。
比较简单,就直接放代码了。
func mapF(filename string, contents string) []mapreduce.KeyValue { f := func(c rune) bool { return !unicode.IsLetter(c) } var strSlice []string = strings.FieldsFunc(contents,f) var kvSlice []mapreduce.KeyValue for _,str := range strSlice { kvSlice = append(kvSlice, mapreduce.KeyValue{str, "1"}) } return kvSlice } func reduceF(key string, values []string) string { var cnt int64 for _,str := range values{ temp,err := strconv.ParseInt(str,10,64) if(err != nil){ fmt.Println("wc :parseint ",err) } cnt += temp } return strconv.FormatInt(cnt,10) }
Part III: Distributing MapReduce tasks && Part IV: Handling worker failures
第三部分和第四部分可以一起来做,主要是完成schedule()
,实现一个通过线程并发执行 map worker 和 reduce worker 的 MR 框架。框架通过 RPC 来模拟分布式计算,并要带有 worker 的容灾功能。
TestBasic()
测试函数启动两个线程运行RUnWoker()
。
func TestBasic(t *testing.T) { mr := setup() for i := 0; i < 2; i++ { go RunWorker(mr.address, port("worker"+strconv.Itoa(i)), MapFunc, ReduceFunc, -1) } mr.Wait() check(t, mr.files) checkWorker(t, mr.stats) cleanup(mr) }
setup() && Distributed()
func setup() *Master { files := makeInputs(nMap) master := port("master") mr := Distributed("test", files, nReduce, master) return mr }
通过mr.startRPCServer()
启动 master 的 RPC 服务器,然后通过 mr.run()
进行 worker 的调度。
// Distributed schedules map and reduce tasks on workers that register with the // master over RPC. func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) { mr = newMaster(master) mr.startRPCServer() go mr.run(jobName, files, nreduce, func(phase jobPhase) { ch := make(chan string) go mr.forwardRegistrations(ch) schedule(mr.jobName, mr.files, mr.nReduce, phase, ch) }, func() { mr.stats = mr.killWorkers() mr.stopRPCServer() }) return }
Master.forwardRegistrations()
该函数通过worker 的数量来判断是否有新 worker 启动,一旦发现有新的 worker 启动,则使用管道(ch)通知schedule()
。
理解该函数对实现后面的schedule()
至关重要。
// helper function that sends information about all existing // and newly registered workers to channel ch. schedule() // reads ch to learn about workers. func (mr *Master) forwardRegistrations(ch chan string) { i := 0 for { mr.Lock() if len(mr.workers) > i { // there's a worker that we haven't told schedule() about. w := mr.workers[i] go func() { ch <- w }() // send without holding the lock. i = i + 1 } else { // wait for Register() to add an entry to workers[] // in response to an RPC from a new worker. mr.newCond.Wait() } mr.Unlock() } }
schedule()
shedule()
虽然不长,但实现起来还是有点难度的。waitGroup
用来判断任务是否完成。registerChan
来监听是否有新的 worker 启动,如果有的话,就启动一个线程来运行该 worker。通过新开线程来运行新 worker的逻辑比较符合分布式 MR 的特点。
对于 宕掉的worker执行call()
操作时,会返回false
。
每开始执行一个任务,就让waitGroup
减一,而执行失败(call()
返回 false
)则将waitGroup
加一,代表会将该任务安排给其他 worker。
waitGroup.Wait()
则会等到任务完全执行完返回。
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) { var ntasks int var n_other int // number of inputs (for reduce) or outputs (for map) switch phase { case mapPhase: ntasks = len(mapFiles) n_other = nReduce case reducePhase: ntasks = nReduce n_other = len(mapFiles) } fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other) // All ntasks tasks have to be scheduled on workers, and only once all of // them have been completed successfully should the function return. // Remember that workers may fail, and that any given worker may finish // multiple tasks. waitGroup := sync.WaitGroup{} waitGroup.Add(ntasks) taskChan := make(chan int, ntasks) for i:=0;i<ntasks;i++ { taskChan <- i } go func() { for { ch := <- registerChan go func(c string) { for { i := <- taskChan if call(c,"Worker.DoTask", &DoTaskArgs{jobName, mapFiles[i],phase,i,n_other},new(struct{})){ waitGroup.Done() } else{ taskChan <- i } } }(ch) } }() waitGroup.Wait() fmt.Printf("Schedule: %v phase done\n", phase) }
RunWorker()
通过RunWorker()
来增加 worker。nRPC
来控制 worker 的寿命,每接收一次 rpc 请求就 -1s。如果初始值为 -1,则代表改 worker 是永生的。
// RunWorker sets up a connection with the master, registers its address, and // waits for tasks to be scheduled. func RunWorker(MasterAddress string, me string, MapFunc func(string, string) []KeyValue, ReduceFunc func(string, []string) string, nRPC int, ) { debug("RunWorker %s\n", me) wk := new(Worker) wk.name = me wk.Map = MapFunc wk.Reduce = ReduceFunc wk.nRPC = nRPC rpcs := rpc.NewServer() rpcs.Register(wk) os.Remove(me) // only needed for "unix" l, e := net.Listen("unix", me) if e != nil { log.Fatal("RunWorker: worker ", me, " error: ", e) } wk.l = l wk.register(MasterAddress) // DON'T MODIFY CODE BELOW for { wk.Lock() if wk.nRPC == 0 { wk.Unlock() break } wk.Unlock() conn, err := wk.l.Accept() if err == nil { wk.Lock() wk.nRPC-- wk.Unlock() go rpcs.ServeConn(conn) } else { break } } wk.l.Close() debug("RunWorker %s exit\n", me) }
Part V: Inverted index generation
第五部分是实现倒排索引。此处要求的倒排索引,就是在输出结果时,需要将出现过 key 值文件的文件名在 key 值后面输出。
功能是通过完成 mapF()
和 reduceF()
来实现的。
mapF()
将key 值所在文件的文件名赋给 kv对 的value。
func mapF(document string, value string) (res []mapreduce.KeyValue) { f := func(c rune) bool { return !unicode.IsLetter(c) } var strSlice []string = strings.FieldsFunc(value,f) var kvSlice []mapreduce.KeyValue for _,str := range strSlice { kvSlice = append(kvSlice, mapreduce.KeyValue{str, document}) } return kvSlice }
reduceF()
将相同 key 值的所有 value 打包并统计数量返回。
func reduceF(key string, values []string) string { var cnt int64 var documents string set := make(map[string]bool) for _,str := range values{ set[str] = true } var keys []string for key := range set{ if set[key] == false{ continue } keys = append(keys,key) } sort.Strings(keys) for _,key := range keys{ cnt++ if cnt >= 2{ documents += "," } documents += key } //return strconv.FormatInt(cnt,10) return strconv.FormatInt(cnt,10) + " " + documents }
后记
从刚开始的无从下手,到现在通过Lab1全部测试,MR 实验算是完全做完了,还是很有成就感的。
除了对 MR 有一个更深的理解之外,也深深感受到了优秀系统的魅力——功能强大,结构简洁。
同时又了解了一门新语言——GoLang,一门专门为高并发系统而设计的语言,用起来还是很舒服的。
但这毕竟是分布式系统的第一个实验,欠缺的知识还很多,继续努力。
上一篇: 大数据时代的首席营销官