最近抽空开始作6.824 Schedule: Spring 2018的相关练习,一个国外的分布式系统的相关课题任务,而后从新看了一下古老的 "mapreduce"相关论文论文 mapreduce Simplified Data Processing on Large Clusters,结合以前一些hadoop的相关知识,这里使用Go实现了mapreduce中的关键部分。我想简单的介绍一下mapreduce,而后解析实现过程,同时结合本身的想法,对mapreduce分布式离线计算框架有一个简单的整理,思考。html
mapreduce最开始是2004(全部说"古老")年被Google提出的,大多数人应该是在hadoop普遍普及才开始接触。git
简单来讲mapreduce就是离线处理大规模数据的分布式计算框架,用户只须要定义一个处理k,v的map函数(该map函数会自动将计算结果保存到临时文件中)和reduce(处理合并临时文件中全部key中记录)函数便可。因此从用户角度来说是很是简单的,使用类mapreduce框架就能够拥有处理海量数据的计算能力,他不须要去关心数据分区分片,任务的多机合理调度,任务失败重试,计算集群资源管理等等分布式系统中会遇到的问题。github
这里先简单梳理一下mapreduce的逻辑。golang
上图能够简单看一下官方的流程图,有几个流程,从左到右的逻辑是,首先是将数据文件切分,而后分布到每一个worker执行(至关于分布式执行map函数),再而后产生中间结果集文件,最后reduce函数聚合全部中间文件的结果集,而后其中须要Master节点去作居中调度,分配map Worker和reduce Worker。apache
下面简要介绍map, reduce, schedule的实现流程编程
编程模型json
对于开发者的编程模型是 map和reduce,处理一组k,v数据,输出一组k,v数据, 类mapreduce框架会提供map和reduce函数接口。bash
具体代码提交放在连接中github-mapreduceapp
Master 任务分配框架
master将任务调度到合适的worker,这里实际上是整个计算系统的核心模块,至关因而整个系统的大脑,在这里其实须要作不少事,有不少细节,也有不少选择,好比首先很是重要的实现一个master-worker的服务发现过程,让worker注册到master, 能让master调度worker, 而后是须要实现worker-master之间的通讯协议,能够经过使用dubbo, grpc, thrift,再而后worker的任务调度,这里能够增长一个像hadoop yarn, apache mesos的这样的资源管理器去合理管理调度资源,将核实的任务放在合适的计算资源上,而后像worker作心跳保活,处理worker的异常调度,最后很是重要的是作master的高可用,这里不管是在google仍是hadoop感受都不是作得特别好,主要缘由应该一方面master源数据没有自动存储,而后也没有像master controller这样的管理工具去管理master容错的软件吧。
这里没有那么复杂,全部的worker和master都在一个节点,经过socket进行通讯,调用,单进程多协程,至关因而伪分布式的场景,而后这里只实现了任务的调度,将每一个doMap, doReduce 任务调度到合适的worker(这里的worker已经注册到了registerChan中),而后这里会有个局部变量allTask知道须要处理的总任务数,而后利用sync.WaitGroup控制携程函数的执行结束,而后将allTask中的任务经过使用Go func启动单独的协程去执行,当出现任务调度失败,将task从新放入到allTask,最后当等待全部的worker func执行完毕后,退出函数。
func schedule(
jobName string,
mapFiles []string,
nReduce int,
phase jobPhase,
registerChan chan string) {
var ntasks int
var n_other int
//判断map reduce函数类型
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
fmt.Printf("Schedule: %v done\n", phase)
var wg sync.WaitGroup
var allTask = make(chan int)
// 加载task, 等待全部task完成
go func() {
for i := 0; i < ntasks; i++ {
wg.Add(1)
allTask <- i
}
wg.Wait()
close(allTask)
}()
// 调度全部的task
for i := range allTask {
// 获取合适的worker
worker := <- registerChan
go func(worker string, i int) {
file := ""
if phase == mapPhase {
file = mapFiles[i]
}
doTaskArgs := DoTaskArgs{
JobName: jobName,
File : file,
Phase : phase,
TaskNumber: i,
NumOtherPhase: n_other,
}
// 执行worker
if call(worker, "Worker.DoTask", &doTaskArgs, nil ) {
wg.Done()
// 执行成功从新放回worker
registerChan <- worker
} else {
// 放回执行失败的task
allTask <- i
}
}(worker, i)
}
fmt.Printf("Schedule: %v done\n", phase)
}
复制代码
doMap实现逻辑其实比较简单,就是使用ioutil.ReadFile将就本worker的数据读出来(会有一个文件切片功能,这里没有介绍,就是将须要计算的数据经过分片的方式传递给worker, 在hadoop和gfs中默认是64M,这和分布式文件系统hdfs, gfs的具体内部实现逻辑有关系),而后执行用户定义的map func函数,将执行后的结果保存到中间文件中tmpFiles中, 这里保存的格式是json, 注意这里的环境假设是,全部worker都一个文件系统中,实际处理状况会在gfs,hdfs中。
func doMap(jobName string,
mapTask int,
inFile string,
nReduce int,
mapF func(filename string, contents string) []KeyValue,
)
// 1 读取合适的文件,执行用户定义的map函数
body, err := ioutil.ReadFile(inFile)
if err != nil {
fmt.Errorf("doMap read file err %s", err)
return
}
// 2 执行用户定义的map函数
resultKv := mapF(inFile ,string(body))
if len(resultKv) == 0 {
fmt.Println("doMap not need to create tmp file")
return
}
tmpFiles := make([]*os.File, nReduce)
tmpFileEcoder := make([]*json.Encoder, nReduce)
// 3 预生成须要的中间结果文件,获取文件指针
for i:= 0; i < nReduce; i++ {
reduceTmpFileName := reduceName(jobName, mapTask, i)
fmt.Println(reduceTmpFileName)
stat, err := os.Stat(reduceTmpFileName)
if err != nil {
if os.IsExist(err) {
os.Remove(reduceTmpFileName)
}
}
if stat != nil {
if stat.IsDir() {
os.RemoveAll(reduceTmpFileName)
}
}
reduceFile, err := os.Create(reduceTmpFileName)
if err != nil {
fmt.Errorf("doMap create tmp %s file err %s ", reduceTmpFileName, err)
return
}
//tmpFiles := append(tmpFiles, reduceFile)
tmpFiles[i] = reduceFile
enc := json.NewEncoder(reduceFile)
//tmpFileEcoder = append(tmpFileEcoder, enc)
tmpFileEcoder[i] = enc
}
// 4 将map函数执行的resultKv,放入到中间文件中
for _, kv := range resultKv {
// 经过hash定位到 k 和对应的v 保存的合适的数据文件中
hashIndex := ihash(kv.Key) % nReduce
if tmpFileEcoder[hashIndex] == nil {
fmt.Errorf("doMap tmpFile ecoder index err")
continue
}
err := tmpFileEcoder[hashIndex].Encode(&kv)
if err != nil {
fmt.Errorf("doMap write tmp file %s err %s", tmpFiles[hashIndex].Name(), err)
}
}
// 5 关闭文件描述符
for _, tmpFile := range tmpFiles {
tmpFile.Close()
}
}
复制代码
doReduce逻辑也不是太难,就是将当全部map执行结束后,执行reduce函数(这里会用map函数所生成的中间文件),至关因而合并排序结果集,而后输出到输出文件。
func doReduce(
jobName string,
MapReduce job,
reduceTask int,
outFile string,
nMap int,
reduceF func(key string, values []string) string,
) {
resultMap := make(map[string][]string)
var keys []string
// 1 读取全部的中间文件
for i:= 0; i < nMap; i++{
reduceTmpFileName := reduceName(jobName, i, reduceTask)
reduceTmpfile, err := os.Open(reduceTmpFileName)
if err != nil {
fmt.Errorf("doReduce read tmp file error %s", reduceTmpFileName)
continue
}
var kv KeyValue
// 2 解析每一个临时文件
decode := json.NewDecoder(reduceTmpfile)
err = decode.Decode(&kv)
for err == nil {
if _, ok := resultMap[kv.Key]; !ok {
keys = append(keys, kv.Key)
}
resultMap[kv.Key] = append(resultMap[kv.Key], kv.Value)
err = decode.Decode(&kv)
}
// 3 排序 key
sort.Strings(keys)
out, err := os.Create(outFile)
if err != nil {
fmt.Errorf("doReduce create output file %s failed", err)
return
}
enc := json.NewEncoder(out)
// 4 输出全部的结果到reduce文件中
for _, key := range keys {
if err = enc.Encode(KeyValue{key, reduceF(key, resultMap[key])}); err != nil {
fmt.Errorf("write key: %s to file %s failed", key, outFile)
}
}
out.Close()
}
}
复制代码
感受其实几年前就进入云计算大数据的时代,而后进入移动互谅网,而后最近两年新冒出来的,物联网,新零售的概念,这意味着终端设备愈来愈多,同时也会产生愈来愈多的数据,咱们在数据中找到有价值的信息会愈来愈难,因此感受数据处理,数据分析的比重会是在互联网中会愈来愈高。
那么对应到技术上,可能之前最开始想到数据处理框架就是mapreduce, 如今可能不少人会以为mapreduce这种框架略显老式, 我其实以为仍是使用场景的问题,像带有离线计算,批处理,离线存储这样需求,hadoop体系仍是能够继续胜任,而后对于实时性要求更高的应用场景,会采用流式计算技术,像storm, yahoo s4等方案,如今开源社区也对于数据处理已经也孵化出了更多程序的框架,spark, flink的程序体系等,对于咱们来说在技术选项上也能有更多方案。
其实只实现了mapreduce比较简单的功能,有一些逻辑没有作介绍,具体能够看看上边链接中github的代码,也能够尝试作作6.824 Schedule: Spring 2018相关练习,会对理解分布式系统有更多的帮助, 有了一个基础的框架结构,能够往上边增长不少功能,实现一个标准的mapreduce库吧。