MapReduce:超大机群上的简单数据处理程序员
摘要web
MapReduce是一个编程模型,和处理,产生大数据集的相关实现.用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.而后再指定一个reduce函数合并全部的具备相同中间key的中间value.下面将列举许多能够用这个模型来表示的现实世界的工做.数据库
以这种方式写的程序能自动的在大规模的普通机器上实现并行化.这个运行时系统关心这些细节:分割输入数据,在机群上的调度,机器的错误处理,管理机器之间必要的通讯.这样就可让那些没有并行分布式处理系统经验的程序员利用大量分布式系统的资源.编程
咱们的MapReduce实现运行在规模能够灵活调整的由普通机器组成的机群上,一个典型的MapReduce计算处理几千台机器上的以TB计算的数据.程序员发现这个系统很是好用:已经实现了数以百计的MapReduce程序,天天在Google的机群上都有1000多个MapReduce程序在执行.设计模式
1.介绍api
在过去的5年里,做者和Google的许多人已经实现了数以百计的为专门目的而写的计算来处理大量的原始数据,好比,爬行的文档,Web请求日志,等等.为了计算各类类型的派生数据,好比,倒排索引,Web文档的图结构的各类表示,每一个主机上爬行的页面数量的概要,天天被请求数量最多的集合,等等.不少这样的计算在概念上很容易理解.然而,输入的数据量很大,而且只有计算被分布在成百上千的机器上才能在能够接受的时间内完成.怎样并行计算,分发数据,处理错误,全部这些问题综合在一块儿,使得本来很简介的计算,由于要大量的复杂代码来处理这些问题,而变得让人难以处理.数组
做为对这个复杂性的回应,咱们设计一个新的抽象模型,它让咱们表示咱们将要执行的简单计算,而隐藏并行化,容错,数据分布,负载均衡的那些杂乱的细节,在一个库里.咱们的抽象模型的灵感来自Lisp和许多其余函数语言的map和reduce的原始表示.咱们认识到咱们的许多计算都包含这样的操做:在咱们输入数据的逻辑记录上应用map操做,来计算出一个中间key/value对集,在全部具备相同key的value上应用reduce操做,来适当的合并派生的数据.功能模型的使用,再结合用户指定的map和reduce操做,让咱们能够很是容易的实现大规模并行化计算,和使用再次执行做为初级机制来实现容错.缓存
这个工做的主要贡献是经过简单有力的接口来实现自动的并行化和大规模分布式计算,结合这个接口的实现来在大量普通的PC机上实现高性能计算.服务器
第二部分描述基本的编程模型,而且给一些例子.第三部分描述符合咱们的基于集群的计算环境的MapReduce的接口的实现.第四部分描述咱们以为编程模型中一些有用的技巧.第五部分对于各类不一样的任务,测量咱们实现的性能.第六部分探究在Google内部使用MapReduce做为基础来重写咱们的索引系统产品.第七部分讨论相关的,和将来的工做.网络
2.编程模型
计算利用一个输入key/value对集,来产生一个输出key/value对集.MapReduce库的用户用两个函数表达这个计算:map和reduce.
用户自定义的map函数,接受一个输入对,而后产生一个中间key/value对集.MapReduce库把全部具备相同中间key I的中间value聚合在一块儿,而后把它们传递给reduce函数.
用户自定义的reduce函数,接受一个中间key I和相关的一个value集.它合并这些value,造成一个比较小的value集.通常的,每次reduce调用只产生0或1个输出value.经过一个迭代器把中间value提供给用户自定义的reduce函数.这样可使咱们根据内存来控制value列表的大小.
2.1 实例
考虑这个问题:计算在一个大的文档集合中每一个词出现的次数.用户将写和下面相似的伪代码:
map(String key,String value):
//key:文档的名字
//value:文档的内容
for each word w in value:
EmitIntermediate(w,"1");
reduce(String key,Iterator values):
//key:一个词
//values:一个计数列表
int result=0;
for each v in values:
result+=ParseInt(v);
Emit(AsString(resut));
map函数产生每一个词和这个词的出现次数(在这个简单的例子里就是1).reduce函数把产生的每个特定的词的计数加在一块儿.
另外,用户用输入输出文件的名字和可选的调节参数来填充一个mapreduce规范对象.用户而后调用MapReduce函数,并把规范对象传递给它.用户的代码和MapReduce库连接在一块儿(用C++实现).附录A包含这个实例的所有文本.
2.2类型
即便前面的伪代码写成了字符串输入和输出的term格式,可是概念上用户写的map和reduce函数有关联的类型:
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
例如,输入的key,value和输出的key,value的域不一样.此外,中间key,value和输出key,values的域相同.
咱们的C++实现传递字符串来和用户自定义的函数交互,并把它留给用户的代码,来在字符串和适当的类型间进行转换.
2.3更多实例
这里有一些让人感兴趣的简单程序,能够容易的用MapReduce计算来表示.
分布式的Grep(UNIX工具程序, 可作文件内的字符串查找):若是输入行匹配给定的样式,map函数就输出这一行.reduce函数就是把中间数据复制到输出.
计算URL访问频率:map函数处理web页面请求的记录,输出(URL,1).reduce函数把相同URL的value都加起来,产生一个(URL,记录总数)的对.
倒转网络连接图:map函数为每一个连接输出(目标,源)对,一个URL叫作目标,包含这个URL的页面叫作源.reduce函数根据给定的相关目标URLs链接全部的源URLs造成一个列表,产生(目标,源列表)对.
每一个主机的术语向量:一个术语向量用一个(词,频率)列表来概述出如今一个文档或一个文档集中的最重要的一些词.map函数为每个输入文档产生一个(主机名,术语向量)对(主机名来自文档的URL).reduce函数接收给定主机的全部文档的术语向量.它把这些术语向量加在一块儿,丢弃低频的术语,而后产生一个最终的(主机名,术语向量)对.
倒排索引:map函数分析每一个文档,而后产生一个(词,文档号)对的序列.reduce函数接受一个给定词的全部对,排序相应的文档IDs,而且产生一个(词,文档ID列表)对.全部的输出对集造成一个简单的倒排索引.它能够简单的增长跟踪词位置的计算.
分布式排序:map函数从每一个记录提取key,而且产生一个(key,record)对.reduce函数不改变任何的对.这个计算依赖分割工具(在4.1描述)和排序属性(在4.2描述).
3实现
MapReduce接口可能有许多不一样的实现.根据环境进行正确的选择.例如,一个实现对一个共享内存较小的机器是合适的,另外的适合一个大NUMA的多处理器的机器,而有的适合一个更大的网络机器的集合.
这部分描述一个在Google普遍使用的计算环境的实现:用交换机链接的普通PC机的大机群.咱们的环境是:
1.Linux操做系统,双处理器,2-4GB内存的机器.
2.普通的网络硬件,每一个机器的带宽或者是百兆或者千兆,可是平均小于所有带宽的一半.
3.由于一个机群包含成百上千的机器,全部机器会常常出现问题.
4.存储用直接连到每一个机器上的廉价IDE硬盘.一个从内部文件系统发展起来的分布式文件系统被用来管理存储在这些磁盘上的数据.文件系统用复制的方式在不可靠的硬件上来保证可靠性和有效性.
5.用户提交工做给调度系统.每一个工做包含一个任务集,每一个工做被调度者映射到机群中一个可用的机器集上.
3.1执行预览
经过自动分割输入数据成一个有M个split的集,map调用被分布到多台机器上.输入的split可以在不一样的机器上被并行处理.经过用分割函数分割中间key,来造成R个片(例如,hash(key) mod R),reduce调用被分布到多台机器上.分割数量(R)和分割函数由用户来指定.
图1显示了咱们实现的MapReduce操做的所有流程.当用户的程序调用MapReduce的函数的时候,将发生下面的一系列动做(下面的数字和图1中的数字标签相对应):
1.在用户程序里的MapReduce库首先分割输入文件成M个片,每一个片的大小通常从 16到64MB(用户能够经过可选的参数来控制).而后在机群中开始大量的拷贝程序.
2.这些程序拷贝中的一个是master,其余的都是由master分配任务的worker.有M 个map任务和R个reduce任务将被分配.管理者分配一个map任务或reduce任务给一个空闲的worker.
3.一个被分配了map任务的worker读取相关输入split的内容.它从输入数据中分析出key/value对,而后把key/value对传递给用户自定义的map函数.由map函数产生的中间key/value对被缓存在内存中.
4.缓存在内存中的key/value对被周期性的写入到本地磁盘上,经过分割函数把它们写入R个区域.在本地磁盘上的缓存对的位置被传送给master,master负责把这些位置传送给reduce worker.
5.当一个reduce worker获得master的位置通知的时候,它使用远程过程调用来从map worker的磁盘上读取缓存的数据.当reduce worker读取了全部的中间数据后,它经过排序使具备相同key的内容聚合在一块儿.由于许多不一样的key映射到相同的reduce任务,因此排序是必须的.若是中间数据比内存还大,那么还须要一个外部排序.
6.reduce worker迭代排过序的中间数据,对于遇到的每个惟一的中间key,它把key和相关的中间value集传递给用户自定义的reduce函数.reduce函数的输出被添加到这个reduce分割的最终的输出文件中.
7.当全部的map和reduce任务都完成了,管理者唤醒用户程序.在这个时候,在用户程序里的MapReduce调用返回到用户代码.
在成功完成以后,mapreduce执行的输出存放在R个输出文件中(每个reduce任务产生一个由用户指定名字的文件).通常,用户不须要合并这R个输出文件成一个文件--他们常常把这些文件看成一个输入传递给其余的MapReduce调用,或者在能够处理多个分割文件的分布式应用中使用他们.
3.2master数据结构
master保持一些数据结构.它为每个map和reduce任务存储它们的状态(空闲,工做中,完成),和worker机器(非空闲任务的机器)的标识.
master就像一个管道,经过它,中间文件区域的位置从map任务传递到reduce任务.所以,对于每一个完成的map任务,master存储由map任务产生的R个中间文件区域的大小和位置.当map任务完成的时候,位置和大小的更新信息被接受.这些信息被逐步增长的传递给那些正在工做的reduce任务.
3.3容错
由于MapReduce库被设计用来使用成百上千的机器来帮助处理很是大规模的数据,因此这个库必需要能很好的处理机器故障.
worker故障
master周期性的ping每一个worker.若是master在一个肯定的时间段内没有收到worker返回的信息,那么它将把这个worker标记成失效.由于每个由这个失效的worker完成的map任务被从新设置成它初始的空闲状态,因此它能够被安排给其余的worker.一样的,每个在失败的worker上正在运行的map或reduce任务,也被从新设置成空闲状态,而且将被从新调度.
在一个失败机器上已经完成的map任务将被再次执行,由于它的输出存储在它的磁盘上,因此不可访问.已经完成的reduce任务将不会再次执行,由于它的输出存储在全局文件系统中.
当一个map任务首先被worker A执行以后,又被B执行了(由于A失效了),从新执行这个状况被通知给全部执行reduce任务的worker.任何尚未从A读数据的reduce任务将从worker B读取数据.
MapReduce能够处理大规模worker失败的状况.例如,在一个MapReduce操做期间,在正在运行的机群上进行网络维护引发80台机器在几分钟内不可访问了,MapReduce master只是简单的再次执行已经被不可访问的worker完成的工做,继续执行,最终完成这个MapReduce操做.
master失败
能够很容易的让管理者周期的写入上面描述的数据结构的checkpoints.若是这个master任务失效了,能够从上次最后一个checkpoint开始启动另外一个master进程.然而,由于只有一个master,因此它的失败是比较麻烦的,所以咱们如今的实现是,若是master失败,就停止MapReduce计算.客户能够检查这个状态,而且能够根据须要从新执行MapReduce操做.
在错误面前的处理机制
当用户提供的map和reduce操做对它的输出值是肯定的函数时,咱们的分布式实现产生,和所有程序没有错误的顺序执行同样,相同的输出.
咱们依赖对map和reduce任务的输出进行原子提交来完成这个性质.每一个工做中的任务把它的输出写到私有临时文件中.一个reduce任务产生一个这样的文件,而一个map任务产生R个这样的文件(一个reduce任务对应一个文件).当一个map任务完成的时候,worker发送一个消息给master,在这个消息中包含这R个临时文件的名字.若是master从一个已经完成的map任务再次收到一个完成的消息,它将忽略这个消息.不然,它在master的数据结构里记录这R个文件的名字.
当一个reduce任务完成的时候,这个reduce worker原子的把临时文件重命名成最终的输出文件.若是相同的reduce任务在多个机器上执行,多个重命名调用将被执行,并产生相同的输出文件.咱们依赖由底层文件系统提供的原子重命名操做来保证,最终的文件系统状态仅仅包含一个reduce任务产生的数据.
咱们的map和reduce操做大部分都是肯定的,而且咱们的处理机制等价于一个顺序的执行的这个事实,使得程序员能够很容易的理解程序的行为.当map或/和reduce操做是不肯定的时候,咱们提供虽然比较弱可是合理的处理机制.当在一个非肯定操做的前面,一个reduce任务R1的输出等价于一个非肯定顺序程序执行产生的输出.然而,一个不一样的reduce任务R2的输出也许符合一个不一样的非肯定顺序程序执行产生的输出.
考虑map任务M和reduce任务R1,R2的状况.咱们设定e(Ri)为已经提交的Ri的执行(有且仅有一个这样的执行).这个比较弱的语义出现,由于e(R1)也许已经读取了由M的执行产生的输出,而e(R2)也许已经读取了由M的不一样执行产生的输出.
3.4存储位置
在咱们的计算机环境里,网络带宽是一个至关缺少的资源.咱们利用把输入数据(由GFS管理)存储在机器的本地磁盘上来保存网络带宽.GFS把每一个文件分红64MB的一些块,而后每一个块的几个拷贝存储在不一样的机器上(通常是3个拷贝).MapReduce的master考虑输入文件的位置信息,而且努力在一个包含相关输入数据的机器上安排一个map任务.若是这样作失败了,它尝试在那个任务的输入数据的附近安排一个map任务(例如,分配到一个和包含输入数据块在一个switch里的worker机器上执行).当运行巨大的MapReduce操做在一个机群中的一部分机器上的时候,大部分输入数据在本地被读取,从而不消耗网络带宽.
3.5任务粒度
象上面描述的那样,咱们细分map阶段成M个片,reduce阶段成R个片.M和R应当比worker机器的数量大许多.每一个worker执行许多不一样的工做来提升动态负载均衡,也能够加速从一个worker失效中的恢复,这个机器上的许多已经完成的map任务能够被分配到全部其余的worker机器上.
在咱们的实现里,M和R的范围是有大小限制的,由于master必须作O(M+R)次调度,而且保存O(M*R)个状态在内存中.(这个因素使用的内存是不多的,在O(M*R)个状态片里,大约每一个map任务/reduce任务对使用一个字节的数据).
此外,R常常被用户限制,由于每个reduce任务最终都是一个独立的输出文件.实际上,咱们倾向于选择M,以便每个单独的任务大概都是16到64MB的输入数据(以便上面描述的位置优化是最有效的),咱们把R设置成咱们但愿使用的worker机器数量的小倍数.咱们常常执行MapReduce计算,在M=200000,R=5000,使用2000台工做者机器的状况下.
3.6备用任务
一个落后者是延长MapReduce操做时间的缘由之一:一个机器花费一个异乎寻常地的长时间来完成最后的一些map或reduce任务中的一个.有不少缘由可能产生落后者.例如,一个有坏磁盘的机器常常发生能够纠正的错误,这样就使读性能从30MB/s下降到3MB/s.机群调度系统也许已经安排其余的任务在这个机器上,因为计算要使用CPU,内存,本地磁盘,网络带宽的缘由,引发它执行MapReduce代码很慢.咱们最近遇到的一个问题是,一个在机器初始化时的Bug引发处理器缓存的失效:在一个被影响的机器上的计算性能有上百倍的影响.
咱们有一个通常的机制来减轻这个落后者的问题.当一个MapReduce操做将要完成的时候,master调度备用进程来执行那些剩下的还在执行的任务.不管是原来的仍是备用的执行完成了,工做都被标记成完成.咱们已经调整了这个机制,一般只会占用多几个百分点的机器资源.咱们发现这能够显著的减小完成大规模MapReduce操做的时间.做为一个例子,将要在5.3描述的排序程序,在关闭掉备用任务的状况下,要比有备用任务的状况下多花44%的时间.
4技巧
尽管简单的map和reduce函数的功能对于大多数需求是足够的了,可是咱们开发了一些有用的扩充.这些将在这个部分描述.
4.1分割函数
MapReduce用户指定reduce任务和reduce任务须要的输出文件的数量.在中间key上使用分割函数,使数据分割后经过这些任务.一个缺省的分割函数使用hash方法(例如,hash(key) mod R).这个致使很是平衡的分割.而后,有的时候,使用其余的key分割函数来分割数据有很是有用的.例如,有时候,输出的key是URLs,而且咱们但愿每一个主机的全部条目保持在同一个输出文件中.为了支持像这样的状况,MapReduce库的用户能够提供专门的分割函数.例如,使用"hash(Hostname(urlkey)) mod R"做为分割函数,使全部来自同一个主机的URLs保存在同一个输出文件中.
4.2顺序保证
咱们保证在一个给定的分割里面,中间key/value对以key递增的顺序处理.这个顺序保证可使每一个分割产出一个有序的输出文件,当输出文件的格式须要支持有效率的随机访问key的时候,或者对输出数据集再做排序的时候,就很容易.
4.3combiner函数
在某些状况下,容许中间结果key重复会占据至关的比重,而且用户定义的reduce函数
知足结合律和交换律.一个很好的例子就是在2.1部分的词统计程序.由于词频率倾向于一个zipf分布(齐夫分布),每一个map任务将产生成百上千个这样的记录<the,1>.全部的这些计数将经过网络被传输到一个单独的reduce任务,而后由reduce函数加在一块儿产生一个数字.咱们容许用户指定一个可选的combiner函数,先在本地进行合并一下,而后再经过网络发送.
在每个执行map任务的机器上combiner函数被执行.通常的,相同的代码被用在combiner和reduce函数.在combiner和reduce函数之间惟一的区别是MapReduce库怎样控制函数的输出.reduce函数的输出被保存最终输出文件里.combiner函数的输出被写到中间文件里,而后被发送给reduce任务.
部分使用combiner能够显著的提升一些MapReduce操做的速度.附录A包含一个使用combiner函数的例子.
4.4输入输出类型
MapReduce库支持以几种不一样的格式读取输入数据.例如,文本模式输入把每一行看做是一个key/value对.key是文件的偏移量,value是那一行的内容.其余普通的支持格式以key的顺序存储key/value对序列.每个输入类型的实现知道怎样把输入分割成对每一个单独的map任务来讲是有意义的(例如,文本模式的范围分割确保仅仅在每行的边界进行范围分割).虽然许多用户仅仅使用不多的预约意输入类型的一个,可是用户能够经过提供一个简单的reader接口来支持一个新的输入类型.
一个reader没必要要从文件里读数据.例如,咱们能够很容易的定义它从数据库里读记录,或从内存中的数据结构读取.
4.5反作用
有的时候,MapReduce的用户发如今map操做或/和reduce操做时产生辅助文件做为一个附加的输出是很方便的.咱们依靠应用程序写来使这个反作用成为原子的.通常的,应用程序写一个临时文件,而后一旦这个文件所有产生完,就自动的被重命名.
对于单个任务产生的多个输出文件来讲,咱们没有提供其上的两阶段提交的原子操做支持.所以,一个产生须要交叉文件链接的多个输出文件的任务,应该使肯定性的任务.不过这个限制在实际的工做中并非一个问题.
4.6跳过错误记录
有的时候由于用户的代码里有bug,致使在某一个记录上map或reduce函数忽然crash掉.这样的bug使得MapReduce操做不能完成.虽然通常是修复这个bug,可是有时候这是不现实的;也许这个bug是在源代码不可获得的第三方库里.有的时候也能够忽略一些记录,例如,当在一个大的数据集上进行统计分析.咱们提供一个可选的执行模式,在这个模式下,MapReduce库检测那些记录引发的crash,而后跳过那些记录,来继续执行程序.
每一个worker程序安装一个信号处理器来获取内存段异常和总线错误.在调用一个用户自定义的map或reduce操做以前,MapReduce库把记录的序列号存储在一个全局变量里.若是用户代码产生一个信号,那个信号处理器就会发送一个包含序号的"last gasp"UDP包给MapReduce的master.当master不止一次看到同一个记录的时候,它就会指出,当相关的map或reduce任务再次执行的时候,这个记录应当被跳过.
4.7本地执行
调试在map或reduce函数中问题是很困难的,由于实际的计算发生在一个分布式的系统中,常常是有一个master动态的分配工做给几千台机器.为了简化调试和测试,咱们开发了一个可替换的实现,这个实如今本地执行全部的MapReduce操做.用户能够控制执行,这样计算能够限制到特定的map任务上.用户以一个标志调用他们的程序,而后能够容易的使用他们认为好用的任何调试和测试工具(例如,gdb).
4.8状态信息
master运行一个HTTP服务器,而且能够输出一组情况页来供人们使用.状态页显示计算进度,象多少个任务已经完成,多少个还在运行,输入的字节数,中间数据字节数,输出字节数,处理百分比,等等.这个页也包含到标准错误的连接,和由每一个任务产生的标准输出的连接.用户能够根据这些数据预测计算须要花费的时间,和是否须要更多的资源.当计算比预期的要慢不少的时候,这些页面也能够被用来判断是否是这样.
此外,最上面的状态页显示已经有多少个工做者失败了,和当它们失败的时候,那个map和reduce任务正在运行.当试图诊断在用户代码里的bug时,这个信息也是有用的.
4.9计数器
MapReduce库提供一个计数器工具,来计算各类事件的发生次数.例如,用户代码想要计算全部处理的词的个数,或者被索引的德文文档的数量.
为了使用这个工具,用户代码建立一个命名的计数器对象,而后在map或/和reduce函数里适当的增长计数器.例如:
Counter * uppercase;
uppercase=GetCounter("uppercase");
map(String name,String contents):
for each word w in contents:
if(IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w,"1");
来自不一样worker机器上的计数器值被周期性的传送给master(在ping回应里).master把来自成功的map和reduce任务的计数器值加起来,在MapReduce操做完成的时候,把它返回给用户代码.当前计数器的值也被显示在master状态页里,以便人们能够查看实际的计算进度.当计算计数器值的时候消除重复执行的影响,避免数据的累加.(在备用任务的使用,和因为出错的从新执行,能够产生重复执行)
有些计数器值被MapReduce库自动的维护,好比,被处理的输入key/value对的数量,和被产生的输出key/value对的数量.
用户发现计数器工具对于检查MapReduce操做的完整性颇有用.例如,在一些MapReduce操做中,用户代码也许想要确保输出对的数量彻底等于输入对的数量,或者处理过的德文文档的数量是在所有被处理的文档数量中属于合理的范围.
5性能
在本节,咱们用在一个大型集群上运行的两个计算来衡量MapReduce的性能.一个计算用来在一个大概1TB的数据中查找特定的匹配串.另外一个计算排序大概1TB的数据.
这两个程序表明了MapReduce的用户实现的真实的程序的一个大子集.一类是,把数据从一种表示转化到另外一种表示.另外一类是,从一个大的数据集中提取少许的关心的数据.
5.1机群配置
全部的程序在包含大概1800台机器的机群上执行.机器的配置是:2个2G的Intel Xeon超线程处理器,4GB内存,两个160GB IDE磁盘,一个千兆网卡.这些机器部署在一个由两层的,树形交换网络中,在根节点上大概有100到2000G的带宽.全部这些机器都有相同的部署(对等部署),所以任意两点之间的来回时间小于1毫秒.
在4GB的内存里,大概有1-1.5GB被用来运行在机群中其余的任务.这个程序是在周末的下午开始执行的,这个时候CPU,磁盘,网络基本上是空闲的.
5.2Grep
这个Grep程序扫描大概10^10个,每一个100字节的记录,查找比较少的3字符的查找串(这个查找串出如今92337个记录中).输入数据被分割成大概64MB的片(M=15000),所有 的输出存放在一个文件中(R=1).
图2显示计算过程随时间变化的状况.Y轴表示输入数据被扫描的速度.随着更多的机群被分配给这个MapReduce计算,速度在逐步的提升,当有1764个worker的时候这个速度达到最高的30GB/s.当map任务完成的时候,速度开始降低,在计算开始后80秒,输入的速度降到0.这个计算持续的时间大概是150秒.这包括了前面大概一分钟的启动时间.启动时间用来把程序传播到全部的机器上,等待GFS打开1000个输入文件,获得必要的位置优化信息.
5.3排序
这个sort程序排序10^10个记录,每一个记录100个字节(大概1TB的数据).这个程序是模仿TeraSort的.
这个排序程序只包含不到50行的用户代码.其中有3行map函数用来从文本行提取10字节的排序key,而且产生一个由这个key和原始文本行组成的中间key/value对.咱们使用一个内置的Identity函数做为reduce操做.这个函数直接把中间key/value对做为输出的key/value对.最终的排序输出写到一个2路复制的GFS文件中(也就是,程序的输出会写2TB的数据).
象之前同样,输入数据被分割成64MB的片(M=15000).咱们把排序后的输出写到4000个文件中(R=4000).分区函数使用key的原始字节来把数据分区到R个小片中.
咱们以这个基准的分割函数,知道key的分布状况.在通常的排序程序中,咱们会增长一个预处理的MapReduce操做,这个操做用于采样key的状况,而且用这个采样的key的分布状况来计算对最终排序处理的分割点。
图3(a)显示这个排序程序的正常执行状况.左上图显示输入数据的读取速度.这个速度最高到达13GB/s,而且在不到200秒全部map任务完成以后迅速滑落到0.注意到这个输入速度小于Grep.这是由于这个排序map任务花费大概一半的时间和带宽,来把中间数据写到本地硬盘中.而Grep相关的中间数据能够忽略不计.
左中图显示数据经过网络从map任务传输给reduce任务的速度.当第一个map任务完成后,这个排序过程就开始了.图示上的第一个高峰是启动了第一批大概1700个reduce任务(整个MapReduce任务被分配到1700台机器上,每一个机器一次只执行一个reduce任务).大概开始计算后的300秒,第一批reduce任务中的一些完成了,咱们开始执行剩下的reduce任务.所有的排序过程持续了大概600秒的时间.
左下图显示排序后的数据被reduce任务写入最终文件的速度.由于机器忙于排序中间数据,因此在第一个排序阶段的结束和写阶段的开始有一个延迟.写的速度大概是2-4GB/s.大概开始计算后的850秒写过程结束.包括前面的启动过程,所有的计算任务持续的891秒.这个和TeraSort benchmark的最高纪录1057秒差很少.
须要注意的事情是:所以位置优化的缘由,不少数据都是从本地磁盘读取的而没有经过咱们有限带宽的网络,因此输入速度比排序速度和输出速度都要快.排序速度比输出速度快的缘由是输出阶段写两个排序后数据的拷贝(咱们写两个副本的缘由是为了可靠性和可用性).咱们写两份的缘由是由于底层文件系统的可靠性和可用性的要求.若是底层文件系统用相似容错编码(erasure coding)的方式,而不采用复制写的方式,在写盘阶段能够下降网络带宽的要求。
5.4备用任务的影响
在图3(b)中,显示咱们不用备用任务的排序程序的执行状况.除了它有一个很长的几乎没有写动做发生的尾巴外,执行流程和图3(a)类似.在960秒后,只有5个reduce任务没有完成.然而,就是这最后几个落后者知道300秒后才完成.所有的计算任务执行了1283秒,多花了44%的时间.
5.5机器失效
在图3(c)中,显示咱们有意的在排序程序计算过程当中中止1746台worker中的200台机器上的程序的状况.底层机群调度者在这些机器上立刻从新开始新的worker程序(由于仅仅程序被中止,而机器仍然在正常运行).
由于已经完成的map工做丢失了(因为相关的map worker被杀掉了),须要从新再做,因此worker死掉会致使一个负数的输入速率.相关map任务的从新执行很快就从新执行了.整个计算过程在933秒内完成,包括了前边的启动时间(只比正常执行时间多了5%的时间).
6经验
咱们在2003年的2月写了MapReduce库的第一个版本,而且在2003年的8月作了显著的加强,包括位置优化,worker机器间任务执行的动态负载均衡,等等.从那个时候起,咱们惊奇的发现MapReduce函数库普遍用于咱们平常处理的问题.它如今在Google内部各个领域内普遍应用,包括:
大规模机器学习问题
Google News和Froogle产品的机器问题.
提取数据产生一个流行查询的报告(例如,Google Zeitgeist).
为新的试验和产品提取网页的属性(例如,从一个web页的大集合中提取位置信息 用在位置查询).
大规模的图计算.
图4显示了咱们主要的源代码管理系统中,随着时间推移,MapReduce程序的显著增长,从2003年早先时候的0个增加到2004年9月份的差很少900个不一样的程序.MapReduce之因此这样的成功,是由于他可以在不到半小时时间内写出一个简单的可以应用于上千台机器的大规模并发程序,而且极大的提升了开发和原形设计的周期效率.而且,他可让一个彻底没有分布式和/或并行系统经验的程序员,可以很容易的利用大量的资源.
在每个任务结束的时候,MapReduce函数库记录使用的计算资源的统计信息.在图1里,咱们列出了2004年8月份在Google运行的一些MapReduce的工做的统计信息.
6.1大规模索引
到目前为止,最成功的MapReduce的应用就是重写了Google web 搜索服务所使用到的index系统.索引系统处理爬虫系统抓回来的超大量的文档集,这些文档集保存在GFS文件里.这些文档的原始内容的大小,超过了20TB.索引程序是经过一系列的,大概5到10次MapReduce操做来创建索引.经过利用MapReduce(替换掉上一个版本的特别设计的分布处理的索引程序版本)有这样一些好处:
索引的代码简单,量少,容易理解,由于容错,分布式,并行处理都隐藏在MapReduce库中了.例如,当使用MapReduce函数库的时候,计算的代码行数从原来的3800行C++代码一下减小到大概700行代码.
MapReduce的函数库的性能已经很是好,因此咱们能够把概念上不相关的计算步骤分开处理,而不是混在一块儿以期减小在数据上的处理.这使得改变索引过程很容易.例如,咱们对老索引系统的一个小更改可能要好几个月的时间,可是在新系统内,只须要花几天时间就能够了.
索引系统的操做更容易了,这是由于机器的失效,速度慢的机器,以及网络失效都已经由MapReduce本身解决了,而不须要操做人员的交互.另外,咱们能够简单的经过对索引系统增长机器的方式提升处理性能.
7相关工做
不少系统都提供了严格的设计模式,而且经过对编程的严格限制来实现自动的并行计算.例如,一个结合函数能够经过N个元素的数组的前缀在N个处理器上使用并行前缀计算在log N的时间内计算完.MapReduce是基于咱们的大型现实计算的经验,对这些模型的一个简化和精炼.而且,咱们还提供了基于上千台处理器的容错实现.而大部分并发处理系统都只在小规模的尺度上实现,而且机器的容错仍是程序员来控制的.
Bulk Synchronous Programming以及一些MPI primitives提供了更高级别的抽象,能够更容易写出并行处理的程序.这些系统和MapReduce系统的不一样之处在,MapReduce利用严格的编程模式自动实现用户程序的并发处理,而且提供了透明的容错处理.
咱们本地的优化策略是受active disks等技术的启发,在active disks中,计算任务是尽可能推送到靠近本地磁盘的处理单元上,这样就减小了经过I/O子系统或网络的数据量.咱们在少许磁盘直接链接到普通处理机运行,来代替直接链接到磁盘控制器的处理机上,可是通常的步骤是类似的.
咱们的备用任务的机制和在Charlotte系统上的积极调度机制类似.这个简单的积极调度的一个缺陷是,若是一个任务引发了一个重复性的失败,那个整个计算将没法完成.咱们经过在故障状况下跳过故障记录的机制,在某种程度上解决了这个问题.
MapReduce实现依赖一个内置的机群管理系统来在一个大规模共享机器组上分布和运行用户任务.虽然这个不是本论文的重点,可是集群管理系统在理念上和Condor等其余系统是同样的.
在MapReduce库中的排序工具在操做上和NOW-Sort类似.源机器(map worker)分割将要被排序的数据,而后把它发送到R个reduce worker中的一个上.每一个reduce worker来本地排序它的数据(若是可能,就在内存中).固然,NOW-Sort没有用户自定义的map和reduce函数,使得咱们的库能够普遍的应用.
River提供一个编程模型,在这个模型下,处理进程能够靠在分布式的队列上发送数据进行彼此通信.和MapReduce同样,River系统尝试提供对不一样应用有近似平均的性能,即便在不对等的硬件环境下或者在系统颠簸的状况下也能提供近似平均的性.River是经过精心调度硬盘和网络的通信,来平衡任务的完成时间.MapReduce不和它不一样.利用严格编程模型,MapReduce构架来把问题分割成大量的任务.这些任务被自动的在可用的worker上调度,以便速度快的worker能够处理更多的任务.这个严格编程模型也让咱们能够在工做快要结束的时候安排冗余的执行,来在非一致处理的状况减小完成时间(好比,在有慢机或者阻塞的worker的时候).
BAD-FS是一个很MapReduce彻底不一样的编程模型,它的目标是在一个广阔的网络上执行工做.然而,它们有两个基本原理是相同的.(1)这两个系统使用冗余的执行来从由失效引发的数据丢失中恢复.(2)这两个系统使用本地化调度策略,来减小经过拥挤的网络链接发送的数据数量.
TACC是一个被设计用来简化高有效性网络服务结构的系统.和MapReduce同样,它经过再次执行来实现容错.
8结束语
MapReduce编程模型已经在Google成功的用在不一样的目的.咱们把这个成功归于如下几个缘由:第一,这个模型使用简单,甚至对没有并行和分布式经验的程序员也是如此,由于它隐藏了并行化,容错,位置优化和负载均衡的细节.第二,大量不一样的问题能够用MapReduce计算来表达.例如,MapReduce被用来,为Google的产品web搜索服务,排序,数据挖掘,机器学习,和其余许多系统,产生数据.第三,咱们已经在一个好几千台计算机的大型集群上开发实现了这个MapReduce.这个实现使得对于这些机器资源的利用很是简单,所以也适用于解决Google遇到的其余不少须要大量计算的问题.
从这个工做中咱们也学习到了一些东西.首先,严格的编程模型使得并行化和分布式计算简单,而且也易于构造这样的容错计算环境.第二,网络带宽是系统的瓶颈.所以在咱们的系统中大量的优化目标是减小经过网络发送的数据量,本地优化使用咱们从本地磁盘读取数据,而且把中间数据写到本地磁盘,以保留网络带宽.第三,冗余的执行能够用来减小速度慢的机器的影响,和控制机器失效和数据丢失.
感谢
Josh Levenberg校定和扩展了用户级别的MapReduce API,而且结合他的适用经验和其余人的改进建议,增长了不少新的功能.MapReduce从GFS中读取和写入数据.咱们要感谢Mohit Aron,Howard Gobioff,Markus Gutschke,David Krame,Shun-Tak Leung,和Josh Redstone,他们在开发GFS中的工做.咱们还感谢Percy Liang Olcan Sercinoglu 在开发用于MapReduce的集群管理系统得工做.Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach为本论文提出了宝贵的意见.OSDI的无名审阅者,以及咱们的审核者Eric Brewer,在论文应当如何改进方面给出了有益的意见.最后,咱们感谢Google的工程部的全部MapReduce的用户,感谢他们提供了有用的反馈,建议,以及错误报告等等.
A单词频率统计
本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中,每个不一样的单词出现频率.
#include "mapreduce/mapreduce.h"
//用户map函数
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
//跳过前导空格
while ((i < n) && isspace(text[i]))
i++;
// 查找单词的结束位置
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),"1");
}
}
};
REGISTER_MAPPER(WordCounter);
//用户的reduce函数
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
//迭代具备相同key的全部条目,而且累加它们的value
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
//提交这个输入key的综合
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// 把输入文件列表存入"spec"
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format("text");
input->set_filepattern(argv[i]);
input->set_mapper_class("WordCounter");
}
//指定输出文件:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// ...
MapReduceOutput* out = spec.output();
out->set_filebase("/gfs/test/freq");
out->set_num_tasks(100);
out->set_format("text");
out->set_reducer_class("Adder");
// 可选操做:在map任务中作部分累加工做,以便节省带宽
out->set_combiner_class("Adder");
// 调整参数: 使用2000台机器,每一个任务100MB内存
spec.set_machines(2000);
spec.set_map_megabytes(100);
spec.set_reduce_megabytes(100);
// 运行它
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
// 完成: 'result'结构包含计数,花费时间,和使用机器的信息
return 0;
}