欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

MIT-6.824 lab1-MapReduce

程序员文章站 2022-07-07 21:51:40
概述 本lab将用go完成一个MapReduce框架,完成后将大大加深对MapReduce的理解。 Part I: Map/Reduce input and output 这部分需要我们实现common_map.go中的doMap()和common_reduce.go中的doReduce()两个函数 ......

概述

本lab将用go完成一个mapreduce框架,完成后将大大加深对mapreduce的理解。

part i: map/reduce input and output

这部分需要我们实现common_map.go中的domap()和common_reduce.go中的doreduce()两个函数。
可以先从测试用例下手:

func testsequentialsingle(t *testing.t) {
    mr := sequential("test", makeinputs(1), 1, mapfunc, reducefunc)
    mr.wait()
    check(t, mr.files)
    checkworker(t, mr.stats)
    cleanup(mr)
}

从sequential()开始调用链如下:
MIT-6.824 lab1-MapReduce
现在要做的是完成domap()和doreduce()。

domap():

func domap(
    jobname string, // the name of the mapreduce job
    maptask int, // which map task this is
    infile string,
    nreduce int, // the number of reduce task that will be run ("r" in the paper)
    mapf func(filename string, contents string) []keyvalue,
) {
    //打开infile文件,读取全部内容
    //调用mapf,将内容转换为键值对
    //根据reducename()返回的文件名,打开nreduce个中间文件,然后将键值对以json的格式保存到中间文件

    inputcontent, err := ioutil.readfile(infile)
    if err != nil {
        panic(err)
    }

    keyvalues := mapf(infile, string(inputcontent))

    var intermediatefileencoders []*json.encoder
    for reducetasknumber := 0; reducetasknumber < nreduce; reducetasknumber++ {
        intermediatefile, err := os.create(reducename(jobname, maptask, reducetasknumber))
        if err != nil {
            panic(err)
        }
        defer intermediatefile.close()
        enc := json.newencoder(intermediatefile)
        intermediatefileencoders = append(intermediatefileencoders, enc)
    }
    for _, kv := range keyvalues {
        err := intermediatefileencoders[ihash(kv.key) % nreduce].encode(kv)
        if err != nil {
            panic(err)
        }
    }
}

总结来说就是:

  1. 读取输入文件内容
  2. 将内容交个用户定义的map函数执行,生成键值对
  3. 保存键值对

doreduce:

func doreduce(
    jobname string, // the name of the whole mapreduce job
    reducetask int, // which reduce task this is
    outfile string, // write the output here
    nmap int, // the number of map tasks that were run ("m" in the paper)
    reducef func(key string, values []string) string,
) {
    //读取当前reducetasknumber对应的中间文件中的键值对,将相同的key的value进行并合
    //调用reducef
    //将reducef的结果以json形式保存到mergename()返回的文件中

    kvs := make(map[string][]string)
    for maptasknumber := 0; maptasknumber < nmap; maptasknumber++ {
        middatafilename := reducename(jobname, maptasknumber, reducetask)
        file, err := os.open(middatafilename)
        if err != nil {
            panic(err)
        }
        defer file.close()

        dec := json.newdecoder(file)
        for {
            var kv keyvalue
            err = dec.decode(&kv)
            if err != nil {
                break
            }
            values, ok := kvs[kv.key]
            if ok {
                kvs[kv.key] = append(values, kv.value)
            } else {
                kvs[kv.key] = []string{kv.value}
            }
        }
    }

    outputfile, err := os.create(outfile)
    if err != nil {
        panic(err)
    }
    defer outputfile.close()
    enc := json.newencoder(outputfile)
    for key, values := range kvs {
        enc.encode(keyvalue{key, reducef(key, values)})
    }
}

总结:

  1. 读取中间数据
  2. 执行reducef
  3. 保存结果

文件转换的过程大致如下:
MIT-6.824 lab1-MapReduce

part ii: single-worker word count

这部分将用一个简单的实例展示如何使用mr框架。需要我们实现main/wc.go中的mapf()和reducef()来统计单词的词频。

mapf:

func mapf(filename string, contents string) []mapreduce.keyvalue {
    // your code here (part ii).
    words := strings.fieldsfunc(contents, func(r rune) bool {
        return !unicode.isletter(r)
    })
    var kvs []mapreduce.keyvalue
    for _, word := range words {
        kvs = append(kvs, mapreduce.keyvalue{word, "1"})
    }
    return kvs
}

将文本内容分割成单词,每个单词对应一个<word, "1">键值对。

reducef:

func reducef(key string, values []string) string {
    // your code here (part ii).
    return strconv.itoa(len(values))
}

value中有多少个"1",就说明这个word出现了几次。

part iii: distributing mapreduce tasks

目前实现的版本都是执行完一个map然后在执行下一个map,也就是说没有并行,这恰恰是mapreduce最大的买点。这部分需要实现schedule(),该函数将任务分配给worker去执行。当然这里并没有真正的多机部署,而是使用多线程进行模拟。
master和worker的关系大致如下:
MIT-6.824 lab1-MapReduce
在创建worker对象的时候会调用register() rpc,master收到rpc后,将该worker的id保存在数组中,执行shedule()是可以根据该id,通过dotask() rpc调用该worker的dotask()执行map或reduce任务。

schedule.go

func schedule(jobname string, mapfiles []string, nreduce int, phase jobphase, registerchan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapphase:
        ntasks = len(mapfiles)
        n_other = nreduce
    case reducephase:
        ntasks = nreduce
        n_other = len(mapfiles)
    }

    fmt.printf("schedule: %v %v tasks (%d i/os)\n", ntasks, phase, n_other)

    //总共有ntasks个任务,registerchan中保存着空闲的workers
    taskchan := make(chan int)
    var wg sync.waitgroup
    go func() {
        for tasknumber := 0; tasknumber < ntasks; tasknumber++ {
            taskchan <- tasknumber
            fmt.printf("taskchan <- %d in %s\n", tasknumber, phase)
            wg.add(1)

        }

        wg.wait()                           //ntasks个任务执行完毕后才能通过
        close(taskchan)
    }()


    for task := range taskchan {            //所有任务都处理完后跳出循环
        worker := <- registerchan         //消费worker
        fmt.printf("given task %d to %s in %s\n", task, worker, phase)

        var arg dotaskargs
        arg.jobname = jobname
        arg.phase = phase
        arg.tasknumber = task
        arg.numotherphase = n_other

        if phase == mapphase {
            arg.file = mapfiles[task]
        }

        go func(worker string, arg dotaskargs) {
            if call(worker, "worker.dotask", arg, nil) {
                //执行成功后,worker需要执行其它任务
                //注意:需要先掉wg.done(),然后调register<-worker,否则会出现死锁
                //fmt.printf("worker %s run task %d success in phase %s\n", worker, task, phase)
                wg.done()
                registerchan <- worker  //回收worker
            } else {
                //如果失败了,该任务需要被重新执行
                //注意:这里不能用taskchan <- task,因为task这个变量在别的地方可能会被修改。比如task 0执行失败了,我们这里希望
                //将task 0重新加入到taskchan中,但是因为执行for循环的那个goroutine,可能已经修改task这个变量为1了,我们错误地
                //把task 1重新执行了一遍,并且task 0没有得到执行。
                taskchan <- arg.tasknumber
            }
        }(worker, arg)

    }
    fmt.printf("schedule: %v done\n", phase)

}

这里用到了两个channel,分别是registerchan和taskchan。
registerchan中保存了可用的worker id。
生产:

  1. worker调用register()进行注册,往里添加
  2. worker成功执行dotask()后,该worker需要重新加入registerchan

消费:

  1. schedule()拿到一个任务后,消费registerchan

taskchan中保存了任务号。任务执行失败需要重新加入taskchan。

part iv: handling worker failures

之前的代码已经体现了,对于失败的任务重新执行。

part v: inverted index generation

这是mapreduce的一个应用,生成倒排索引,比如想查某个单词出现在哪些文本中,就可以建立倒排索引来解决。

func mapf(document string, value string) (res []mapreduce.keyvalue) {
    // your code here (part v).
    words := strings.fieldsfunc(value, func(r rune) bool {
        return !unicode.isletter(r)
    })
    var kvs []mapreduce.keyvalue
    for _, word := range words {
        kvs = append(kvs, mapreduce.keyvalue{word, document})
    }
    return kvs
}

func reducef(key string, values []string) string {
    // your code here (part v).
    values = removeduplicationandsort(values)
    return strconv.itoa(len(values)) + " " + strings.join(values, ",")
}

func removeduplicationandsort(values []string) []string {
    kvs := make(map[string]struct{})
    for _, value := range values {
        _, ok := kvs[value]
        if !ok {
            kvs[value] = struct{}{}
        }
    }
    var ret []string
    for k := range kvs {
        ret = append(ret, k)
    }
    sort.strings(ret)
    return ret
}

mapf()生成<word, document>的键值对,reducef()处理word对应的所有document,去重并且排序,然后拼接到一起。

具体代码在:
如有错误,欢迎指正:
15313676365