6.824 lab1
一、实验说明
https://pdos.csail.mit.edu/6.824/labs/lab-1.html
二、梳理
在 lab1 中需要编写填充MapReduce 部分代码,使用 Go 建立一个容错的分布式系统。
mapreduce 包提供了一个简单的 Map/Reduce 库,调用 master.go/Distributed()
来开始任务
mapreduce在运行大致为5部分
1. input:获取输入数据进行分片作为map的输入
2. map:将输入记录解析成一条或多条记录
3. shuffle:对中间数据的处理,作为reduce的输入
4. reduce:对相同key的数据进行合并
5. output:按照格式输出到指定目录
代码主要流程如下:
- Master 读取输入文件,然后调用
common_map.go中的doMap()
。doMap()
运行 mapF (),将文件转化为 key/value (KeyValue)结果写入nReduce
个中间文件之中。在 map 结束后总共会生成nMap * nReduce
个文件。 - 之后调用
common_reduce.go/doReduce()
,doReduce()
将按照 reduce task 编号(reduceTask)来汇总,生成nReduce
个结果文件。
Part I: Map/Reduce input and output
描述
doMap()
- 读取数据
- 调用mapF 把文件进行解析成KeyValue
- 生成 nReduce 个子文件
- 利用 KeyValue中的 Key 值做哈希,将得到的值对 nReduce 取余,根据结果将KeyValue存入对应文件之中
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
fileContent, err := ioutil.ReadFile(inFile)
if err != nil {
log.Fatal(err)
}
// use JSON, but as the output of the reduce tasks *must* be JSON
encoders := make([]*json.Encoder, nReduce)
// nReduce the number of reduce task that will be run
for i := 0; i < nReduce; i++ {
var err error
filename := reduceName(jobName, mapTask, i)
outFile, err := os.Create(filename)
if err != nil {
log.Fatal(err)
}
defer outFile.Close()
encoders[i] = json.NewEncoder(outFile)
}
mapRes := mapF(inFile, string(fileContent))
for _, kv := range mapRes {
index := int(ihash(kv.Key)) % nReduce
encoders[index].Encode(&kv)
}
}
doReduce()
- 读取同一个reduce task下所有文件
- 解析文件中的KeyValue
- 对KeyValue进行排序
- 生成一个文件,并根据排序将KeyValue写入其中
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
keyValues := make(map[string][]string)
for i := 0; i < nMap; i++ {
// reduceName(jobName, m, reduceTask) yields the file
filename := reduceName(jobName, i, reduceTask)
file, err := os.Open(filename)
if err != nil {
log.Fatal(err)
}
enc := json.NewDecoder(file)
for {
var kv KeyValue
err := enc.Decode(&kv)
if err != nil {
break
}
_, ok := keyValues[kv.Key]
if !ok {
keyValues[kv.Key] = make([]string, 0)
}
keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
}
}
var keys []string
for k := range keyValues {
keys = append(keys, k)
}
// doReduce manages one reduce task: it should read the intermediate
// files for the task, sort the intermediate key/value pairs by key,
sort.Strings(keys)
mergeFile := mergeName(jobName, reduceTask)
file, err := os.Create(mergeFile)
if err != nil {
fmt.Printf("create reduce merge file:%s error.", mergeFile)
return
}
defer file.Close()
enc := json.NewEncoder(file)
for _, k := range keys {
enc.Encode(KeyValue{k, reduceF(k, keyValues[k])})
}
}
Part II: Single-worker word count
描述
实现字数统计—一个简单的Map / Reduce示例。通过修改main / wc.go中的mapF()和reduceF()函数,以便 wc.go报告输入中每个单词的出现次数。单词是任何连续的字母序列,由unicode.IsLetter确定
mapF
- 读取文件,根据unicode.IsLetter判断是否为字母,进行split
- 将split后的数据组装成KeyValue
reduceF
- 返回values的length(即为key的出现次数)
func mapF(filename string, contents string) []mapreduce.KeyValue {
// Your code here (Part II).
var res []mapreduce.KeyValue
words := strings.FieldsFunc(contents, func(x rune) bool {
return unicode.IsLetter(x) == false
})
for _, w := range words {
kv := mapreduce.KeyValue{w, "1"}
res = append(res, kv)
}
return res
}
func reduceF(key string, values []string) string {
// Your code here (Part II).
return strconv.Itoa(len(values))
}
Part III: Distributing MapReduce tasks
描述
目前为止我们都是串行地执行任务,而Map / Reduce最大的优势是可以自动并行化执行普通代码,而无需开发人员进行任何额外的工作。在本部分的实验中,您将完成一个MapReduce版本,该版本将工作划分为一组在多个内核上并行运行的工作线程。虽然不像实际的Map / Reduce部署那样分布在多台计算机上,但是您的实现将使用RPC来模拟分布式计算。
mapreduce / master.go中 的代码完成了管理MapReduce作业的大部分工作。我们还与辅助线程的完整代码,在为您提供的MapReduce / worker.go,以及一些代码来处理RPC中的MapReduce / common_rpc.go。
您的工作是 在mapreduce / schedule.go中实现schedule()。Master在MapReduce作业期间两次调用schedule(),一次在Map阶段,一次在Reduce阶段。schedule()的工作是将任务分发给可用的worker。通常会有比worker thread更多的任务,因此schedule()必须给每个worker thread一个Task序列。 schedule()应等待所有任务完成,然后返回。
schedule()通过读取registerChan参数来Workers信息,得到一个包含 Worker 的 RPC 地址的 string。有些Worker可能在调用schedule()之前就已经存在,而有些Worker可能在schedule()运行时启动;全部将出现在 registerChan上。schedule()应该使用所有Worker,包括启动后出现的Worker。
schedule()通过将Worker.DoTask RPC 发送给Worker来通知Worker执行任务。该RPC的参数由DoTaskArgs在mapreduce / common_rpc.go中定义。schedule()可以在mapFiles中找到这些文件名。
使用mapreduce / common_rpc.go中的call()函数 将RPC发送给工作程序。第一个参数是工作程序的地址,从registerChan读取。第二个参数应为“ Worker.DoTask”。第三个参数应为DoTaskArgs结构,最后一个参数应为nil。
schedule
- sync.WaitGroup确保所有task执行完成才结束
- 通过读取registerChan参数来Workers信息,得到一个包含 Worker 的 RPC 地址的 string
- 将Worker.DoTask RPC 发送给Worker来通知Worker执行任务
需要注意的问题
chan死锁
registerChan为无缓冲信道ch := make(chan string)
无缓冲信道本身不存储信息,它只负责转手,有人传给它,它就必须要传给别人,如果只有进或者只有出的操作,都会造成阻塞
registerChan <- workAddr后,这一句会阻塞等待其它goroutine从out读取,如果没有就会持续堵塞goroutine
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
var wGroup sync.WaitGroup
wGroup.Add(ntasks)
for i := 0; i < ntasks; i++ {
go func(i int) {
defer wGroup.Done()
filename := ""
if i <= len(mapFiles) {
filename = mapFiles[i]
}
taskArgs := DoTaskArgs{
JobName: jobName,
File: filename,
Phase: phase,
TaskNumber: i,
NumOtherPhase: n_other,
}
taskFinished := false
for taskFinished == false {
workAddr := <-registerChan
taskFinished = call(workAddr, "Worker.DoTask", taskArgs, nil)
go func() {
registerChan <- workAddr
}()
}
}(i)
}
wGroup.Wait()
fmt.Printf("Schedule: %v done\n", phase)
}
Part IV: Handling worker failures
描述
在这一部分中,您将使Master处理failed Worker。MapReduce使此操作相对容易,因为Worker没有持久状态。如果Worker在处理来自Master的RPC时失败,则由于超时,主服务器的call()最终将返回false。在这种情况下,Master 需要把任务交给另一个 Worker。
RPC失败并不一定意味着该Worker未执行任务;Worker可能已经执行了它,但是Reply丢失了,或者Worker仍在执行但Master的RPC超时了。因此,可能会发生两个Worker接收同一任务,对其进行计算并生成输出的情况。对于给定的输入,需要两次调用map或reduce函数生成相同的输出,因此,如果后续处理有时读取一个输出,有时又读取另一个输出,则不会出现不一致的情况。此外,MapReduce框架可确保map和reduce函数的输出原子显示:输出文件将不存在,或者包含map或reduce函数的一次执行的全部输出
taskFinished := false
for taskFinished == false {
workAddr := <-registerChan
taskFinished = call(workAddr, "Worker.DoTask", taskArgs, nil)
go func() {
registerChan <- workAddr
}()
}
Part V: Inverted index generation
描述
倒排索引在计算机科学中被广泛使用,并且在文档搜索中特别有用。广义而言,倒排索引是从有关基础数据的有趣事实到该数据的原始位置的映射。例如,在搜索的上下文中,它可能是从关键字到包含这些单词的文档的映射。
我们在main / ii.go 中创建了第二个二进制文件,该文件与您之前构建的wc.go非常相似。您应该在main / ii.go中修改mapF和 reduceF,以便它们一起产生一个反向索引。
- 通过map去重
- sort.Strings对values排序,然后按照格式次数 排序后的文件名返回结果
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// Your code here (Part V).
words := strings.FieldsFunc(value, func(x rune) bool {
return unicode.IsLetter(x) == false
})
kvMap := make(map[string]string)
for _, w := range words {
kvMap[w] = document
}
for k, v := range kvMap {
res = append(res, mapreduce.KeyValue{k, v})
}
return res
}
func reduceF(key string, values []string) string {
// Your code here (Part V).
vLen := len(values)
sort.Strings(values)
res := strconv.Itoa(vLen) + " " + strings.Join(values, ",")
return res
}
上一篇: ucore lab2
下一篇: 探索——redis实现分布式锁
推荐阅读
-
MIT-6.828 Lab1实验报告
-
MIT-6.824 Lab 3: Fault-tolerant Key/Value Service
-
【操作系统作业—lab1】linux shell脚本 遍历目标文件夹和所有文件 | 包括特殊字符文件名的处理
-
MIT6.824 远程过程调用RPC使用
-
[软件构造] 01 lab1的心得体会
-
6.824 Lab 1: MapReduce
-
6.824 Lab1: MapReduce
-
MIT 6.824 Lab1 MapReduce
-
《Distributed Systems》(6.824)LAB1(mapreduce)
-
MIT6.824 Lab1 MapReduce