欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

MIT 6.824-lab1

程序员文章站 2022-03-04 23:51:58
...

最近正在学习6.824的课程,准备一边学习一边跟着做课设。新版的课设都是使用go来完成的,所以花了一周多的时间学习了一下go语言的使用。相对于C++,用go来做课设确实方便不少,有各种现成rpc库,序列化与反序列化的库,实现lab时能够让从繁琐的网络通信和IO中解脱出来,更关注分布式交互本身的逻辑。直接切入正题

1. mapreduce

lab1是让设计一个简单的mapreduce框架。做之前需要先看一看mapreduce的论文。
mapreduce中文版
看完论文会对mapreduce有个大概的了解,即这是一个用于任务并行执行的框架。其内部有一个master节点和多个worker节点,master节点负责分发任务与执行任务所需的资源,worker接受master的调度,负责执行具体的任务,任务具体分为map和reduce两种,其实际工作的内容由用户指定。

2. 实验要求

动手做之前先认真看一看官方文档,里面讲了实验要求,也给了很多提示和实现细节。
lab1官网文档

实验要求:实现一个mapreduce框架,并通过测试样例。

下面来看一下会用到的几个文件:
MIT 6.824-lab1
mrmaster.go:是用来启动master节点进程的,启动之后会调用mr/master.go文件。
mrworker.go:是用来创建worker进程的,启动之后会调用mr/worker.go。
mrapps:文件夹中的程序代表一个实际的执行任务,即具体的map任务和reduce任务,如wc.go表示单词计数任务,indexer.go表示生成倒排索引任务。这些文件会被编译成为动态链接库,由mrworker.go进行装载,获取具体的执行任务
mr:该文件夹下面的才是我们具体需要实现的,master.go是作为master节点需要具备的功能,worker.go是worker节点需要执行的任务,rpc.go中需要实现master节点worker节点进行远程调用所需的数据结构

3.分析与实现

3.1 整体流程

首先捋一捋大概的执行流程。执行mrmaster.go,读入输入文件(这些文件作为map任务),启动master.go;执行mrworker.go(可能是多个),读入对用的so文件(比如wc.so),获取具体的执行任务,启动worker.go。

  1. master节点将任务需要执行map任务的文件分配给多个worker节点,然后等待任务完成。
  2. 多个worker节点完成map任务后,按照要求生成中间文件,并将其信息告知master节点。
  3. master节点在所有的map任务完成后,将中间文件作为reduce任务的输入分发给多个worker节点。
  4. 所有worker节点完成任务后告知master节点,master完成mapreduce任务。

3.2 RPC结构体

在本lab中,master是一个Server,为了简化处理,我们让master节点不主动推送信息,所以worker节点获取任务的办法是通过rpc想master节点进行拉取。所以master节点至少需要提供两种rpc服务:

  1. worker获取任务
  2. worker上报任务完成
//获取任务的请求结构体,本身不用携带信息
type GetTaskRequest struct {
	X int//暂时没用
}
//master对于任务请求rpc的回复
type GetTaskResponse struct {
	MFileName string//如果是map任务,则记录map文件名字
	TaskName string//该任务的名字(一个全局唯一的编号)
	RFileName []string//如果是reduce任务,则记录reduce文件名字
	TaskType int//任务类别,0:map,1:reduce,2:sleep
	ReduceNumber int//需要将中间文件分组的数量
}
//worker上报任务完成状态
type ReportStatusRequest struct {
	FilesName []string//如果是map任务,需要告知master中间文件的信息
	TaskName string//该任务的名字

}
//master对于上报任务的回复,不需要携带信息
type ReportStatusResponse struct {
	X int
}

3.3 master节点

作为master节点,需要记录的信息起码包括,需要执行的任务,正在执行的任务。我们的实现如下:

//Tast结构体,表示当前正在执行的任务
type Task struct {
	Name string//任务名字
	Type int//任务类别
	Status int//任务状态,正常或者超时
	mFileName string//如果是map任务,则记录分配给该任务的文件名字
	rFileName int//如果是reduce任务,则记录分配给该任务的文件组编号
}
//一个全局递增变量,作为每个Task的名字,用来区分不同的Task
var taskNumber int = 0
//master结构体
type Master struct {
	mrecord map[string]int//记录需要map的文件,0表示未执行,1表示正在执行,2表示已经完成
	rrecord map[int]int//记录需要reduce的文件,0表示未执行,1表示正在执行,2表示已经完成
	reducefile map[int][]string//记录中间文件
	taskmap map[string]*Task //任务池,记录当前正在执行的任务
	mcount int//记录已经完成map的任务数量
	rcount int//记录已经完成的reduce的任务数量
	mapFinished bool//标志map任务是否已经完成
	reduceNumber int//需要执行的reduce的数量
	mutex sync.Mutex//锁
}

3.4 Master功能实现

上面已经说了,Master节点起码需要提供两种RPC服务,一种是任务获取的服务,代码如下:

func (m *Master) GetTask(args *GetTaskRequest,reply* GetTaskResponse) error{
	m.mutex.Lock()//首先需要加锁,因为Master是为多个Worker服务的
	defer m.mutex.Unlock()
	reply.RFileName=make([]string,0)
	reply.ReduceNumber=m.reduceNumber
	reply.MFileName=""
	reply.TaskName=strconv.Itoa(taskNumber)
	taskNumber+=1
	if m.mapFinished{//如果map任务已经完成了,则该分配reduce任务
		for v:=range m.rrecord{//遍历reduce任务
			flag:=m.rrecord[v]
			if flag==Processing||flag==Finished{//如果这个任务正在执行或者已经结束,找下一个任务
				continue
			}else{
				m.rrecord[v]=Processing//将该任务状态修改
				for _,filename:=range m.reducefile[v]{
					reply.RFileName = append(reply.RFileName, filename)//将这一组文件名添加到回复中,作为reduce任务的输入
				}
				reply.TaskType=Reduce
				t:=&Task{reply.TaskName,reply.TaskType,Working,"",v}
				m.taskmap[reply.TaskName]=t//记录下任务
				go m.HandleTimeout(reply.TaskName)//超时处理
				return nil
			}
		}
		reply.TaskType=Sleep//如果所有没有待执行的任务,让Worker休眠
		return nil
	}else{
		//分配map任务
		for v,_:=range m.mrecord{
			flag:=m.mrecord[v]
			if flag==Processing||flag==Finished{//如果这个任务正在执行或者已经结束,找下一个任务
				continue
			}else{
				m.mrecord[v]=Processing//修改文件状态
				reply.MFileName=v
				reply.TaskType=Map
				//记录当前任务				
				t:=&Task{reply.TaskName,reply.TaskType,Working,reply.MFileName,-1}
				m.taskmap[reply.TaskName]=t
				//超时处理
				go m.HandleTimeout(reply.TaskName)
				return nil
			}
		}
		//如果没有待执行的map任务,让worker休眠
		reply.TaskType=Sleep
		return nil
	}
	return nil
}

对于超过十秒未响应的worker节点我们认为其为失效节点,已经分配给他的任务需要重新分配给其他节点,所以在分配任务是启动一个新协程进行超时处理,函数如下:

func (m *Master) HandleTimeout(taskName string){

	time.Sleep(time.Second*10)//睡眠十秒
	m.mutex.Lock()
	defer m.mutex.Unlock()
	if t,ok:=m.taskmap[taskName];ok{//睡眠十秒这个任务还在任务池中,则意味着任务超时,需要处理
		t.Status=Timeout//任务设置为超时状态
		if t.Type==Map{
			f:=t.mFileName
			if m.mrecord[f]==Processing{//修改文件状态,还在执行中修改为未完成,方便分配给其他的worker
				m.mrecord[f]=NotStarted//将任务设置为未开始状态,以便下一次分配给其他worker进行执行
			}
		}else if t.Type==Reduce{
			f:=t.rFileName
			if m.rrecord[f]==Processing{
				m.rrecord[f]=NotStarted
			}
		}
	}
}

另一个RPC服务为上报服务,代码如下:

func (m *Master) Report(args *ReportStatusRequest,reply* ReportStatusResponse)error{
	reply.X=1
	m.mutex.Lock()
	defer m.mutex.Unlock()
	if t,ok:=m.taskmap[args.TaskName];ok{
	//如果还在任务池中
		flag:=t.Status
		if flag==Timeout{//如果任务已经超时了,对于超时回复的任务直接忽略,因为这个任务可能已经被分配给其他的worker节点了
			delete(m.taskmap, args.TaskName)//删除该任务
			return nil
		}
		
		ttype:=t.Type
		if ttype==Map{//如果是map任务
			f:=t.mFileName
			m.mrecord[f]=Finished
			m.mcount+=1//增加完成的map任务计数
			if m.mcount== len(m.mrecord){//如果完成全部map任务
				m.mapFinished=true
			}
			//记录下map任务回复中的所有中间文件信息
			for _,v:=range args.FilesName{
				index:=strings.LastIndex(v,"_")
				num,err:=strconv.Atoi(v[index+1:])
				if err!=nil{
					log.Fatal(err)
				}
				m.reducefile[num] = append(m.reducefile[num], v)
			}
			delete(m.taskmap, t.Name)//任务池中删除已完成任务
			return nil
		}else if ttype==Reduce{//如果是reduce任务
			rf:=t.rFileName
			m.rrecord[rf]=Finished
			m.rcount+=1//增加已完成任务计数
			delete(m.taskmap, t.Name)//删除已完成任务
			return nil
		}else{
			log.Fatal("task type is not map and reduce")
		}
	}
	log.Println("%s task is not in Master record",args.TaskName)
	return nil
}

3.5 Worker节点功能实现

worker节点启动后就执行一个循环,从master获取任务,执行,然后上报。其主循环代码如下:

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for{//无限循环
		args:=GetTaskRequest{}
		args.X=0
		rep:=GetTaskResponse{}
		call("Master.GetTask",&args,&rep)//获取任务
		//log.Println("name",rep.TaskName,"type",rep.TaskType)
		if rep.TaskType==Map{//如果获取到了map任务
			//处理map任务
		filenames:=HandleMap(mapf,rep.MFileName,rep.ReduceNumber,rep.TaskName)
			rargs:=ReportStatusRequest{}
			rargs.TaskName=rep.TaskName
			rargs.FilesName=filenames
			rreply:=ReportStatusResponse{}
			rreply.X=0
			//上报任务完成
			call("Master.Report",&rargs,&rreply)
		}else if rep.TaskType==Reduce{//如果获取到了reduce任务
		//处理reduce任务
			HandleReduce(reducef,rep.RFileName)
			rargs:=ReportStatusRequest{}
			rargs.TaskName=rep.TaskName
			rargs.FilesName=make([]string,0)
			rreply:=ReportStatusResponse{}
			rreply.X=0
			call("Master.Report",&rargs,&rreply)
		}else if rep.TaskType==Sleep{//master暂无可执行任务,休眠一段时间
			time.Sleep(time.Millisecond*10)
			//log.Println("Sleep Task")
		}else{
			log.Fatal("get task is not map sleep and reduce")
		}

	}
	// Your worker implementation here.

	// uncomment to send the Example RPC to the master.
	// CallExample()

}

其中HandleMap与HandleReduce的具体实现就比较简单了。HandleMap就读入输入文件,然后将读到的数据处理,并将输出写入到中间文件中。HandleReduce的就读取中间文件处理并将数据输出到对应的文件中即可。

func HandleMap(mapf func(string, string) []KeyValue,filename string,filenum int,tasknum string) []string{
	intermediate := []KeyValue{}
	file,err:=os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)//读取输入文件
	//log.Println(content)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	file.Close()
	kva := mapf(filename, string(content))//调用用户设置的map函数
	intermediate = append(intermediate, kva...)
	filenames:=make([]string,filenum)
	files:=make([]*os.File,filenum)

	for i:=0;i<filenum;i++{
		oname := "mr"
		oname=oname+"_"+tasknum+"_"+strconv.Itoa(i)
		//log.Println("create ",oname)
		ofile,_:=os.Create(oname)
		files[i]=ofile
		filenames[i]=oname
	}
	for _,kv:=range intermediate{//将数据写入到对应的文件中。为了方便reduce读取,所以选择以json格式写入
		index:=ihash(kv.Key)%filenum
		enc:=json.NewEncoder(files[index])
		enc.Encode(&kv)
	}
	return filenames
}

func HandleReduce(reducef func(string, []string) string,filenames []string)string{
	files:=make([]*os.File,len(filenames))
	intermediate := []KeyValue{}
	for i:=0;i<len(filenames);i++{//读取所有的文件信息
		files[i],_=os.Open(filenames[i])
		kv:=KeyValue{}
		dec:=json.NewDecoder(files[i])
		for{
			if err:=dec.Decode(&kv);err!=nil{
				break
			}
			intermediate = append(intermediate, kv)
		}

	}
	sort.Sort(ByKey(intermediate))//将读到的所有键值对排序
	//log.Println("intermediate",len(intermediate))
	oname := "mr-out-"

	index:=filenames[0][strings.LastIndex(filenames[0],"_")+1:]
	oname=oname+index
	ofile, _ := os.Create(oname)//创建特定名字的输出文件

	//
	// call Reduce on each distinct key in intermediate[],
	// and print the result to mr-out-0.
	//
	i := 0
	//将同一个键的数据合并并交由用户设置的reduce函数处理,并将结果写入到对应文件中
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)

		// this is the correct format for each line of Reduce output.
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)

		i = j
	}
	return oname
}

查看完整代码点这里