MIT 6.824 Lab1 MapReduce
概述
本文章主要讲述lab1的基本实现思路,具体的实验要求见MIT-Lab1。实验代码见Lab代码。
基本需求
- 一个Coordinator管理多个Worker,通过RPC进行通信
- Worker向Corrdinator请求任务,Coordinator向Worker分配任务
- Coordinator能够处理Worker Crash
基本数据结构
Coordinator
type Coordinator struct {
// Your definitions here.
nReduce int
nMap int
workerLists sync.Map
startReduce chan bool
// MapTask
muMapTask sync.Mutex
mapTaskNeedExec int
mapTaskLists []*MapTask
mapTaskQueue chan *MapTask
// ReduceTask
muReduceTask sync.Mutex
reduceTaskNeedExec int
reduceTaskLists []*ReduceTask
reduceTaskQueue chan *ReduceTask
}
workerLists用来管理Worker所有Worker的状态,mapTaskQueue和reduceTaskQueue为并发队列,用于Worker并发获取任务,mapTaskLists和reduceTaskLists用于存储所有的Task。
Worker
type worker struct {
id string
nReduce int
needExit chan bool
}
needExit同于判断当前Worker是否可以退出,即所有任务已经完成。
具体功能
Worker注册
每个Worker新加入集群时,都要向Coordinator发起注册,Coordinator收到注册请求后,会进行合法性判断,如果合法则加入到workerLists中
// worker.go
func (w *worker) register() {
w.id = strconv.Itoa(os.Getpid())
reply := RegisterReply{}
args := RegisterArgs{WorkerID: w.id}
call("Coordinator.Register", &args, &reply)
w.nReduce = reply.ReduceNum
}
// coordinator.go
func (c *Coordinator) Register(args *RegisterArgs, reply *RegisterReply) error {
workerID := args.WorkerID
_, exist := c.workerLists.Load(workerID)
if exist {
return errors.New(ErrDuplicateWorker)
}
reply.ReduceNum = c.nReduce
worker := workerRecord{currTaskId: -1}
c.workerLists.Store(workerID, &worker)
return nil
}
任务请求与执行
Worker会启动一个线程不断循环向Coordinator获取任务,调用相应的Map的Reduce函数,并在任务执行完成后通知Coordinator。如果Coordinator通知Worker所有任务都已经结束,则Worker进程可以直接退出。
// worker.go
func (w *worker) requestAndReportTask(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
for {
reply := RequestTaskReply{}
call("Coordinator.DispatchTask", &RequestTaskArgs{w.id}, &reply)
taskType := reply.TaskType
taskId := reply.TaskId
reportArgs := CompleteReportArgs{TaskId: taskId, WorkerID: w.id, TaskType: taskType}
reportReply := CompleteReportReply{}
switch taskType {
case Map:
res := mapf(reply.Content.(KeyValue).Key, reply.Content.(KeyValue).Value)
reportArgs.TempFiles = tempFiles
call("Coordinator.CompleteReport", &reportArgs, &reportReply)
case Reduce:
kv := KeyValueSlice(reply.Content.([]KeyValue))
sort.Sort(kv)
result := []KeyValue{}
for i := 0; i < len(kv); {
j := i + 1
for j < len(kv) && kv[j].Key == kv[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, kv[k].Value)
}
output := reducef(kv[i].Key, values)
result = append(result, KeyValue{Key: kv[i].Key, Value: output})
i = j
}
if reportReply.TaskFinished {
w.needExit <- true
os.Exit(1)
}
case NoTask:
time.Sleep(time.Second)
}
}
}
Coordinator在分发任务时,会从并发队列中获取要执行的Task,并且发送给请求的Worker。
// coordinator.go
func (c *Coordinator) DispatchTask(args *RequestTaskArgs, reply *RequestTaskReply) error {
workerId := args.WorkerID
record, exist := c.workerLists.Load(workerId)
if !exist {
return errors.New(ErrNoWorker)
}
record.(*workerRecord).muTime.Lock()
record.(*workerRecord).taskStartTime = time.Now()
record.(*workerRecord).muTime.Unlock()
select {
case mTask := <-c.mapTaskQueue:
reply.Content = KeyValue{Key: mTask.fileName, Value: mTask.content}
reply.TaskType = Map
reply.TaskId = mTask.id
record.(*workerRecord).muTask.Lock()
//fmt.Printf("worker %v get task: %d\n", workerId, mTask.id)
record.(*workerRecord).currTaskId = mTask.id
record.(*workerRecord).muTask.Unlock()
case rTask := <-c.reduceTaskQueue:
reply.Content = rTask.content
reply.TaskType = Reduce
reply.TaskId = rTask.id
record.(*workerRecord).muTask.Lock()
record.(*workerRecord).currTaskId = rTask.id
record.(*workerRecord).muTask.Unlock()
default:
reply.TaskType = NoTask
}
return nil
}
宕机处理
在Worker的任务10s内没有执行完成时,我们判断该Worker宕机,将其正在执行的任务分配到其他空闲的Worker上。Coordinator端会开启一个线程每秒查询所有Worker的任务执行状态,如果该Worker宕机,则将其执行的任务重新放入缓冲队列中。
// coordinator.go
func (c *Coordinator) checkAlive() {
for {
t := time.NewTimer(time.Second)
<-t.C
c.workerLists.Range(func(k, v interface{}) bool {
//fmt.Printf("check worker:%v\n", k)
v.(*workerRecord).muTask.Lock()
defer v.(*workerRecord).muTask.Unlock()
if v.(*workerRecord).currTaskId == -1 {
//fmt.Printf("worker:%v no task\n", k)
return true
}
t := time.Now()
v.(*workerRecord).muTime.Lock()
duration := t.Sub(v.(*workerRecord).taskStartTime)
v.(*workerRecord).muTime.Unlock()
if duration > (time.Second * AliceCheckDuration) {
c.workerLists.Delete(k)
//fmt.Printf("worker %v offline, crash occured\n", k)
c.muMapTask.Lock()
if c.mapTaskNeedExec != 0 {
c.mapTaskQueue <- c.mapTaskLists[v.(*workerRecord).currTaskId]
}
c.muMapTask.Unlock()
c.muReduceTask.Lock()
if c.reduceTaskNeedExec != 0 {
c.reduceTaskQueue <- c.reduceTaskLists[v.(*workerRecord).currTaskId]
}
c.muReduceTask.Unlock()
}
return true
})
}
}
在处理宕机时,有个问题就是可能出现一个任务重复执行的情况。比如Worker a执行Task a时由于CPU调度等因素超过了10s,此时我们将Task a重新分配给Worker b继续执行,这时Worker a上的Task a还可能会执行。为了解决这个问题,我们采用了MapReduce中的基于临时文件的解决方案,每个Worker把结果写入临时文件,由Coordinator将其重命名,由于重命名操作是原子操作,因此不会发生写冲突。
总结
本文只展示了部分重要代码,完整代码和数据结构见 MIT-6.24 lab 代码
推荐阅读