MapReduce 是谷歌 2004 年(Google 内部是从03年写出第一个版本)发表的论文里提出的一个概念。虽然已通过去15 年了,但如今回顾这个大数据时代始祖级别概念的背景、原理和实现,仍能得到对分布式系统的不少直觉性的启发,所谓温故而知新。node
在Google 的语境里,MapReduce 既是一种编程模型,也是支持该模型的一种分布式系统实现。它的提出,让没有分布式系统背景的开发者,也能较轻松的利用大规模集群以高吞吐量的方式来处理海量数据。其解决问题思路很值得借鉴:找到需求的痛点(如海量索引如何维护,更新和排名),对处理关键流程进行高阶抽象(分片Map,按需Reduce),以进行高效的系统实现(所谓量体裁衣)。这其中,如何找到一个合适的计算抽象,是最难的部分,既要对需求有直觉般的了解,又要具备极高的计算机科学素养。固然,而且可能更为接近现实的是,该抽象是在根据需求不断试错后进化出的海水之上的冰山一角。python
谷歌当时做为互联网的最大入口,维护着世界全网索引,最先触到了数据量的天花板。即,哪怕针对很简单的业务逻辑:如从爬虫数据中生成倒排索引、将图状网页集合用不一样方式组织、计算每一个主机爬取的网页数量、给定日期的高频查询词汇等等,在全球互联网数据的尺度的加成下,也变的异常复杂。c++
这些复杂性包括:输入数据分散在很是多的主机上、计算耗资源太多单机难以完成、输出数据须要跨主机进行从新组织。为此,不得不针对每一个需求重复构造专用系统,并耗费大量代码在分发数据和代码、调度和并行任务、应对机器故障和处理通讯失败等问题上。git
map 和 reduce 的抽象灵感来自于函数式编程语言 Lisp,为何选定这两个概念呢?这来源于谷歌人对其业务的高度提炼:首先输入能够切分红一个个逻辑的记录 (record);而后对其每一个 record 执行某种映射 (map) 操做,生成一些键值对组成的中间结果(为何要分键和值呢?为最后一步作铺垫,容许用户将中间结果以任意指定的方式——键,来进行组织规约);最后在具备相同键的中间结果子集上执行规约(reduce ,包括排序,统计,提取最值等等)操做。github
函数式模型的另外一个特色在于对 map 操做实现的约束,即规定用户应提供一个无反作用的 map 操做(相关概念有纯函数,肯定性,幂等性等等,固然他们的概念并不同,后面小结会详细讨论)。如此限制,好处有二,能够进行大规模并行执行,能够经过换地儿重试来屏蔽主机故障。算法
具体到落地上,map 和 reduce 都是用户自定义函数。map 函数接受一个 Record,不过为了灵活,通常也组织为键值对;而后产生 List[key, value],reduce 函数接受一个 key 和该 key 对应的全部中间结果 List[value]。即:数据库
map (k1,v1) -→ list(k2,v2)
reduce (k2,list(v2)) -→ list(v2)
复制代码
拿由谷歌这篇论文提出,后来成为大数据处理界的 hello world 级别示例程序 Word Count (对一堆文档中的单词计数)来讲,map 和 reduce 的实现长这样:编程
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
EmitIntermediate(w, "1");
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
Emit(AsString(result));
复制代码
这里有两个有意思的点:api
抽象定了,那么实现天然能够有不一样,这也是接口和实现分离的意义所在。前者的抽象是一种思想,谷歌已经给你作了;后者的实现,彻底能够根据本身的生产环境进行量体裁衣的来定制实现。谷歌在 paper 中给了一种内部经典版,Hadoop 也提供了一套通用版,固然咱们也能够根据本身的业务需求和场景约束来实现一个合身版。缓存
谷歌发布论文时 实现 MapReduce 所面对的系统环境长这样:
输入由用户指定切分大小后,切分红 M 份,而后分散到不一样机器上(因为 GFS 的存在,也可能该输入 Block 原本就在那台机器上)。每一个机器上会并行的跑用户定义的 map 。map 输出的中间结果,亦由用户指定,按 key 值范围切分为 R 份,对于每一个中间结果,经过 node label = hash(key) mod R 决定其去处。下面是流程概览图:
通常而言,用户无需将最终结果的 R 个 Partition 进行合并,而是将其直接做为下一个 MapReduce 任务的输入。Spark RDD 的partition 就是将这一特色概念化了,而且将每一步 MapReduce 输出也放内存中,不进行落盘,以下降连续 MapReduce 任务的延迟。
计算机科学中经常使用的一个原理,叫作*局部性原理*** (locality reference,这里特指空间局部性),说的是程序在顺序执行时,访问了一块数据,接下来大几率会访问该数据(物理位置上)旁边的一块数据。很朴素的断言,倒是一切 cache 发挥做用的基础,计算机存储所以也造成了由慢到快,由贱到贵,由大到小的存储层次体系(硬盘-> 内存 -> 缓存 -> 寄存器)。在分布式环境中,这个层次体系至少还要再罩上一层——网络IO。
在 MapReduce 系统中,咱们也会充分利用输入数据的 locality。只不过此次,不是将数据加载过来,而是将程序调度过去(Moving Computation is Cheaper than Moving Data)。若是输入存在 GFS 上,表现形式将为一系列的逻辑 Block,每一个 Block 可能会有几个(通常是三个)物理副本。对于输入每一个逻辑 Block,咱们能够在其某个物理副本所在机器上运行 Map Task(若是失败,就再换一个副本),由此来尽可能减少网络数据传输。从而下降了延迟,节约了带宽。
谷歌的 MapReduce 实现是有做业(Job)级别的封装的,每一个 Job 包含一系列任务(Task),即 Map Task 和 Reduce Task。那么,咱们要维护一个正在运行的 Job 的元信息,就势必要保存全部正在执行的 Task 的状态,其所在的机器 ID 等等。并且,Master 事实上充当了 Map Task 输出到 Reduce Task 输入的一个"管道"。每个 Map Task 结束时,会将其输出的中间结果的位置信息通知 Master,Master 再将其转给对应的 Reduce Task,Reduce Task 再去对应位置拉取对应 size 的数据。注意,因为 Map Task 的结束时间不统一,这个***通知->转发-> 拉取*** 的过程是增量的。那么不难推测出,reduce 侧对中间数据排序的应该是一个不断 merge 的过程,不大多是等全部数据就位了再全局排序。
在分布式系统中,一个比较忌讳的问题就是单点。由于是牵一发而动全身,而 Master 就是这样一个单点。固然单个机器的统计平均故障率并不高,可是一旦故障,那么整个集群都将不可用。但同时,有一个 Leader 节点会大大简化分布式系统的的设计;所以采用单点 Master 的系统反而是主流,那势必须要开发一些其余手段来强化 master 的容错能力,好比说记 log + snapshot、好比说主从备份、好比说每次从 worker 心跳进行状态重建、好比说用其余实现了分布式一致性协议的系统来保存元信息等等。
集群中有 Master 和 Worker 两种机器角色。
Worker 因为数量大,有机器故障几率较大。在分布式系统中,Master 获取 Workers 的信息,最多见即是心跳,既能够是 master ping worker,也能够反过来,也能够兼而有之。master 经过心跳发现某些 worker 不可到达后(多是 worker 死掉了,也多是网络出问题了等),就会将该 Worker 打个故障(failed)的标记。
以前已经调度到该故障 Worker 上的任务(Task) 很显然有两种类型: Map Task 和 Reduce Task。对于 Map Task(如下所提的 Task,确定是从属于未结束的 Job) ,无论成功与否,咱们都要进行重试,由于一旦该 Worker 变为不可达,存于其上的中间结果也随之没法被 Reduce Task 获取。固然,咱们能够在 Master 中多记点状态来减小对已完成的 Map Task 进行重试的几率。好比记下某个 Map Task 的输出是否已经都被 Reduce Task 拉取,以决定要不要对正常完成的 Map Task 进行重试,但无疑会提升系统复杂度。*工程每每会对环境作某些假设, 以简化某些实现;*咱们假设 worker 失败率不是那么高,或者重试全部 Map Task 的代价能够忍,那么就能够简化一点实现,以保持系统的简约,减小可能的 bug。对于 Reduce Task,未完成的无疑要进行重试,已经完成的,因为其输出结果咱们假设会写到全局分布式系统文件中(即某些机器挂了也不影响),就不会重试。
具体重试的方法,能够标记须要重试的 Task 的状态为 idle,以告诉调度器,该 Task 能够从新被调度。固然,也能够实现为从一个(工做/完成)队列倒腾到另外一个(就绪)队列,本质上是同样的,都是合理实现一个 Task 的状态机。
至于 master 的故障恢复,上一节稍有提到。若是在实践中 Master 确实不多死掉,而且偶尔死掉形成全部正在运行的任务失败的后果也能够接受,那么就能够粗暴的实现为若是 Master 死掉,就简单通知全部正在运行的任务的用户代码任务失败了(好比返回非 0 值),而后有用户代码自行决定丢弃任务仍是待集群重启后进行重试:
MapReduceResult result;
if (!MapReduce(spec, &result)) abort();
复制代码
若是业务对于宕机时间有要求,或者大面积任务失败不能够忍受,那么就须要加强 Master 的容错性。经常使用的方法上节有提到,这里展开一下:
还值得一提的是,容错也须要用户侧代码作配合。由于框架会对不成功的 map/reduce 用户代码进行重试。这就要求,用户提供的 map/reduce 逻辑符合肯定性(Deterministic):即函数的输出依赖且仅依赖于输入,而不取决任何其余隐形的输入或者状态。固然,这个蕴含了幂等性(Idempotency):屡次执行和一次执行效果同样;可是幂等性并不能推出肯定性;假设有这么一个函数,它第一次执行形成了一些状态改变(好比某些释放资源的 dispose 函数),然后续发现状态已经改变过了就再也不改变该状态,那么它符合幂等性;可是因为其含有隐式状态输入,不是肯定性的。
若是 map/reudce 函数是肯定性的,那么框架就能够放心大胆重试了。某些条件下,幂等性也能够接受,好比保存隐式状态的地方很牢靠。举个栗子,咱们依赖于一个文件锁作判断某个函数执行了一次或屡次,若是该文件锁所依赖的文件系统很稳定,而且提供分布式一致性,那么就彻底能够。若是是用 nfs 的一个文件作锁,来实现的所谓幂等性就值得商榷了。
若是 map/reduce 函数是肯定性的,框架会经过其输出提交的原子性来进行幂等性保证。即,即便重试了屡次,也和只执行了一次同样。具体来讲,对于 Map Task,会产生 R 个临时文件,并在结束时将其位置发送给 Master;Master 在收到屡次同一分片(split) 的位置信息时,若是该分片前面某次结果来源仍可用或者已经被消费,那么就忽略掉该请求后面的全部请求。对于 Reduce Task,其生成的结果也会先写成临时文件,而后依赖于底层文件系统的原子性的更名操做(原子性更名也是一个多进程竞争的经典的操做,由于生成文件过程比较长,不容易作成原子的,可是判断具备某名字的文件是否存在并更名却很容易作成原子的),在处理完成时改变为目的文件名。若是发现已经有一个具备该目的文件名的文件了,就放弃更名操做,从而保证了该 Reduce Task只有一个成功输出的最终文件。
一个 MapReduce Job 中会产生 M+R 个 Task,具体 M 和 R 的值在运行以前能够由人进行配置。不一样的系统实现可能会有发挥出最佳系统性能的不一样配比;可是同时要兼顾业务需求,好比输入大小,输出文件个数等等。
在实际业务中,因为某些主机缘由常会出现长尾效应,即少数几个 Map/Reduce Task 老是会巨慢的拖到最后,甚至拖得时间甚至是其余任务的几倍。形成这些主机拖后腿的缘由能够举出不少,如:某个机器硬盘老化,读写速度几十倍的变慢;又好比调度器调度的有问题,致使某些机器负载过高,具备大量的 CPU、内存、硬盘和网络带宽的争抢;又好比软件 bug 致使某些主机变慢等等。
只要肯定这些问题只发生在少数某些主机上,那么解决方法也很简单。在任务接近尾声的时候(好比统计剩余task的占比小于一个阈值时),对于每一个仍然在跑的任务,分别额外调度一份到其余主机上,那么大几率会让这些任务提早完成,同一任务跑屡次的处理逻辑,和容错重试形成跑屡次是一致的,能够复用。
此外,咱们能够经过实际业务检验来微调该阈值(包括怎么断定任务结尾,启用几个备份任务),在耗费额外资源代价和减小任务总用时以前取得一个平衡。
除了 Mapper 和 Reducer 这两个最基本的源语,该系统还提供了一些其余的后来事实上也成为标配的扩展:Partitioner,Combiner 和 Reader/Writer。
默认来讲,对 Map 输出的中间结果进行划分会使用相似于 hash(key) mod R 这种应用无关的划分算法。可是有时候用户有需求将特定的一些 keys 路由到同一个 Reduce Task,好比说中间结果的 key 是 URL, 用户想按网站 host 进行汇总处理。这时候就须要将系统的这部分路由功能开放给用户,以知足用户的定制需求。
若是该 Job 针对全部中间结果的 reduce 的操做知足结合律,那么指定 Combiner 会很能提升效率。拿的 Word Count 来讲,数值的加法无疑知足结合律,也就是说,同一个单词的频次,在 Map Task 输出后进行加和(在 Map Work 上),仍是在 Reduce Task 中进行加和(在 Reduce Worker上),结果保持一致;而这样一来,因为一些中间结果对进行了 combine,Map Task 到 Reduce Task 间的传输数据量会小不少,从而提升整个 Job 的效率。
也能够看出,combine 函数通常和 reduce 函数是同样的,由于他们本质上是对 value set 执行了同一种操做,只不过执行时,执行的地点不同,结合的顺序不同。目的是为了减小中间结果传输量,加速任务执行过程。
若是不将定制输入输出的能力开放给用户,那么系统显然只能处理有限几种默认约定的格式。所以,reader 和 writer 接口本质上是系统和现实繁杂的业务之间的适配器(Adaptor)。它们让用户能够自行指定数据的来源和去处、按须要理解输入内容和自由定制输出格式。
有了这两个 Adaptor,系统才能适配更多的业务。通常来讲,系统会内置提供一些常见的 Reader 和 Writer 的实现;包括按行读文本文件,读文件中键值,读数据库等等。而后用户能够实现这两个接口,进行更具体的定制。系统常经过相似这种经常使用脚手架+进一步定制能力来提供API,下面的 Counter 也是如此。
有些用户实现的 map/reduce 函数会有一些反作用,好比说在执行任务中间输出一些文件、写一些数据库条目等等。通常来讲这些反作用的原子性和幂等性须要用户本身来处理。由于若是输出介质不归入 MapReduce 系统,系统是没有办法保证这些输出的幂等性和原子性的。不过有的系统就这么干的,提供一些某种类型/介质的状态或者数据存储,归入系统中,而且提供一些容错和幂等的性质。好像 MillWheel 有相似的作法。但这样会大大加剧系统的复杂性。
若是用户代码有 bug 或者某些输入有问题,会致使 Map 或者 Reduce 任务在运行时崩溃。固然这些 bug 或者输入能修则修,可是有些状况因为第三方库或者输入的缘由,不可以进行修复。而在某些类型的任务,好比说训练数据集清洗、大型统计任务,丢几个是能够容忍的。针对这种状况,系统会提供一种模式,在这种模式中会跳过这些 Record 记录的执行。
具体实现上来讲,也比较简单。能够给每一个输入 Record 给个惟一编号(单次任务内惟一就行);若是某个 Record 处理时挂掉了,就将其编号汇报给 Master。若是 Master 收到了某个 Record 超过一次的处理失败信息,就将其跳过。作的再细一点,还能够记下错误类型和信息进行比对,来肯定这是不是一个肯定性(deterministic)的错误,进而决定是否将其跳过。
众所周知,分布式系统很难跟踪、调试;由于一个 Job 可能同时分散在数千台机器上进行执行。所以系统提供了本地运行 Job 的能力。能够针对小数据集输入对代码的正确性进行测试。因为在单机运行,就能够比较方便经过调试工具进行断点追踪。
实现一个本地 mock 系统,通常来讲比较简单。由于不须要考虑网络间状态通讯,代码多节点分发,多机调度等一系列分布式系统的问题。但却能极大方便代码调试,性能测试和小数据集测试。
对于分布式执行的 Job,一个任务进度等信息可视化界面(给系统集成一个 HTTP 服务,实时拉取系统信息进行展现)有时候是相当重要的,它是系统易用性的关键。若是系统用户不可以很方便的实时监控任务的运行进度、执行速度、资源用量、输出位置,出错信息以及其余一些任务的元信息,就不能对任务的执行情况有个感性的把握。尤为是若是写 MapReduce 程序的人和跑这些程序的不是一我的时,会更为依赖这些状态的实时呈现。
所以,对于分布式系统来讲,其易用性有一大半落在一个良好的系统信息呈现上。使用者须要据此来预测任务的完成时间、资源的吃紧程度等等,从而作出相应决策。
此外,对与集群机器状态信息,也须要进行跟踪,由于机器的负载信息、故障信息、网络情况等等对用户任务的执行也有不一样程度的影响。给出这些机器状态信息,有助于对用户代码甚至系统代码进行出错诊断。
系统提供了一种计数服务,以统计某种事件的发生频次。好比用户想统计 Word Count 示例中所处理的全大写单词的总数:
Counter* uppercase;
uppercase = GetCounter("uppercase");
map(String name, String contents):
for each word w in contents:
if (IsCapitalized(w)):
uppercase->Increment();
EmitIntermediate(w, "1");
复制代码
从代码能够大体猜想其实现:定义的时候,须要给 Counter 指定一个 Id。而后在 Map/Reudce 代码中能够经过该 Id 获取该 Counter 而后进行计数。每一个 worker 机器上的计数信息会汇总到 Master 上,而后按 Counter 的 ID 进行加和,而且最终返回给用户。同时,前述展现状态信息页面也会将这些计数器进行可视化(好比说打折线图)。其中有个点须要注意,就是屡次对重试的任务(因为机器死掉或者避免长尾进行的重试)的计次进行去重;能够按照 Map/Reduce ID 来进行去重,即咱们假定同一输入的重试任务共享一个 Task ID(事实上为了知足重试需求和任务管理需求,分布式系统确定会对全部任务进行惟一编号的),针对具备相同 Task ID 内部的 Counter 的计次,Master 只保留第一次成功的那一份;可是若是计数须要在页面上实时显示,可能就须要作适当信息保留,而且在该 Task 重试时进行计数回退之类的操做。
系统会自动维持一些计数器,好比说全部已经处理的键值对的数量和全部已经产生的键值对数量。全局计数操做对于某些应用用处很大,好比说有的应用要求全部输入键值对和输出键值对的数量同样,若是没有全局计数,就无从验证;或者统计一些数据的全局比例等等。
自 Spark 成名以后,shuffle 这个 MapReduce 中的语义获得了不少研究和实践。这是一个多机传输的耗时操做,其实现的高效性对系统的性能有着相当重要的做用,所以单独拿出一节来聊聊。
在 MapReduce 中就是指 Map Task 分片输出到 Reduce Task 按需拉取的这么一个过程。还拿 Word Count 为例,你想统计某个单词在全部文档中的总频次,可是这些单词分布在不一样机器上的不一样的 Map Task 输出里;而只有将全部一样单词的频次对汇集到同一台机器上,才能对其加和。这种将机器和子数据集对应关系按key打乱重组的操做,咱们姑且称之为 shuffle。
在 Spark 中,基本上继承了该语义,而且更通用化了。一个常见的例子是 join,即将两个 Table 间具备相同 key 的记录路由到同一台机器上,从而在全部机器上按 key 分片进行并行 join,从而大幅提升效率。相似于 join 这样的高阶操做,会使得底层的 Partition 不能继续在本机运行而不与其余 Partition 发生联系,所以 shuffle 也是 Spark 中划分 Stage 的一个分水岭。
对于 MapReduce 系统来讲,使用的 shuffle 策略相似于 Spark 中基于排序的 shuffle。Map 首先将中间结果写到内存中,而后按期刷盘,刷盘时进行归并排序。而后 Reducer 端按需拉取,从多个 Mapper 端拉取数据后,再次进行归并排序,而后传给 Reduce 函数。这样的好处在于能够进行大规模数据处理,由于能够作外部排序,也能够作迭代惰性加载。对于 Hadoop 的实现来讲,将包含 shuffle 的整个流程分为了明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。
一些缺点:经过 MapReduce 的系统设计能够看出,它是一个高吞吐,可是也高延迟的批量处理系统。而且不支持迭代。这也是后续 Spark,Flink 这样系统火热的动机。
文件系统: MapReduce 只有和 GFS 这样支持分块、多进程并发写的大文件系统配合才能发挥出更大的优点,优化输入和输出的性能。此外,这种分布式文件系统还会屏蔽底层节点故障。
组织形式: MapReduce 是一个系统,须要部署到集群上,但它同时又是一个库,让用户代码和分布式集群进行交互而不太用关心分布式环境中的问题的一个库。每一个任务须要写任务描述(MapReduceSpecification),而后提交给系统——这是库经常使用的一种提交任务的手段。
代码分发:谷歌的 MapReduce 具体实现不得而知。猜想能够有两种方式:一是将 MapReduce 库代码 + 用户代码总体在不一样机器上 fork ,而后根据角色不一样来执行不一样分支。二是将各个机器上的服务先启动起来,而后执行任务时只会将用户自定义函数序列化后传输到不一样机器。
Intermediate result:map 函数产生的中间结果,以键值对形式组织。
**Map Task **:这个应该都是指的 Worker 机器上,执行 map 函数的工做进程。
Map Worker:Map Task 所运行的 Worker 机器。全部 Worker 应该是没有角色标记的,既能够执行 Map Task,也能够执行 Reduce Task,以充分的利用机器性能。
[1] Jeffrey Dean and Sanjay Ghemawat, MapReduce: Simplified Data Processing on Large Clusters
[2] Alexey Grishchenko, Spark Architecture: Shuffle
[3] JerryLead, Spark internals