MapReduce:Simplified Data Processing on Large Clusters中文版from百度文库

超大集群的简单数据处理程序员

转自百度文库web

Jeffrey Dean Sanjay Ghemawat算法

jeff@google.com , sanjay@google.com数据库

Google , Inc.编程

摘要

MapReduce是一个编程模式,它是与处理/产生海量数据集的实现相关。用户指定一个map函数,经过这个map函数处理key/value(键/值)对,而且产生一系列的中间key/value对,而且使用reduce函数来合并全部的具备相同key值的中间键值对中的值部分。现实生活中的不少任务的实现都是基于这个模式的,正如本文稍后会讲述的那样。api

使用这样的函数形式实现的程序能够自动分布到一个由普通机器组成的超大集群上并发执行。run-time系统会解决输入数据的分布细节,跨越机器集群的程序执行调度,处理机器的失效,而且管理机器之间的通信请求。这样的模式容许程序员能够不须要有什么并发处理或者分布式系统的经验,就能够处理超大的分布式系统得资源。服务器

咱们的MapReduce系统的实现运行在一个由普通机器组成的大型集群上,而且有着很高的扩展性:一个典型的MapReduce计算处理一般分布到上千台机器上来处理上TB的数据。程序员会发现这样的系统很容易使用:已经开发出来了上百个MapReduce程序,而且天天在Google的集群上有上千个MapReduce job正在执行。网络

 

1 介绍

在过去的5年内,Google的创造者和其余人实现了上百个用于特别计算目的的程序来出来海量的原始数据,好比蠕虫文档,web请求log,等等,用于计算出不一样的数据,好比降序索引,不一样的图示展现的web文档,蠕虫采集的每一个host的page数量摘要,给定日期内最经常使用的查询等等。绝大部分计算都是概念上很简洁的。不过,输入的数据一般是很是巨大的,而且为了能在合理时间内执行完毕,其上的计算必须分布到上百个或者上千个计算机上去执行。如何并发计算,如何分布数据,如何处理失败等等相关问题合并在一块儿就会致使本来简单的计算掩埋在为了解决这些问题而引入的很复杂的代码中。数据结构

由于这种复杂度,咱们设计了一种新的东西来让咱们可以方便处理这样的简单计算。这些简单计算本来很简单,可是因为考虑到并发处理细节,容错细节,以及数据分布细节,负载均衡等等细节问题,而致使代码很是复杂。因此咱们抽象这些公共的细节到一个lib中。这种抽象是源自Lisp以及其余不少面向功能的语言的map和reduce概念。咱们认识到大部分操做都和map操做相关,这些map操做都是运算在输入记录的每一个逻辑”record”上,而且map操做为了产生一组中间的key/value键值对,而且接着在全部相同key的中间结果上执行reduce操做,这样就能够合并适当的数据。咱们的函数模式是使用用户定义的map和reduce操做,这样可让咱们并发执行大规模的运算,而且使用从新执行的方式做为容错的优先机制。并发

MapReduce的主要贡献在于提供了一个简单强大的接口,经过这个接口,能够把大尺度的计算自动的并发和分布执行。使用这个接口,能够经过普通PC的巨大集群,来达到极高的性能。

第二节讲述了基本的编程模式,而且给出了一些例子。第三节讲述了一个面向咱们基于集群的计算环境的MapReduce的实现。第四节讲述了一些咱们建议的精巧编程模式。第五节讲述了在不一样任务下咱们的MapReduce实现的性能比较。第六节讲述了在Google中的MapReduce应用以及尝试重写了咱们产品的索引系统。第七节讲述了相关工做和将来的工做。

2 编程模式

咱们的运算处理一组输入的(input)键值对(key/valuepairs),而且产生一组输出的(output)键值对。MapReduce函数库德用户用两个函数来表达这样的计算:Map和Reduce。

Map函数,是用户自定义的的函数,处理输入的键值对,而且产生一组中间的(intermediate)键值对。MapReduce函数库稽核全部相同的中间键值键I的值,而且发送给Reduce函数进行处理。

Reduce函数一样也是用户提供的,它处理中间键值I,以及这个中间键值相关的值集合。这个函数合并这些值,最后造成一个相对较小的值集合。一般一个单次Reduce执行会产生0个或者1个输出值。提供给Reduce函数的中间值是经过一个iterator来提供的。这就让咱们能够处理超过内存容量的值列表。

2.1 例子

 

咱们考虑这样一个例子,在很大的文档集合中统计每个单词出现的次数。咱们写出相似以下的伪代码:

map(String key, String value):

       // key: document name

       // value: document contents

       for each word w in value:

              EmitIntermediate(w, "1");

 

reduce(String key, Iterator values):

       // key: a word

       // values: a list of counts

       int result = 0;

       for each v in values:

              result += ParseInt(v);

       Emit(AsString(result));

 

map函数检查每个单词,而且对每个单词增长1到其对应的计数器(在这个例子里就是’1’).reduce函数把特定单词的全部出现的次数进行合并。

此外,咱们还要写代码来对mapreduce specification对象进行赋值,设定输入和输出的文件名,以及设定一些参数。接着咱们调用MapReduce函数,把这个对象做为参数调用过去。咱们把MapReduce函数库(C++函数库)和咱们的程序连接在一块儿。附件1有完整的这个例子的代码。

 

2.2 类型

 

即便上边的例子是用字符串做为输入和输入出的,从概念上讲,使用者提供的map和reduce函数有着以下相关类型:

map (k1,v1)                à    list(k2,v2)

reduce (k2,list(v2))             à    list(v2)

也就是,输入的键和值和输出的键值是属于不一样的域的。进一步说,中间的键值是和输出的键值属于相同的域的。(好比map的输出,就是做为reduce的输入)。

咱们的C++实现上,把字符串做为用户定义函数的输入和输出,由用户代码来本身识别字符串到合适的类型。

 

2.3 其余例子

 

这里有一些简单有趣的例子,均可以简单的经过MapReduce计算模型来展现:

分布式Grep            若是map函数检查输入行,知足条件的时候,map函数就把本行输出。reduce函数就是一个直通函数,简单的把中间数据输出就能够了。

URL访问频率统计:  map函数处理webpag请求和应答(URL,1)的log。Reduce函数把全部相同的URL的值合并,而且输出一个成对的(URL,总个数)。

逆向Web-Link 图:   map函数输出全部包含指向target URL的source网页,用(target,source)这样的结构对输出。Reduce函数局和全部关联相同target URL的source列表,而且输出一个(target,list(source))这样的结构。

主机关键向量指标(Term-Vector per Hosts):   关键词向量指标简而言之就是在一个文档或者一组文档中的重点次出现的频率,用(word,frequency)表达。map函数计算每个输入文档(主机名字是从文档的URL取出的)的关键词向量,而后输出(hostname,关键词向量(Term-Vector))。reduce函数处理全部相同host的全部文档关键词向量。去掉不经常使用的关键词,而且输出最终的(hostname,关键词向量)对。

逆序索引:                 map函数分析每个文档,而且产生一个序列(word,documentID)组。reduce函数处理指定word的全部的序列组,而且对相关的document ID进行排序,输出一个(word,list(document ID))组。全部的输出组,组成一个简单的逆序索引。经过这种方法能够很容易保持关键词在文档库中的位置。

分布式排序:                    map函数从每条记录中抽取关键字,而且产生(key,record)对。reduce函数原样输出全部的关键字对。这个算法是与4.1节描述的分布式处理相关的,而且排序是在4.2节描述的。

3 实现

MapReduce接口能够有不少种不一样的实现。应当根据不一样的环境选择不一样的实现。好比,一个实现能够适用于小型的共享内存的机器,另外一个实现多是基于大型NUMA多处理器系统,还可能有为大规模计算机集群的实现。

本届描述了Google普遍使用的计算环境:用交换机网络[4]链接的,由普通PC构成的超大集群。在咱们的环境里:

(1)     每一个节点一般是双x86处理器,运行Linux,每台机器2-4GB内存。

(2)     使用的网络设备都是经常使用的。通常在节点上使用的是100M/或者千M网络,通常状况下都用不到一半的网络带宽。

(3)     一个cluster中经常有成百上千台机器,因此,机器故障是屡见不鲜。

(4)     存储时使用的便宜的IDE硬盘,直接放在每个机器上。而且有一个分布式的文件系统来管理这些分布在各个机器上的硬盘。文件系统经过复制的方法来在不可靠的硬件上保证可用性和可靠性。

(5)     用户向调度系统提交请求。每个请求都包含一组任务,映射到这个计算机cluster里的一组机器上执行。

 

3.1 执行概览

 

Map操做经过把输入数据进行分区(partition)(好比分为M块),就能够分布到不一样的机器上执行了。输入块的拆成多块,能够并行在不一样机器上执行。Reduce操做是经过对中间产生的key的分布来进行分布的,中间产生的key能够根据某种分区函数进行分布(好比hash(key) mod R),分布成为R块。分区(R)的数量和分区函数都是由用户指定的。

 

 

 

图1是咱们实现的MapReduce操做的总体数据流。当用户程序调用MapReduce函数,就会引发以下的操做(图一中的数字标示和下表的数字标示相同)。

 

1. 用户程序中的MapReduce函数库首先把输入文件分红M块,每块大概16M到64M(能够经过参数决定)。接着在cluster的机器上执行处理程序。

2. 这些分排的执行程序中有一个程序比较特别,它是主控程序master。剩下的执行程序都是做为master分排工做的worker。总共有M个map任务和R个reduce任务须要分排。master选择空闲的worker而且分配这些map任务或者reduce任务

3. 一个分配了map任务的worker读取并处理相关的输入小块。他处理输入的数据,而且将分析出的key/value对传递给用户定义的map函数。map函数产生的中间结果key/value对暂时缓冲到内存。

4. 这些缓冲到内存的中间结果将被定时刷写到本地硬盘,这些数据经过分区函数分红R个区。这些中间结果在本地硬盘的位置信息将被发送回master,而后这个master负责把这些位置信息传送给reduce的worker。

5. 当master通知reduce的worker关于中间key/value对的位置时,他调用remote procedure来从map worker的本地硬盘上读取缓冲的中间数据。当reduce的worker读到了全部的中间数据,他就使用中间key进行排序,这样可使得相同key的值都在一块儿。由于有许多不一样key的map都对应相同的reduce任务,因此,排序是必须的。若是中间结果集太大了,那么就须要使用外排序。

6. reduce worker根据每个惟一中间key来遍历全部的排序后的中间数据,而且把key和相关的中间结果值集合传递给用户定义的reduce函数。reduce函数的对于本reduce区块的输出到一个最终的输出文件。

7. 当全部的map任务和reduce任务都已经完成了的时候,master激活用户程序。在这时候MapReduce返回用户程序的调用点。

 

当这些成功结束之后,mapreduce的执行数据存放在总计R个输出文件中(每一个都是由reduce任务产生的,这些文件名是用户指定的)。一般,用户不须要合并这R个输出文件到一个文件,他们一般把这些文件做为输入传递到另外一个MapReduce调用,或者用另外一个分布式应用来处理这些文件,而且这些分布式应用把这些文件当作为输入文件因为分区(partition)成为的多个块文件。

3.2 Master的数据结构

 

master须要保存必定的数据结构。对于每个map和reduce任务来讲,都须要保存它的状态(idle,in-progress或者completed),而且识别不一样的worker机器(对于非idel的任务状态)。

master是一个由map任务产生的中间区域文件位置信息到reduce任务的一个管道。所以,对于每个完成得map任务,master保存下来这个map任务产生的R中间区域文件信息的位置和大小。对于这个位置和大小信息是当接收到map任务完成得时候作的。这些信息是增量推送处处于in-progress状态的reduce任务的worker上的。

 

3.3 容错考虑

 

因为MapReduce函数库是设计用于在成百上千台机器上处理海量数据的,因此这个函数库必须考虑到机器故障的容错处理。

 

Worker失效的考虑

master会按期ping每个worker机器。若是在必定时间内没有worker机器的返回,master就认为这个worker失效了。全部这台worker完成的map任务都被设置成为他们的初始idel状态,而且所以能够被其余worker所调度执行。相似的,全部这个机器上正在处理的map 任务或者reduce任务都被设置成为idle状态,能够被其余worker所从新执行。

在失效机器上的已经完成的map任务还须要再次从新执行,这是由于中间结果存放在这个失效的机器上,因此致使中间结果没法访问。已经完成的recude任务无需再次执行,由于他们的结果已经保存在全局的文件系统中了。

当map任务首先由Aworker执行,随后被Bworker执行的时候(由于A失效了),全部执行reduce任务的worker都会被通知。全部尚未来得及从A上读取数据的worker都会从B上读取数据。

MapReduce能够有效地支持到很大尺度的worker失效的状况。好比,在一个MapReduce操做中,在一个网络例行维护中,可能会致使每次大约有80台机器在几分钟以内不能访问。MapReduce的master制式简单的把这些不能访问的worker上的工做再执行一次,而且继续调度进程,最后完成MapReduce的操做。

 

Master失效

 

在master中,按期会设定checkpoint,写出master的数据结构。若是master任务失效了,能够从上次最后一个checkpoint开始启动另外一个master进程。不过,因为只有一个master在运行,因此他若是失效就比较麻烦,所以咱们当前的实现上,是若是master失效了,就终止MapReduce执行。客户端能够检测这种失效而且若是须要就从新尝试MapReduce操做。

 

失效的处理设计

当用户提供的map和reduce函数对于他们的输入来讲是肯定性的函数,咱们的分布式的输出就应当和在一个整个程序没有失败的连续执行相同。

咱们依靠对map和reduce任务的输出进行原子提交来完成这样的可靠性。每个in-progress任务把输出写道一个私有的临时文件中。reduce任务产生一个这样的文件,map任务产生R个这样的任务(每个对应一个reduce任务)。当一个map任务完成的时候,worker发送一个消息给master,而且这个消息中包含了这个R临时文件的名字。若是master又收到一个已经完成的map任务的完成消息,他就忽略这个消息。不然,他就在master数据结构中记录这个R文件。

当一个reduce任务完成的时候,reduce worker自动把临时输出的文件名改成正式的输出文件。若是再多台机器上有相同的reduce任务执行,那么就会有多个针对最终输出文件的改名动做。咱们依靠文件系统提供的原子操做’更名字’,来保证最终的文件系统状态中记录的是其中一个reduce任务的输出。

咱们的绝大部分map和reduce操做都是肯定性的,实际上在语义角度,这个map和reduce并发执行和顺序执行市同样的,这就使得程序员很容易推测程序行为。当map和reduce操做是非肯定性的时候,咱们有稍弱的可是依旧是有道理的错误处理机制。对于非肯定性操做来讲,特定reduce任务R1的输出,与,非肯定性的顺序执行的程序对R1的输出是等价的。另外,另外一个reduce任务R2的输出,是和另外一个顺序执行的非肯定性程序对应的R2输出相关的。

考虑map任务M和reduce任务R1,R2。咱们设定e(Ri)为已经提交的Ri执行(有且仅有一个这样的执行)。当e(R1)处理得是M的一次执行,而e(R2)是处理M的另外一次执行的时候,那么就会致使稍弱的失效处理了。

 

3.4 存储位置

 

在咱们的环境下,网络带宽资源是相对缺少的。咱们用尽可能让输入数据保存在构成集群机器的本地硬盘上(经过GFS管理[8])的方式来减小网络带宽的开销。GFS把文件分红64M一块,而且每一块都有几个拷贝(一般是3个拷贝),分布到不一样的机器上。MapReduce的master有输入文件组的位置信息,而且尝试分派map任务在对应包含了相关输入数据块的设备上执行。若是不能分配map任务到对应其输入数据的机器上执行,他就尝试分配map任务到尽可能靠近这个任务的输入数据库的机器上执行(好比,分配到一个和包含输入数据块在一个switch网段的worker机器上执行)。当在一个足够大的cluster集群上运行大型MapReduce操做的时候,大部分输入数据都是在本地机器读取的,他们消耗比较少的网络带宽。

3.5 任务颗粒度

 

若是上边咱们讲的,咱们把map阶段拆分到M小块,而且reduce阶段拆分到R小块执行。在理想状态下,M和R应当比worker机器数量要多得多。每个worker机器都经过执行大量的任务来提升动态的负载均衡能力,而且可以加快故障恢复的速度:这个失效机器上执行的大量map任务均可以分布到全部其余worker机器上执行。

可是咱们的实现中,实际上对于M和R的取值有必定的限制,由于master必须执行O(M+R)次调度,而且在内存中保存O(M*R)个状态。(对影响内存使用的因素仍是比较小的:O(M*R)块状态,大概每对map任务/reduce任务1个字节就能够了)

进一步来讲,用户一般会指定R的值,由于每个reduce任务最终都是一个独立的输出文件。在实际中,咱们倾向于调整M的值,使得每个独立任务都是处理大约16M到64M的输入数据(这样,上面描写的本地优化策略会最有效),另外,咱们使R比较小,这样使得R占用很少的worker机器。咱们一般会用这样的比例来执行MapReduce: M=200,000,R=5,000,使用2,000台worker机器。

3.6 备用任务

 

一般状况下,一个MapReduce的总执行时间会受到最后的几个”拖后腿”的任务影响:在计算过程当中,会有一个机器过了比正常执行时间长得多的时间尚未执行完map或者reduce任务,致使MapReduce总任务不能按时完成。出现拖后腿的状况有不少缘由。好比:一个机器的硬盘有点问题,常常须要反复读取纠错,而后把读取输入数据的性能从30M/s下降到1M/s。cluster调度系统已经在某台机器上调度了其余的任务,因此由于CPU/内存/本地硬盘/网络带宽等竞争的关系,致使执行MapReduce的代码性能比较慢。咱们最近出现的一个问题是机器的启动代码有问题,致使关闭了cpu的cache:在这些机器上的任务性能有上百倍的影响。

咱们有一个通用的机制来减小拖后腿的状况。当MapReduce操做接近完成的时候,master调度备用进程来执行那些剩下的in-progress状态的任务。不管当最初的任务仍是backup任务执行完成的时候,都把这个任务标记成为已经完成。咱们调优了这个机制,一般只会占用多几个百分点的机器资源。可是咱们发现这样作之后对于减小超大MapReduce操做的总处理时间来讲很是有效。例如,在5.3节描述的排序任务,在关闭掉备用任务的状况下,要比有备用任务的状况下多花44%的时间。

 

4 技巧

虽然简单写map和reduce函数实现基本功能就已经对大部分须要都足够了,咱们仍是开发了一些有用的扩展,这些在本节详细描述。

 

4.1 分区函数

 

MapReduce的使用者经过指定(R)来给出reduce 任务/输出文件的数量。他们处理的数据在这些任务上经过对中间结果key得分区函数来进行分区。缺省的分区函数时使用hash函数(例如hash(key)mod R)。这通常就能够获得分散均匀的分区。不过,在某些状况下,对key用其余的函数进行分区可能更有用。好比,某些状况下key是URL,那么咱们但愿全部对单个host的入口URL都保存在相同的输出文件。为了支持相似的状况,MapReduce函数库可让用户提供一个特定的分区函数。好比使用hash(hostname(urlkey))mod R做为分区函数,这样可让指向同一个hostname的URL分配到相同的输出文件中。

4.2 顺序保证

 

咱们确保在给定的分区中,中间键值对key/value的处理顺序是根据key增量处理的。这样的顺序保证能够很容易生成每个分区有序的输出文件,这对于输出文件格式须要支持客户端的对key的随机存取的时候就颇有用,或者对输出数据集再做排序就很容易。

4.3 combiner函数

 

在某些状况下,容许中间结果key重复会占据至关的比重,而且用户定义的reduce函数知足结合律和交换律。好比2.1节的一个统计单词出现次数的例子。因为word的频率趋势符合Zipf 分布(齐夫分布),每个map任务都回产生成百上千的<the,1>这样格式的记录。全部这些记录都经过网络发送给一个单个的reduce任务,经过reduce函数进行相加,最后产生单个数字。咱们容许用户指定一个可选的组合函数Combiner函数,先在本地进行合并如下,而后再经过网络发送。

Combiner函数在每个map任务的机器上执行。一般这个combiner函数的代码和reduce的代码实现上都是同样的。reduce函数和combiner函数惟一的不一样就是MapReduce对于这两个函数的输出处理上不一样。对于reduce函数的输出是直接写到最终的输出文件。对于combiner函数来讲,输出是写到中间文件,而且会被发送到reduce任务中去。

部分使用combiner函数能够显著提升某些类型的MapReduce操做。附录A有这样的使用combiner的例子。

4.4 输入和输出类型

 

MapReduce函数库提供了读取几种不一样格式的输入的支持。例如,”text”模式下,每行输入都被当作一个key/value对:key是在文件的偏移量,value是行的内容。另外一个宠用格式保存了根据key进行排序key/value对的顺序。每个输入类型的实现都知道如何把输入为了分别得map任务而进行有效分隔(好比,text模式下的分隔就是要确保分隔的边界只能按照行来进行分隔)。用户能够经过简单的提供reader接口来进行新的输入类型的支持。不过大部分用户都只用一小部分预先定义的输入类型。

reader函数不须要提供从文件读取数据。例如,咱们很容易定义一个reader函数从数据库读取数据,或者从保存在内存中的数据结构中读取数据。

相似的,咱们提供了一组用于输出的类型,能够产生不一样格式的数据,而且用户也能够很简单的增长新的输出类型。

4.5 边界效应

 

在某些状况下,MapReduce的使用上,若是再map操做或者reduce操做时,增长辅助的输出文件,会比较有用。咱们依靠程序来提供这样的边界原子操做。一般应用程序写一个临时文件而且用系统的原子操做:更名字操做,来再这个文件写完的时候,一次把这个文件更名改掉。

对于单个任务产生的多个输出文件来讲,咱们没有提供其上的两阶段提交的原子操做支持。所以,对于产生多个输出文件的,对于跨文件有一致性要求的任务,都必须是肯定性的任务。这个限制到如今为止尚未真正在实际中遇到过。

4.6 跳过损坏的记录

 

某些状况下,用户程序的代码会让map或者reduce函数在处理某些记录的时候crash掉。这种状况下MapReduce操做就不能完成。通常的作法是改掉bug而后再执行,可是有时候这种先改掉bug的方式不太可行;也许是由于bug是在第三方的lib里边,它的原代码不存在等等。而且,不少时候,忽略一些记录不处理也是能够接受的,好比,在一个大数据集上进行统计分析的时候,就能够忽略有问题的少许记录。咱们提供了一种执行模式,在这种执行模式下,MapReduce会检测到哪些记录会致使肯定的crash,而且跳过这些记录不处理,使得整个处理能继续进行。

每个worker处理进程都有一个signal handler,能够捕获内存段异常和总线错误。在执行用户map或者reduce操做以前,MapReduce函数库经过全局变量保存记录序号。若是用户代码产生了这个信号,signal handler因而用”最后一口气”经过UDP包向master发送上次处理的最后一条记录的序号。当master看到在这个特定记录上,有不止一个失效的时候,他就标志着条记录须要被跳过,,而且在下次从新执行相关的Map或者Reduce任务的时候跳过这条记录。

4.7 本地执行

 

由于实际执行操做时分布在系统中执行的,一般是在好几千台计算机上执行得,而且是由master机器进行动态调度的任务,因此对map和reduce函数的调试就比较麻烦。为了可以让调试方便,profiling和小规模测试,咱们开发了一套MapReduce的本地实现,也就是说,MapReduce函数库在本地机器上顺序执行全部的MapReduce操做。用户能够控制执行,这样计算能够限制到特定的map任务上。用户能够经过设定特别的标志来执行他们的程序,同时也能够很容易的使用调试和测试工具(好比gdb)等等。

4.8 状态信息

 

master内部有一个HTTP服务器,而且能够输出状态报告。状态页提供了计算的进度报告,好比有多少任务已经完成,有多少任务正在处理,输入的字节数,中间数据的字节数,输出的字节数,处理百分比,等等。这些页面也包括了指向每一个任务输出的标准错误和输出的标准文件的链接。用户能够根据这些数据来预测计算须要大约执行多长时间,是否须要为这个计算增长额外的计算资源。这些页面也能够用来分析为什么计算执行的会比预期的慢。

此外,最上层的状态页面也显示了哪些worker失效了,以及他们失效的时候上面运行的map和reduce任务。这些信息对于调试用户代码中的bug颇有帮助。

4.9 计数器

 

MapReduce函数库提供了用于统计不一样事件发生次数的计数器。好比,用户可能想统计全部已经索引的German文档数量或者已经处理了多少单词的数量,等等。

为了使用这样的特性,用户代码建立一个叫作counter的对象,而且在map和reduce函数中在适当的时候增长counter的值。例如:

 

Counter* uppercase;

uppercase = GetCounter("uppercase");

 

map(String name, String contents):

       for each word w in contents:

              if (IsCapitalized(w)):

                     uppercase->Increment();

              EmitIntermediate(w, "1");

 

 

 

这些counter的值,会定时从各个单独的worker机器上传递给master(经过ping的应答包传递)。master把执行成功的map或者reduce任务的counter值进行累计,而且当MapReduce操做完成以后,返回给用户代码。当前counter值也会显示在master的状态页面,这样人能够看到计算现场的进度。当累计counter的值的时候,master会检查是否有对同一个map或者reduce任务的相同累计,避免累计重复。(backup任务或者机器失效致使的从新执行map任务或者reduce任务或致使这个counter重复执行,因此须要检查,避免master进行重复统计)。

部分计数器的值是由MapReduce函数库进行自动维持的,好比已经处理的输入的key/value对的数量,或者输出的key/value键值对等等。

counter特性对于MapReduce操做的完整性检查很是有用。好比,在某些MapReduce操做中,用户程序须要确保输出的键值对精确的等于处理的输入键值对,或者处理得German文档数量是在处理的整个文档数量中属于合理范围内。

5 性能

在本节,咱们用在一个大型集群上运行的两个计算来衡量MapReduce的性能。一个计算用来在一个大概1TB的数据中查找特定的匹配串。另外一个计算排序大概1TB的数据。

这两个程序表明了大量的用MapReduce实现的真实的程序的主要类型-一类是对数据进行洗牌,另外一类是从海量数据集中抽取少部分的关心的数据。

5.1 集群配置

 

全部这些程序都是运行在一个大约有1800台机器的集群上。每台机器配置2个2G Intel Xeon支持超线程的处理器, 4GB内存,两个160GBIDE硬盘,一个千兆网卡。这些机器部署在一个由两层的,树形交换网络中,在最上层大概有100-200G的聚合贷款。全部这些机器都有相同的部署(对等部署),所以任意两点之间的来回时间小于1毫秒。

在4GB内存里,大概有1-1.5G用于运行在集群上的其余任务。这个程序是在周末下午执行的,这时候的CPU,磁盘和网络基本上属于空闲状态。

 

5.2 GREP

 

grep程序须要扫描大概10的10次方个由100个字节组成的记录,查找比较少见的3个字符的查找串(这个查找串在92,337个记录中存在)。输入的记录被拆分红大约64M一个的块(M=15000),整个输出方在一个文件中(R=1)。

 

 

 

图2表示了这个程序随时间的处理过程。Y轴是输入数据的处理速度。处理速度逐渐随着参与MapReduce计算的机器增长而增长,当1764台worker开始工做的时候,达到了30G/s的速度。当map任务结束的时候,在计算开始后80秒,输入的速度降到0。整个计算过程从开始到结束一共花了大概150秒。这包括了大约一分钟的开头启动部分。开头的部分是用来把这个程序传播到各个worker机器上的时间,而且等待GFS系统打开100个输入文件集合而且得到相关的文件位置优化信息。

 

5.3 SORT排序

 

SORT程序排序10的10次方个100个字节组成的记录(大概1TB的数据)。这个程序是仿制TeraSort benchmark[10]的。

sort程序是由不到50行用户代码组成。三行的map函数从文本行中解出10个字节的排序key,而且把这个key和原始行做为中间结果key/value键值对输出。咱们使用了一个内嵌的identitiy函数做为reduce的操做。这个函数把中间结果key/value键值对不变的做为输出的key/value键值对。最终排序输出写到一个两路复制的GFS文件中(就是说,程序的输出会写2TB的数据)。

就像前边讲的,输入数据分红64MB每块(M=15000)。咱们把排序后的输出分区成为4000个文件(R=4000)。分区函数使用key的原始字节来吧数据分区到R个小块中。

咱们这个benchmark中的分区函数自身知道key的分区状况。一般对于排序程序来讲,咱们会增长一个预处理的MapReduce操做,这个操做用于采样key的状况,而且用这个采样的key的分布状况来计算对最终排序处理得分区点。

 

 

 

图三是这个排序程序的正常执行过程。左上的图表示了输入数据读取的速度。数据读取速度会达到13G/s,而且在不到200秒全部map任务完成以后迅速滑落到0。咱们注意到数据读取速度小于grep粒子。这是由于排序map任务划了大概一半时间和I/O带宽写入中间输出到本地硬盘。相对应的grep中间结果输出几乎能够忽略不计。

左边中间的图是map任务把中间数据发送到reduce任务的网络速度。这个排序过程自从第一个任务完成以后就开始了。图示上的第一个高峰是启动了第一批大概1700个reduce任务(整个MapReduce分布到大概1700台机器上,每台机器一次大概执行1个reduce任务)。大概计算开始300秒之后,这些第一批reduce任务完成了,而且咱们开始执行剩下的reduce任务。全部这些排序任务会在计算开始后大概600秒结束。

左下的图表示reduce任务把排序后的数据写到最终的输出文件的速度。在第一个排序期结束后到写盘开始以前有一个小延时,这是由于机器正在忙于内部排序中间数据。写盘速度持续大概2-4G/s。在计算开始后大概850秒左右写盘完成。包括启动部分,整个计算用了891秒。这个和TeraSort benchmark[18]的最高纪录1057秒差很少。

须要注意的事情是:输入速度要比排序速度和输出速度快,这是由于咱们本地化的优化策略,绝大部分数据都是从本地硬盘读取而上去了咱们相关的网络消耗。排序速度比输出速度快,这是由于输出阶段写了两份排序后的速度(咱们写两份的缘由是为了可靠性可可用性的缘由)。咱们写两份的缘由是由于底层文件系统的可靠性和可用性的要求。若是底层文件系统用相似容错编码[14](erasure coding)的方式,而不采用复制写的方式,在写盘阶段能够下降网络带宽的要求。

5.4 高效的backup任务

 

在图三(b),是咱们在关闭掉backup任务的时候,sort程序的执行状况。执行流和上边讲述的图3(a)很相似,可是这个关闭掉backup任务的时候,执行的尾巴很长,而且执行的尾巴没有什么有效的写盘动做。在960秒之后,除了5个reduce之外,其余reduce任务都已经完成。不过这些拖后腿的任务又执行了300秒才完成。整个计算化了1283秒,多了44%的执行时间。

 

 

5.5 失效的机器

 

在图三(c)中,咱们演示了在sort程序执行过程当中故意暂时杀掉1746个worker中的200个worker进程的执行状况。底层的集群调度马上在这些机器上从新建立了新的worker处理(由于咱们只是把这些机器上的处理进程杀掉,而机器依旧是能够操做的)。

由于已经完成的map work丢失了(因为相关的map worker被杀掉了),须要从新再做,因此worker死掉会致使一个负数的输入速率。相关map任务的从新执行很快就从新执行了。整个计算过程在933秒内完成,包括了前边的启动时间(只比正常执行时间多了5%的时间)。

6 经验

咱们在2003年1月写了第一个版本的MapReduce函数库,而且在2003年8月做了显著的加强,包括了本地优化,worker机器之间的动态负载均衡等等。自那之后,MapReduce函数库就普遍用于咱们平常处理的问题。它如今在Google内部各个领域内普遍应用,包括:

 

。大尺度的计算机学习问题。

。Google News和Froogle产品的集群问题。

。从公众查询产品(好比Google的Zeitgeist)的报告中抽取数据。

。从web网页做新试验和抽取新的产品(例如,从大量的webpage中的本地查找抽取物理位置信息)。

。大尺度的图型计算。

 

 

 

 

 

任务数

平均任务完成时间

使用的机器时间

29423

634秒

79,186天

读取的输入数据

产生的中间数据

写出的输出数据

3,288TB

758TB

193TB

每一个job平均worker机器数

每一个job平均死掉work数

每一个job平均map任务

每一个job平均reduce任务

157

1.2

3,351

55

map惟一实现

reduce的惟一实现

map/reduce的combiner实现

395

296

426

表1:MapReduce2004年8月的执行状况

 

图四显示了咱们的源代码管理系统中,随着时间推移,MapReduce程序的显著增长,从2003年早先时候的0个增加到2004年9月份的差很少900个不一样的程序。MapReduce之因此这样成功是由于他可以在不到半小时时间内写出一个简单的可以应用于上千台机器的大规模并发程序,而且极大的提升了开发和原形设计的周期效率。而且,他可让一个彻底没有分布式和/或并行系统经验的程序员,可以很容易的开发处理海量数据的程序。

在每个任务结束的时候,MapReduce函数库记录使用的计算资源的状态。在表1,咱们列出了2004年8月份MapReduce运行的任务所占用的相关资源。

6.1 大尺度的索引

 

到目前为止,最成功的MapReduce的应用就是重写了Google web 搜索服务所使用到的index系统。索引系统处理蠕虫系统抓回来的超大量的数据,这些数据保存在GFS文件里。普通这些文档的大小是超过了20TB的数据。索引程序是经过一系列的,大概5到10次MapReduce操做来创建索引。经过利用MapReduce(替换掉上一个版本的特别设计的分布处理的索引程序版本)有这样一些好处:

 

l  索引代码很简单,很小,很容易理解。由于对于容错的处理代码,分布以及并行处理代码都经过MapReduce函数库封装了,因此索引代码很简单,很小,很容易理解。例如,当使用MapReduce函数库的时候,计算的代码行数从原来的3800行C++代码一下减小到大概700行代码。

l  MapReduce的函数库的性能已经很是好,因此咱们能够把概念上不相关的计算步骤分开处理,而不是混在一块儿以期减小处理次数。这使得咱们容易改变索引处理方式。好比,咱们对老索引系统的一个小更改可能要好几个月的时间,可是在新系统内,只须要花几天时间就能够了。

l  索引系统的操做更容易了,这是由于机器的失效,速度慢的机器,以及网络风暴都已经由MapReduce本身解决了,而不须要操做人员的交互。此外,咱们能够简单的经过对索引系统增长机器的方式提升处理性能。

 

7 相关工做

 

不少系统都提供了严格的编程模式,而且经过对编程的严格限制来实现自动的并行计算。例如,一个结合函数能够在一个N个元素的全部前缀上进行计算,而且使用并发前缀计算,会在在N个并发节点上会耗费log N的时间[6,9,13]。MapReduce是这些模式下的,一个咱们基于超大系统的现实经验的一个简化和精炼。而且,咱们还提供了基于上千台处理器的容错实现。而大部分并发处理系统都只在小规模的尺度上实现,而且机器的容错仍是程序员来操心的。

Bulk Synchronous Programming[17]以及一些MPI primitives[11]提供了更高级别的抽象,能够更容易写出并行处理的程序。这些系统和MapReduce系统的不一样之处在于,MapReduce是经过限制性编程模式自动实现用户程序的并发处理,而且提供了透明的容错处理。

咱们本地的优化策略是受active disks[12,15]等技术的影响的,在active disks中,计算任务是尽可能推送到数据在本地磁盘的节点处理,这样就减小了网络系统的I/O吞吐。咱们是在直接附带几个硬盘的通机器上执行咱们的计算工做,不是在磁盘处理器上执行咱们的工做,可是总的效果是同样的。

咱们的backup task机制和早先CharlotteSystem[3]的机制比较相似。早先的简单调度的一个缺点是若是一个任务致使反复失效,那么整个计算就不能完成。咱们经过在故障状况下跳过故障记录的方式,在某种程度上解决了这个问题。

MapReduce的实现依赖于一个内部的集群管理系统,这个集群管理系统负责在一个超大共享机器组上分布和运行用户任务。虽然这个不是本论文的重点,集群管理系统在理念上和Condor[16]等其余系统同样。

MapReduce函数库的排序部分和NOW-Sort[1]的操做上很相似。源机器(map workers)把待排序的数据进行分区,而且发送到R个reduce worker中的一个进行处理。每个reduce worker做本地排序(尽量在内存排序)。固然NOW-Sort没有刻意用户定义的Map和Reduce函数,而咱们的函数库有,因此咱们的函数库能够有很高的适应性。

River[2]提供了一个编程模式,在这样的编程模式下,处理进程能够经过分布式查询来互相传送数据的方式进行通信。和MapReduce相似,River系统尝试提供对不一样应用有近似平均的性能,即便在不对等的硬件环境下或者在系统颠簸的状况下也能提供近似平均的性能。River是经过精心调度硬盘和网络的通信,来平衡任务的完成时间。MapReduce的框架是经过限制性编程模式,来把问题分解成为大量的任务。每个任务都是动态调度到可用的worker上执行,这样快速的worker能够执行更多的任务。限制性编程模式一样容许咱们在接近计算完成的时候调度backup 任务,在出现处理不均匀的状况下,大量的缩小整个完成的时间(好比在有慢机或者阻塞的worker的时候)。

BAD-FS[5]和MapReduce的编程模式彻底不一样,它不像MapReduce是基于很大的网络计算的。不过,这两个系统有两个基本原理很相似。(1)两个系统都使用重复执行来防止因为失效致使的数据丢失。(2)两个都使用数据本地化调度策略,使得处理尽量在本地数据上进行,减小经过网络通信的数据量。

TACC[7]是一个用于简单构造高可用性网络服务的系统。就像MapReduce,它依靠从新执行机制来实现的容错处理。

8 结束语

MapReduce的编程模式在Google成功应用于许多方面。咱们把这种成功应用归结为几个方面:首先,这个编程模式易于使用,即便程序员没有并行或者分布式系统经验,因为MapReduce封装了并行的细节和容错处理,本地化计算,负载均衡等等,因此,使得编程很是容易。其次,大量不一样的问题均可以简单经过MapReduce来解决。例如,MapReduce用于产生Google的web搜索服务所须要的数据,用来排序,用来数据挖掘,用于机器智能学习,以及不少其余系统。第三,咱们已经在一个好几千台计算机的大型集群上开发实现了这个MapReduce。这个实现使得对于这些机器资源的利用很是简单,而且所以也适用于解决Google遇到的其余不少须要大量计算的问题。

咱们也从MapReduce上学到了很多内容。首先,先执行编程模式使得并行和分布式计算很是容易,而且也易于构造这样的容错计算环境。其次,网络带宽是系统的资源的瓶颈。咱们系统的一系列优化都使所以针对减小网络传输量为目的的:本地优化使得咱们读取数据时,是从本地磁盘读取的,而且写出单个中间数据文件到本地磁盘也节约了网络带宽。第三,冗余执行能够减小慢机器带来的影响,而且解决因为机器失效致使的数据丢失问题。

 

9 感谢

Josh Levenberg校定和扩展了用户级别的MapReduce API,而且结合他的适用经验和其余人的改进建议,增长了不少新的功能。MapReduce使用Google文件系统GFS[8]来做为数据和输出。咱们还感谢Percy Liang Olcan Sercinoglu 在开发用于MapReduce的集群管理系统得工做。Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach 为本论文提出了宝贵的意见。OSDI的无名审阅者,以及咱们的审核者Eric Brewer,在论文应当如何改进方面给出了有益的意见。最后,咱们感谢Google的工程部的全部MapReduce的用户,感谢他们提供了有用的反馈,以及建议,以及错误报告等等。

 

10 参考资料

[1]         Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E. Culler, Joseph M. Hellerstein, and David A. Patterson.High-performance sorting on networks of workstations.In Proceedings of the 1997 ACM SIGMOD InternationalConference on Management of Data, Tucson,Arizona, May 1997.

[2]         Remzi H. Arpaci-Dusseau, Eric Anderson, NoahTreuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River:Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS '99), pages 10.22, Atlanta, Georgia, May 1999.

[3]         Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996. [4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22.28, April 2003.

[5]          John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004.

[6]         Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November 1989.

[7]         Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78. 91, Saint-Malo, France, 1997.

[8]         Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29.43, Lake George, New York, 2003. To appear in OSDI 2004 12

[9]         S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par'96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401.408. Springer-Verlag, 1996.

[10]        Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/.

[11]       William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999.

[12]       L. Huston, R. Sukthankar, R.Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004.

[13]       Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831.838, 1980. [14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335.348, 1989.

[15]       Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68.74, June 2001.

[16]       Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004.

[17]       L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103.111, 1997.

[18]       Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.

 

A 单词频率统计

本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中,每个不一样的单词出现频率。

#include "mapreduce/mapreduce.h"

 

// User's map function

class WordCounter : public Mapper {

       public:

              virtual void Map(const MapInput& input) {

                     const string& text = input.value();

                     const int n = text.size();

                     for (int i = 0; i < n; ) {

                            // Skip past leading whitespace

                            while ((i < n) && isspace(text[i]))

                                   i++;

 

                     // Find word end

                     int start = i;

                     while ((i < n) && !isspace(text[i]))

                            i++;

                     if (start < i)

                            Emit(text.substr(start,i-start),"1");

              }

       }

};

 

REGISTER_MAPPER(WordCounter);

 

// User's reduce function

class Adder : public Reducer {

       virtual void Reduce(ReduceInput* input) {

              // Iterate over all entries with the

              // same key and add the values

              int64 value = 0;

              while (!input->done()) {

                     value += StringToInt(input->value());

                     input->NextValue();

              }

 

              // Emit sum for input->key()

              Emit(IntToString(value));

       }

};

 

REGISTER_REDUCER(Adder);

 

int main(int argc, char** argv) {

       ParseCommandLineFlags(argc, argv);

      

       MapReduceSpecification spec;

      

       // Store list of input files into "spec"

       for (int i = 1; i < argc; i++) {

              MapReduceInput* input = spec.add_input();

              input->set_format("text");

              input->set_filepattern(argv[i]);

              input->set_mapper_class("WordCounter");

       }

 

       // Specify the output files:

       // /gfs/test/freq-00000-of-00100

       // /gfs/test/freq-00001-of-00100

       // ...

       MapReduceOutput* out = spec.output();

       out->set_filebase("/gfs/test/freq");

       out->set_num_tasks(100);

       out->set_format("text");

       out->set_reducer_class("Adder");

      

       // Optional: do partial sums within map

       // tasks to save network bandwidth

       out->set_combiner_class("Adder");

 

       // Tuning parameters: use at most 2000

       // machines and 100 MB of memory per task

       spec.set_machines(2000);

       spec.set_map_megabytes(100);

       spec.set_reduce_megabytes(100);

      

       // Now run it

       MapReduceResult result;

       if (!MapReduce(spec, &result)) abort();

      

       // Done: 'result' structure contains info

       // about counters, time taken, number of

       // machines used, etc.

       return 0;

}

 

 

 

 

 

 

 

 

B 译者

 

崮山路上走9遍2005-8-8于大连完稿

BLOG: sharp838.mblogger.cn

EMAIL: sharp838@21cn.comguangweishi@gmail.com

 

全部的版权归于原做者。

 

感谢:朱朱,洋洋,sophia

相关文章
相关标签/搜索