本lab将用go完成一个MapReduce框架,完成后将大大加深对MapReduce的理解。git
这部分须要咱们实现common_map.go中的doMap()和common_reduce.go中的doReduce()两个函数。
能够先从测试用例下手:github
func TestSequentialSingle(t *testing.T) { mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc) mr.Wait() check(t, mr.files) checkWorker(t, mr.stats) cleanup(mr) }
从Sequential()开始调用链以下:
如今要作的是完成doMap()和doReduce()。json
doMap():数组
func doMap( jobName string, // the name of the MapReduce job mapTask int, // which map task this is inFile string, nReduce int, // the number of reduce task that will be run ("R" in the paper) mapF func(filename string, contents string) []KeyValue, ) { //打开inFile文件,读取所有内容 //调用mapF,将内容转换为键值对 //根据reduceName()返回的文件名,打开nReduce个中间文件,而后将键值对以json的格式保存到中间文件 inputContent, err := ioutil.ReadFile(inFile) if err != nil { panic(err) } keyValues := mapF(inFile, string(inputContent)) var intermediateFileEncoders []*json.Encoder for reduceTaskNumber := 0; reduceTaskNumber < nReduce; reduceTaskNumber++ { intermediateFile, err := os.Create(reduceName(jobName, mapTask, reduceTaskNumber)) if err != nil { panic(err) } defer intermediateFile.Close() enc := json.NewEncoder(intermediateFile) intermediateFileEncoders = append(intermediateFileEncoders, enc) } for _, kv := range keyValues { err := intermediateFileEncoders[ihash(kv.Key) % nReduce].Encode(kv) if err != nil { panic(err) } } }
总结来讲就是:多线程
doReduce:app
func doReduce( jobName string, // the name of the whole MapReduce job reduceTask int, // which reduce task this is outFile string, // write the output here nMap int, // the number of map tasks that were run ("M" in the paper) reduceF func(key string, values []string) string, ) { //读取当前reduceTaskNumber对应的中间文件中的键值对,将相同的key的value进行并合 //调用reduceF //将reduceF的结果以json形式保存到mergeName()返回的文件中 kvs := make(map[string][]string) for mapTaskNumber := 0; mapTaskNumber < nMap; mapTaskNumber++ { midDatafileName := reduceName(jobName, mapTaskNumber, reduceTask) file, err := os.Open(midDatafileName) if err != nil { panic(err) } defer file.Close() dec := json.NewDecoder(file) for { var kv KeyValue err = dec.Decode(&kv) if err != nil { break } values, ok := kvs[kv.Key] if ok { kvs[kv.Key] = append(values, kv.Value) } else { kvs[kv.Key] = []string{kv.Value} } } } outputFile, err := os.Create(outFile) if err != nil { panic(err) } defer outputFile.Close() enc := json.NewEncoder(outputFile) for key, values := range kvs { enc.Encode(KeyValue{key, reduceF(key, values)}) } }
总结:框架
文件转换的过程大体以下:
函数
这部分将用一个简单的实例展现如何使用MR框架。须要咱们实现main/wc.go中的mapF()和reduceF()来统计单词的词频。测试
mapF:this
func mapF(filename string, contents string) []mapreduce.KeyValue { // Your code here (Part II). words := strings.FieldsFunc(contents, func(r rune) bool { return !unicode.IsLetter(r) }) var kvs []mapreduce.KeyValue for _, word := range words { kvs = append(kvs, mapreduce.KeyValue{word, "1"}) } return kvs }
将文本内容分割成单词,每一个单词对应一个<word, "1">键值对。
reduceF:
func reduceF(key string, values []string) string { // Your code here (Part II). return strconv.Itoa(len(values)) }
value中有多少个"1",就说明这个word出现了几回。
目前实现的版本都是执行完一个map而后在执行下一个map,也就是说没有并行,这偏偏是MapReduce最大的买点。这部分须要实现schedule(),该函数将任务分配给Worker去执行。固然这里并无真正的多机部署,而是使用多线程进行模拟。
master和worker的关系大体以下:
在建立worker对象的时候会调用Register() RPC,master收到RPC后,将该worker的id保存在数组中,执行shedule()是能够根据该id,经过DoTask() RPC调用该worker的DoTask()执行map或reduce任务。
schedule.go
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) { var ntasks int var n_other int // number of inputs (for reduce) or outputs (for map) switch phase { case mapPhase: ntasks = len(mapFiles) n_other = nReduce case reducePhase: ntasks = nReduce n_other = len(mapFiles) } fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other) //总共有ntasks个任务,registerChan中保存着空闲的workers taskChan := make(chan int) var wg sync.WaitGroup go func() { for taskNumber := 0; taskNumber < ntasks; taskNumber++ { taskChan <- taskNumber fmt.Printf("taskChan <- %d in %s\n", taskNumber, phase) wg.Add(1) } wg.Wait() //ntasks个任务执行完毕后才能经过 close(taskChan) }() for task := range taskChan { //全部任务都处理完后跳出循环 worker := <- registerChan //消费worker fmt.Printf("given task %d to %s in %s\n", task, worker, phase) var arg DoTaskArgs arg.JobName = jobName arg.Phase = phase arg.TaskNumber = task arg.NumOtherPhase = n_other if phase == mapPhase { arg.File = mapFiles[task] } go func(worker string, arg DoTaskArgs) { if call(worker, "Worker.DoTask", arg, nil) { //执行成功后,worker须要执行其它任务 //注意:须要先掉wg.Done(),而后调register<-worker,不然会出现死锁 //fmt.Printf("worker %s run task %d success in phase %s\n", worker, task, phase) wg.Done() registerChan <- worker //回收worker } else { //若是失败了,该任务须要被从新执行 //注意:这里不能用taskChan <- task,由于task这个变量在别的地方可能会被修改。好比task 0执行失败了,咱们这里但愿 //将task 0从新加入到taskChan中,可是由于执行for循环的那个goroutine,可能已经修改task这个变量为1了,咱们错误地 //把task 1从新执行了一遍,而且task 0没有获得执行。 taskChan <- arg.TaskNumber } }(worker, arg) } fmt.Printf("Schedule: %v done\n", phase) }
这里用到了两个channel,分别是registerChan和taskChan。
registerChan中保存了可用的worker id。
生产:
消费:
taskChan中保存了任务号。任务执行失败须要从新加入taskChan。
以前的代码已经体现了,对于失败的任务从新执行。
这是MapReduce的一个应用,生成倒排索引,好比想查某个单词出如今哪些文本中,就能够创建倒排索引来解决。
func mapF(document string, value string) (res []mapreduce.KeyValue) { // Your code here (Part V). words := strings.FieldsFunc(value, func(r rune) bool { return !unicode.IsLetter(r) }) var kvs []mapreduce.KeyValue for _, word := range words { kvs = append(kvs, mapreduce.KeyValue{word, document}) } return kvs } func reduceF(key string, values []string) string { // Your code here (Part V). values = removeDuplicationAndSort(values) return strconv.Itoa(len(values)) + " " + strings.Join(values, ",") } func removeDuplicationAndSort(values []string) []string { kvs := make(map[string]struct{}) for _, value := range values { _, ok := kvs[value] if !ok { kvs[value] = struct{}{} } } var ret []string for k := range kvs { ret = append(ret, k) } sort.Strings(ret) return ret }
mapF()生成<word, document>的键值对,reduceF()处理word对应的全部document,去重而且排序,而后拼接到一块儿。
具体代码在:https://github.com/gatsbyd/mit_6.824_2018 若有错误,欢迎指正: 15313676365