2020 6.624 的 MapReduce
前言
做2020的6.824,刚刚开始了第一个实验,是MapReduce,这个实验难度(过测试)还行,前提是需要把要求理清楚。过程也是参考了网上好几个博客的,下面是我的solution
一、MapReduce
主要就是实现paper中提到的MapReduce
paper 地址
流程大概是这样的
二、Master
1. 数据结构
Master只有有两个个数据结构, Master, Task
Task包含了Map的task跟Reduce的task,对于Map task,会有FileIndex,对于Reduce task,会有ReduceIndex
type Task struct {
StartTime time.Time
TaskType TaskType
FileName string
FileIndex int
ReduceIndex int
State TaskState
ReduceNum int
}
type FileMap map[string]int
type Master struct {
// Your definitions here.
files FileMap
UnstartedMapTask ConcurrentList
MapRunningTasks ConcurrentList
ReduceTasks ConcurrentList
ReduceRunningTasks ConcurrentList
nReduce int
state MasterState
}
const (
MapTask MasterState = 0
ReduceTask MasterState = 1
Finish MasterState = 2
)
const (
Assigned TaskState = 1
WorkerFinish TaskState = 2
)
const (
Map TaskType = 1
Reduce TaskType = 2
Close TaskType = 3
)
我用的是List结构保存需要分配map task以及reduce task, List 可以选择泛型 interface{},但是考虑到只给这个实验使用,就限定了类型MapReduceItem,里面组合了name, Task, Master
type MapReduceItem struct {
name string
task *Task
master *Master
}
type ConcurrentList struct {
mu sync.Mutex
items []MapReduceItem
}
func (s *ConcurrentList) Add(item MapReduceItem) {
s.mu.Lock()
defer s.mu.Unlock()
s.items = append(s.items, item)
}
func (s *ConcurrentList) GetLast() MapReduceItem {
s.mu.Lock()
defer s.mu.Unlock()
item := s.items[len(s.items)-1]
s.items = s.items[:len(s.items)-1]
return item
}
func (s *ConcurrentList) Remove(name string) {
s.mu.Lock()
defer s.mu.Unlock()
for i := 0; i < len(s.items); i++ {
if s.items[i].name == name {
s.items = append(s.items[:i], s.items[i+1:]...)
}
}
}
2.部分核心函数
Master 的调度函数,就是先分配MapTask,然后分配ReduceTask
//Return a task for the worker
func (m *Master) GetTask(args *NilArgs, reply *Task) error {
//if master have map task to assign, return map task
//if master have all map task assigned but NOT ALL map tasks are done, TODO
//if all map tasks are done, assign reduce task
if m.state == MapTask {
reply = m.handleMapTask(reply)
}
if m.state == ReduceTask {
m.handleReduceTask(reply)
}
//fmt.Println("GetTask return nil")
return nil
}
handleMapTask
func (m *Master) handleMapTask(reply *Task) *Task {
if m.UnstartedMapTask.Size() > 0 {
popMapItem := m.UnstartedMapTask.Pop()
*reply = Task{StartTime: time.Now(), TaskType: Map, FileName: popMapItem.name, State: Assigned, ReduceNum: m.nReduce, FileIndex: m.GetFileIndex(popMapItem.name)}
m.MapRunningTasks.Add(MapReduceItem{name: reply.FileName, task: reply})
} else if m.isMapTasksAssignedNotDone() == true {
m.checkDeadTask(Map)
} else if m.isMapTasksDone() == true {
m.state = ReduceTask
}
return reply
}
handleReduceTask 类似
func (m *Master) handleReduceTask(reply *Task) {
if m.ReduceTasks.Size() > 0 {
popReduceItem := m.ReduceTasks.Pop()
*reply = Task{StartTime: time.Now(), TaskType: Reduce, State: Assigned, ReduceNum: m.nReduce, FileIndex: m.GetReduceIndex(popReduceItem.name)}
m.ReduceRunningTasks.Add(MapReduceItem{name: strconv.Itoa(reply.FileIndex), task: reply})
} else if m.isReduceTasksAssignedNotDone() == true {
m.checkDeadTask(Reduce)
} else if m.isReduceTasksDone() == true {
*reply = Task{TaskType: Close}
m.state = Finish
}
}
处理deadTask
func (m *Master) checkDeadTask(taskType TaskType) {
switch taskType {
case Map:
m.checkMapTask()
break
case Reduce:
m.checkReduceTask()
break
default:
break
}
}
处理Worker发过来的UpdateTask
func (m *Master) UpdateTask(task *Task, reply *NilReply) error {
// update task: map
// update task: reduce
//fmt.Println("update task is called for task in master ", task.FileName)
if task.TaskType == Map {
if task.State == WorkerFinish {
//fmt.Println("remove file ", task.FileName)
m.MapRunningTasks.Remove(task.FileName)
}
}
if task.TaskType == Reduce {
if task.State == WorkerFinish {
//fmt.Println("it is a reduce task for file name ", task.FileIndex)
m.ReduceRunningTasks.Remove(strconv.Itoa(task.FileIndex))
}
}
return nil
}
三、Worker
部分核心函数
向Mater发送task请求
func Worker(mapf func(string, string) []KeyValue, reducef func(string, []string) string) {
for {
task := CallGetTask()
switch task.TaskType {
case Map:
doMap(mapf, task)
break
case Reduce:
doReduce(reducef, task)
break
case Close:
return
}
}
}
doMap, 其中文件读写比较复杂一些,我也是参考(复制粘贴)了网上的做法,当然Lab的文件也有相关提示,好好阅读Lab的要求以及提示
/*
* read input file from task.
* FileName pass it to Map accumulate the intermediate Map output.
*/
func doMap(mapf func(string, string) []KeyValue, task *Task) {
constent := fileHandler(task.FileName)
kva := mapf(task.FileName, constent)
//prepare file for reduce task
mapGroupName := prefix + strconv.Itoa(task.FileIndex)
outFiles := make([]*os.File, task.ReduceNum)
EncodeFile := make([]*json.Encoder, task.ReduceNum)
for i := 0; i < task.ReduceNum; i++ {
f, err := ioutil.TempFile("mr-tmp", "tmp")
if err != nil {
panic("fail to create temp file")
}
outFiles[i] = f
EncodeFile[i] = json.NewEncoder(f)
}
//kva: all kv by map function
for _, kv := range kva {
index := ihash(kv.Key) % task.ReduceNum
enc := EncodeFile[index]
err := enc.Encode(&kv)
if err != nil {
fmt.Println("err is ", err)
fmt.Println("File encode fail", kv)
panic("fail encode")
}
}
for i, file := range outFiles {
outName := mapGroupName + hyphen + strconv.Itoa(i)
oldpath := filepath.Join(file.Name())
os.Rename(oldpath, outName)
file.Close()
}
task.State = WorkerFinish
CallUpdateTask(task)
doReduce,这个跟mrsequential 的做法就像了,直接把核心代码搬过来就行,不一样的就是系统的读写,需要把intermediate files读出来然后根据最后的数字决定生成文件的数字编号,比如mr-X-Y 最后生成就是mr-out-Y
func doReduce(reducef func(string, []string) string, task *Task) {
// get relate intermediate
// do reduce
patternPrefix := "mr-tmp/mr-*-"
pattern := patternPrefix + strconv.Itoa(task.FileIndex)
fileList, err := filepath.Glob(pattern)
if err != nil {
fmt.Println("cannot read the dir")
}
intermediate := []KeyValue{}
for _, fname := range fileList {
//fmt.Printf("fname is %v\n", fname)
file, err := os.Open(fname)
if err != nil {
fmt.Println("cannot read the dir")
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err = dec.Decode(&kv); err != nil {
break
}
intermediate = append(intermediate, kv)
}
}
sort.Sort(ByKey(intermediate))
oname := "mr-out-" + strconv.Itoa(task.FileIndex)
ofile, _ := os.Create(oname)
i := 0
for i < len(intermediate) {
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
output := reducef(intermediate[i].Key, values)
fmt.Fprintf(ofile, "%v %v\n", intermediate[i].Key, output)
i = j
}
task.State = WorkerFinish
CallUpdateTask(task)
}
总结
自己的solution还是有很多可以改善的地方的,代码有点冗余,其中一个像改进的地方就是想通过channel通信换掉自己实现的concurrentList,毕竟channel是Go的特点之一。总的来说MapReduce流程清楚了,代码部分不是太难写,Go还是很灵活的语言,但是没有了传统面向对象的特点,有时候方法实现起来还是有点不适应
推荐阅读
-
浙江大专排名2021最新排名:浙江排名前十的大专学校(附2020年分数线)
-
东北三省哪个大学排第一?附2020东北三省最好的大学及分数线(2021年参考)
-
MapReduce的输入文件是两个
-
2020年进步最快的大学:进步最大的大学
-
2021年在四川招生的军校大学名单?附军校在四川录取分数线2020汇总
-
2020年提升百度权重的方法
-
二本师范最低多少分?附2020比较好的二本师范大学排名
-
与iPhone 12一起 搭载A14的5G iPad Pro将于2020年秋季推出
-
2021年在安徽招生的军校有哪些?附安徽军校录取分数线2020年汇总
-
IntelliJ IDEA 2020如何设置背景图片的方法步骤