MIT 6.824-lab1
最近正在学习6.824的课程,准备一边学习一边跟着做课设。新版的课设都是使用go来完成的,所以花了一周多的时间学习了一下go语言的使用。相对于C++,用go来做课设确实方便不少,有各种现成rpc库,序列化与反序列化的库,实现lab时能够让从繁琐的网络通信和IO中解脱出来,更关注分布式交互本身的逻辑。直接切入正题
1. mapreduce
lab1是让设计一个简单的mapreduce框架。做之前需要先看一看mapreduce的论文。
mapreduce中文版
看完论文会对mapreduce有个大概的了解,即这是一个用于任务并行执行的框架。其内部有一个master节点和多个worker节点,master节点负责分发任务与执行任务所需的资源,worker接受master的调度,负责执行具体的任务,任务具体分为map和reduce两种,其实际工作的内容由用户指定。
2. 实验要求
动手做之前先认真看一看官方文档,里面讲了实验要求,也给了很多提示和实现细节。
lab1官网文档
实验要求:实现一个mapreduce框架,并通过测试样例。
下面来看一下会用到的几个文件:
mrmaster.go:是用来启动master节点进程的,启动之后会调用mr/master.go文件。
mrworker.go:是用来创建worker进程的,启动之后会调用mr/worker.go。
mrapps:文件夹中的程序代表一个实际的执行任务,即具体的map任务和reduce任务,如wc.go表示单词计数任务,indexer.go表示生成倒排索引任务。这些文件会被编译成为动态链接库,由mrworker.go进行装载,获取具体的执行任务
mr:该文件夹下面的才是我们具体需要实现的,master.go是作为master节点需要具备的功能,worker.go是worker节点需要执行的任务,rpc.go中需要实现master节点worker节点进行远程调用所需的数据结构
3.分析与实现
3.1 整体流程
首先捋一捋大概的执行流程。执行mrmaster.go,读入输入文件(这些文件作为map任务),启动master.go;执行mrworker.go(可能是多个),读入对用的so文件(比如wc.so),获取具体的执行任务,启动worker.go。
- master节点将任务需要执行map任务的文件分配给多个worker节点,然后等待任务完成。
- 多个worker节点完成map任务后,按照要求生成中间文件,并将其信息告知master节点。
- master节点在所有的map任务完成后,将中间文件作为reduce任务的输入分发给多个worker节点。
- 所有worker节点完成任务后告知master节点,master完成mapreduce任务。
3.2 RPC结构体
在本lab中,master是一个Server,为了简化处理,我们让master节点不主动推送信息,所以worker节点获取任务的办法是通过rpc想master节点进行拉取。所以master节点至少需要提供两种rpc服务:
- worker获取任务
- worker上报任务完成
//获取任务的请求结构体,本身不用携带信息
type GetTaskRequest struct {
X int//暂时没用
}
//master对于任务请求rpc的回复
type GetTaskResponse struct {
MFileName string//如果是map任务,则记录map文件名字
TaskName string//该任务的名字(一个全局唯一的编号)
RFileName []string//如果是reduce任务,则记录reduce文件名字
TaskType int//任务类别,0:map,1:reduce,2:sleep
ReduceNumber int//需要将中间文件分组的数量
}
//worker上报任务完成状态
type ReportStatusRequest struct {
FilesName []string//如果是map任务,需要告知master中间文件的信息
TaskName string//该任务的名字
}
//master对于上报任务的回复,不需要携带信息
type ReportStatusResponse struct {
X int
}
3.3 master节点
作为master节点,需要记录的信息起码包括,需要执行的任务,正在执行的任务。我们的实现如下:
//Tast结构体,表示当前正在执行的任务
type Task struct {
Name string//任务名字
Type int//任务类别
Status int//任务状态,正常或者超时
mFileName string//如果是map任务,则记录分配给该任务的文件名字
rFileName int//如果是reduce任务,则记录分配给该任务的文件组编号
}
//一个全局递增变量,作为每个Task的名字,用来区分不同的Task
var taskNumber int = 0
//master结构体
type Master struct {
mrecord map[string]int//记录需要map的文件,0表示未执行,1表示正在执行,2表示已经完成
rrecord map[int]int//记录需要reduce的文件,0表示未执行,1表示正在执行,2表示已经完成
reducefile map[int][]string//记录中间文件
taskmap map[string]*Task //任务池,记录当前正在执行的任务
mcount int//记录已经完成map的任务数量
rcount int//记录已经完成的reduce的任务数量
mapFinished bool//标志map任务是否已经完成
reduceNumber int//需要执行的reduce的数量
mutex sync.Mutex//锁
}
3.4 Master功能实现
上面已经说了,Master节点起码需要提供两种RPC服务,一种是任务获取的服务,代码如下:
func (m *Master) GetTask(args *GetTaskRequest,reply* GetTaskResponse) error{
m.mutex.Lock()//首先需要加锁,因为Master是为多个Worker服务的
defer m.mutex.Unlock()
reply.RFileName=make([]string,0)
reply.ReduceNumber=m.reduceNumber
reply.MFileName=""
reply.TaskName=strconv.Itoa(taskNumber)
taskNumber+=1
if m.mapFinished{//如果map任务已经完成了,则该分配reduce任务
for v:=range m.rrecord{//遍历reduce任务
flag:=m.rrecord[v]
if flag==Processing||flag==Finished{//如果这个任务正在执行或者已经结束,找下一个任务
continue
}else{
m.rrecord[v]=Processing//将该任务状态修改
for _,filename:=range m.reducefile[v]{
reply.RFileName = append(reply.RFileName, filename)//将这一组文件名添加到回复中,作为reduce任务的输入
}
reply.TaskType=Reduce
t:=&Task{reply.TaskName,reply.TaskType,Working,"",v}
m.taskmap[reply.TaskName]=t//记录下任务
go m.HandleTimeout(reply.TaskName)//超时处理
return nil
}
}
reply.TaskType=Sleep//如果所有没有待执行的任务,让Worker休眠
return nil
}else{
//分配map任务
for v,_:=range m.mrecord{
flag:=m.mrecord[v]
if flag==Processing||flag==Finished{//如果这个任务正在执行或者已经结束,找下一个任务
continue
}else{
m.mrecord[v]=Processing//修改文件状态
reply.MFileName=v
reply.TaskType=Map
//记录当前任务
t:=&Task{reply.TaskName,reply.TaskType,Working,reply.MFileName,-1}
m.taskmap[reply.TaskName]=t
//超时处理
go m.HandleTimeout(reply.TaskName)
return nil
}
}
//如果没有待执行的map任务,让worker休眠
reply.TaskType=Sleep
return nil
}
return nil
}
对于超过十秒未响应的worker节点我们认为其为失效节点,已经分配给他的任务需要重新分配给其他节点,所以在分配任务是启动一个新协程进行超时处理,函数如下:
func (m *Master) HandleTimeout(taskName string){
time.Sleep(time.Second*10)//睡眠十秒
m.mutex.Lock()
defer m.mutex.Unlock()
if t,ok:=m.taskmap[taskName];ok{//睡眠十秒这个任务还在任务池中,则意味着任务超时,需要处理
t.Status=Timeout//任务设置为超时状态
if t.Type==Map{
f:=t.mFileName
if m.mrecord[f]==Processing{//修改文件状态,还在执行中修改为未完成,方便分配给其他的worker
m.mrecord[f]=NotStarted//将任务设置为未开始状态,以便下一次分配给其他worker进行执行
}
}else if t.Type==Reduce{
f:=t.rFileName
if m.rrecord[f]==Processing{
m.rrecord[f]=NotStarted
}
}
}
}
另一个RPC服务为上报服务,代码如下:
func (m *Master) Report(args *ReportStatusRequest,reply* ReportStatusResponse)error{
reply.X=1
m.mutex.Lock()
defer m.mutex.Unlock()
if t,ok:=m.taskmap[args.TaskName];ok{
//如果还在任务池中
flag:=t.Status
if flag==Timeout{//如果任务已经超时了,对于超时回复的任务直接忽略,因为这个任务可能已经被分配给其他的worker节点了
delete(m.taskmap, args.TaskName)//删除该任务
return nil
}
ttype:=t.Type
if ttype==Map{//如果是map任务
f:=t.mFileName
m.mrecord[f]=Finished
m.mcount+=1//增加完成的map任务计数
if m.mcount== len(m.mrecord){//如果完成全部map任务
m.mapFinished=true
}
//记录下map任务回复中的所有中间文件信息
for _,v:=range args.FilesName{
index:=strings.LastIndex(v,"_")
num,err:=strconv.Atoi(v[index+1:])
if err!=nil{
log.Fatal(err)
}
m.reducefile[num] = append(m.reducefile[num], v)
}
delete(m.taskmap, t.Name)//任务池中删除已完成任务
return nil
}else if ttype==Reduce{//如果是reduce任务
rf:=t.rFileName
m.rrecord[rf]=Finished
m.rcount+=1//增加已完成任务计数
delete(m.taskmap, t.Name)//删除已完成任务
return nil
}else{
log.Fatal("task type is not map and reduce")
}
}
log.Println("%s task is not in Master record",args.TaskName)
return nil
}
3.5 Worker节点功能实现
worker节点启动后就执行一个循环,从master获取任务,执行,然后上报。其主循环代码如下:
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for{//无限循环
args:=GetTaskRequest{}
args.X=0
rep:=GetTaskResponse{}
call("Master.GetTask",&args,&rep)//获取任务
//log.Println("name",rep.TaskName,"type",rep.TaskType)
if rep.TaskType==Map{//如果获取到了map任务
//处理map任务
filenames:=HandleMap(mapf,rep.MFileName,rep.ReduceNumber,rep.TaskName)
rargs:=ReportStatusRequest{}
rargs.TaskName=rep.TaskName
rargs.FilesName=filenames
rreply:=ReportStatusResponse{}
rreply.X=0
//上报任务完成
call("Master.Report",&rargs,&rreply)
}else if rep.TaskType==Reduce{//如果获取到了reduce任务
//处理reduce任务
HandleReduce(reducef,rep.RFileName)
rargs:=ReportStatusRequest{}
rargs.TaskName=rep.TaskName
rargs.FilesName=make([]string,0)
rreply:=ReportStatusResponse{}
rreply.X=0
call("Master.Report",&rargs,&rreply)
}else if rep.TaskType==Sleep{//master暂无可执行任务,休眠一段时间
time.Sleep(time.Millisecond*10)
//log.Println("Sleep Task")
}else{
log.Fatal("get task is not map sleep and reduce")
}
}
// Your worker implementation here.
// uncomment to send the Example RPC to the master.
// CallExample()
}
其中HandleMap与HandleReduce的具体实现就比较简单了。HandleMap就读入输入文件,然后将读到的数据处理,并将输出写入到中间文件中。HandleReduce的就读取中间文件处理并将数据输出到对应的文件中即可。
func HandleMap(mapf func(string, string) []KeyValue,filename string,filenum int,tasknum string) []string{
intermediate := []KeyValue{}
file,err:=os.Open(filename)
if err != nil {
log.Fatalf("cannot open %v", filename)
}
content, err := ioutil.ReadAll(file)//读取输入文件
//log.Println(content)
if err != nil {
log.Fatalf("cannot read %v", filename)
}
file.Close()
kva := mapf(filename, string(content))//调用用户设置的map函数
intermediate = append(intermediate, kva...)
filenames:=make([]string,filenum)
files:=make([]*os.File,filenum)
for i:=0;i<filenum;i++{
oname := "mr"
oname=oname+"_"+tasknum+"_"+strconv.Itoa(i)
//log.Println("create ",oname)
ofile,_:=os.Create(oname)
files[i]=ofile
filenames[i]=oname
}
for _,kv:=range intermediate{//将数据写入到对应的文件中。为了方便reduce读取,所以选择以json格式写入
index:=ihash(kv.Key)%filenum
enc:=json.NewEncoder(files[index])
enc.Encode(&kv)
}
return filenames
}
func HandleReduce(reducef func(string, []string) string,filenames []string)string{
files:=make([]*os.File,len(filenames))
intermediate := []KeyValue{}
for i:=0;i<len(filenames);i++{//读取所有的文件信息
files[i],_=os.Open(filenames[i])
kv:=KeyValue{}
dec:=json.NewDecoder(files[i])
for{
if err:=dec.Decode(&kv);err!=nil{
break
}
intermediate = append(intermediate, kv)
}
}
sort.Sort(ByKey(intermediate))//将读到的所有键值对排序
//log.Println("intermediate",len(intermediate))
oname := "mr-out-"
index:=filenames[0][strings.LastIndex(filenames[0],"_")+1:]
oname=oname+index
ofile, _ := os.Create(oname)//创建特定名字的输出文件
//
// call Reduce on each distinct key in intermediate[],
// and print the result to mr-out-0.
//
i := 0
//将同一个键的数据合并并交由用户设置的reduce函数处理,并将结果写入到对应文件中
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
// this is the correct format for each line of Reduce output.
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
return oname
}
上一篇: ElasticSearch—冷热(hot&warm)架构部署
下一篇: Pygame学习01_初相见