欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

2020 6.624 的 MapReduce

程序员文章站 2022-03-15 17:06:02
...


前言

做2020的6.824,刚刚开始了第一个实验,是MapReduce,这个实验难度(过测试)还行,前提是需要把要求理清楚。过程也是参考了网上好几个博客的,下面是我的solution


一、MapReduce

主要就是实现paper中提到的MapReduce
paper 地址

流程大概是这样的
2020 6.624 的 MapReduce


二、Master

1. 数据结构

Master只有有两个个数据结构, Master, Task
Task包含了Map的task跟Reduce的task,对于Map task,会有FileIndex,对于Reduce task,会有ReduceIndex

type Task struct {
	StartTime   time.Time
	TaskType    TaskType
	FileName    string
	FileIndex 	int
	ReduceIndex int
	State       TaskState
	ReduceNum   int
}

type FileMap map[string]int

type Master struct {
	// Your definitions here.
	files              FileMap
	UnstartedMapTask   ConcurrentList
	MapRunningTasks    ConcurrentList
	ReduceTasks        ConcurrentList
	ReduceRunningTasks ConcurrentList
	nReduce            int
	state              MasterState
}

const (
	MapTask    MasterState = 0
	ReduceTask MasterState = 1
	Finish     MasterState = 2
)

const (
	Assigned     TaskState = 1
	WorkerFinish TaskState = 2
)

const (
	Map    TaskType = 1
	Reduce TaskType = 2
	Close  TaskType = 3
)

我用的是List结构保存需要分配map task以及reduce task, List 可以选择泛型 interface{},但是考虑到只给这个实验使用,就限定了类型MapReduceItem,里面组合了name, Task, Master

type MapReduceItem struct {
	name   string
	task   *Task
	master *Master
}

type ConcurrentList struct {
	mu    sync.Mutex
	items []MapReduceItem
}

func (s *ConcurrentList) Add(item MapReduceItem) {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.items = append(s.items, item)
}

func (s *ConcurrentList) GetLast() MapReduceItem {
	s.mu.Lock()
	defer s.mu.Unlock()
	item := s.items[len(s.items)-1]
	s.items = s.items[:len(s.items)-1]
	return item
}

func (s *ConcurrentList) Remove(name string) {
	s.mu.Lock()
	defer s.mu.Unlock()
	for i := 0; i < len(s.items); i++ {
		if s.items[i].name == name {
			s.items = append(s.items[:i], s.items[i+1:]...)
		}
	}
}

2.部分核心函数

Master 的调度函数,就是先分配MapTask,然后分配ReduceTask

//Return a task for the worker
func (m *Master) GetTask(args *NilArgs, reply *Task) error {

	//if master have map task to assign, return map task
	//if master have all map task assigned but NOT ALL map tasks are done, TODO
	//if all map tasks are done, assign reduce task

	if m.state == MapTask {
		reply = m.handleMapTask(reply)
	}

	if m.state == ReduceTask {
		m.handleReduceTask(reply)
	}

	//fmt.Println("GetTask return nil")
	return nil
}

handleMapTask

func (m *Master) handleMapTask(reply *Task) *Task {
	if m.UnstartedMapTask.Size() > 0 {
		popMapItem := m.UnstartedMapTask.Pop()
		*reply = Task{StartTime: time.Now(), TaskType: Map, FileName: popMapItem.name, State: Assigned, ReduceNum: m.nReduce, FileIndex: m.GetFileIndex(popMapItem.name)}
		m.MapRunningTasks.Add(MapReduceItem{name: reply.FileName, task: reply})
	} else if m.isMapTasksAssignedNotDone() == true {
		m.checkDeadTask(Map)
	} else if m.isMapTasksDone() == true {
		m.state = ReduceTask
	}
	return reply
}

handleReduceTask 类似

func (m *Master) handleReduceTask(reply *Task) {
	if m.ReduceTasks.Size() > 0 {
		popReduceItem := m.ReduceTasks.Pop()
		*reply = Task{StartTime: time.Now(), TaskType: Reduce, State: Assigned, ReduceNum: m.nReduce, FileIndex: m.GetReduceIndex(popReduceItem.name)}
		m.ReduceRunningTasks.Add(MapReduceItem{name: strconv.Itoa(reply.FileIndex), task: reply})
	} else if m.isReduceTasksAssignedNotDone() == true {
		m.checkDeadTask(Reduce)
	} else if m.isReduceTasksDone() == true {
		*reply = Task{TaskType: Close}
		m.state = Finish
	}
}

处理deadTask

func (m *Master) checkDeadTask(taskType TaskType) {
	switch taskType {
	case Map:
		m.checkMapTask()
		break
	case Reduce:
		m.checkReduceTask()
		break
	default:
		break
	}
}

处理Worker发过来的UpdateTask

func (m *Master) UpdateTask(task *Task, reply *NilReply) error {
	// update task: map
	// update task: reduce
	//fmt.Println("update task is called for task in master ", task.FileName)
	if task.TaskType == Map {
		if task.State == WorkerFinish {
			//fmt.Println("remove file ", task.FileName)
			m.MapRunningTasks.Remove(task.FileName)
		}
	}
	if task.TaskType == Reduce {
		if task.State == WorkerFinish {
			//fmt.Println("it is a reduce task for file name ", task.FileIndex)
			m.ReduceRunningTasks.Remove(strconv.Itoa(task.FileIndex))
		}
	}
	return nil
}

三、Worker

部分核心函数

向Mater发送task请求

func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
	for {
		task := CallGetTask()
		switch task.TaskType {
		case Map:
			doMap(mapf, task)
			break
		case Reduce:
			doReduce(reducef, task)
			break
		case Close:
			return
		}
	}
}

doMap, 其中文件读写比较复杂一些,我也是参考(复制粘贴)了网上的做法,当然Lab的文件也有相关提示,好好阅读Lab的要求以及提示

/*
* read input file from task.
* FileName pass it to Map accumulate the intermediate Map output.
 */
func doMap(mapf func(string, string) []KeyValue, task *Task) {
	constent := fileHandler(task.FileName)
	kva := mapf(task.FileName, constent)

	//prepare file for reduce task
	mapGroupName := prefix + strconv.Itoa(task.FileIndex)
	outFiles := make([]*os.File, task.ReduceNum)
	EncodeFile := make([]*json.Encoder, task.ReduceNum)

	for i := 0; i < task.ReduceNum; i++ {
		f, err := ioutil.TempFile("mr-tmp", "tmp")
		if err != nil {
			panic("fail to create temp file")
		}
		outFiles[i] = f
		EncodeFile[i] = json.NewEncoder(f)
	}

	//kva: all kv by map function
	for _, kv := range kva {
		index := ihash(kv.Key) % task.ReduceNum
		enc := EncodeFile[index]
		err := enc.Encode(&kv)
		if err != nil {
			fmt.Println("err is ", err)
			fmt.Println("File encode fail", kv)
			panic("fail encode")
		}
	}

	for i, file := range outFiles {
		outName := mapGroupName + hyphen + strconv.Itoa(i)
		oldpath := filepath.Join(file.Name())
		os.Rename(oldpath, outName)
		file.Close()
	}

	task.State = WorkerFinish
	CallUpdateTask(task)

doReduce,这个跟mrsequential 的做法就像了,直接把核心代码搬过来就行,不一样的就是系统的读写,需要把intermediate files读出来然后根据最后的数字决定生成文件的数字编号,比如mr-X-Y 最后生成就是mr-out-Y

func doReduce(reducef func(string, []string) string, task *Task) {
	// get relate intermediate
	// do reduce

	patternPrefix := "mr-tmp/mr-*-"
	pattern := patternPrefix + strconv.Itoa(task.FileIndex)
	fileList, err := filepath.Glob(pattern)
	if err != nil {
		fmt.Println("cannot read the dir")
	}

	intermediate := []KeyValue{}

	for _, fname := range fileList {
		//fmt.Printf("fname is %v\n", fname)
		file, err := os.Open(fname)
		if err != nil {
			fmt.Println("cannot read the dir")
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err = dec.Decode(&kv); err != nil {
				break
			}
			intermediate = append(intermediate, kv)
		}
	}

	sort.Sort(ByKey(intermediate))
	oname := "mr-out-" + strconv.Itoa(task.FileIndex)
	ofile, _ := os.Create(oname)
	i := 0
	for i < len(intermediate) {
		j := i + 1
		for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
			j++
		}
		values := []string{}
		for k := i; k < j; k++ {
			values = append(values, intermediate[k].Value)
		}
		output := reducef(intermediate[i].Key, values)
		fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
		i = j
	}

	task.State = WorkerFinish
	CallUpdateTask(task)
}

总结

自己的solution还是有很多可以改善的地方的,代码有点冗余,其中一个像改进的地方就是想通过channel通信换掉自己实现的concurrentList,毕竟channel是Go的特点之一。总的来说MapReduce流程清楚了,代码部分不是太难写,Go还是很灵活的语言,但是没有了传统面向对象的特点,有时候方法实现起来还是有点不适应

相关标签: 6.824