用go实现"古老的"mapreduce

最近抽空开始作6.824 Schedule: Spring 2018的相关练习,一个国外的分布式系统的相关课题任务,而后从新看了一下古老的 "mapreduce"相关论文论文 mapreduce Simplified Data Processing on Large Clusters,结合以前一些hadoop的相关知识,这里使用Go实现了mapreduce中的关键部分。我想简单的介绍一下mapreduce,而后解析实现过程,同时结合本身的想法,对mapreduce分布式离线计算框架有一个简单的整理,思考。html

简要介绍

mapreduce最开始是2004(全部说"古老")年被Google提出的,大多数人应该是在hadoop普遍普及才开始接触。git

简单来讲mapreduce就是离线处理大规模数据的分布式计算框架,用户只须要定义一个处理k,v的map函数(该map函数会自动将计算结果保存到临时文件中)和reduce(处理合并临时文件中全部key中记录)函数便可。因此从用户角度来说是很是简单的,使用类mapreduce框架就能够拥有处理海量数据的计算能力,他不须要去关心数据分区分片,任务的多机合理调度,任务失败重试,计算集群资源管理等等分布式系统中会遇到的问题。github

这里先简单梳理一下mapreduce的逻辑。golang


上图能够简单看一下官方的流程图,有几个流程,从左到右的逻辑是,首先是将数据文件切分,而后分布到每一个worker执行(至关于分布式执行map函数),再而后产生中间结果集文件,最后reduce函数聚合全部中间文件的结果集,而后其中须要Master节点去作居中调度,分配map Worker和reduce Worker。apache

下面简要介绍map, reduce, schedule的实现流程编程


编程模型json

对于开发者的编程模型是 map和reduce,处理一组k,v数据,输出一组k,v数据, 类mapreduce框架会提供map和reduce函数接口。bash

关键实现逻辑

具体代码提交放在连接中github-mapreduceapp

Master 任务分配框架

master将任务调度到合适的worker,这里实际上是整个计算系统的核心模块,至关因而整个系统的大脑,在这里其实须要作不少事,有不少细节,也有不少选择,好比首先很是重要的实现一个master-worker的服务发现过程,让worker注册到master, 能让master调度worker, 而后是须要实现worker-master之间的通讯协议,能够经过使用dubbo, grpc, thrift,再而后worker的任务调度,这里能够增长一个像hadoop yarn, apache mesos的这样的资源管理器去合理管理调度资源,将核实的任务放在合适的计算资源上,而后像worker作心跳保活,处理worker的异常调度,最后很是重要的是作master的高可用,这里不管是在google仍是hadoop感受都不是作得特别好,主要缘由应该一方面master源数据没有自动存储,而后也没有像master controller这样的管理工具去管理master容错的软件吧。

这里没有那么复杂,全部的worker和master都在一个节点,经过socket进行通讯,调用,单进程多协程,至关因而伪分布式的场景,而后这里只实现了任务的调度,将每一个doMap, doReduce 任务调度到合适的worker(这里的worker已经注册到了registerChan中),而后这里会有个局部变量allTask知道须要处理的总任务数,而后利用sync.WaitGroup控制携程函数的执行结束,而后将allTask中的任务经过使用Go func启动单独的协程去执行,当出现任务调度失败,将task从新放入到allTask,最后当等待全部的worker func执行完毕后,退出函数。

func schedule(
            jobName string, 
            mapFiles []string, 
            nReduce int, 
            phase jobPhase, 
            registerChan chan string) {

    var ntasks int
    var n_other int 

    //判断map reduce函数类型

    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }


    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

    fmt.Printf("Schedule: %v done\n", phase)

    var wg sync.WaitGroup
    var allTask = make(chan int)

     // 加载task, 等待全部task完成
    go func() {

        for i := 0; i < ntasks; i++ {
            wg.Add(1)
            allTask <- i
        }

        wg.Wait()
        close(allTask)

    }()

     // 调度全部的task

     for i := range allTask {

        // 获取合适的worker
        worker := <- registerChan

        go func(worker string, i int) {

            file := ""

            if phase == mapPhase {

                file = mapFiles[i]
            }
            doTaskArgs := DoTaskArgs{

                JobName: jobName,
                File :  file,
                Phase : phase,
                TaskNumber:  i,
                NumOtherPhase: n_other,
            }

            // 执行worker
            if call(worker, "Worker.DoTask", &doTaskArgs, nil ) {
                wg.Done()

                // 执行成功从新放回worker

                registerChan <- worker

            } else {

                    // 放回执行失败的task

                allTask <- i
            }

        }(worker, i)

    }


    fmt.Printf("Schedule: %v done\n", phase)

}
复制代码

Domap实现:

doMap实现逻辑其实比较简单,就是使用ioutil.ReadFile将就本worker的数据读出来(会有一个文件切片功能,这里没有介绍,就是将须要计算的数据经过分片的方式传递给worker, 在hadoop和gfs中默认是64M,这和分布式文件系统hdfs, gfs的具体内部实现逻辑有关系),而后执行用户定义的map func函数,将执行后的结果保存到中间文件中tmpFiles中, 这里保存的格式是json, 注意这里的环境假设是,全部worker都一个文件系统中,实际处理状况会在gfs,hdfs中。

func doMap(jobName string,  
    mapTask int, 
    inFile string,
    nReduce int, 
    mapF func(filename string, contents string) []KeyValue,
)

    // 1 读取合适的文件,执行用户定义的map函数

    body, err := ioutil.ReadFile(inFile)
    if err != nil {
        fmt.Errorf("doMap read file err %s", err)
        return
    }

    // 2 执行用户定义的map函数 

    resultKv := mapF(inFile ,string(body))

    if len(resultKv) == 0 {
        fmt.Println("doMap not need to create tmp file")
        return
    }
    tmpFiles := make([]*os.File, nReduce)
    tmpFileEcoder := make([]*json.Encoder, nReduce)

    // 3 预生成须要的中间结果文件,获取文件指针

    for i:= 0; i < nReduce; i++ {
        reduceTmpFileName := reduceName(jobName, mapTask, i)
        fmt.Println(reduceTmpFileName)
        stat, err := os.Stat(reduceTmpFileName)
        if err != nil {
            if os.IsExist(err) {
                os.Remove(reduceTmpFileName)
            }
        }
        if stat != nil {
            if stat.IsDir() {
                os.RemoveAll(reduceTmpFileName)
            }
        }
        reduceFile, err := os.Create(reduceTmpFileName)
        if err != nil {
            fmt.Errorf("doMap create tmp %s file err %s ", reduceTmpFileName, err)
            return
        }
        //tmpFiles := append(tmpFiles, reduceFile)
        tmpFiles[i] = reduceFile
        enc := json.NewEncoder(reduceFile)
        //tmpFileEcoder = append(tmpFileEcoder, enc)
        tmpFileEcoder[i] = enc

    }


    // 4 将map函数执行的resultKv,放入到中间文件中

    for _, kv := range resultKv {


        // 经过hash定位到 k 和对应的v 保存的合适的数据文件中

        hashIndex := ihash(kv.Key) % nReduce
        if tmpFileEcoder[hashIndex] == nil {
            fmt.Errorf("doMap tmpFile ecoder index err")
            continue
        }

        err := tmpFileEcoder[hashIndex].Encode(&kv)
        if err != nil {
            fmt.Errorf("doMap write tmp file %s err %s", tmpFiles[hashIndex].Name(), err)
        }
    }

    // 5 关闭文件描述符

    for _, tmpFile := range tmpFiles {
        tmpFile.Close()
    }
}
复制代码

DoReduce实现:

doReduce逻辑也不是太难,就是将当全部map执行结束后,执行reduce函数(这里会用map函数所生成的中间文件),至关因而合并排序结果集,而后输出到输出文件。

func doReduce(
    jobName string, 
    MapReduce job,
    reduceTask int, 
    outFile string, 
    nMap int, 
    reduceF func(key string, values []string) string,
) {


    resultMap := make(map[string][]string)

    var keys []string

    // 1 读取全部的中间文件

    for i:= 0; i < nMap; i++{

        reduceTmpFileName := reduceName(jobName, i, reduceTask)
        reduceTmpfile, err := os.Open(reduceTmpFileName)
        if err != nil {
            fmt.Errorf("doReduce read tmp file error %s", reduceTmpFileName)
            continue
        }

        var kv KeyValue

        // 2 解析每一个临时文件
        decode := json.NewDecoder(reduceTmpfile)
        err = decode.Decode(&kv)

        for err == nil {
            if _, ok := resultMap[kv.Key]; !ok {
                keys = append(keys, kv.Key)
            }
            resultMap[kv.Key] = append(resultMap[kv.Key], kv.Value)
            err = decode.Decode(&kv)
        }

        // 3 排序 key

        sort.Strings(keys)

        out, err := os.Create(outFile)
        if err != nil {
            fmt.Errorf("doReduce create output file %s failed", err)
            return
        }
        enc := json.NewEncoder(out)

        // 4 输出全部的结果到reduce文件中

        for _, key := range keys {
            if err = enc.Encode(KeyValue{key, reduceF(key, resultMap[key])}); err != nil {
                fmt.Errorf("write key: %s to file %s failed", key, outFile)
            }
        }
        out.Close()
    }


}
复制代码

想法:

感受其实几年前就进入云计算大数据的时代,而后进入移动互谅网,而后最近两年新冒出来的,物联网,新零售的概念,这意味着终端设备愈来愈多,同时也会产生愈来愈多的数据,咱们在数据中找到有价值的信息会愈来愈难,因此感受数据处理,数据分析的比重会是在互联网中会愈来愈高。

那么对应到技术上,可能之前最开始想到数据处理框架就是mapreduce, 如今可能不少人会以为mapreduce这种框架略显老式, 我其实以为仍是使用场景的问题,像带有离线计算,批处理,离线存储这样需求,hadoop体系仍是能够继续胜任,而后对于实时性要求更高的应用场景,会采用流式计算技术,像storm, yahoo s4等方案,如今开源社区也对于数据处理已经也孵化出了更多程序的框架,spark, flink的程序体系等,对于咱们来说在技术选项上也能有更多方案。


最后:

其实只实现了mapreduce比较简单的功能,有一些逻辑没有作介绍,具体能够看看上边链接中github的代码,也能够尝试作作6.824 Schedule: Spring 2018相关练习,会对理解分布式系统有更多的帮助, 有了一个基础的框架结构,能够往上边增长不少功能,实现一个标准的mapreduce库吧。



相关文章
相关标签/搜索