MIT6.824 lab1 MapReduce 2018版本Part1 Map/Reduce input and output
2020在2018的基础上做了些调整,变得更加开放,但同时也增加了一些难度。建议先做2018的版本,在做2020的版本,这样编码的质量会提高。
Preamble: Getting familiar with the source
The mapreduce package provides a simple Map/Reduce library (in the mapreduce directory). Applications should normally call Distributed() [located in master.go] to start a job, but may instead call Sequential() [also in master.go] to get a sequential execution for debugging.
Part I: Map/Reduce input and output
The Map/Reduce implementation you are given is missing some pieces. Before you can write your first Map/Reduce function pair, you will need to fix the sequential implementation. In particular, the code we give you is missing two crucial pieces: the function that divides up the output of a map task, and the function that gathers all the inputs for a reduce task. These tasks are carried out by the doMap() function in common_map.go, and the doReduce() function in common_reduce.go respectively. The comments in those files should point you in the right direction.
To help you determine if you have correctly implemented doMap() and doReduce(), we have provided you with a Go test suite that checks the correctness of your implementation. These tests are implemented in the file test_test.go. To run the tests for the sequential implementation that you have now fixed, run:
doMap manages one map task: it should read one of the input files
(inFile), call the user-defined map function (mapF) for that file's
contents, and partition mapF's output into nReduce intermediate files.
下面是doMap和doReduce的函数实现:
主要难点是文件的IO,以及写json文件,这一部分主要靠查Golang的文档。
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
//
// doMap manages one map task: it should read one of the input files
// (inFile), call the user-defined map function (mapF) for that file's
// contents, and partition mapF's output into nReduce intermediate files.
//
// There is one intermediate file per reduce task. The file name
// includes both the map task number and the reduce task number. Use
// the filename generated by reduceName(jobName, mapTask, r)
// as the intermediate file for reduce task r. Call ihash() (see
// below) on each key, mod nReduce, to pick r for a key/value pair.
//
// mapF() is the map function provided by the application. The first
// argument should be the input file name, though the map function
// typically ignores it. The second argument should be the entire
// input file contents. mapF() returns a slice containing the
// key/value pairs for reduce; see common.go for the definition of
// KeyValue.
//
// Look at Go's ioutil and os packages for functions to read
// and write files.
//
// Coming up with a scheme for how to format the key/value pairs on
// disk can be tricky, especially when taking into account that both
// keys and values could contain newlines, quotes, and any other
// character you can think of.
//
// One format often used for serializing data to a byte stream that the
// other end can correctly reconstruct is JSON. You are not required to
// use JSON, but as the output of the reduce tasks *must* be JSON,
// familiarizing yourself with it here may prove useful. You can write
// out a data structure as a JSON string to a file using the commented
// code below. The corresponding decoding functions can be found in
// common_reduce.go.
//
// enc := json.NewEncoder(file)
// for _, kv := ... {
// err := enc.Encode(&kv)
//
// Remember to close the file after you have written all the values!
//
// Your code here (Part I).
//
content, err := ioutil.ReadFile(inFile)
if err != nil {
log.Fatalf("error: failed to read file %v when doing map task", inFile)
}
keyValues := mapF(inFile,string(content))
interResults := make([][]KeyValue, nReduce)
for _, keyValue:=range keyValues{
iReduce := ihash(keyValue.Key)%nReduce
interResults[iReduce] = append(interResults[iReduce],keyValue)
}
for i:=0;i<nReduce;i++{
outFile := reduceName(jobName, mapTask, i)
f, openErr := os.OpenFile(outFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0664)
if openErr != nil {
log.Fatal("OpenFile: ", openErr)
}
defer f.Close()
// 将KeyValue切片中的每个元素序列化为JSON并写入到文件
enc := json.NewEncoder(f)
for _, kv := range interResults[i] {
err := enc.Encode(&kv)
if err != nil {
log.Fatal("Encode: ", err)
}
}
}
}
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
//
// doReduce manages one reduce task: it should read the intermediate
// files for the task, sort the intermediate key/value pairs by key,
// call the user-defined reduce function (reduceF) for each key, and
// write reduceF's output to disk.
//
// You'll need to read one intermediate file from each map task;
// reduceName(jobName, m, reduceTask) yields the file
// name from map task m.
//
// Your doMap() encoded the key/value pairs in the intermediate
// files, so you will need to decode them. If you used JSON, you can
// read and decode by creating a decoder and repeatedly calling
// .Decode(&kv) on it until it returns an error.
//
// You may find the first example in the golang sort package
// documentation useful.
//
// reduceF() is the application's reduce function. You should
// call it once per distinct key, with a slice of all the values
// for that key. reduceF() returns the reduced value for that key.
//
// You should write the reduce output as JSON encoded KeyValue
// objects to the file named outFile. We require you to use JSON
// because that is what the merger than combines the output
// from all the reduce tasks expects. There is nothing special about
// JSON -- it is just the marshalling format we chose to use. Your
// output code will look something like this:
//
// enc := json.NewEncoder(file)
// for key := ... {
// enc.Encode(KeyValue{key, reduceF(...)})
// }
// file.Close()
//
// Your code here (Part I).
//
interMaps := map[string][]string{}
for i:=0;i<nMap;i++{
inFile := reduceName(jobName, i, reduceTask)
file, err := os.Open(inFile)
if err != nil {
log.Fatal("Open intermedia file: ", err)
}
// 从map的每个输出文件逐个反序列出KeyValue对象,直到遇到EOF出错为止。
decoder := json.NewDecoder(file)
for {
var kv KeyValue // 解码后的结果存在kv中
if err := decoder.Decode(&kv); err != nil {
if err == io.EOF {
break
} else {
log.Fatal(err)
}
}
// 如果Key还没有被统计过
if _, ok := interMaps[kv.Key]; !ok {
interMaps[kv.Key] = []string{kv.Value}
} else {
// 被统计过了
interMaps[kv.Key] = append(interMaps[kv.Key], kv.Value)
}
}
}
f, openErr := os.OpenFile(outFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0664)
if openErr != nil {
log.Fatal("Open result file: ", openErr)
}
enc := json.NewEncoder(f)
for key, value := range interMaps{
enc.Encode(KeyValue{key, reduceF(key,value)})
}
f.Close()
}
测试程序用
go test -run Sequential
上一篇: Spring事务处理流程