MapReduce:大型集群上的简单数据处理

MapReduce:大型集群上的简单数据处理程序员

摘要web

MapReduce是一个设计模型,也是一个处理和产生海量数据的一个相关实现。用户指定一个用于处理一个键值(key-value)对生成一组key/value对形式的中间结果的map函数,以及一个将中间结果键相同的键值对合并到一块儿的reduce函数。许多现实世界的任务都能知足这个模型,如这篇文章所示。数据库

使用这个功能形式实现的程序可以在大量的普通机器上并行执行。这个运行程序的系统关心下面的这些细节:输入数据的分区、一组机器上调度程序执行、处理机器失败问题,以及管理所需的机器内部的通讯。这使没有任何并行处理和分布式系统经验的程序员可以利用这个大型分布式系统的资源。编程

咱们的MapReduce实现运行在一个由普通机器组成的大规模集群上,具备很高的可扩展性:一个典型的MapReduce计算会在几千台机器上处理许多TB的数据。程序员们发现这个系统很容易使用:目前已经实现了几百个MapReduce程序,在Google的集群上,天天有超过一千个的MapReduce工做在运行。数组

1、        介绍缓存

在过去的5年中,本文做者和许多Google的程序员已经实现了数百个特定用途的计算程序,处理了海量的原始数据,包括抓取到的文档、网页请求日志等,计算各类衍生出来的数据,如反向索引、网页文档的图形结构的各类表示、每一个host下抓取到的页面数量的总计、一个给定日期内的最频繁查询的集合等。大多数这种计算概念明确。然而,输入数据一般都很大,而且计算必须分布到数百或数千台机器上以确保在一个合理的时间内完成。如何并行计算、分布数据、处理错误等问题使这个起初很简单的计算,因为增长了处理这些问题的不少代码而变得十分复杂。网络

为了解决这个复杂问题,咱们设计了一个新的抽象模型,它容许咱们将想要执行的计算简单的表示出来,而隐藏其中并行计算、容错、数据分布和负载均衡等很麻烦的细节。咱们的抽象概念是受最先出如今lisp和其它结构性语言中的map和reduce启发的。咱们认识到,大多数的计算包含对每一个在输入数据中的逻辑记录执行一个map操做以获取一组中间key/value对,而后对含有相同key的全部中间值执行一个reduce操做,以此适当的合并以前的衍生数据。由用户指定map和reduce操做的功能模型容许咱们可以简单的进行并行海量计算,并使用re-execution做为主要的容错机制。数据结构

这项工做的最大贡献是提供了一个简单的、强大的接口,使咱们可以自动的进行并行和分布式的大规模计算,经过在由普通PC组成的大规模集群上实现高性能的接口来进行合并。负载均衡

第二章描述了基本的编程模型,并给出了几个例子。第三章描述了一个为咱们的聚类计算环境定制的MapReduce接口实现。第四章描述了咱们发现对程序模型颇有用的几个优化。第六章探索了MapReduce在Google内部的使用,包括咱们在将它做为生产索引系统重写的基础的一些经验。第七章讨论了相关的和将来的工做。框架

2、        编程模型

这个计算输入一个key/value对集合,产生一组输出key/value对。MapReduce库的用户经过两个函数来标识这个计算:Map和Reduce。

Map,由用户编写,接收一个输入对,产生一组中间key/value对。MapReduce库将具备相同中间key I的聚合到一块儿,而后将它们发送给Reduce函数。

Reduce,也是由用户编写的,接收中间key I和这个key的值的集合,将这些值合并起来,造成一个尽量小的集合。一般,每一个Reduce调用只产生0或1个输出值。这些中间值通过一个迭代器(iterator)提供给用户的reduce函数。这容许咱们能够处理因为数据量过大而没法载入内存的值的链表。

2.1 例子

考虑一个海量文件集中的每一个单词出现次数的问题,用户会写出相似于下面的伪码:

 

Map函数对每一个单词增长一个相应的出现次数(在这个例子中仅仅为“1”)。Reduce函数将一个指定单词全部的计数加到一块儿。

此外,用户使用输入和输出文件的名字、可选的调节参数编写代码,来填充一个mapreduce规格对象,而后调用MapReduce函数,并把这个对象传给它。用户的代码与MapReduce库(C++实现)链接到一块儿。。附录A包含了这个例子的整个程序。

2.2 类型

尽管以前的伪代码中使用了字符串格式的输入和输出,可是在概念上,用户定义的map和reduce函数须要相关联的类型:

map       (k1, v1)                      -->         list(k2, v2)

reduce   (k2, list(v2))                -->          list(v2)

也就是说,输入的键和值和输出的键和值来自不一样的域。此外,中间结果的键和值与输出的键和值有相同的域。

MapReduce的C++实现与用户定义的函数使用字符串类型进行参数传递,将类型转换的工做留给用户的代码来处理。

2.3 更多的例子

这里有几个简单有趣的程序,可以使用MapReduce计算简单的表示出来。

分布式字符串查找(Distributed Grep):map函数将匹配一个模式的行找出来。Reduce函数是一个恒等函数,只是将中间值拷贝到输出上。

URL访问频率计数(Count of URL Access Frequency):map函数处理web页面请求的日志,并输出<URL, 1>。Reduce函数将相同URL的值累加到一块儿,生成一个<URL, total count>对。

翻转网页链接图(Reverse Web-Link Graph):map函数为在一个名为source的页面中指向目标(target)URL的每一个连接输出<target, source>对。Reduce函数将一个给定目标URL相关的全部源(source)URLs链接成一个链表,并生成对:<target, list(source)>。

主机关键向量指标(Term-Vector per Host):一个检索词向量将出如今一个文档或是一组文档中最重要的单词概述为一个<word, frequency>对链表。Map函数为每一个输入文档产生一个<hostname, term vector>(hostname来自文档中的URL)。Reduce函数接收一个给定hostname的全部文档检索词向量,它将这些向量累加到一块儿,将罕见的向量丢掉,而后生成一个最终的<hostname, term vector>对。

倒排索引(Inverted Index):map函数解析每一个文档,并生成一个<word, document ID>序列。Reduce函数接收一个给定单词的全部键值对,全部的输出对造成一个简单的倒排索引。能够经过对计算的修改来保持对单词位置的追踪。

分布式排序(Distributed Sort):map函数将每一个记录的key抽取出来,并生成一个<key, record>对。Reduce函数不会改变任何的键值对。这个计算依赖了在4.1节提到的分区功能和4.2节提到的排序属性。

3、        实现

MapReduce接口有不少不一样的实现,须要根据环境来作出合适的选择。好比,一个实现可能适用于一个小的共享内存机器,而另外一个实现则适合一个大的NUMA多处理器机器,再另外一个可能适合一个更大的网络机器集合。

这一章主要描述了针对在Google内部普遍使用的计算环境的一个实现:经过交换以太网将大量的普通PC链接到一块儿的集群。在咱们的环境中:

(1)    机器一般是双核x86处理器、运行Linux操做系统、有2-4G的内存。

(2)    使用普通的网络硬件—一般是100Mb/s或者是1Gb/s的机器带宽,可是平均值远小于带宽的一半。

(3)    由数百台或者数千台机器组成的集群,所以机器故障是很日常的事

(4)    存储是由直接装在不一样机器上的便宜的IDE磁盘提供。一个内部的分布式文件系统用来管理存储这些磁盘上的数据。文件系统在不可靠的硬件上使用副本机制提供了可用性和可靠性。

(5)    用户将工做提交给一个调度系统,每一个工做由一个任务集组成,经过调度者映射到集群中可用机器的集合上。

3.1 执行概述

经过自动的将输入数据分区成M个分片,Map调用被分配到多台机器上运行。数据的分片可以在不一样的机器上并行处理。使用分区函数(如,hash(key) mod R)将中间结果的key进行分区成R个分片,Reduce调用也被分配到多台机器上运行。分区的数量(R)和分区函数是由用户指定的。

 

图1:执行概述

图1中显示了咱们实现的一个MapReduce操做的整个流程。当用户程序调用MapReduce函数时,下面一系列的行为将会发生(图1中所使用的数字标识将与下面列表中的相对应):

1. 用户程序中的MapReduce库会先将输入文件分割成M个一般为16MB-64MB大小的片(用户能够经过可选参数进行控制)。而后它将在一个集群的机器上启动许多程序的拷贝。

2. 这些程序拷贝中的一个是比较特殊的——master。其它的拷贝都是工做进程,是由master来分配工做的。有M个map任务和R个reduce任务被分配。Master挑选出空闲的工做进程,并把一个map任务或reduce任务分配到这个进程上。

3. 一个分配了map任务的工做进程读取相关输入分片的内容,它将从输入数据中解析出key/value对,并将其传递给用户定义的Map函数。Map函数生成的中间key/value对缓存在内存中。

4. 缓存中的键值对周期性的写入到本地磁盘,并经过分区函数分割为R个区域。将这些缓存在磁盘上的键值对的位置信息传回给master,master负责将这些位置信息传输给reduce工做进程。

5. 当一个reduce工做进程接收到master关于位置信息的通知时,它将使用远程调用函数(RPC)从map工做进程的磁盘上读取缓存的数据。当reduce工做进程读取完全部的中间数据后,它将全部的中间数据按中间key进行排序,以保证相同key的数据聚合在一块儿。这个排序是须要的,由于一般许多不一样的key映射到相同的reduce任务上。若是中间数据的总量太大而没法载入到内存中,则须要进行外部排序。

6. reduce工做进程迭代的访问已排序的中间数据,而且对遇到的每一个不一样的中间key,它会将key和相关的中间values传递给用户的Reduce函数。Reduce函数的输出追加到当前reduce分区一个最终的输出文件上。

7. 当全部的map任务和reduce任务完成后,master会唤醒用户程序。这时候,用户程序中的MapReduce调用会返回到用户代码上。

在成功完成后,MapReduce操做输出到R个输出文件(每一个reduce任务生成一个,文件名是由用户指定的)中的结果是有效的。一般,用户不须要合并这R个输出文件,它们常常会将这些文件做为输入传递给另外一个MapReduce调用,或者在另外一个处理这些输入分区成多个文件的分布式应用中使用。

3.2 Master数据结构

Master保留了几个数据结构。对于每一个Map和Reduce任务,它存储了它们的状态(idle、in-progress或者completed),以及工做进程机器的特性(对于非空闲任务)。

Master是中间文件区域的位置信息从map任务传送到reduce任务的一个通道。所以,对于每一个完成的map任务来讲,master存储了map任务产生的R个中间文件区域的位置信息和大小。在map任务完成时,master会接收到更新这个含有位置信息和大小信息的消息。信息被增量的传输到运行in-progress的reduce任务的工做进程上。

3.3 容错

由于MapReduce库是被设计成运行在数百或数千台机器上帮助处理海量数据的,因此这个库必须可以优雅的处理机器故障。

工做进程故障

Master周期性的ping每一个工做进程,若是在一个特定的时间内没有收到响应,则master会将这个工做进程标记为失效。任何由失效的工做进程完成的map任务都被标记为初始idle状态,所以这个map任务会被从新分配给其它的工做进程。一样的,任何正在处理的map任务或reduce任务也会被置为idle状态,进而能够被从新调度。

在一个失效的节点上完成的map任务会被从新执行,由于它们的输出被存放在失效机器的本地磁盘上,而磁盘不可访问。完成的reduce任务不须要从新执行,由于它们的输出被存储在全局文件系统上。

当一个map任务先被工做进程A执行,而后再被工做进程B执行(由于A失效了),全部执行reduce任务的工做进程都会接收到从新执行的通知,任何没有从工做进程A上读取数据的reduce任务将会从工做进程B上读取数据。

MapReduce对于大规模工做进程失效有足够的弹性。好比,在一个MapReduce操做处理过程当中,网络维护形成了80台机器组成的集群几分钟内不可达。MapReduce的master会从新执行那些在不可达机器上完成的工做,并持续推动,最终完成MapReduce操做。

Master故障

将上面提到的master数据结构周期性的进行写检查点操做(checkpoint)是比较容易的。若是master任务死掉,一个新的拷贝会从最近的检查点状态上启动。然而,假定只有一个单独的master,它的故障是不大可能的。所以,若是master故障,咱们当前的实现是停止MapReduce计算。

当前故障的语义

当用户提供的map和reduce操做是输入肯定性函数,咱们的分布式实现与无端障序列执行整个程序所生成的结果相同。

咱们依靠map和reduce任务输出的原子性提交来实现这个属性。每一个in-progress任务将它们的输出写入到一个私有的临时文件中。一个reduce任务产生一个这样的文件,一个map任务产生R个这样的文件(每一个reduce任务一个)。当一个map任务完成时,它将发送给master一个消息,其中包括R个临时文件的名字。若是master收到一个已经完成的map任务的完成消息,则忽略这个消息。不然,它将这R个文件名记录在master的数据结构中。

当一个reduce任务完成后,reduce的工做进程自动的将临时文件改名为最终的输出文件,若是相同的reduce任务运行在多台机器上,会调用多个重命名操做将这些文件改名为最终的输出文件。

绝大部分的map和reduce操做是肯定性的,事实上,在这种状况下咱们的语义与一个序列化的执行是相同的,这使程序开发者可以简单的推出他们程序的行为。当map和/或reduce操做是不肯定性的时,咱们提供较弱但依然合理的语义。在不肯定性的操做面前,一个特定的reduce任务R1的输出与一个序列执行的不肯定性程序生成的输出相同。然而,一个不一样的reduce任务R2的输出可能与一个不一样的序列执行的不肯定性程序生成的输出可能一致。

考虑map任务M和reduce任务R1和R2。假定e(Ri)是提交的Ri的执行过程(有且仅有这样一个过程)。e(R1)可能从M的一个执行生成的输出中读取数据,e(R2)可能从M的一个不一样执行生成的输出中读取数据,则会产生较弱的语义。

3.4 位置

在咱们的计算环境中,网络带宽是一个相对不足的资源。咱们经过将输入数据存放在组成集群的机器的本地磁盘来节省网络带宽。GFS将每一个文件分割成64MB大小的块,每一个块会在不一样的机器上存储几个拷贝(一般为3个)。MapReduce master会考虑文件的位置信息,并试图将一个map任务分配到包含相关输入数据副本的机器上。若是这样作失败,它会试图将map任务调度到一个包含任务输入数据的临近的机器上(例如,与包含输入数据机器在同一个网络下进行交互的一个工做进程)。当在集群的一个有效部分上运行大规模的MapReduce操做时,大多数输入数据都从本地读取,不消耗任何网络带宽。

3.5 任务粒度

根据上面所提到的,咱们将map阶段细分为M个片,将reduce阶段细分为R个片。理想状况下,M和R应该比工做机器的数量大得多,每一个工做进程执行不少不一样的任务来促使负载均衡,在一个工做进程失效时也可以快速的恢复:许多完成的map任务能够传播到其它全部的工做机器上。

在咱们的实现中,对于取多大的M和R有一个实际的界限,由于如上面提到的那样,master必须进行O(M+R)次调度,在内存中保持O(M*R)个状态。(对内存使用的恒定因素影响较小,然而:对由每一个map任务/reduce任务对占用大约一个字节所组成的O(M*R)片的状态影响较大。)

此外,R常常是由用户约束的,由于每一个reduce任务的输出最终放在一个分开的输出文件中。实际中,咱们倾向选择M值,以使每个独立的任务可以处理大约16MB到64MB的输入数据(可使上面提到的位置优化有更好的效果),把R值设置为咱们想使用的工做机器的一个小的倍数。咱们常用2000个工做机器,设置M=200000和R=5000,来执行MapReduce计算。

3.6 备用任务

影响一个MapReduce操做总体执行时间的一个一般因素是“落后者”:一个使用了异常的时间完成了计算中最后几个map任务或reduce任务中的一个的机器。可能有不少因素致使落后者的出现,例如,一个含有损坏磁盘的机器频繁的处理可校订的错误,使它的读取速度从30MB/s降低到了1MB/s。集群调度者可能将其它的任务分配到这个机器上,因为CPU、内存、磁盘或网络带宽的竞争会致使MapReduce代码执行的更慢。咱们遇到的最近一个问题是机器初始化代码中的一个bug,它会使处理器的缓存不可用:受到这个问题影响的机器会慢上百倍。

咱们使用一个普通的机制来缓解落后者问题。当一个MapReduce操做接近完成时,master调度备用(backup)任务执行剩下的、处于in-process状态的任务。一旦主任务或是备用任务完成,则将这个任务标识为已经完成。咱们优化了这个机制,使它一般可以仅仅增长少许的操做所使用的计算资源。咱们发现这能有效的减小完成大规模MapReduce操做所须要的时间。做为一个例子,5.3节所描述的那种程序在禁用备用任务机制的状况下,会须要多消耗44%的时间。

4、        细化

尽管简单的编写Map和Reduce函数提供的基本功能足够知足大多数须要,可是,咱们发现一些扩展是颇有用的。这会在本章进行描述。

4.1 分区函数

MapReduce的用户指定所但愿的reduce任务/输出文件的数量(R)。使用分区函数在中间键上将数据分区到这些任务上。一个默认的分区函数使用hash方法(如“hash(key) mod R”),它能产生至关平衡的分区。然而,在一些状况下,须要使用其它的在key上的分区函数对数据进行分区。为了支持这种状况,MapReduce库的用户可以提供指定的分区函数。例如,使用“hash(Hostname(urlkey)) mod R”做为分区函数,使全部来自同一个host的URL最终放到同一个输出文件中。

4.2 顺序保证

咱们保证在一个给定的分区内,中间key/value对是根据key值顺序增量处理的。顺序保证可使它易于生成一个有序的输出文件,这对于输出文件须要支持有效的随机访问,或者输出的用户方便的查找排序的数据颇有帮助。

4.3 组合(Combiner)函数

在一些状况下,每一个map任务产生的中间key会有不少重复,而且用户指定的reduce函数知足结合律和交换律。2.1节中提到的单词技术的例子就是一个很好的例子。由于单词频率倾向于zifp分布,每一个map任务都会产生数百或数千个<the, 1>形式的记录。全部这些计数都会经过网络发送给一个单独的reduce任务,而后经过Reduce函数进行累加并产生一个数字。咱们容许用户指定一个可选的Combiner函数,它能在数据经过网络发送前先对这些数据进行局部合并。

Combiner函数在每台执行map任务的机器上执行。一般状况下,combiner函数和reduce函数的代码是相同的,二者惟一不一样的是MapReduce库如何处理函数的输出。Reduce函数的输出被写入到一个最终的输出文件中,而combiner函数会写入到一个将被发送给reduce函数的中间文件中。

局部合并能够有效的对某类MapReduce操做进行加速。附录A包含了一个使用combiner函数的例子。

4.4 输入和输出类型

MapReduce库支持几种不一样格式的输入数据。好比,“text”模式的输入能够讲每一行看出一个key/value对:key是该行在文件中的偏移量,value是该行的内容。另外一中常见的支持格式是根据key进行排序存储一个key/value对的序列。每种输入类型的实现知道如何将本身分割成对map任务处理有意义的区间(例如,text模式区间分割确保区间分割只在行的边界进行)。用户可以经过实现一个简单的读取(reader)接口来增长支持一种新的输入类型,尽管大多数用户仅仅使用了预约义输入类型中的一小部分。

Reader并非必须从文件中读取数据,好比,咱们能够容易的定义一个从数据库中读取记录,或者从内存的数据结构中读取数据的Reader。

相似的,咱们提供一组输出类型来产生不一样格式的数据,用户也能够简单的经过代码增长对新输出类型的支持。

4.5 反作用

在一些状况下,MapReduce的用户发现为它们的map和/或reduce操做的输出生成辅助的文件很方便。咱们依靠应用的writer将这个反作用变成原子的和幂等的。一般,应用会将结果写入到一个临时文件,而后在数据彻底生成后,原子的重命名这个文件。

若是一个单独任务产生的多个输出文件,咱们没有提供两阶段提交的原子操做。所以,产生多个输出文件且对交叉文件有一致性需求的任务应该是肯定性的操做。可是在实际工做中,这个限制并非一个问题。

4.6 跳过损坏的记录

有时,在咱们的代码中会存在一些bug,它们会致使Map或Reduce函数在处理特定的记录上必定会Crash。这样的bug会阻止MapReduce操做顺利完成。一般的作法是解决这个bug,但有时,这是不可行的;多是因为第三方的库中的bug,而咱们没有这个库的源码。有时,忽略一些记录也是能够接受的,例如,当在海量的数据集上作数据统计时。咱们提供了一个可选的运行模式,MapReduce库探测出哪些记录会致使肯定性的Crash,并跳过这些记录以继续执行这个程序。

每一个工做进程都安装了一个信号处理器,它能捕获段错误和总线错误。在调用用户的Map或Reduce操做以前,MapReduce库将记录的序号存储到全局变量中。若是用户代码产生一个信号,这个信号处理器会向MapReudce master发送一个“临死前”的UDP包,其中包含了这个序号。当master看到对于一个特定的记录有多个失败信号时,在相应的Map或Reduce任务下一次从新执行时,master会通知它跳过这个记录。

4.7 本地执行

在Map或Reduce函数中调试问题是很棘手的,由于实际的计算是发生在一个分布式系统上的,一般有几千台机器,而且是由master动态分配的。为了有助于调试、性能分析和小规模测试,咱们开发了一个MapReduce库可供选择的实现,它将在本地机器上序列化的执行一个MapReduce的全部工做。这为用户提供了对MapReduce操做的控制,使计算能被限制在一个特定的map任务上。用户使用标记调用他们的程序,并可以简单的使用它们找到的任何调试或测试工具(如,gdb)。

4.8 状态信息

Master运行了一个内部的HTTP服务,并显示出状态集页面供人们查看,如,有多少任务已经完成、有多少正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理速率等。这些页面也包含了指向每一个任务生成的标准错误和标准输出文件的连接。用户能使用这些数据预测这个计算将要持续多长时间,以及是否应该向这个计算添加更多的资源。这些页面也有助于找出计算比预期执行慢的多的缘由。

此外,顶层的状态页显示了哪些工做进程失效,哪些map和reduce任务在处理时失败。这个信息对试图诊断出用户代码中的bug颇有用。

4.9 计数器

MapReduce库提供了一个计数器,用于统计不一样事件的发生次数。好比,用户代码想要统计已经处理了多少单词,或者已经对多少德国的文档创建了索引等。

用户代码可使用这个计数器建立一个命名的计数器对象,而后在Map和/或Reduce函数中适当的增长这个计数器的计数。例如:

 

独立的工做机器的计数器值周期性的传送到master(附在ping的响应上)master将从成功的map和reduce任务上获取的计数器值进行汇总,当MapReduce操做完成时,将它们返回给用户的代码。当前的计数器值也被显示在了master的状态页面上,令人们可以看到当前计算的进度。当汇总计数器值时,master经过去掉同一个map或reduce任务的屡次执行所形成的影响来防止重复计数。(重复执行可能会在咱们使用备用任务和从新执行失败的任务时出现。)

一些计数器的值是由MapReduce库自动维护的,如已处理的输入key/value对的数量和已生成的输出key/value对的数量。

用户发现计数器对检查MapReduce操做的行为颇有用处。例如,在一些MapReduce操做中,用户代码可能想要确保生成的输出对的数量是否精确的等于已处理的输入对的数量,或者已处理的德国的文档数量在已处理的全部文档数量中是否被容忍。

5、        性能

在这章中,咱们测试两个运行在一个大规模集群上的MapReduce计算的性能。一个计算在大约1TB的数据中进行特定的模式匹配,另外一个计算对大约1TB的数据进行排序。

这两个程序可以表明实际中大量的由用户编写的MapReduce程序,一类程序将数据从一种表示方式转换成另外一种形式;另外一类程序是从海里的数据集中抽取一小部分感兴趣的数据。

5.1 集群配置

全部的程序运行在一个由将近1800台机器组成的集群上。每一个机器有两个2GHz、支持超线程的Intel Xeon处理器、4GB的内存、两个160GB的IDE磁盘和一个1Gbps的以太网链路,这些机器部署在一个两层的树状交换网络中,在根节点处有大约100-200Gbps的带宽。全部的机器都采用相同的部署,所以任意两个机器间的RTT都小于1ms。

在4GB内存里,有接近1-1.5GB用于运行在集群上的其它任务。程序在一个周末的下午开始执行,这时主机的CPU、磁盘和网络基本都是空闲的。

5.2 字符串查找(Grep)

这个grep程序扫描了大概1010个100字节大小的记录,查找出现几率相对较小的3个字符的模式(这个模式出如今92337个记录中)。输入被分割成接近64MB的片(M=15000),整个输出被放到一个文件中(R=1)。

 

图2:数据传输速率

图2显示了计算随时间的进展状况。Y轴显示了输入数据的扫描速率,这个速率会随着MapReduce计算的机器数量的增加而增加,当1764个工做进程参与计算时,总的速率超过30GB/s。随着map任务的完成,速率开始降低,并在计算的大约第80秒变为0,整个计算从开始到结束大约持续了150秒,这包含了大约1分钟的启动时间开销,这个开销是由将程序传播到全部工做机器的时间、等待GFS文件系统打开1000个输入文件集的时间和获取位置优化所需信息的时间形成的。

5.3 排序

排序程序对1010个100字节大小的记录(接近1TB的数据)进行排序,这个程序模仿了TeraSort benchmark。

排序程序由不到50行的用户代码组成,一个三行的Map函数从一个文本行中抽取出一个10字节的key,并将这个key和原始的文本行做为中间的key/value对进行输出。咱们使用内置的Identity函数做为Reduce操做。这个函数将中间key/value对不作任何修改的输出,最终排序结果输出到两路复制的GFS文件中(如,该程序输出了2TB的数据)。

如前所述,输入数据被分割为64MB大小的片(M=15000),将输出结果分红4000个文件(R=4000)。分区函数使用了key的开头字符将数据分隔到R片中的一个。

这个基准测试的分区函数内置了key的分区信息。在一个普通的排序程序中,咱们将增长一个预处理MapReduce操做,它可以对key进行抽样,经过key的抽样分布来计算最终排序处理的分割点。

 

图3:对于排序程序的不一样执行过程随时间的数据传输速率

图3(a)显示了排序程序的正常执行过程。左上方的图显示了输入读取的速率,这个速率峰值大约为13GB/s,由于全部的map任务执行完成,速率也在200秒前降低到了0。注意,这里的输入速率比字符串查找的要小,这是由于排序程序的map任务花费了大约一半的处理时间和I/O带宽将终结结果输出到它们的本地磁盘上,字符串查找相应的中间结果输出几乎能够忽略。

左边中间的图显示了数据经过网络从map任务发往reduce任务的速率。这个缓慢的数据移动在第一个map任务完成时会尽快开始。图中的第一个峰值是启动了第一批大概1700个reduce任务(整个MapReduce被分配到大约1700台机器上,每一个机器每次最多只执行一个reduce任务)。这个计算执行大概300秒后,第一批reduce任务中的一些执行完成,咱们开始执行剩下的reduce任务进行数据处理。全部的处理在计算开始后的大约600秒后完成。

左边下方的图显示了reduce任务就爱那个排序后的数据写到最终的输出文件的速率。在第一个处理周期完成到写入周期开始间有一个延迟,由于机器正在忙于对中间数据进行排序。写入的速率会在2-4GB/s上持续一段时间。全部的写操做会在计算开始后的大约850秒后完成。包括启动的开销,整个计算耗时891秒,这与TeraSort benchmark中的最好记录1057秒类似。

一些事情须要注意:由于咱们的位置优化策略,大多数数据从本地磁盘中读取,绕开了网络带宽的显示,因此输入速率比处理速率和输出速率要高。处理速率要高于输出速率,由于输出过程要将排序后的数据写入到两个拷贝中(为了可靠性和可用性,咱们将数据写入到两个副本中)。咱们将数据写入两个副本,由于咱们的底层文件系统为了可靠性和可用性提供了相应的机制。若是底层文件系统使用容错编码(erasure coding)而不是复制,写数据的网络带宽需求会下降。

5.4 备用任务的做用

在图3(b)中,咱们显示了一个禁用备用任务的排序程序的执行过程。执行的流程与如3(a)中所显示的类似,除了有一个很长的尾巴,在这期间几乎没有写入行为发生。在960秒后,除了5个reduce任务的全部任务都执行完成。然而,这些落后者只到300秒后才执行完成。整个计算任务耗时1283秒,增长了大约44%的时间。

5.5 机器故障

在图3(c)中,咱们显示了一个排序程序的执行过程,在计算过程开始都的几分钟后,咱们故意kill掉了1746个工做进程中的200个。底层的调度者会迅速在这些机器上重启新的工做进程(由于只有进程被杀掉,机器自己运行正常)。

工做进程死掉会出现负的输入速率,由于一些以前已经完成的map工做消失了(由于香港的map工做进程被kill掉了),而且须要从新执行。这个map任务会至关快的从新执行。整个计算过程在933秒后完成,包括了启动开销(仅仅比普通状况多花费了5%的时间)。

6、        经验

咱们在2003年2月完成了MapReduce库的第一个版本,并在2003年8月作了重大的改进,包括位置优化、任务在工做机器上的动态负载均衡执行等。从那时起,咱们惊喜的发现,MapReduce库可以普遍的用于咱们工做中的各类问题。它已经被用于Google内部普遍的领域,包括:

  • 大规模机器学习问题
  • Google新闻和Froogle产品的集群问题
  • 抽取数据用于公众查询的产品报告
  • 从大量新应用和新产品的网页中抽取特性(如,从大量的位置查询页面中抽取地理位置信息)
  • 大规模图形计算

 

图4:随时间变化的MapReduce实例

图4中显示了在咱们的源码管理系统中,随着时间的推移,MapReduce程序的数量有明显的增长,从2003年早期的0增长到2004年9月时的900个独立的实例。MapReduce如此的成功,由于它使利用半个小时编写的一个简单程序可以高效的运行在一千台机器上成为可能,这极大的加快了开发和原型设计的周期。此外,它容许没有分布式和/或并行系统经验的开发者可以利用这些资源开发出分布式应用。

 

表1: 2004年8月运行的MapReduce任务

在每一个工做的最后,MapReduce库统计了工做使用的计算资源。在表1中,咱们看到一些2004年8月在Google内部运行的MapReduce工做的一些统计数据。

6.1 大规模索引

目前为止,MapReduce最重要的应用之一就是完成了对生产索引系统的重写,它生成了用于Google网页搜索服务的数据结构。索引系统的输入数据是经过咱们的爬取系统检索到的海量文档,存储为就一个GFS文件集合。这些文件的原始内容还有超过20TB的数据。索引程序是一个包含了5-10个MapReduce操做的序列。使用MapReduce(代替了以前版本的索引系统中的adhoc分布式处理)有几个优势:

  • 索引程序代码是一个简单、短小、易于理解的代码,由于容错、分布式和并行处理都隐藏在了MapReduce库中。好比,一个计算程序的大小由接近3800行的C++代码减小到使用MapReduce的大约700行的代码。
  • MapReduce库性能很是好,以致于可以将概念上不相关的计算分开,来代替将这些计算混合在一块儿进行,避免额外的数据处理。这会使索引程序易于改变。好比,对以前的索引系统作一个改动大概须要几个月时间,而对新的系统则只须要几天时间。
  • 索引程序变得更易于操做,由于大多数因为机器故障、机器处理速度慢和网络的瞬间阻塞等引发的问题都被MapReduce库自动的处理掉,而无需人为的介入。

7、        相关工做

许多系统都提供了有限的程序模型,而且对自动的并行计算使用了限制。好比,一个结合函数能够在logN时间内在N个处理器上对一个包含N个元素的数组使用并行前缀计算,来获取全部的前缀[6,9,13]。MapReduce被认为是这些模型中基于咱们对大规模工做计算的经验的简化和精华。更为重要的是,咱们提供了一个在数千个处理器上的容错实现。相反的,大多数并行处理系统只在较小规模下实现,并将机器故障的处理细节交给了程序开发者。

Bulk Synchronous Programming和一些MPI源于提供了更高层次的抽象使它更易于让开发者编写并行程序。这些系统和MapReduce的一个关键不一样点是MapReduce开发了一个有限的程序模型来自动的并行执行用户的程序,并提供了透明的容错机制。

咱们的位置优化机制的灵感来自于移动磁盘技术,计算用于处理靠近本地磁盘的数据,减小数据在I/O子系统或网络上传输的次数。咱们的系统运行在挂载几个磁盘的普通机器上,而不是在磁盘处理器上运行,可是通常方法是相似的。

咱们的备用任务机制与Charlotte系统中采用的eager调度机制相似。简单的Eager调度机制有一个缺点,若是一个给定的任务形成反复的失败,整个计算将以失败了结。咱们经过跳过损坏计算路的机制,解决了这个问题的一些状况。

MapReduce实现依赖了内部集群管理系统,它负责在一个大规模的共享机器集合中分发和运行用户的任务。尽管不是本篇文章的焦点,可是集群管理系统在本质上与像Condor的其它系统相似。

排序功能是MapReduce库的一部分,与NOW-Sort中的操做相似。源机器(map工做进程)将将要排序的数据分区,并将其发送给R个Reduce工做进程中的一个。每一个reduce工做进程在本地对这些数据进行排序(若是可能的话就在内存中进行)。固然NOW-Sort没有使MapReduce库可以普遍使用的用户定义的Map和Reduce函数。

River提供了一个编程模型,处理进程经过在分布式队列上发送数据来进行通讯。像MapReduce同样,即便在不均匀的硬件或系统颠簸的状况下,River系统依然试图提供较好的平均性能。River系统经过当心的磁盘和网络传输调度来平衡完成时间。经过限制编程模型,MapReduce框架可以将问题分解成不少细颗粒的任务,这些任务在可用的工做进程上动态的调度,以致于越快的工做进程处理越多的任务。这个受限制的编程模型也容许咱们在工做将要结束时调度冗余的任务进行处理,这样能够减小不均匀状况下的完成时间。

BAD-FS与MapReduce有彻底不一样的编程模型,不像MapReduce,它是用于在广域网下执行工做的。然而,它们有两个基本类似点。(1)两个系统都使用了从新执行的方式来处理因故障而丢失的数据。(2)两个系统都本地有限调度原则来减小网络链路上发送数据的次数。

TASCC是一个用于简化结构的高可用性的网络服务。像MapReduce同样,它依靠从新执行做为一个容错机制。

8、        总结

MapReduce编程模型已经成功的应用在Google内部的许多不一样的产品上。咱们将这个成功归功于几个缘由。第一,模型很易用,即便对那些没有并行计算和分布式系统经验的开发者,由于它隐藏了并行处理、容错、本地优化和负载均衡这些处理过程。第二,各类各样的问题都能用MapReduce计算简单的表示出来,例如,MapReduce被Google网页搜索服务用于生成数据、排序、数据挖掘、机器学习和许多其它系统。第三,咱们已经实现了扩展到由数千台机器组成的大规模集群上使用的MapReduce。这个实现可以有效的利用这些机器自由,所以适合在Google内部遇到的不少海量计算问题。

咱们从这项工做中学到了几样东西。第一,限制程序模型使得并行计算和分布式计算变得容易,也容易实现这样的计算容错。第二,网络带宽是一个稀有的资源,所以咱们系统中的不少优化的目标都是为了减小数据在网络上的传输次数:位置优化容许咱们从本地磁盘读取数据,并将中间数据的一个拷贝写入到本地磁盘,以此来节省网络带宽的使用。第三,冗余执行可以用于减小容许速度慢的机器所形成的影响,而且可以处理机器故障和数据丢失。

相关文章
相关标签/搜索