欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

6.824 Lab 1: MapReduce

程序员文章站 2022-07-12 17:39:02
...

lab源地址

简介

根据MapReduce Paper构造一个MapReduce系统。该系统主要包括master和worker。master主要负责分发任务、处理worker故障;worker主要负责根据mapreduce函数读写文件。

思路

  • 任务分发:master将需要完成的任务放到通道中,让worker从通道中拿取任务,根据任务类型完成相应的操作。
  • 容错:master跟踪每个任务的完成情况,如果一个任务超过一定时间仍未完成,则重新发布该任务。
  • 完成情况判断:master直接判断当前目录的目标文件是否存在来判断一个任务是否完成。例如中间文件mr-X,和reduce操作完成后输出的文件mr-out-X;master开始前要判断和中间文件同名文件是否存在,如存在则删除,避免运行时错误地判断了任务完成。
  • 程序退出:master检查所有任务完成后互斥地设置done = true,这时mrworker调用Done()方法发现任务完成,就能顺利退出;在master退出后,worker在RPC时联系不上master就可以判断所有任务已经完成。
  • 避免并发错误:利用ioutil.TempFile创建一个名字独有的临时文件、利用os.Rename原子性地重命名一个文件。

具体实现

以下给出rpc.gomaster.goworker.go三个文件。

rpc.go

rpc.go定义了master和worker通信的数据结构:

package mr

//
// RPC definitions.
//
// remember to capitalize all names.
//

import "os"
import "strconv"

type TaskRequest struct {
}

type TaskType int

const (
	MapTask = 1
	ReduceTask = 2
)

type TaskResponse struct {

	// if it is a map task, Filename indicates file that need to be mapped, else it is empty string
	Filename string

	// task type is either map/reduce
	TypeOfTask TaskType

	// this is the serial number of task
	Serial int

	// NReduce is for dividing intermediate result into buckets
	NReduce int

}

// Cook up a unique-ish UNIX-domain socket name
// in /var/tmp, for the master.
// Can't use the current directory since
// Athena AFS doesn't support UNIX-domain sockets.
func masterSock() string {
	s := "/var/tmp/824-mr-"
	s += strconv.Itoa(os.Getuid())
	return s
}

master.go

master实现如下:

package mr

import (
	"log"
	"net"
	"net/http"
	"net/rpc"
	"strconv"
	"time"
)
import "os"

type Master struct {
	// user TaskChannel to deliver task to workers
	TaskChannel chan TaskResponse
	// done will be true if all task is done
	done bool
	// sem is to protect done from concurrent read/write
	sem chan struct{}
}

// keep track of task
type TaskTrack struct {
	taskResp TaskResponse
	startTime time.Time
}

func (m *Master) DispatchTask(request *TaskRequest, response *TaskResponse) error {
	// extract a task from channel
	// if there is no task available, the thread which calls this function will go to sleep
	temp := <-m.TaskChannel
	response.Filename = temp.Filename
	response.TypeOfTask = temp.TypeOfTask
	response.Serial = temp.Serial
	response.NReduce = temp.NReduce
	return nil
}

//
// main/mrmaster.go calls Done() periodically to find out
// if the entire job has finished.
//
func (m *Master) Done() bool {
	ret := false

	// read m.done exclusively
	<- m.sem
	ret = m.done
	m.sem <- struct{}{}

	return ret
}


// task expires after ten seconds
func isExpired(task TaskTrack) bool {
	return time.Now().Sub(task.startTime).Seconds() > 10
}

func dispatcher(files []string, nReduce int, m *Master) {

	// remove intermediate files in case there is any collision
	for i := 0; i < len(files); i++ {
		filename := "mr-" + strconv.Itoa(i)
		err := os.Remove(filename)
		if err != nil && !os.IsNotExist(err) {
			log.Fatalf("error occurs while removing file %v", filename)
		}
	}

	var unfinishedTasks []TaskTrack
	//-------------------------------------------- dispatch map task --------------------------------------------
	for i, file := range files {
		resp := TaskResponse{Filename: file, TypeOfTask: MapTask, Serial: i}
		m.TaskChannel <- resp
		unfinishedTasks = append(unfinishedTasks, TaskTrack{taskResp: resp, startTime: time.Now()})
	}
	// check if all map tasks are complete
	for len(unfinishedTasks) > 0 {
		for i := 0; i < len(unfinishedTasks); i++ {
			track := unfinishedTasks[i]
			filename := "mr-" + strconv.Itoa(track.taskResp.Serial)
			// check if intermediate file exists
			if _, err := os.Stat(filename); err == nil {
				// filename exists, which indicates that this track is completed
				unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
				i--
			} else if len(m.TaskChannel) == 0 && isExpired(track) {
				// track dispatch channel is empty && this task is expired, emit this task again
				m.TaskChannel <- track.taskResp
				// reset startTime of this task
				unfinishedTasks[i].startTime = time.Now()
			}
		}
		time.Sleep(time.Second)
	}

	//-------------------------------------------- dispatch reduce task --------------------------------------------
	// all map tasks are completed, now start to emit reduce task
	// there are nReduce reduce tasks in total
	for i := 0; i < nReduce; i++ {
		resp := TaskResponse{TypeOfTask: ReduceTask, Serial: i, NReduce: nReduce}
		m.TaskChannel <- resp
		unfinishedTasks = append(unfinishedTasks, TaskTrack{taskResp: resp, startTime: time.Now()})
	}

	// check if all reduce tasks are complete
	for len(unfinishedTasks) > 0 {
		for i := 0; i < len(unfinishedTasks); i++ {
			track := unfinishedTasks[i]
			filename := "mr-out-" + strconv.Itoa(track.taskResp.Serial)
			if _, err := os.Stat(filename); err == nil {
				unfinishedTasks = append(unfinishedTasks[:i], unfinishedTasks[i + 1:]...)
				i--
			} else if len(m.TaskChannel) == 0 && isExpired(track) {
				m.TaskChannel <- track.taskResp
				// reset startTime
				unfinishedTasks[i].startTime = time.Now()
			}
		}
		time.Sleep(time.Second)
	}

	// exclusively set status to done
	<- m.sem
	m.done = true
	m.sem <- struct{}{}
}


//
// create a Master.
// main/mrmaster.go calls this function.
// nReduce is the number of reduce tasks to use.
//
func MakeMaster(files []string, nReduce int) *Master {

	// initialize master
	m := Master{TaskChannel: make(chan TaskResponse, 100), sem: make(chan struct{}, 1)}
	m.sem <- struct{}{}

	// dispatcher tasks in another thread
	go dispatcher(files, nReduce, &m)

	// start a thread that listens for RPCs from worker.go
	m.server()
	return &m
}

//
// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
	rpc.Register(m)
	rpc.HandleHTTP()
	//l, e := net.Listen("tcp", ":1234")
	sockname := masterSock()
	os.Remove(sockname)
	l, e := net.Listen("unix", sockname)
	if e != nil {
		log.Fatal("listen error:", e)
	}
	go http.Serve(l, nil)
}

worker.go

worker实现如下:

package mr

import (
	"encoding/json"
	"fmt"
	"io/ioutil"
	"os"
	"sort"
	"strconv"
)
import "log"
import "net/rpc"
import "hash/fnv"


//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
	Key   string
	Value string
}

// for sorting by key.
type ByKey []KeyValue

// for sorting by key.
func (a ByKey) Len() int           { return len(a) }
func (a ByKey) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
func (a ByKey) Less(i, j int) bool { return a[i].Key < a[j].Key }

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
	h := fnv.New32a()
	_, err := h.Write([]byte(key))
	if err != nil {
		log.Fatalf("error occurs while hashing key %v", key)
	}
	return int(h.Sum32() & 0x7fffffff)
}


//
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	// Your worker implementation here.
	// worker调用rpc向master请求工作
	for true {
		task, ok := askForTask()
		// fail to contact master, which indicates that all tasks are done
		if !ok {
			break
		}

		if task.TypeOfTask == MapTask {
			doMapTask(task, mapf)
		} else {
			doReduceTask(task, reducef)
		}
	}

	// uncomment to send the Example RPC to the master.
	//CallExample()
}

func askForTask() (TaskResponse, bool) {
	request := TaskRequest{}
	response := TaskResponse{}
	ok := call("Master.DispatchTask", &request, &response)
	return response, ok
}


func doMapTask(task TaskResponse, mapf func(string, string) []KeyValue) {
	filename := task.Filename
	file, err := os.Open(filename)
	if err != nil {
		log.Fatalf("cannot open %v", filename)
	}
	content, err := ioutil.ReadAll(file)
	if err != nil {
		log.Fatalf("cannot read %v", filename)
	}
	err = file.Close()
	if err != nil {
		log.Fatalf("cannot close file %v", file.Name())
	}
	kva := mapf(filename, string(content))
	tempFile, err := ioutil.TempFile("", "")
	if err != nil {
		log.Fatalf("cannot create tempFile %v", tempFile)
	}
	enc := json.NewEncoder(tempFile)
	for _, kv := range kva {
		err := enc.Encode(&kv)
		if err != nil {
			log.Fatalf("cannot encode kv %v into file %v", kv, tempFile)
		}
	}
	err = tempFile.Close()
	if err != nil {
		log.Fatalf("error occurs while closing file %v", tempFile)
	}
	// intermediate kv pairs are saved in mr-X
	err = os.Rename(tempFile.Name(), "mr-"+strconv.Itoa(task.Serial))
	if err != nil {
		log.Fatalf("err occurs while renaming file %v to %s", tempFile, "mr-"+strconv.Itoa(task.Serial))
	}
}

func doReduceTask(task TaskResponse, reducef func(string, []string) string) {
	var kva []KeyValue
	tempFile, err := ioutil.TempFile("", "")
	if err != nil {
		log.Fatalf("cannot create tempFile %v", tempFile)
	}
	// go to every intermediate file to collect corresponding keys
	i := 0
	for {
		file, err := os.Open("mr-" + strconv.Itoa(i))
		if err != nil {
			if os.IsNotExist(err) {
				// all intermediate files are read
				break
			} else {
				log.Fatalf("error occurs while openning a file %v", "mr-" + strconv.Itoa(i))
			}
		}
		dec := json.NewDecoder(file)
		for {
			var kv KeyValue
			if err := dec.Decode(&kv); err != nil {
				break
			}
			// select keys that this worker need to reduce
			if ihash(kv.Key) % task.NReduce == task.Serial {
				kva = append(kva, kv)
			}
		}
		i++
	}
	sort.Sort(ByKey(kva))
	j := 0
	for j < len(kva) {
		k := j + 1
		for k < len(kva) && kva[j].Key == kva[k].Key {
			k++
		}
		var values []string
		for u := j; u < k; u++ {
			values = append(values, kva[u].Value)
		}
		output := reducef(kva[j].Key, values)
		_, err := fmt.Fprintf(tempFile, "%v %v\n", kva[j].Key, output)
		if err != nil {
			log.Fatalf("error occurs while wrting into tempFile %v", tempFile)
		}
		j = k
	}
	err = tempFile.Close()
	if err != nil {
		log.Fatalf("error occurs while closing file %v", tempFile)
	}
	err = os.Rename(tempFile.Name(), "mr-out-"+strconv.Itoa(task.Serial))
	if err != nil {
		log.Fatalf("error occurs while renaming file %v to %s", tempFile, "mr-out-"+strconv.Itoa(task.Serial))
	}
}


//
// send an RPC request to the master, wait for the response.
// usually returns true.
// returns false if something goes wrong.
//
func call(rpcname string, args interface{}, reply interface{}) bool {
	// c, err := rpc.DialHTTP("tcp", "127.0.0.1"+":1234")
	sockname := masterSock()
	c, err := rpc.DialHTTP("unix", sockname)
	if err != nil {
		log.Fatal("dialing:", err)
	}
	defer c.Close()

	err = c.Call(rpcname, args, reply)
	if err == nil {
		return true
	}

	fmt.Println(err)
	return false
}