最近有幸拜读 Google 分布式的三大论文,本着好记性不如烂笔头的原则,谈谈楼主对分布式系统开发的一点小小的心得~算法
相信用过 Hadoop 的同窗在等待结果输出的时候会出现相似于这样的 INFO : 2020-01-17 11:44:14,132 Stage-11 map = 0%, reduce = 0%
的日志,它展现了 MapReduce 的执行过程,下面咱们也将就 MapReduce 进行展开,阐述 MapReduce 的执行原理以及根据 Google 的论文实现了 mini 版的 MapReduce。编程
MapReduce is a programming model and an associated implementation for processing and generating large data sets. Users specify a map function that processes a key/value pair to generate a set of intermediate key/value pairs, and a reduce function that merges all intermediate values associated with the same intermediate key.json
就像 Google 的 MapReduce 论文中所说的, MapReduce 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先建立一个 Map 函数处理一个基于 key/value pair
的数据集合,输出中间的基于 key/value pair
的数据集合,而后再建立一个 Reduce 函数用来合并全部的具备相同中间 key 值的中间 value 值。缓存
MapReduce编程模型的原理是:利用一个输入 key/value pair
集合来产生一个输出的 key/value pair
集合。MapReduce 库的用户用两个函数表达这个计算:Map和Reduce。app
Map : 用户自定义的 Map 函数接受一个输入的 key/value pair
值,而后产生一个中间 key/value pair
值的集合。MapReduce 库把全部具备相同中间 key 值的中间 value 值集合在一块儿后传递给 Reduce 函数。框架
Reduce : 用户自定义的 Reduce 函数接受一个中间 key 的值和相关的一个 value 值的集合。Reduce 函数合并这些 value 值,造成一个较小的 value 值的集合。通常的,每次 Reduce 函数调用只产生 0 或 1 个输出 value 值。一般咱们经过一个迭代器把中间 value 值提供给 Reduce 函数,这样咱们就能够处理没法所有放入内存中的大量的 value 值的集合。分布式
master
。副本中其它的程序都是 worker
程序,由 master
分配任务。有 M 个 Map 任务和 R 个 Reduce 任务将被分配,master
将一个 Map 任务或 Reduce 任务分配给一个空闲的 worker
。worker
程序读取相关的输入数据片断,从输入的数据片断中解析出 key/value pair
,而后把 key/value pair
传递给用户自定义的 Map 函数,由 Map 函数生成并输出的中间 key/value pair
,并缓存在内存中。key/value pair
经过分区函数分红 R 个区域,以后周期性的写入到本地磁盘上。缓存的 key/value pair
在本地磁盘上的存储位置将被回传给 master
,由 master
负责把这些存储位置再传送给 Reduce worker
。Reduce worker
程序接收到 master
程序发来的数据存储位置信息后,使用 RPC 从 Map worker
所在主机的磁盘上读取这些缓存数据。当 Reduce worker
读取了全部的中间数据后,经过对 key 进行排序后使得具备相同 key 值的数据聚合在一块儿。因为许多不一样的 key 值会映射到相同的 Reduce 任务上,所以必须进行排序。若是中间数据太大没法在内存中完成排序,那么就要在外部进行排序。Reduce worker
程序遍历排序后的中间数据,对于每个惟一的中间 key 值,Reduce worker
程序将这个key值和它相关的中间 value 值的集合传递给用户自定义的 Reduce 函数。Reduce 函数的输出被追加到所属分区的输出文件。master
唤醒用户程序。在这个时候,在用户程序里的对 MapReduce 调用才返回。在成功完成任务以后,MapReduce 的输出存放在 R 个输出文件中(对应每一个 Reduce 任务产生一个输出文件,文件名由用户指定)。通常状况下,用户不须要将这 R 个输出文件合并成一个文件,咱们常常把这些文件做为另一个 MapReduce 的输入,或者在另一个能够处理多个分割文件的分布式应用中使用。函数
MapReduce 的核心就是实现其 Map 与 Reduce 的逻辑代码,显示楼主将就在上面描述的 Map 与 Reduce 的执行过程完成对 Map 与 Reduce 的实现。oop
1,下面的 doMap 函数管理一项 map 任务:它读取输入文件(inFile),为该文件的内容调用用户定义的 map 函数(mapF),而后将 mapF 的输出分区为 nReduce 中间文件。大数据
2,每一个 reduce 任务对应一个中间文件。文件名包括 map 任务编号和 reduce 任务编号。使用由reduceName 函数生成的文件名做为 reduce 任务的中间文件。在每一个key mod nReduce 上调用 ihash()来选择对应的 reduce 任务。
3,mapF 是应用程序提供的 map 函数。第一个参数应该是输入文件名。第二个参数应该是整个输入文件的内容。 mapF()返回包含用于 reduce 的键/值对的切片。
4,下面程序中使用 json 格式将 mapF 处理好的数据写入文件中,为了数据处理方便,下面程序中处理好的每条数据都采用换行符进行分割。
func reduceName(jobName string, mapTask int, reduceTask int) string {
return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}
func doMap( jobName string, // MapReduce 的任务名称 mapTask int, // 当前执行的 mapTask inFile string, // 输入的的文件 nReduce int, // reduceTask 的数量 mapF func(filename string, contents string) []KeyValue, // 用户自定义的 map 函数 ) {
f, err := os.Open(inFile)
if err != nil {
debug("open file err %v", err)
}
defer f.Close()
dat, err := ioutil.ReadAll(f)
if err != nil {
debug("open map file err %v", err)
}
res := mapF(inFile, string(dat))
for _, kv := range res {
hash := ihash(kv.Key)
r := hash % nReduce
// mrtmp.xxx-0-0
fd, err := os.OpenFile(reduceName(jobName, mapTask, r), os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644)
if err != nil {
debug("open mrtmp.xxx file err %v", err)
continue
}
enc := json.NewEncoder(fd)
if err := enc.Encode(&kv); err != nil {
debug("encode json err %v", err)
continue
}
fd.Close()
}
}
func ihash(s string) int {
h := fnv.New32a()
h.Write([]byte(s))
return int(h.Sum32() & 0x7fffffff)
}
复制代码
doReduce 管理一个 reduce 任务:它读取任务的中间文件,按 key 对中间文件中的数据对进行排序,为每一个 key 调用用户定义的 reduceF 函数,并将 reduceF 的输出的写入磁盘。
func reduceName(jobName string, mapTask int, reduceTask int) string {
return "mrtmp." + jobName + "-" + strconv.Itoa(mapTask) + "-" + strconv.Itoa(reduceTask)
}
func doReduce( jobName string, // MapReduce 的任务名称 reduceTask int, // 当前运行的 reduce 任务的任务号 outFile string, // 结果输出的文件路径 nMap int, // map 任务的个数 reduceF func(key string, values []string) string, // 用户的自定义 reduce 函数 ) {
kvMap := make(map[string][]string)
for i := 0; i < nMap; i++ {
func() {
inFileName := reduceName(jobName, i, reduceTask)
inFile, err := os.Open(inFileName)
if err != nil {
panic("can't open file:" + inFileName)
}
defer inFile.Close()
// Read and Decoder the file
var kv KeyValue
for decoder := json.NewDecoder(inFile); decoder.Decode(&kv) != io.EOF; {
kvMap[kv.Key] = append(kvMap[kv.Key], kv.Value)
}
}()
}
var keys []string
// sort by key
for k := range kvMap {
keys = append(keys, k)
}
sort.Strings(keys)
// reduce
outfd, err := os.Create(outFile)
if err != nil {
panic("can't create file:" + outFile)
}
defer outfd.Close()
enc := json.NewEncoder(outfd)
for _, k := range keys {
reducedValue := reduceF(k, kvMap[k])
enc.Encode(KeyValue{Key: k, Value: reducedValue})
}
}
复制代码
下面的函数是对 doMap 与 doReduce 进行顺序调用,生成 MapReduce 任务的结果输出到结果文件中。
func Sequential(jobName string, files []string, nreduce int, mapF func(string, string) []KeyValue, reduceF func(string, []string) string, ) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}
复制代码
在上面咱们提到了 MapReduce 在实际应用中的例子,下面咱们将对这两个例子作一下简单的实现。
为了实现词频统计这一功能,咱们使用 MapReduce 框架的思路就是实现自定义的 map 与 reduce 函数: 1,map:读取文档,将文档中的单词逐个提取出来,生成(单词,1)这样的键值对,而后把数据罗盘,写入到中间文件中。 2,reduce:读取中间文件,按照键值对进行排序,将 key 相同的数据聚合到一块儿,统计每一个单词出现的次数,而后将结果写入到文件中落盘。
package main
import (
"6.824/src/mapreduce"
"fmt"
"os"
"strconv"
"strings"
"unicode"
)
func mapF(filename string, contents string) (res []mapreduce.KeyValue) {
// Your code here (Part II).
f := func(c rune) bool {
return !unicode.IsLetter(c)
}
words := strings.FieldsFunc(contents, f)
for _, w := range words {
kv := mapreduce.KeyValue{Key: w, Value: "1"}
res = append(res, kv)
}
return res
}
func reduceF(key string, values []string) string {
// Your code here (Part II).
sum := 0
for _, e := range values {
data, err := strconv.Atoi(e)
if err != nil {
fmt.Printf("Reduce err %s%v\n", key, err)
continue
}
sum += data
}
return strconv.Itoa(sum)
}
func main() {
if len(os.Args) < 4 {
fmt.Printf("%s: see usage comments in file\n", os.Args[0])
} else {
var mr *mapreduce.Master
mr = mapreduce.Sequential("wcseq", os.Args[3:], 3, mapF, reduceF)
mr.Wait()
}
}
复制代码
一样,在理解了倒排索引的基础上设计咱们本身的 map 与 reduce 方法, 1,map:将读取文档,将文档中的单词做为 key,单词所在的文档做为 value,写入到中间文件中。 2,reduce:读取中间文件,按照键值对进行排序,将 key 相同的数据聚合到一块儿,将单词出现的文件名拼接在一块儿,写入到结果文件中。
package main
import (
"bytes"
"os"
"strconv"
"strings"
"unicode"
)
import "fmt"
import "6.824/src/mapreduce"
func mapF(document string, value string) (res []mapreduce.KeyValue) {
// Your code here (Part V).
words := strings.FieldsFunc(value, func(c rune) bool {
return !unicode.IsLetter(c)
})
for _, w := range words {
res = append(res, mapreduce.KeyValue{Key: w, Value: document})
}
return res
}
func reduceF(key string, values []string) string {
// Your code here (Part V).
sum := 0
var buffer bytes.Buffer
if key == "www" {
fmt.Println(values)
}
isExist := make(map[string]string)
for _, e := range values {
if _, ok := isExist[e]; !ok {
buffer.WriteString(e)
buffer.WriteString(",")
sum += 1
isExist[e] = e
}
}
iiRes := strconv.Itoa(sum) + " " + strings.TrimRight(buffer.String(), ",")
return iiRes
}
func main() {
if len(os.Args) < 4 {
fmt.Printf("%s: see usage comments in file\n", os.Args[0])
} else {
var mr *mapreduce.Master
mr = mapreduce.Sequential("iiseq", os.Args[3:], 3, mapF, reduceF)
mr.Wait()
}
}
复制代码
本文参考 Google 的论文,实现了一个单机版的 MapReduce 框架,并实现了两个简单的 MapReduce 实例,文中的代码能够在楼主的 GitHub 下载查看。
MapReduce: Simplified Data Processing on Large Clusters