在PartI主要是实现 common_map.go的 doMap()方法(分割map任务输出的函数)以及 common_reduce.go的 doReduce()方法(收集reduce任务的全部输入的函数),此时map和reduce阶段的task仍是串行运行的。json
首先了解一下整个程序的运行流程,执行下列命令l便可运行试验第一部分的代码。数组
go test -run Sequentialbash
执行命令后会运行test_test.go中的TestSequentialSingle函数app
func TestSequentialSingle(t *testing.T) {
mr := Sequential("test", makeInputs(1), 1, MapFunc, ReduceFunc)
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}
复制代码
Sequential()函数中会串行地执行map和reduce tasks。其中makeInputs会生成一个输入文件824-mrinput-0.txt,即本次要处理的文件,里面是递增的数字(0~99999),一个数字为一行。函数
func Sequential(jobName string, files []string, nreduce int,
mapF func(string, string) []KeyValue,
reduceF func(string, []string) string,
) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}
复制代码
根据参数,本次只会生成一个文件824-mrinput-0.txt,以及一个reduce task处理。在Sequential函数中首先调用 doMap()方法实现Map功能,生成中间键值对,而后调用doReduce()方法实现Reduce功能。因为只有一个文件和一个reduce task,因此doMap()和doReduce()会依次串行地各执行一次。测试
doMap()函数主要是实现这样的功能:读取一个输入文件(inFile),调用咱们实现的Map功能的函数mapF,并将mapF的输出内容分配到给nReduce的中间文件中。ui
func reduceName(jobName string, mapTask int, reduceTask int) string {
return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}
复制代码
// 将读取文件中的全部单词分割,返回[]KeyValue,形如[(“0”,””),(“1”,””)...] ps.可能存在重复的KeyValue
func MapFunc(file string, value string) (res []KeyValue) {
debug("Map %v\n", value)
// Fields 以连续的空白字符为分隔符,将 s 切分红多个子串,结果中不包含空白字符自己
// 空白字符有:\t, \n, \v, \f, \r, ' ', U+0085 (NEL), U+00A0 (NBSP)
words := strings.Fields(value)
for _, w := range words {
kv := KeyValue{w, ""}
res = append(res, kv)
}
return
}
复制代码
至于写入中间文件,建议使用json的方式,可使用如下方法this
enc := json.NewEncoder(file)
for _, kv := ... {
err := enc.Encode(&kv)
}
复制代码
综上分析,获得实现doMap()函数的大体思路(本部分 只有一个输入文件和一个reduce task,nReduce=1):
首先根据n个输入文件和设定的m个reduce tasks,生成n*m个中间文件,调用reduceName方法进行命名。因为这部分实验只有一个输入文件和一个reduce task,所以只会生成一个中间文件。 对于每个输入文件fileX,读取文件内容,调用mapF()方法进行处理,最终返回键值对[]KeyValue。此处的mapF()是指上文的MapFunc方法 对上一步生成的[]KeyValue,每个Key调用ihash()方法而后mod nReduce,选择该 KeyValue写入哪一个中间文件中。
处理完[]KeyValue所有内容,关闭文件。
个人实现代码以下:spa
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
//建立中间文件
var interFiles []*os.File
for r:=0;r<nReduce;r++ {
interName := reduceName(jobName, mapTask, r)
interFile,err := os.Create(interName)
if err != nil {
fmt.Println(err)
}
interFiles = append(interFiles,interFile)
defer interFile.Close()
}
//调用mapF,获得map程序处理后的键值对
inBody,err:=ioutil.ReadFile(inFile)//读取文件内容
if err!=nil {
fmt.Println(err)
}
keyValue := mapF(inFile,string(inBody))
//写入中间文件
for _,v := range keyValue {
r:= ihash(v.Key)%nReduce
enc := json.NewEncoder(interFiles[r])
err := enc.Encode(&v)
if err!=nil {
fmt.Println(err)
}
}
}
复制代码
doReduce()主要是实现这样的功能:读取这个reduce task对应的中间文件,按key对中间键/值对进行排序及合并,为每一个key调用用户定义的reduce函数(reduceF),最后将reduceF的输出写入磁盘。
若在doMap()中使用了enc.Encode(&kv)将中间键值对写入中间文件,在doReduce()中可使用Decode(&kv)来读取。最后的输出文件也推荐使用json的方式写入。
同理,reduceF对应的是ReduceFunc,也已经在原代码中实现了。第一个参数是键key,第二个参数是值value的数组,若各个中间文件中某个key有多个value。在ReduceFunc中只是打印了key值,没作什么处理。debug
// Just return key
func ReduceFunc(key string, values []string) string {
for _, e := range values {
debug("Reduce %s %v\n", key, e)
}
return ""
}
复制代码
个人实现以下
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
//读取中间文件
var keyValue []KeyValue
for i:=0;i<nMap;i++ {
interName := reduceName(jobName, i, reduceTask)
interBody,err:=ioutil.ReadFile(interName)
if err != nil {
fmt.Println(err)
}
dec := json.NewDecoder(strings. NewReader(string(interBody)))
for {
var m KeyValue
if err := dec.Decode(&m) ; err == io. EOF {
break
} else if err != nil {
fmt.Println(err)
}
keyValue = append(keyValue,m)
}
}
//排序及合并,处理后应该相似["0":[""],"1":[""]...]
var keyValuesMap map[string][]string
keyValuesMap = make(map[string][]string)
for _,v := range keyValue {
if _,ok:= keyValuesMap[v.Key];ok {//若key值已存在,将value添加到[]string中
keyValuesMap[v.Key] = append(keyValuesMap[v.Key],v.Value)
}else{//若key值不存在,在map中新建key
var values []string
values = append(values,v.Value)
keyValuesMap[v.Key] = values
}
}
//对每一个key调用reduceF,并写入最后的文件
outputFile,err := os.Create(outFile)
if err!=nil {
fmt.Println(err)
}
enc := json.NewEncoder(outputFile)
for k,v := range keyValuesMap {
err := enc.Encode(KeyValue{k, reduceF(k,v)})
if err!=nil {
fmt.Println(err)
}
}
}
复制代码
在6.824\src\mapreduce目录下执行下列命令进行PartI实验的测试
go test -run Sequential
若运行经过会在结果中输出ok,相似
ps.运行go命令须要设置好GOPATH
在common.go中设置 debugEnabled = true,go test时增长-v参数能够得到更多调试信息
env "GOPATH=$PWD/../../" go test -v -run Sequential
=== RUN TestSequentialSingle
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
master: Map/Reduce task completed
--- PASS: TestSequentialSingle (1.34s)
=== RUN TestSequentialMany
master: Starting Map/Reduce task test
Merge: read mrtmp.test-res-0
Merge: read mrtmp.test-res-1
Merge: read mrtmp.test-res-2
master: Map/Reduce task completed
--- PASS: TestSequentialMany (1.33s)
PASS
ok mapreduce 2.672s