在这部分,须要使master处理失败的worker。若是worker在处理master的RPC时失败,则master的call()最终会因超时而返回false。在这种状况下,master应该将失败的task从新分配给另外一个worker。
RPC的失败并不必定意味着worker没有执行task,worker可能已执行可是回复丢失,或者worker可能仍在执行但master的RPC超时。因此若是从新分配task可能形成2个worker接受相同的task并计算。可是这不要紧,由于相同的task生成相同的结果。咱们只要实现对失败的task从新分配worker便可。
这部分一样只须要修改schedule.go文件。bash
这部分只须要对call()的结果作一个判断就行,若是成功,则WaitGroup减一,将该worker address放回registerChan通道,并跳出registerChan通道的读取。若失败则继续从registerChan中读取worker执行。并发
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
var wg sync.WaitGroup
wg.Add(ntasks)
for i:=0;i<ntasks;i++{
//开启线程并发调用
go func(taskNum int) {
//从chan获取可用的worker
for w := range registerChan {
//构造DoTaskArgs参数
var arg DoTaskArgs
switch phase {
case mapPhase:
arg = DoTaskArgs{JobName:jobName,File:mapFiles[taskNum],Phase:mapPhase,TaskNumber:taskNum,NumOtherPhase:n_other}
case reducePhase:
arg = DoTaskArgs{JobName:jobName,File:"",Phase:reducePhase,TaskNumber:taskNum,NumOtherPhase:n_other}
}
result := call(w,"Worker.DoTask",arg,nil)
if result {
wg.Done()
registerChan <- w
break
}
}
}(i)
}
wg.Wait()
return
}
复制代码
运行下面命令来测试所写代码app
go test -run Failure函数
前几个试验实现的是统计一些文档中各个单词出现的总次数,而这个部分须要实现统计有单词出现的文档数,即某个单词同一个文档中出现屡次,只算一次,统计该单词在哪些文档中出现了。
须要实现main/ii.go 中的mapF和reduceF函数。最终output文件内容应该是下面这种格式,每一个单词一行。
word: 出现该word的文档数 出现该word的文档,以’,’分隔测试
和前面的实现基本相似,只是返回的KeyValue中的Value由原来的单词频数变成了该输入文件的文件名。ui
func mapF(document string, value string) (res []mapreduce.KeyValue) {
//删除,.?等等标点符号
re, _ := regexp.Compile("[^a-z^A-Z]")
value = re.ReplaceAllString(value, " ")
var kv map[string]string
kv = make(map[string]string)
words := strings.Fields(value)
for _, w := range words {
kv[w] = document
}
//转换为[]mapreduce.KeyValue
for k,v := range kv {
res = append(res,mapreduce.KeyValue{k,v})
}
return res
}
复制代码
也是差很少的处理,须要注意的是values []string会有重复的文件名,因此应该去重。最后返回的该word的value应该是这种格式:出现该word的文档数+" "+出现该word的文档,以’,’分隔。spa
func reduceF(key string, values []string) string {
var tmp []string
//去除values中重复的文件名,最终出现过该单词的文件名保存在tmp中
set := make(map[string]int)
for _, str := range values {
_, ok := set[str]
if !ok {
set[str] = 1
tmp = append(tmp, str)
}
}
sort.Strings(tmp)//文件名排序
//拼接返回的Value内容,应该是相似这种样子
//8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
num := len(tmp)
result := strconv.Itoa(num)+" "
for i,v := range tmp {
if(i<num-1){
result = result + v +","
}else{
result = result + v
}
}
return result
}
复制代码
运行脚本测试,最终显示以下则测试经过。线程
bash ./test-ii.sh3d
LC_ALL=C sort -k1,1 mrtmp.iiseq | sort -snk2,2 | grep -v '16' | tail -10
www: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt year: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
years: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
yesterday: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
yet: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
you: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
young: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
your: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
yourself: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txt
zip: 8 pg-being_ernest.txt,pg-dorian_gray.txt,pg-frankenstein.txt,pg-grimm.txt,pg-huckleberry_finn.txt,pg-metamorphosis.txt,pg-sherlock_holmes.txt,pg-tom_sawyer.txtcode