Google MapReduce中文版
译者: alex
摘要
MapReduce 是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先建立一个Map函数处理一个基于key/value pair的数据集合,输出中间的基于key/value pair的数据集合;而后再建立一个Reduce函数用来合并全部的具备相同中间key值的中间value值。现实世界中有不少知足上述处理模型的例子, 本论文将详细描述这个模型。
MapReduce架构的程序可以在大量的 普通配置的计算机上实现并行化处理。这个系统在运行时只关心:如何分割输入数据,在大量计算机组成的集群上的调度,集群中计算机的错误处理,管理集群中计 算机之间必要的通讯。采用MapReduce架构可使那些没有并行计算和分布式处理系统开发经验的程序员有效利用分布式系统的丰富资源。
我 们的MapReduce实现运行在规模能够灵活调整的由普通机器组成的集群上:一个典型的MapReduce计算每每由几千台机器组成、处理以TB计算的 数据。程序员发现这个系统很是好用:已经实现了数以百计的MapReduce程序,在Google的集群上,天天都有1000多个MapReduce程序 在执行。
一、介绍
在 过去的5年里,包括本文做者在内的Google的不少程序员,为了处理海量的原始数据,已经实现了数以百计的、专用的计算方法。这些计算方法用来处理大量 的原始数据,好比,文档抓取(相似网络爬虫的程序)、Web请求日志等等;也为了计算处理各类类型的衍生数据,好比倒排索引、Web文档的图结构的各类表 示形势、每台主机上网络爬虫抓取的页面数量的汇总、天天被请求的最多的查询的集合等等。大多数这样的数据处理运算在概念上很容易理解。然而因为输入的数据 量巨大,所以要想在可接受的时间内完成运算,只有将这些计算分布在成百上千的主机上。如何处理并行计算、如何分发数据、如何处理错误?全部这些问题综合在 一块儿,须要大量的代码处理,所以也使得本来简单的运算变得难以处理。
为 了解决上述复杂的问题,咱们设计一个新的抽象模型,使用这个抽象模型,咱们只要表述咱们想要执行的简单运算便可,而没必要关心并行计算、容错、数据分布、负 载均衡等复杂的细节,这些问题都被封装在了一个库里面。设计这个抽象模型的灵感来自Lisp和许多其余函数式语言的Map和Reduce的原语。咱们意识 到咱们大多数的运算都包含这样的操做:在输入数据的“逻辑”记录上应用Map操做得出一个中间key/value pair集合,而后在全部具备相同key值的value值上应用Reduce操做,从而达到合并中间的数据,获得一个想要的结果的目的。使用 MapReduce模型,再结合用户实现的Map和Reduce函数,咱们就能够很是容易的实现大规模并行化计算;经过MapReduce模型自带的“再 次执行”(re-execution)功能,也提供了初级的容灾实现方案。
这个工做(实现一个MapReduce框架模型)的主要贡献是经过简单的接口来实现自动的并行化和大规模的分布式计算,经过使用MapReduce模型接口实如今大量普通的PC机上高性能计算。
第 二部分描述基本的编程模型和一些使用案例。第三部分描述了一个通过裁剪的、适合咱们的基于集群的计算环境的MapReduce实现。第四部分描述咱们认为 在MapReduce编程模型中一些实用的技巧。第五部分对于各类不一样的任务,测量咱们MapReduce实现的性能。第六部分揭示了在Google内部 如何使用MapReduce做为基础重写咱们的索引系统产品,包括其它一些使用MapReduce的经验。第七部分讨论相关的和将来的工做。
二、编程模型
MapReduce编程模型的原理是:利用一个输入key/value pair集合来产生一个输出的key/value pair集合。MapReduce库的用户用两个函数表达这个计算:Map和Reduce。
用户自定义的Map函数接受一个输入的key/value pair值,而后产生一个中间key/value pair值的集合。MapReduce库把全部具备相同中间key值I的中间value值集合在一块儿后传递给reduce函数。
用 户自定义的Reduce函数接受一个中间key的值I和相关的一个value值的集合。Reduce函数合并这些value值,造成一个较小的value 值的集合。通常的,每次Reduce函数调用只产生0或1个输出value值。一般咱们经过一个迭代器把中间value值提供给Reduce函数,这样我 们就能够处理没法所有放入内存中的大量的value值的集合。
2.一、例子
例如,计算一个大的文档集合中每一个单词出现的次数,下面是伪代码段:
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, “1″);
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
Map函数输出文档中的每一个词、以及这个词的出现次数(在这个简单的例子里就是1)。Reduce函数把Map函数产生的每个特定的词的计数累加起来。
另 外,用户编写代码,使用输入和输出文件的名字、可选的调节参数来完成一个符合MapReduce模型规范的对象,而后调用MapReduce函数,并把这 个规范对象传递给它。用户的代码和MapReduce库连接在一块儿(用C++实现)。附录A包含了这个实例的所有程序代码。
2.二、类型
尽管在前面例子的伪代码中使用了以字符串表示的输入输出值,可是在概念上,用户定义的Map和Reduce函数都有相关联的类型:
map(k1,v1) ->list(k2,v2)
reduce(k2,list(v2)) ->list(v2)
好比,输入的key和value值与输出的key和value值在类型上推导的域不一样。此外,中间key和value值与输出key和value值在类型上推导的域相同。
(alex注:原文中这个domain的含义不是很清楚,我参考Hadoop、KFS等实现,map和reduce都使用了泛型,所以,我把domain翻译成类型推导的域)。
咱们的C++中使用字符串类型做为用户自定义函数的输入输出,用户在本身的代码中对字符串进行适当的类型转换。html
2.三、更多的例子
这里还有一些有趣的简单例子,能够很容易的使用MapReduce模型来表示:
-
分布式的Grep:Map函数输出匹配某个模式的一行,Reduce函数是一个恒等函数,即把中间数据复制到输出。
-
计算URL访问频率:Map函数处理日志中web页面请求的记录,而后输出(URL,1)。Reduce函数把相同URL的value值都累加起来,产生(URL,记录总数)结果。
-
倒转网络连接图:Map函数在源页面(source)中搜索全部的连接目标(target)并输出为(target,source)。Reduce函数把给定连接目标(target)的连接组合成一个列表,输出(target,list(source))。
-
每 个主机的检索词向量:检索词向量用一个(词,频率)列表来概述出如今文档或文档集中的最重要的一些词。Map函数为每个输入文档输出(主机名,检索词向 量),其中主机名来自文档的URL。Reduce函数接收给定主机的全部文档的检索词向量,并把这些检索词向量加在一块儿,丢弃掉低频的检索词,输出一个最 终的(主机名,检索词向量)。
-
倒排索引:Map函数分析每一个文档输出一个(词,文档号)的列表,Reduce函数的输入是一个给定词的全部(词,文档号),排序全部的文档号,输出(词,list(文档号))。全部的输出集合造成一个简单的倒排索引,它以一种简单的算法跟踪词在文档中的位置。
-
分布式排序:Map函数从每一个记录提取key,输出(key,record)。Reduce函数不改变任何的值。这个运算依赖分区机制(在4.1描述)和排序属性(在4.2描述)。
三、实现
MapReduce模型能够有多种不一样的实现方式。如何正确选择取决于具体的环境。例如,一种实现方式适用于小型的共享内存方式的机器,另一种实现方式则适用于大型NUMA架构的多处理器的主机,而有的实现方式更适合大型的网络链接集群。
本章节描述一个适用于Google内部普遍使用的运算环境的实现:用以太网交换机链接、由普通PC机组成的大型集群。在咱们的环境里包括:
1.x86架构、运行Linux操做系统、双处理器、2-4GB内存的机器。
2.普通的网络硬件设备,每一个机器的带宽为百兆或者千兆,可是远小于网络的平均带宽的一半。
(alex注:这里须要网络专家解释一下了)
3.集群中包含成百上千的机器,所以,机器故障是常态。
4.存储为廉价的内置IDE硬盘。一个内部分布式文件系统用来管理存储在这些磁盘上的数据。文件系统经过数据复制来在不可靠的硬件上保证数据的可靠性和有效性。
5.用户提交工做(job)给调度系统。每一个工做(job)都包含一系列的任务(task),调度系统将这些任务调度到集群中多台可用的机器上。
3.一、执行归纳
通 过将Map调用的输入数据自动分割为M个数据片断的集合,Map调用被分布到多台机器上执行。输入的数据片断可以在不一样的机器上并行处理。使用分区函数将 Map调用产生的中间key值分红R个不一样分区(例如,hash(key) mod R),Reduce调用也被分布到多台机器上执行。分区数量(R)和分区函数由用户来指定。
图1展现了咱们的MapReduce实现中操做的所有流程。当用户调用MapReduce函数时,将发生下面的一系列动做(下面的序号和图1中的序号一一对应):
1.用户程序首先调用的MapReduce库将输入文件分红M个数据片度,每一个数据片断的大小通常从 16MB到64MB(能够经过可选的参数来控制每一个数据片断的大小)。而后用户程序在机群中建立大量的程序副本。
(alex:copies of the program还真难翻译)
2.这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是worker程序,由master分配任务。有M个Map任务和R个Reduce任务将被分配,master将一个Map任务或Reduce任务分配给一个空闲的worker。
3.被分配了map任务的worker程序读取相关的输入数据片断,从输入的数据片断中解析出key/value pair,而后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value pair,并缓存在内存中。
4.缓存中的key/value pair经过分区函数分红R个区域,以后周期性的写入到本地磁盘上。缓存的key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。
5.当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主机的磁盘上读取这些缓存数据。当Reduce worker读取了全部的中间数据后,经过对key进行排序后使得具备相同key值的数据聚合在一块儿。因为许多不一样的key值会映射到相同的Reduce 任务上,所以必须进行排序。若是中间数据太大没法在内存中完成排序,那么就要在外部进行排序。
6.Reduce worker程序遍历排序后的中间数据,对于每个惟一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出文件。
7.当全部的Map和Reduce任务都完成以后,master唤醒用户程序。在这个时候,在用户程序里的对MapReduce调用才返回。
在 成功完成任务以后,MapReduce的输出存放在R个输出文件中(对应每一个Reduce任务产生一个输出文件,文件名由用户指定)。通常状况下,用户不 须要将这R个输出文件合并成一个文件–他们常常把这些文件做为另一个MapReduce的输入,或者在另一个能够处理多个分割文件的分布式应用中使 用。程序员
3.二、Master数据结构
Master持有一些数据结构,它存储每个Map和Reduce任务的状态(空闲、工做中或完成),以及Worker机器(非空闲任务的机器)的标识。
Master 就像一个数据管道,中间文件存储区域的位置信息经过这个管道从Map传递到Reduce。所以,对于每一个已经完成的Map任务,master存储了Map 任务产生的R个中间文件存储区域的大小和位置。当Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工做的 Reduce任务。
3.三、容错
由于MapReduce库的设计初衷是使用由成百上千的机器组成的集群来处理超大规模的数据,因此,这个库必需要能很好的处理机器故障。
worker故障
master 周期性的ping每一个worker。若是在一个约定的时间范围内没有收到worker返回的信息,master将把这个worker标记为失效。全部由这 个失效的worker完成的Map任务被重设为初始的空闲状态,以后这些任务就能够被安排给其余的worker。一样的,worker失效时正在运行的 Map或Reduce任务也将被从新置为空闲状态,等待从新调度。
当worker故障时,因为已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,所以必须从新执行。而已经完成的Reduce任务的输出存储在全局文件系统上,所以不须要再次执行。web
当 一个Map任务首先被worker A执行,以后因为worker A失效了又被调度到worker B执行,这个“从新执行”的动做会被通知给全部执行Reduce任务的worker。任何尚未从worker A读取数据的Reduce任务将从worker B读取数据。
MapReduce 能够处理大规模worker失效的状况。好比,在一个MapReduce操做执行期间,在正在运行的集群上进行网络维护引发80台机器在几分钟内不可访问 了,MapReduce master只须要简单的再次执行那些不可访问的worker完成的工做,以后继续执行未完成的任务,直到最终完成这个MapReduce操做。
master失败
一个简单的解决办法是让master周期性的将上面描述的数据结构
(alex注:指3.2节)的 写入磁盘,即检查点(checkpoint)。若是这个master任务失效了,能够从最后一个检查点(checkpoint)开始启动另外一个 master进程。然而,因为只有一个master进程,master失效后再恢复是比较麻烦的,所以咱们如今的实现是若是master失效,就停止 MapReduce运算。客户能够检查到这个状态,而且能够根据须要从新执行MapReduce操做。
在失效方面的处理机制
(alex注:原文为”semantics in the presence of failures”)
当用户提供的Map和Reduce操做是输入肯定性函数(即相同的输入产生相同的输出)时,咱们的分布式实如今任何状况下的输出都和全部程序没有出现任何错误、顺序的执行产生的输出是同样的。
我 们依赖对Map和Reduce任务的输出是原子提交的来完成这个特性。每一个工做中的任务把它的输出写到私有的临时文件中。每一个Reduce任务生成一个这 样的文件,而每一个Map任务则生成R个这样的文件(一个Reduce任务对应一个文件)。当一个Map任务完成的时,worker发送一个包含R个临时文 件名的完成消息给master。若是master从一个已经完成的Map任务再次接收到到一个完成消息,master将忽略这个消息;不然,master 将这R个文件的名字记录在数据结构里。
当Reduce任务完成 时,Reduce worker进程以原子的方式把临时文件重命名为最终的输出文件。若是同一个Reduce任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名 操做执行。咱们依赖底层文件系统提供的重命名操做的原子性来保证最终的文件系统状态仅仅包含一个Reduce任务产生的数据。
使 用MapReduce模型的程序员能够很容易的理解他们程序的行为,由于咱们绝大多数的Map和Reduce操做是肯定性的,并且存在这样的一个事实:我 们的失效处理机制等价于一个顺序的执行的操做。当Map或/和Reduce操做是不肯定性的时候,咱们提供虽然较弱可是依然合理的处理机制。当使用非肯定 操做的时候,一个Reduce任务R1的输出等价于一个非肯定性程序顺序执行产生时的输出。可是,另外一个Reduce任务R2的输出也许符合一个不一样的非 肯定顺序程序执行产生的R2的输出。算法
考虑Map任务M和Reduce任务R一、R2的状况。咱们设定e(Ri)是Ri已经提交的执行过程(有且仅有一个这样的执行过程)。当e(R1)读取了由M一次执行产生的输出,而e(R2)读取了由M的另外一次执行产生的输出,致使了较弱的失效处理。
3.四、存储位置
在 咱们的计算运行环境中,网络带宽是一个至关匮乏的资源。咱们经过尽可能把输入数据(由GFS管理)存储在集群中机器的本地磁盘上来节省网络带宽。GFS把每 个文件按64MB一个Block分隔,每一个Block保存在多台机器上,环境中就存放了多份拷贝(通常是3个拷贝)。MapReduce的master在 调度Map任务时会考虑输入文件的位置信息,尽可能将一个Map任务调度在包含相关输入数据拷贝的机器上执行;若是上述努力失败了,master将尝试在保 存有输入数据拷贝的机器附近的机器上执行Map任务(例如,分配到一个和包含输入数据的机器在一个switch里的worker机器上执行)。当在一个足 够大的cluster集群上运行大型MapReduce操做的时候,大部分的输入数据都能从本地机器读取,所以消耗很是少的网络带宽。
3.五、任务粒度
如 前所述,咱们把Map拆分红了M个片断、把Reduce拆分红R个片断执行。理想状况下,M和R应当比集群中worker的机器数量要多得多。在每台 worker机器都执行大量的不一样任务可以提升集群的动态的负载均衡能力,而且可以加快故障恢复的速度:失效机器上执行的大量Map任务均可以分布到全部 其余的worker机器上去执行。
可是实际上,在咱们的具体实现中对M和R的取值都有必定的客观限制,由于master必须执行O(M+R)次调度,而且在内存中保存O(M*R)个状态(对影响内存使用的因素仍是比较小的:O(M*R)块状态,大概每对Map任务/Reduce任务1个字节就能够了)。数据库
更 进一步,R值一般是由用户指定的,由于每一个Reduce任务最终都会生成一个独立的输出文件。实际使用时咱们也倾向于选择合适的M值,以使得每个独立任 务都是处理大约16M到64M的输入数据(这样,上面描写的输入数据本地存储优化策略才最有效),另外,咱们把R值设置为咱们想使用的worker机器数 量的小的倍数。咱们一般会用这样的比例来执行MapReduce:M=200000,R=5000,使用2000台worker机器。
3.六、备用任务
影 响一个MapReduce的总执行时间最一般的因素是“落伍者”:在运算过程当中,若是有一台机器花了很长的时间才完成最后几个Map或Reduce任务, 致使MapReduce操做总的执行时间超过预期。出现“落伍者”的缘由很是多。好比:若是一个机器的硬盘出了问题,在读取的时候要常常的进行读取纠错操 做,致使读取数据的速度从30M/s下降到1M/s。若是cluster的调度系统在这台机器上又调度了其余的任务,因为CPU、内存、本地硬盘和网络带 宽等竞争因素的存在,致使执行MapReduce代码的执行效率更加缓慢。咱们最近遇到的一个问题是因为机器的初始化代码有bug,致使关闭了的处理器的 缓存:在这些机器上执行任务的性能和正常状况相差上百倍。
咱们有一个通 用的机制来减小“落伍者”出现的状况。当一个MapReduce操做接近完成的时候,master调度备用(backup)任务进程来执行剩下的、处于处 理中状态(in-progress)的任务。不管是最初的执行进程、仍是备用(backup)任务进程完成了任务,咱们都把这个任务标记成为已经完成。我 们调优了这个机制,一般只会占用比正常操做多几个百分点的计算资源。咱们发现采用这样的机制对于减小超大MapReduce操做的总处理时间效果显著。例 如,在5.3节描述的排序任务,在关闭掉备用任务的状况下要多花44%的时间完成排序任务。
四、技巧
虽然简单的Map和Reduce函数提供的基本功能已经可以知足大部分的计算须要,咱们仍是发掘出了一些有价值的扩展功能。本节将描述这些扩展功能。编程
4.一、分区函数
MapReduce 的使用者一般会指定Reduce任务和Reduce任务输出文件的数量(R)。咱们在中间key上使用分区函数来对数据进行分区,以后再输入到后续任务执 行进程。一个缺省的分区函数是使用hash方法(好比,hash(key) mod R)进行分区。hash方法能产生很是平衡的分区。然而,有的时候,其它的一些分区函数对key值进行的分区将很是有用。好比,输出的key值是 URLs,咱们但愿每一个主机的全部条目保持在同一个输出文件中。为了支持相似的状况,MapReduce库的用户须要提供专门的分区函数。例如,使用 “hash(Hostname(urlkey)) mod R”做为分区函数就能够把全部来自同一个主机的URLs保存在同一个输出文件中。
4.二、顺序保证
咱们确保在给定的分区中,中间key/value pair数据的处理顺序是按照key值增量顺序处理的。这样的顺序保证对每一个分红生成一个有序的输出文件,这对于须要对输出文件按key值随机存取的应用很是有意义,对在排序输出的数据集也颇有帮助。
4.三、Combiner函数
在 某些状况下,Map函数产生的中间key值的重复数据会占很大的比重,而且,用户自定义的Reduce函数知足结合律和交换律。在2.1节的词数统计程序 是个很好的例子。因为词频率倾向于一个zipf分布(齐夫分布),每一个Map任务将产生成千上万个这样的记录<the,1>。全部的这些记录 将经过网络被发送到一个单独的Reduce任务,而后由这个Reduce任务把全部这些记录累加起来产生一个数字。咱们容许用户指定一个可选的 combiner函数,combiner函数首先在本地将这些记录进行一次合并,而后将合并的结果再经过网络发送出去。
Combiner 函数在每台执行Map任务的机器上都会被执行一次。通常状况下,Combiner和Reduce函数是同样的。Combiner函数和Reduce函数之 间惟一的区别是MapReduce库怎样控制函数的输出。Reduce函数的输出被保存在最终的输出文件里,而Combiner函数的输出被写到中间文件 里,而后被发送给Reduce任务。
部分的合并中间结果能够显著的提升一些MapReduce操做的速度。附录A包含一个使用combiner函数的例子。api
4.四、输入和输出的类型
MapReduce 库支持几种不一样的格式的输入数据。好比,文本模式的输入数据的每一行被视为是一个key/value pair。key是文件的偏移量,value是那一行的内容。另一种常见的格式是以key进行排序来存储的key/value pair的序列。每种输入类型的实现都必须可以把输入数据分割成数据片断,该数据片断可以由单独的Map任务来进行后续处理(例如,文本模式的范围分割必 须确保仅仅在每行的边界进行范围分割)。虽然大多数MapReduce的使用者仅仅使用不多的预约义输入类型就知足要求了,可是使用者依然能够经过提供一 个简单的Reader接口实现就可以支持一个新的输入类型。
Reader并不是必定要从文件中读取数据,好比,咱们能够很容易的实现一个从数据库里读记录的Reader,或者从内存中的数据结构读取数据的Reader。数组
相似的,咱们提供了一些预约义的输出数据的类型,经过这些预约义类型可以产生不一样格式的数据。用户采用相似添加新的输入数据类型的方式增长新的输出类型。缓存
4.五、反作用
在某些状况下,MapReduce的使用者发现,若是在Map和/或Reduce操做过程当中增长辅助的输出文件会比较省事。咱们依靠程序writer把这种“反作用”变成原子的和幂等的
(alex注:幂等的指一个老是产生相同结果的数学运算)。一般应用程序首先把输出结果写到一个临时文件中,在输出所有数据以后,在使用系统级的原子操做rename从新命名这个临时文件。
若是一个任务产生了多个输出文件,咱们没有提供相似两阶段提交的原子操做支持这种状况。所以,对于会产生多个输出文件、而且对于跨文件有一致性要求的任务,都必须是肯定性的任务。可是在实际应用过程当中,这个限制尚未给咱们带来过麻烦。服务器
4.六、跳过损坏的记录
有 时候,用户程序中的bug致使Map或者Reduce函数在处理某些记录的时候crash掉,MapReduce操做没法顺利完成。惯常的作法是修复 bug后再次执行MapReduce操做,可是,有时候找出这些bug并修复它们不是一件容易的事情;这些bug也许是在第三方库里边,而咱们手头没有这 些库的源代码。并且在不少时候,忽略一些有问题的记录也是能够接受的,好比在一个巨大的数据集上进行统计分析的时候。咱们提供了一种执行模式,在这种模式 下,为了保证保证整个处理能继续进行,MapReduce会检测哪些记录致使肯定性的crash,而且跳过这些记录不处理。
每 个worker进程都设置了信号处理函数捕获内存段异常(segmentation violation)和总线错误(bus error)。在执行Map或者Reduce操做以前,MapReduce库经过全局变量保存记录序号。若是用户程序触发了一个系统信号,消息处理函数将 用“最后一口气”经过UDP包向master发送处理的最后一条记录的序号。当master看到在处理某条特定记录不止失败一次时,master就标志着 条记录须要被跳过,而且在下次从新执行相关的Map或者Reduce任务的时候跳过这条记录。
4.七、本地执行
调 试Map和Reduce函数的bug是很是困难的,由于实际执行操做时不可是分布在系统中执行的,并且一般是在好几千台计算机上执行,具体的执行位置是由 master进行动态调度的,这又大大增长了调试的难度。为了简化调试、profile和小规模测试,咱们开发了一套MapReduce库的本地实现版 本,经过使用本地版本的MapReduce库,MapReduce操做在本地计算机上顺序的执行。用户能够控制MapReduce操做的执行,能够把操做 限制到特定的Map任务上。用户经过设定特别的标志来在本地执行他们的程序,以后就能够很容易的使用本地调试和测试工具(好比gdb)。
4.八、状态信息
master 使用嵌入式的HTTP服务器(如Jetty)显示一组状态信息页面,用户能够监控各类执行状态。状态信息页面显示了包括计算执行的进度,好比已经完成了多 少任务、有多少任务正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理百分比等等。页面还包含了指向每一个任务的stderr和stdout文 件的连接。用户根据这些数据预测计算须要执行大约多长时间、是否须要增长额外的计算资源。这些页面也能够用来分析何时计算执行的比预期的要慢。
另外,处于最顶层的状态页面显示了哪些worker失效了,以及他们失效的时候正在运行的Map和Reduce任务。这些信息对于调试用户代码中的bug颇有帮助。
4.九、计数器
MapReduce库使用计数器统计不一样事件发生次数。好比,用户可能想统计已经处理了多少个单词、已经索引的多少篇German文档等等。
为了使用这个特性,用户在程序中建立一个命名的计数器对象,在Map和Reduce函数中相应的增长计数器的值。例如:
Counter* uppercase;
uppercase = GetCounter(“uppercase”);
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, “1″);
这些计数器的值周期性的从各个单独的worker机器上传递给master(附加在ping的应答包中传递)。master把执行成功的Map和Reduce任务的计数器值进行累计,当MapReduce操做完成以后,返回给用户代码。
计数器当前的值也会显示在master的状态页面上,这样用户就能够看到当前计算的进度。当累加计数器的值的时候,master要检查重复运行的Map或者Reduce任务,避免重复累加(以前提到的备用任务和失效后从新执行任务这两种状况会致使相同的任务被屡次执行)。
有些计数器的值是由MapReduce库自动维持的,好比已经处理的输入的key/value pair的数量、输出的key/value pair的数量等等。
计数器机制对于MapReduce操做的完整性检查很是有用。好比,在某些MapReduce操做中,用户须要确保输出的key value pair精确的等于输入的key value pair,或者处理的German文档数量在处理的整个文档数量中属于合理范围。
五、性能
本节咱们用在一个大型集群上运行的两个计算来衡量MapReduce的性能。一个计算在大约1TB的数据中进行特定的模式匹配,另外一个计算对大约1TB的数据进行排序。
这两个程序在大量的使用MapReduce的实际应用中是很是典型的 — 一类是对数据格式进行转换,从一种表现形式转换为另一种表现形式;另外一类是从海量数据中抽取少部分的用户感兴趣的数据。
5.一、集群配置
所 有这些程序都运行在一个大约由1800台机器构成的集群上。每台机器配置2个2G主频、支持超线程的Intel Xeon处理器,4GB的物理内存,两个160GB的IDE硬盘和一个千兆以太网卡。这些机器部署在一个两层的树形交换网络中,在root节点大概有 100-200GBPS的传输带宽。全部这些机器都采用相同的部署(对等部署),所以任意两点之间的网络来回时间小于1毫秒。
在4GB内存里,大概有1-1.5G用于运行在集群上的其余任务。测试程序在周末下午开始执行,这时主机的CPU、磁盘和网络基本上处于空闲状态。
5.二、GREP
这个分布式的grep程序须要扫描大概10的10次方个由100个字节组成的记录,查找出现几率较小的3个字符的模式(这个模式在92337个记录中出现)。输入数据被拆分红大约64M的Block(M=15000),整个输出数据存放在一个文件中(R=1)。
图 2显示了这个运算随时间的处理过程。其中Y轴表示输入数据的处理速度。处理速度随着参与MapReduce计算的机器数量的增长而增长,当1764台 worker参与计算的时,处理速度达到了30GB/s。当Map任务结束的时候,即在计算开始后80秒,输入的处理速度降到0。整个计算过程从开始到结 束一共花了大概150秒。这包括了大约一分钟的初始启动阶段。初始启动阶段消耗的时间包括了是把这个程序传送到各个worker机器上的时间、等待GFS 文件系统打开1000个输入文件集合的时间、获取相关的文件本地位置优化信息的时间。
5.三、排序
排序程序处理10的10次方个100个字节组成的记录(大概1TB的数据)。这个程序模仿TeraSort benchmark[10]。
排 序程序由不到50行代码组成。只有三行的Map函数从文本行中解析出10个字节的key值做为排序的key,而且把这个key和原始文本行做为中间的 key/value pair值输出。咱们使用了一个内置的恒等函数做为Reduce操做函数。这个函数把中间的key/value pair值不做任何改变输出。最终排序结果输出到两路复制的GFS文件系统(也就是说,程序输出2TB的数据)。
如前所述,输入数据被分红64MB的Block(M=15000)。咱们把排序后的输出结果分区后存储到4000个文件(R=4000)。分区函数使用key的原始字节来把数据分区到R个片断中。
在这个benchmark测试中,咱们使用的分区函数知道key的分区状况。一般对于排序程序来讲,咱们会增长一个预处理的MapReduce操做用于采样key值的分布状况,经过采样的数据来计算对最终排序处理的分区点。

图 三(a)显示了这个排序程序的正常执行过程。左上的图显示了输入数据读取的速度。数据读取速度峰值会达到13GB/s,而且全部Map任务完成以后,即大 约200秒以后迅速滑落到0。值得注意的是,排序程序输入数据读取速度小于分布式grep程序。这是由于排序程序的Map任务花了大约一半的处理时间和 I/O带宽把中间输出结果写到本地硬盘。相应的分布式grep程序的中间结果输出几乎能够忽略不计。
左 边中间的图显示了中间数据从Map任务发送到Reduce任务的网络速度。这个过程从第一个Map任务完成以后就开始缓慢启动了。图示的第一个高峰是启动 了第一批大概1700个Reduce任务(整个MapReduce分布到大概1700台机器上,每台机器1次最多执行1个Reduce任务)。排序程序运 行大约300秒后,第一批启动的Reduce任务有些完成了,咱们开始执行剩下的Reduce任务。全部的处理在大约600秒后结束。
左 下图表示Reduce任务把排序后的数据写到最终的输出文件的速度。在第一个排序阶段结束和数据开始写入磁盘之间有一个小的延时,这是由于worker机 器正在忙于排序中间数据。磁盘写入速度在2-4GB/s持续一段时间。输出数据写入磁盘大约持续850秒。计入初始启动部分的时间,整个运算消耗了891 秒。这个速度和TeraSort benchmark[18]的最高纪录1057秒相差很少。
还 有一些值得注意的现象:输入数据的读取速度比排序速度和输出数据写入磁盘速度要高很多,这是由于咱们的输入数据本地化优化策略起了做用 — 绝大部分数据都是从本地硬盘读取的,从而节省了网络带宽。排序速度比输出数据写入到磁盘的速度快,这是由于输出数据写了两份(咱们使用了2路的GFS文件 系统,写入复制节点的缘由是为了保证数据可靠性和可用性)。咱们把输出数据写入到两个复制节点的缘由是由于这是底层文件系统的保证数据可靠性和可用性的实 现机制。若是底层文件系统使用相似容错编码[14](erasure coding)的方式而不是复制的方式保证数据的可靠性和可用性,那么在输出数据写入磁盘的时候,就能够下降网络带宽的使用。
5.四、高效的backup任务
图 三(b)显示了关闭了备用任务后排序程序执行状况。执行的过程和图3(a)很类似,除了输出数据写磁盘的动做在时间上拖了一个很长的尾巴,并且在这段时间 里,几乎没有什么写入动做。在960秒后,只有5个Reduce任务没有完成。这些拖后腿的任务又执行了300秒才完成。整个计算消耗了1283秒,多了 44%的执行时间。
5.五、失效的机器
在图三(c)中演示的排序程序执行的过程当中,咱们在程序开始后几分钟有意的kill了1746个worker中的200个。集群底层的调度马上在这些机器上从新开始新的worker处理进程(由于只是worker机器上的处理进程被kill了,机器自己还在工做)。
图 三(c)显示出了一个“负”的输入数据读取速度,这是由于一些已经完成的Map任务丢失了(因为相应的执行Map任务的worker进程被kill了), 须要从新执行这些任务。相关Map任务很快就被从新执行了。整个运算在933秒内完成,包括了初始启动时间(只比正常执行多消耗了5%的时间)。
六、经验
我 们在2003年1月完成了第一个版本的MapReduce库,在2003年8月的版本有了显著的加强,这包括了输入数据本地优化、worker机器之间的 动态负载均衡等等。从那之后,咱们惊喜的发现,MapReduce库能普遍应用于咱们平常工做中遇到的各种问题。它如今在Google内部各个领域获得广 泛应用,包括:
-
大规模机器学习问题
-
Google News和Froogle产品的集群问题
-
从公众查询产品(好比Google的Zeitgeist)的报告中抽取数据。
-
从大量的新应用和新产品的网页中提取有用信息(好比,从大量的位置搜索网页中抽取地理位置信息)。
-
大规模的图形计算。
图 四显示了在咱们的源代码管理系统中,随着时间推移,独立的MapReduce程序数量的显著增长。从2003年早些时候的0个增加到2004年9月份的差 很少900个不一样的程序。MapReduce的成功取决于采用MapReduce库可以在不到半个小时时间内写出一个简单的程序,这个简单的程序可以在上 千台机器的组成的集群上作大规模并发处理,这极大的加快了开发和原形设计的周期。另外,采用MapReduce库,可让彻底没有分布式和/或并行系统开 发经验的程序员很容易的利用大量的资源,开发出分布式和/或并行处理的应用。
在每一个任务结束的时候,MapReduce库统计计算资源的使用情况。在表1,咱们列出了2004年8月份MapReduce运行的任务所占用的相关资源。
6.一、大规模索引
到目前为止,MapReduce最成功的应用就是重写了Google网络搜索服务所使用到的index系统。索引系统的输入数据是网络爬虫抓取回来的海量的文档,这些文档数据都保存在GFS文件系统里。这些文档原始内容
(alex注:raw contents,我认为就是网页中的剔除html标记后的内容、pdf和word等有格式文档中提取的文本内容等)的大小超过了20TB。索引程序是经过一系列的MapReduce操做(大约5到10次)来创建索引。使用MapReduce(替换上一个特别设计的、分布式处理的索引程序)带来这些好处:
-
实现索引部分的代码简单、小巧、容易理解,由于对于容错、分布式以及并行计算的处理都是MapReduce库提供的。好比,使用MapReduce库,计算的代码行数从原来的3800行C++代码减小到大概700行代码。
-
MapReduce 库的性能已经足够好了,所以咱们能够把在概念上不相关的计算步骤分开处理,而不是混在一块儿以期减小数据传递的额外消耗。概念上不相关的计算步骤的隔离也使 得咱们能够很容易改变索引处理方式。好比,对以前的索引系统的一个小更改可能要耗费好几个月的时间,可是在使用MapReduce的新系统上,这样的更改 只须要花几天时间就能够了。
-
索引系统的操做管理更容易了。由于由机器失效、机器处理速度缓慢、以及网络的瞬间阻塞等引发的绝大部分问题都已经由MapReduce库解决了,再也不须要操做人员的介入了。另外,咱们能够经过在索引系统集群中增长机器的简单方法提升总体处理性能。
七、相关工做
不少系统都提供了严格的编程模式,而且经过对编程的严格限制来实现并行计算。例如,一个结合函数能够经过把N个元素的数组的前缀在N个处理器上使用并行前缀算法,在log N的时间内计算完[6,9,13]
(alex注:彻底没有明白做者在说啥,具体参考相关六、九、13文档)。MapReduce能够看做是咱们结合在真实环境下处理海量数据的经验,对这些经典模型进行简化和萃取的成果。更加值得骄傲的是,咱们还实现了基于上千台处理器的集群的容错处理。相比而言,大部分并发处理系统都只在小规模的集群上实现,而且把容错处理交给了程序员。
Bulk Synchronous Programming[17]和一些MPI原语[11]提供了更高级别的并行处理抽象,能够更容易写出并行处理的程序。MapReduce和这些系统的 关键不一样之处在于,MapReduce利用限制性编程模式实现了用户程序的自动并发处理,而且提供了透明的容错处理。
咱们数据本地优化策略的灵感来源于active disks[12,15]等技术,在active disks中,计算任务是尽可能推送到数据存储的节点处理
(alex注:即靠近数据源处理),这样就减小了网络和IO子系统的吞吐量。咱们在挂载几个硬盘的普通机器上执行咱们的运算,而不是在磁盘处理器上执行咱们的工做,可是达到的目的同样的。
咱们的备用任务机制和Charlotte System[3]提出的eager调度机制比较相似。Eager调度机制的一个缺点是若是一个任务反复失效,那么整个计算就不能完成。咱们经过忽略引发故障的记录的方式在某种程度上解决了这个问题。
MapReduce的实现依赖于一个内部的集群管理系统,这个集群管理系统负责在一个超大的、共享机器的集群上分布和运行用户任务。虽然这个不是本论文的重点,可是有必要提一下,这个集群管理系统在理念上和其它系统,如Condor[16]是同样。
MapReduce 库的排序机制和NOW-Sort[1]的操做上很相似。读取输入源的机器(map workers)把待排序的数据进行分区后,发送到R个Reduce worker中的一个进行处理。每一个Reduce worker在本地对数据进行排序(尽量在内存中排序)。固然,NOW-Sort没有给用户自定义的Map和Reduce函数的机会,所以不具有 MapReduce库普遍的实用性。
River[2]提供了一个编程模 型:处理进程经过分布式队列传送数据的方式进行互相通信。和MapReduce相似,River系统尝试在不对等的硬件环境下,或者在系统颠簸的状况下也 能提供近似平均的性能。River是经过精心调度硬盘和网络的通信来平衡任务的完成时间。MapReduce库采用了其它的方法。经过对编程模型进行限 制,MapReduce框架把问题分解成为大量的“小”任务。这些任务在可用的worker集群上动态的调度,这样快速的worker就能够执行更多的任 务。经过对编程模型进行限制,咱们可用在工做接近完成的时候调度备用任务,缩短在硬件配置不均衡的状况下缩小整个操做完成的时间(好比有的机器性能差、或 者机器被某些操做阻塞了)。
BAD-FS[5]采用了和MapReduce彻底不一样的编程模式,它是面向广域网
(alex注:wide-area network)的。不过,这两个系统有两个基础功能很相似。(1)两个系统采用从新执行的方式来防止因为失效致使的数据丢失。(2)两个都使用数据本地化调度策略,减小网络通信的数据量。
TACC[7]是一个用于简化构造高可用性网络服务的系统。和MapReduce同样,它也依靠从新执行机制来实现的容错处理。
八、结束语
MapReduce 编程模型在Google内部成功应用于多个领域。咱们把这种成功归结为几个方面:首先,因为MapReduce封装了并行处理、容错处理、数据本地化优 化、负载均衡等等技术难点的细节,这使得MapReduce库易于使用。即使对于彻底没有并行或者分布式系统开发经验的程序员而言;其次,大量不一样类型的 问题均可以经过MapReduce简单的解决。好比,MapReduce用于生成Google的网络搜索服务所须要的数据、用来排序、用来数据挖掘、用于 机器学习,以及不少其它的系统;第三,咱们实现了一个在数千台计算机组成的大型集群上灵活部署运行的MapReduce。这个实现使得有效利用这些丰富的 计算资源变得很是简单,所以也适合用来解决Google遇到的其余不少须要大量计算的问题。
我 们也从MapReduce开发过程当中学到了很多东西。首先,约束编程模式使得并行和分布式计算很是容易,也易于构造容错的计算环境;其次,网络带宽是稀有 资源。大量的系统优化是针对减小网络传输量为目的的:本地优化策略使大量的数据从本地磁盘读取,中间文件写入本地磁盘、而且只写一份中间文件也节约了网络 带宽;第三,屡次执行相同的任务能够减小性能缓慢的机器带来的负面影响(alex注:即硬件配置的不平衡),同时解决了因为机器失效致使的数据丢失问题。
附录A、单词频率统计
本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中,每个不一样的单词出现频率。
#include “mapreduce/mapreduce.h”
// User’s map function
class WordCounter : public Mapper {
public:
virtual void Map(const MapInput& input) {
const string& text = input.value();
const int n = text.size();
for (int i = 0; i < n; ) {
// Skip past leading whitespace
while ((i < n) && isspace(text[i]))
i++;
// Find word end
int start = i;
while ((i < n) && !isspace(text[i]))
i++;
if (start < i)
Emit(text.substr(start,i-start),”1″);
}
}
};
REGISTER_MAPPER(WordCounter);
// User’s reduce function
class Adder : public Reducer {
virtual void Reduce(ReduceInput* input) {
// Iterate over all entries with the
// same key and add the values
int64 value = 0;
while (!input->done()) {
value += StringToInt(input->value());
input->NextValue();
}
// Emit sum for input->key()
Emit(IntToString(value));
}
};
REGISTER_REDUCER(Adder);
int main(int argc, char** argv) {
ParseCommandLineFlags(argc, argv);
MapReduceSpecification spec;
// Store list of input files into “spec”
for (int i = 1; i < argc; i++) {
MapReduceInput* input = spec.add_input();
input->set_format(“text”);
input->set_filepattern(argv[i]);
input->set_mapper_class(“WordCounter”);
}
// Specify the output files:
// /gfs/test/freq-00000-of-00100
// /gfs/test/freq-00001-of-00100
// …
MapReduceOutput* out = spec.output();
out->set_filebase(“/gfs/test/freq”);
out->set_num_tasks(100);
out->set_format(“text”);
out->set_reducer_class(“Adder”);
// Optional: do partial sums within map
// tasks to save network bandwidth
out->set_combiner_class(“Adder”);
// Tuning parameters: use at most 2000 // machines and 100 MB of memory per task spec.set_machines(2000); spec.set_map_megabytes(100); spec.set_reduce_megabytes(100); // Now run it MapReduceResult result; if (!MapReduce(spec, &result)) abort(); // Done: ‘result’ structure contains info // about counters, time taken, number of // machines used, etc. return 0; }