本文另外一地址请见 MapReduce 模式、算法和用例 html
本文译自 Mapreduce Patterns, Algorithms, and Use Cases node
在这篇文章里总结了几种网上或者论文中常见的MapReduce模式和算法,并系统化的解释了这些技术的不一样之处。全部描述性的文字和代码都使用了标准hadoop的MapReduce模型,包括Mappers, Reduces, Combiners, Partitioners,和 sorting。以下图所示。 算法
问题陈述: 数组
有许多文档,每一个文档都有一些字段组成。须要计算出每一个字段在全部文档中的出现次数或者这些字段的其余什么统计值。例如,给定一个log文件,其中的每条记录都包含一个响应时间,须要计算出平均响应时间。 缓存
解决方案: 网络
让咱们先从简单的例子入手。在下面的代码片断里,Mapper每遇到指定词就把频次记1,Reducer一个个遍历这些词的集合而后把他们的频次加和。 app
1 class Mapper 2 method Map(docid id, doc d) 3 for all term t in doc d do 4 Emit(term t, count 1 ) 5 6 class Reducer 7 method Reduce(term t, counts [c1, c2,...]) 8 sum = 0 9 for all count c in [c1, c2,...] do 10 sum = sum + c 11 Emit(term t, count sum)
这种方法的缺点显而易见,Mapper提交了太多无心义的计数。它彻底能够经过先对每一个文档中的词进行计数从而减小传递给Reducer的数据量: 框架
1 class Mapper 2 method Map(docid id, doc d) 3 H = new AssociativeArray 4 for all term t in doc d do 5 H{t} = H{t} + 1 6 for all term t in H do 7 Emit(term t, count H{t})
若是要累计计数的的不仅是单个文档中的内容,还包括了一个Mapper节点处理的全部文档,那就要用到Combiner了: 机器学习
1 class Mapper 2 method Map(docid id, doc d) 3 for all term t in doc d do 4 Emit(term t, count 1 ) 5 6 class Combiner 7 method Combine(term t, [c1, c2,...]) 8 sum = 0 9 for all count c in [c1, c2,...] do 10 sum = sum + c 11 Emit(term t, count sum) 12 13 class Reducer 14 method Reduce(term t, counts [c1, c2,...]) 15 sum = 0 16 for all count c in [c1, c2,...] do 17 sum = sum + c 18 Emit(term t, count sum)
问题陈述: 分布式
有一系列条目,每一个条目都有几个属性,要把具备同一属性值的条目都保存在一个文件里,或者把条目按照属性值分组。 最典型的应用是倒排索引。
解决方案:
解决方案很简单。 在 Mapper 中以每一个条目的所需属性值做为 key,其自己做为值传递给 Reducer。 Reducer 取得按照属性值分组的条目,而后能够处理或者保存。若是是在构建倒排索引,那么 每一个条目至关于一个词而属性值就是词所在的文档ID。
问题陈述:
假设有不少条记录,须要从其中找出知足某个条件的全部记录,或者将每条记录传换成另一种形式(转换操做相对于各条记录独立,即对一条记录的操做与其余记录无关)。像文本解析、特定值抽取、格式转换等都属于后一种用例。
解决方案:
很是简单,在Mapper 里逐条进行操做,输出须要的值或转换后的形式。
问题陈述:
大型计算能够分解为多个部分分别进行而后合并各个计算的结果以得到最终结果。
解决方案: 将数据切分红多份做为每一个 Mapper 的输入,每一个Mapper处理一份数据,执行一样的运算,产生结果,Reducer把多个Mapper的结果组合成一个。
像 WiMAX 这样的数字通讯模拟软件经过系统模型来传输大量的随机数据,而后计算传输中的错误概率。 每一个 Mapper 处理样本 1/N 的数据,计算出这部分数据的错误率,而后在 Reducer 里计算平均错误率。
问题陈述:
有许多条记录,须要按照某种规则将全部记录排序或是按照顺序来处理记录。
解决方案: 简单排序很好办 – Mappers 将待排序的属性值为键,整条记录为值输出。 不过实际应用中的排序要更加巧妙一点, 这就是它之因此被称为MapReduce 核心的缘由(“核心”是说排序?由于证实Hadoop计算能力的实验是大数据排序?仍是说Hadoop的处理过程当中对key排序的环节?)。在实践中,经常使用组合键来实现二次排序和分组。
MapReduce 最初只可以对键排序, 可是也有技术利用能够利用Hadoop 的特性来实现按值排序。想了解的话能够看 这篇博客。
按照BigTable的概念,使用 MapReduce来对最初数据而非中间数据排序,也即保持数据的有序状态更有好处,必须注意这一点。换句话说,在数据插入时排序一次要比在每次查询数据的时候排序更高效。
问题陈述:
假设一个实体网络,实体之间存在着关系。 须要按照与它比邻的其余实体的属性计算出一个状态。这个状态能够表现为它和其它节点之间的距离, 存在特定属性的邻接点的迹象, 邻域密度特征等等。
解决方案:
网络存储为系列节点的结合,每一个节点包含有其全部邻接点ID的列表。按照这个概念,MapReduce 迭代进行,每次迭代中每一个节点都发消息给它的邻接点。邻接点根据接收到的信息更新本身的状态。当知足了某些条件的时候迭代中止,如达到了最大迭代次数(网络半径)或两次连续的迭代几乎没有状态改变。从技术上来看,Mapper 以每一个邻接点的ID为键发出信息,全部的信息都会按照接受节点分组,reducer 就可以重算各节点的状态而后更新那些状态改变了的节点。下面展现了这个算法:
1 class Mapper 2 method Map(id n, object N) 3 Emit(id n, object N) 4 for all id m in N.OutgoingRelations do 5 Emit(id m, message getMessage(N)) 6 7 class Reducer 8 method Reduce(id m, [s1, s2,...]) 9 M = null 10 messages = [] 11 for all s in [s1, s2,...] do 12 if IsObject(s) then 13 M = s 14 else // s is a message 15 messages.add(s) 16 M.State = calculateState(messages) 17 Emit(id m, item M)
一个节点的状态能够迅速的沿着网络传遍全网,那些被感染了的节点又去感染它们的邻居,整个过程就像下面的图示同样:
问题陈述:
这个问题来自于真实的电子商务应用。将各类货物分类,这些类别能够组成一个树形结构,比较大的分类(像男人、女人、儿童)能够再分出小分类(像男裤或女装),直到不能再分为止(像男式蓝色牛仔裤)。这些不能再分的基层类别能够是有效(这个类别包含有货品)或者已无效的(没有属于这个分类的货品)。若是一个分类至少含有一个有效的子分类那么认为这个分类也是有效的。咱们须要在已知一些基层分类有效的状况下找出分类树上全部有效的分类。
解决方案:
这个问题能够用上一节提到的框架来解决。咱们咋下面定义了名为 getMessage和 calculateState 的方法:
1 class N 2 State in {True = 2 , False = 1 , null = 0 }, 3 initialized 1 or 2 for end - of - line categories, 0 otherwise 4 method getMessage( object N) 5 return N.State 6 method calculateState(state s, data [d1, d2,...]) 7 return max( [d1, d2,...] )
问题陈述:须要计算出一个图结构中某一个节点到其它全部节点的距离。
解决方案: Source源节点给全部邻接点发出值为0的信号,邻接点把收到的信号再转发给本身的邻接点,每转发一次就对信号值加1:
1 class N 2 State is distance, 3 initialized 0 for source node, INFINITY for all other nodes 4 method getMessage(N) 5 return N.State + 1 6 method calculateState(state s, data [d1, d2,...]) 7 min( [d1, d2,...] )
这个算法由Google提出,使用权威的PageRank算法,经过链接到一个网页的其余网页来计算网页的相关性。真实算法是至关复杂的,可是核心思想是权重能够传播,也即经过一个节点的各联接节点的权重的均值来计算节点自身的权重。
1 class N 2 State is PageRank 3 method getMessage( object N) 4 return N.State / N.OutgoingRelations.size() 5 method calculateState(state s, data [d1, d2,...]) 6 return ( sum([d1, d2,...]) )
要指出的是上面用一个数值来做为评分其实是一种简化,在实际状况下,咱们须要在Mapper端来进行聚合计算得出这个值。下面的代码片断展现了这个改变后的逻辑 (针对于 PageRank 算法):
1 class Mapper 2 method Initialize 3 H = new AssociativeArray 4 method Map(id n, object N) 5 p = N.PageRank / N.OutgoingRelations.size() 6 Emit(id n, object N) 7 for all id m in N.OutgoingRelations do 8 H{m} = H{m} + p 9 method Close 10 for all id n in H do 11 Emit(id n, value H{n}) 12 13 class Reducer 14 method Reduce(id m, [s1, s2,...]) 15 M = null 16 p = 0 17 for all s in [s1, s2,...] do 18 if IsObject(s) then 19 M = s 20 else 21 p = p + s 22 M.PageRank = p 23 Emit(id m, item M)
问题陈述: 记录包含值域F和值域 G,要分别统计相同G值的记录中不一样的F值的数目 (至关于按照 G分组).
这个问题能够推而广之应用于分面搜索(某些电子商务网站称之为Narrow Search)
Record 1: F=1, G={a, b} Record 2: F=2, G={a, d, e} Record 3: F=1, G={b} Record 4: F=3, G={a, b} Result: a -> 3 // F=1, F=2, F=3 b -> 2 // F=1, F=3 d -> 1 // F=2 e -> 1 // F=2
解决方案 I:
第一种方法是分两个阶段来解决这个问题。第一阶段在Mapper中使用F和G组成一个复合值对,而后在Reducer中输出每一个值对,目的是为了保证F值的惟一性。在第二阶段,再将值对按照G值来分组计算每组中的条目数。
第一阶段:
1 class Mapper 2 method Map( null , record [value f, categories [g1, g2,...]]) 3 for all category g in [g1, g2,...] 4 Emit(record [g, f], count 1 ) 5 6 class Reducer 7 method Reduce(record [g, f], counts [n1, n2, ...]) 8 Emit(record [g, f], null )
第二阶段:
1 class Mapper 2 method Map(record [f, g], null ) 3 Emit(value g, count 1 ) 4 5 class Reducer 6 method Reduce(value g, counts [n1, n2,...]) 7 Emit(value g, sum( [n1, n2,...] ) )
解决方案 II:
第二种方法只须要一次MapReduce 便可实现,但扩展性不强。算法很简单-Mapper 输出值和分类,在Reducer里为每一个值对应的分类去重而后给每一个所属的分类计数加1,最后再在Reducer结束后将全部计数加和。这种方法适用于只有有限个分类,并且拥有相同F值的记录不是不少的状况。例如网络日志处理和用户分类,用户的总数不少,可是每一个用户的事件是有限的,以此分类获得的类别也是有限的。值得一提的是在这种模式下能够在数据传输到Reducer以前使用Combiner来去除分类的重复值。
1 class Mapper 2 method Map( null , record [value f, categories [g1, g2,...] ) 3 for all category g in [g1, g2,...] 4 Emit(value f, category g) 5 6 class Reducer 7 method Initialize 8 H = new AssociativeArray : category -> count 9 method Reduce(value f, categories [g1, g2,...]) 10 [g1 ' , g2 ' ,..] = ExcludeDuplicates( [g1, g2,..] ) 11 for all category g in [g1 ' , g2 ' ,...] 12 H{g} = H{g} + 1 13 method Close 14 for all category g in H do 15 Emit(category g, count H{g})
问题陈述:有多个各由若干项构成的组,计算项两两共同出现于一个组中的次数。假如项数是N,那么应该计算N*N。
这种状况常见于文本分析(条目是单词而元组是句子),市场分析(购买了此物的客户还可能购买什么)。若是N*N小到能够容纳于一台机器的内存,实现起来就比较简单了。
配对法
第一种方法是在Mapper中给全部条目配对,而后在Reducer中将同一条目对的计数加和。但这种作法也有缺点:
1 class Mapper 2 method Map( null , items [i1, i2,...] ) 3 for all item i in [i1, i2,...] 4 for all item j in [i1, i2,...] 5 Emit(pair [i j], count 1 ) 6 7 class Reducer 8 method Reduce(pair [i j], counts [c1, c2,...]) 9 s = sum([c1, c2,...]) 10 Emit(pair[i j], count s)
Stripes Approach(条方法?不知道这个名字怎么理解)
第二种方法是将数据按照pair中的第一项来分组,并维护一个关联数组,数组中存储的是全部关联项的计数。The second approach is to group data by the first item in pair and maintain an associative array (“stripe”) where counters for all adjacent items are accumulated. Reducer receives all stripes for leading item i, merges them, and emits the same result as in the Pairs approach.
1 class Mapper 2 method Map( null , items [i1, i2,...] ) 3 for all item i in [i1, i2,...] 4 H = new AssociativeArray : item -> counter 5 for all item j in [i1, i2,...] 6 H{j} = H{j} + 1 7 Emit(item i, stripe H) 8 9 class Reducer 10 method Reduce(item i, stripes [H1, H2,...]) 11 H = new AssociativeArray : item -> counter 12 H = merge - sum( [H1, H2,...] ) 13 for all item j in H.keys() 14 Emit(pair [i j], H{j})
在这部分咱们会讨论一下怎么使用MapReduce来进行主要的关系操做。
1 class Mapper 2 method Map(rowkey key, tuple t) 3 if t satisfies the predicate 4 Emit(tuple t, null )
投影只比筛选稍微复杂一点,在这种状况下咱们能够用Reducer来消除可能的重复值。
1 class Mapper 2 method Map(rowkey key, tuple t) 3 tuple g = project(t) // extract required fields to tuple g 4 Emit(tuple g, null ) 5 6 class Reducer 7 method Reduce(tuple t, array n) // n is an array of nulls 8 Emit(tuple t, null )
两个数据集中的全部记录都送入Mapper,在Reducer里消重。
1 class Mapper 2 method Map(rowkey key, tuple t) 3 Emit(tuple t, null ) 4 5 class Reducer 6 method Reduce(tuple t, array n) // n is an array of one or two nulls 7 Emit(tuple t, null )
将两个数据集中须要作交叉的记录输入Mapper,Reducer 输出出现了两次的记录。由于每条记录都有一个主键,在每一个数据集中只会出现一次,因此这样作是可行的。
1 class Mapper 2 method Map(rowkey key, tuple t) 3 Emit(tuple t, null ) 4 5 class Reducer 6 method Reduce(tuple t, array n) // n is an array of one or two nulls 7 if n.size() = 2 8 Emit(tuple t, null )
假设有两个数据集R和S,咱们要找出R与S的差别。Mapper将全部的元组作上标记,代表他们来自于R仍是S,Reducer只输出那些存在于R中而不在S中的记录。
1 class Mapper 2 method Map(rowkey key, tuple t) 3 Emit(tuple t, string t.SetName) // t.SetName is either 'R' or 'S' 4 5 class Reducer 6 method Reduce(tuple t, array n) // array n can be ['R'], ['S'], ['R' 'S'], or ['S', 'R'] 7 if n.size() = 1 and n[ 1 ] = ' R ' 8 Emit(tuple t, null )
分组聚合能够在以下的一个MapReduce中完成。Mapper抽取数据并将之分组聚合,Reducer 中对收到的数据再次聚合。典型的聚合应用好比求和与最值能够以流的方式进行计算,于是不须要同时保有全部的值。可是另一些情景就必需要两阶段MapReduce,前面提到过的唯一值模式就是一个这种类型的例子。
1 class Mapper 2 method Map( null , tuple [value GroupBy, value AggregateBy, value ...]) 3 Emit(value GroupBy, value AggregateBy) 4 5 class Reducer 6 method Reduce(value GroupBy, [v1, v2,...]) 7 Emit(value GroupBy, aggregate( [v1, v2,...] ) ) 8 // aggregate() : sum(), max(),...
MapperReduce框架能够很好地处理链接,不过在面对不一样的数据量和处理效率要求的时候仍是有一些技巧。在这部分咱们会介绍一些基本方法,在后面的参考文档中还列出了一些关于这方面的专题文章。
这个算法按照键K来链接数据集R和L。Mapper 遍历R和L中的全部元组,以K为键输出每个标记了来自于R仍是L的元组,Reducer把同一个K的数据分装入两个容器(R和L),而后嵌套循环遍历两个容器中的数据以获得交集,最后输出的每一条结果都包含了R中的数据、L中的数据和K。这种方法有如下缺点:
尽管如此,再分配链接方式仍然是最通用的方法,特别是其余优化技术都不适用的时候。
1 class Mapper 2 method Map( null , tuple [join_key k, value v1, value v2,...]) 3 Emit(join_key k, tagged_tuple [set_name tag, values [v1, v2, ...] ] ) 4 5 class Reducer 6 method Reduce(join_key k, tagged_tuples [t1, t2,...]) 7 H = new AssociativeArray : set_name -> values 8 for all tagged_tuple t in [t1, t2,...] // separate values into 2 arrays 9 H{t.tag}.add(t.values) 10 for all values r in H{ ' R ' } // produce a cross-join of the two arrays 11 for all values l in H{ ' L ' } 12 Emit( null , [k r l] )
在实际应用中,将一个小数据集和一个大数据集链接是很常见的(如用户与日志记录)。假定要链接两个集合R和L,其中R相对较小,这样,能够把R分发给全部的Mapper,每一个Mapper均可以载入它并以链接键来索引其中的数据,最经常使用和有效的索引技术就是哈希表。以后,Mapper遍历L,并将其与存储在哈希表中的R中的相应记录链接,。这种方法很是高效,由于不须要对L中的数据排序,也不须要经过网络传送L中的数据,可是R必须足够小到可以分发给全部的Mapper。
1 class Mapper 2 method Initialize 3 H = new AssociativeArray : join_key -> tuple from R 4 R = loadR() 5 for all [ join_key k, tuple [r1, r2,...] ] in R 6 H{k} = H{k}.append( [r1, r2,...] ) 7 8 method Map(join_key k, tuple l) 9 for all tuple r in H{k} 10 Emit( null , tuple [k r l] )