在这部分实验要将以前串行版本的MapReduce tasks改为并发模式,只须要实现 mapreduce/schedule.go中的 schedule()函数,其余文件不作更改。 主机在MapReduce做业期间调用schedule()两次,一次用于Map阶段,一次用于Reduce阶段。schedule()的工做是将tasks分发给可用的 workers。一般tasks会比 workers多,所以schedule()必须为每一个 worker提供一系列task,但每一个worker一次只能执行一个task。 schedule()应该等到全部tasks都完成后再返回。bash
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
// All ntasks tasks have to be scheduled on workers. Once all tasks
// have completed successfully, schedule() should return.
//
// Your code here (Part III, Part IV).
//
fmt.Printf("Schedule: %v done\n", phase)
}
复制代码
schedule()经过读取registerChan参数来获取 workers 信息。该通道为每一个worker生成一个字符串,包含worker的RPC address。有些workers可能在调度schedule()以前已经存在,而某些workers可能在schedule()运行时启动,但全部workers都会出如今 registerChan上。schedule()应该使用全部worker,包括启动后出现的worker。并发
schedule()经过向worker 发送Worker.DoTask RPC来告诉worker执行task,每次只能向一个给定的worker发送一个RPC。该RPC的参数被定义在MapReduce / common_rpc.go中的DoTaskArgs。其中File参数只在Map tasks中使用,表示要读取的输入文件名称。 schedule()能够在mapFiles中找到这些文件名。app
type DoTaskArgs struct {
JobName string
File string // 只在Map tasks中使用,表示要读取的输入文件名称
Phase jobPhase // 标志当前是map仍是reduce阶段
TaskNumber int // 此task在当前阶段的索引
// NumOtherPhase是其余阶段的task总数
// mappers须要这个来计算output bins, reducers 须要这个来知道要收集多少输入文件
NumOtherPhase int
}
复制代码
使用mapreduce / common_rpc.go中的call()函数 将RPC发送给worker。第一个参数是worker的 address,从registerChan读取。第二个参数应该是“Worker.DoTask”,表示经过rpc调用worker的DoTask方法,第三个参数应该是DoTaskArgs结构,最后一个参数应该是nil。函数
func call(srv string, rpcname string,
args interface{}, reply interface{}) bool {
c, errx := rpc.Dial("unix", srv)
if errx != nil {
return false
}
defer c.Close()
err := c.Call(rpcname, args, reply)
if err == nil {
return true
}
fmt.Println(err)
return false
}
复制代码
再来看下Worker的结构和方法,定义在worker.go中测试
type Worker struct {
sync.Mutex
name string
Map func(string, string) []KeyValue
Reduce func(string, []string) string
nRPC int // quit after this many RPCs; protected by mutex
nTasks int // total tasks executed; protected by mutex
concurrent int // number of parallel DoTasks in this worker; mutex
l net.Listener
parallelism *Parallelism
}
复制代码
他有下列几个方法:ui
type Parallelism struct {
mu sync.Mutex
now int32
max int32
}
复制代码
其中max字段记录该worker运行的最大task数量,经过锁机制保证了并发。因为在各个函数间传递的是&Parallelism(地址),因此你们修改的是同一个Parallelism。this
这个实验将执行两个测试,TestParallelBasic(验证运行是否正确)和TestParallelCheck(验证是否并发执行) 。在TestParallelBasic中首先会生成20个824-mrinput-xx.txt的输入文件,接着调用Distributed()函数,启动schedule(),运行map tasks和reduce tasks,而后新开两个协程去启动两个workers。最后检查生成的输出文件是否正确,每一个worker至少执行了一个task。所以map tasks数量为20,reduce tasks数量为10,workers数量为2。spa
func TestParallelBasic(t *testing.T) {
mr := setup()//该函数中会调用Distributed()
for i := 0; i < 2; i++ {
go RunWorker(mr.address, port("worker"+strconv.Itoa(i)),
MapFunc, ReduceFunc, -1, nil)
}
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
cleanup(mr)
}
复制代码
//先运行 map tasks,而后运行reduce tasks
func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
mr = newMaster(master)
mr.startRPCServer()
//会先运行 map tasks(mapPhase),而后运行reduce tasks(reducePhase),具体逻辑在Master.run函数中
go mr.run(jobName, files, nreduce,
func(phase jobPhase) {
//建立一个无缓冲的通道
ch := make(chan string)
//将全部现有的和新注册的workers信息发送到ch通道中,schedule经过通道ch获取workers信息
go mr.forwardRegistrations(ch)
//本次要实现内容,将tasks分发给可用的 workers
schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
},
func() {
mr.stats = mr.killWorkers()
mr.stopRPCServer()
})
return
}
复制代码
TestParallelCheck 则是验证所编写的调度程序是否并行地执行task,一样开启了两个worker,检验worker中的parallelism.max(运行的worker数量最大值)是否小于2,若小于则失败。线程
func TestParallelCheck(t *testing.T) {
mr := setup()
parallelism := &Parallelism{}
for i := 0; i < 2; i++ {
go RunWorker(mr.address, port("worker"+strconv.Itoa(i)),
MapFunc, ReduceFunc, -1, parallelism)
}
mr.Wait()
check(t, mr.files)
checkWorker(t, mr.stats)
parallelism.mu.Lock()
if parallelism.max < 2 {
t.Fatalf("workers did not execute in parallel")
}
parallelism.mu.Unlock()
cleanup(mr)
}
复制代码
schedule()的参数中mapFiles是输入文件名称列表,每一个maptask处理一个。nReduce是reduce tasks的数量,registerChan通道传递全部worker的RPC address。局部变量ntasks表示当前阶段的tasks数量,如果map阶段则为输入文件数量,如果reduce阶段则为nReduce参数。
了解了大概的运行流程,schedule()中要作的就是开启多个线程,读取通道registerChan获取worker的address。构造DoTaskArgs参数。调用call()方法向worker 发送Worker.DoTask RPC,告诉worker执行task。在测试程序中开启了两个worker:worker0和worker1,而registerChan是一个无缓冲的通道,每次通道上只有一个worker address,所以能够将处理完task的worker的 address再放回registerChan,供下一个线程读取,以此实现两个worker并发运行。
一旦全部该阶段的tasks成功完成,schedule()就返回,这个功能可使用sync.WaitGroup来实现。 个人实现以下:3d
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
var wg sync.WaitGroup
wg.Add(ntasks)
for i:=0;i<ntasks;i++{
//开启线程并发调用
go func(taskNum int) {
//从chan获取可用的worker
for w := range registerChan {
//构造DoTaskArgs参数
var arg DoTaskArgs
switch phase {
case mapPhase:
arg = DoTaskArgs{JobName:jobName,File:mapFiles[taskNum],Phase:mapPhase,TaskNumber:taskNum,NumOtherPhase:n_other}
case reducePhase:
arg = DoTaskArgs{JobName:jobName,File:"",Phase:reducePhase,TaskNumber:taskNum,NumOtherPhase:n_other}
}
call(w,"Worker.DoTask",arg,nil)
wg.Done()
registerChan <- w//将worker address放回registerChan
break
}
}(i)
}
wg.Wait()
return
}
复制代码
运行下面命令来测试所编写的实验代码。这将依次执行两个测试,TestParallelBasic和TestParallelCheck 。
go test -run TestParallel
获得相似下面结果程序运行成功