MIT-6.824 lab1-MapReduce
概述
本lab将用go完成一个mapreduce框架,完成后将大大加深对mapreduce的理解。
part i: map/reduce input and output
这部分需要我们实现common_map.go中的domap()和common_reduce.go中的doreduce()两个函数。
可以先从测试用例下手:
func testsequentialsingle(t *testing.t) { mr := sequential("test", makeinputs(1), 1, mapfunc, reducefunc) mr.wait() check(t, mr.files) checkworker(t, mr.stats) cleanup(mr) }
从sequential()开始调用链如下:
现在要做的是完成domap()和doreduce()。
domap():
func domap( jobname string, // the name of the mapreduce job maptask int, // which map task this is infile string, nreduce int, // the number of reduce task that will be run ("r" in the paper) mapf func(filename string, contents string) []keyvalue, ) { //打开infile文件,读取全部内容 //调用mapf,将内容转换为键值对 //根据reducename()返回的文件名,打开nreduce个中间文件,然后将键值对以json的格式保存到中间文件 inputcontent, err := ioutil.readfile(infile) if err != nil { panic(err) } keyvalues := mapf(infile, string(inputcontent)) var intermediatefileencoders []*json.encoder for reducetasknumber := 0; reducetasknumber < nreduce; reducetasknumber++ { intermediatefile, err := os.create(reducename(jobname, maptask, reducetasknumber)) if err != nil { panic(err) } defer intermediatefile.close() enc := json.newencoder(intermediatefile) intermediatefileencoders = append(intermediatefileencoders, enc) } for _, kv := range keyvalues { err := intermediatefileencoders[ihash(kv.key) % nreduce].encode(kv) if err != nil { panic(err) } } }
总结来说就是:
- 读取输入文件内容
- 将内容交个用户定义的map函数执行,生成键值对
- 保存键值对
doreduce:
func doreduce( jobname string, // the name of the whole mapreduce job reducetask int, // which reduce task this is outfile string, // write the output here nmap int, // the number of map tasks that were run ("m" in the paper) reducef func(key string, values []string) string, ) { //读取当前reducetasknumber对应的中间文件中的键值对,将相同的key的value进行并合 //调用reducef //将reducef的结果以json形式保存到mergename()返回的文件中 kvs := make(map[string][]string) for maptasknumber := 0; maptasknumber < nmap; maptasknumber++ { middatafilename := reducename(jobname, maptasknumber, reducetask) file, err := os.open(middatafilename) if err != nil { panic(err) } defer file.close() dec := json.newdecoder(file) for { var kv keyvalue err = dec.decode(&kv) if err != nil { break } values, ok := kvs[kv.key] if ok { kvs[kv.key] = append(values, kv.value) } else { kvs[kv.key] = []string{kv.value} } } } outputfile, err := os.create(outfile) if err != nil { panic(err) } defer outputfile.close() enc := json.newencoder(outputfile) for key, values := range kvs { enc.encode(keyvalue{key, reducef(key, values)}) } }
总结:
- 读取中间数据
- 执行reducef
- 保存结果
文件转换的过程大致如下:
part ii: single-worker word count
这部分将用一个简单的实例展示如何使用mr框架。需要我们实现main/wc.go中的mapf()和reducef()来统计单词的词频。
mapf:
func mapf(filename string, contents string) []mapreduce.keyvalue { // your code here (part ii). words := strings.fieldsfunc(contents, func(r rune) bool { return !unicode.isletter(r) }) var kvs []mapreduce.keyvalue for _, word := range words { kvs = append(kvs, mapreduce.keyvalue{word, "1"}) } return kvs }
将文本内容分割成单词,每个单词对应一个<word, "1">键值对。
reducef:
func reducef(key string, values []string) string { // your code here (part ii). return strconv.itoa(len(values)) }
value中有多少个"1",就说明这个word出现了几次。
part iii: distributing mapreduce tasks
目前实现的版本都是执行完一个map然后在执行下一个map,也就是说没有并行,这恰恰是mapreduce最大的买点。这部分需要实现schedule(),该函数将任务分配给worker去执行。当然这里并没有真正的多机部署,而是使用多线程进行模拟。
master和worker的关系大致如下:
在创建worker对象的时候会调用register() rpc,master收到rpc后,将该worker的id保存在数组中,执行shedule()是可以根据该id,通过dotask() rpc调用该worker的dotask()执行map或reduce任务。
schedule.go
func schedule(jobname string, mapfiles []string, nreduce int, phase jobphase, registerchan chan string) { var ntasks int var n_other int // number of inputs (for reduce) or outputs (for map) switch phase { case mapphase: ntasks = len(mapfiles) n_other = nreduce case reducephase: ntasks = nreduce n_other = len(mapfiles) } fmt.printf("schedule: %v %v tasks (%d i/os)\n", ntasks, phase, n_other) //总共有ntasks个任务,registerchan中保存着空闲的workers taskchan := make(chan int) var wg sync.waitgroup go func() { for tasknumber := 0; tasknumber < ntasks; tasknumber++ { taskchan <- tasknumber fmt.printf("taskchan <- %d in %s\n", tasknumber, phase) wg.add(1) } wg.wait() //ntasks个任务执行完毕后才能通过 close(taskchan) }() for task := range taskchan { //所有任务都处理完后跳出循环 worker := <- registerchan //消费worker fmt.printf("given task %d to %s in %s\n", task, worker, phase) var arg dotaskargs arg.jobname = jobname arg.phase = phase arg.tasknumber = task arg.numotherphase = n_other if phase == mapphase { arg.file = mapfiles[task] } go func(worker string, arg dotaskargs) { if call(worker, "worker.dotask", arg, nil) { //执行成功后,worker需要执行其它任务 //注意:需要先掉wg.done(),然后调register<-worker,否则会出现死锁 //fmt.printf("worker %s run task %d success in phase %s\n", worker, task, phase) wg.done() registerchan <- worker //回收worker } else { //如果失败了,该任务需要被重新执行 //注意:这里不能用taskchan <- task,因为task这个变量在别的地方可能会被修改。比如task 0执行失败了,我们这里希望 //将task 0重新加入到taskchan中,但是因为执行for循环的那个goroutine,可能已经修改task这个变量为1了,我们错误地 //把task 1重新执行了一遍,并且task 0没有得到执行。 taskchan <- arg.tasknumber } }(worker, arg) } fmt.printf("schedule: %v done\n", phase) }
这里用到了两个channel,分别是registerchan和taskchan。
registerchan中保存了可用的worker id。
生产:
- worker调用register()进行注册,往里添加
- worker成功执行dotask()后,该worker需要重新加入registerchan
消费:
- schedule()拿到一个任务后,消费registerchan
taskchan中保存了任务号。任务执行失败需要重新加入taskchan。
part iv: handling worker failures
之前的代码已经体现了,对于失败的任务重新执行。
part v: inverted index generation
这是mapreduce的一个应用,生成倒排索引,比如想查某个单词出现在哪些文本中,就可以建立倒排索引来解决。
func mapf(document string, value string) (res []mapreduce.keyvalue) { // your code here (part v). words := strings.fieldsfunc(value, func(r rune) bool { return !unicode.isletter(r) }) var kvs []mapreduce.keyvalue for _, word := range words { kvs = append(kvs, mapreduce.keyvalue{word, document}) } return kvs } func reducef(key string, values []string) string { // your code here (part v). values = removeduplicationandsort(values) return strconv.itoa(len(values)) + " " + strings.join(values, ",") } func removeduplicationandsort(values []string) []string { kvs := make(map[string]struct{}) for _, value := range values { _, ok := kvs[value] if !ok { kvs[value] = struct{}{} } } var ret []string for k := range kvs { ret = append(ret, k) } sort.strings(ret) return ret }
mapf()生成<word, document>的键值对,reducef()处理word对应的所有document,去重并且排序,然后拼接到一起。
具体代码在:
如有错误,欢迎指正:
15313676365