from: http://www.ibm.com/developerworks/cn/analytics/library/ba-1507-mapreducefiveframes/index.htmlhtml
目前开源社区有许多并行计算模型和框架可供选择,按照实现方式、运行机制、依附的产品生态圈等能够被划分为几个类型,每一个类型各有优缺点,若是可以对各种型的并行计算框架都进行深刻研究及适当的缺点修复,就能够为不一样硬件环境下的海量数据分析需求提供不一样的软件层面的解决方案。java
并行计算或称平行计算是相对于串行计算来讲的。它是一种一次可执行多个指令的算法,目的是提升计算速度,以及经过扩大问题求解规模,解决大型而复杂的计算问题。所谓并行计算可分为时间上的并行和空间上的并行。时间上的并行就是指流水线技术,而空间上的并行则是指用多个处理器并发的执行计算。并行计算(Parallel Computing)是指同时使用多种计算资源解决计算问题的过程,是提升计算机系统计算速度和处理能力的一种有效手段。它的基本思想是用多个处理器来协同求解同一问题,即将被求解的问题分解成若干个部分,各部分均由一个独立的处理机来并行计算。并行计算系统既能够是专门设计的、含有多个处理器的超级计算机,也能够是以某种方式互连的若干台的独立计算机构成的集群。经过并行计算集群完成数据的处理,再将处理的结果返回给用户。算法
欧美发达国家对于并行计算技术的研究要远远早于我国,从最初的并行计算逐渐过渡到网格计算,随着 Internet 网络资源的迅速膨胀,因特网容纳了海量的各类类型的数据和信息。海量数据的处理对服务器 CPU、IO 的吞吐都是严峻的考验,不管是处理速度、存储空间、容错性,仍是在访问速度等方面,传统的技术架构和仅靠单台计算机基于串行的方式愈来愈不适应当前海量数据处理的要求。国内外学者提出不少海量数据处理方法,以改善海量数据处理存在的诸多问题。apache
目前已有的海量数据处理方法在概念上较容易理解,然而因为数据量巨大,要在可接受的时间内完成相应的处理,只有将这些计算进行并行化处理,经过提取出处理过程当中存在的可并行工做的份量,用分布式模型来实现这些并行份量的并行执行过程。随着技术的发展,单机的性能有了日新月异的发展变化,尤为是内存和处理器等硬件技术,可是硬件技术的发展在理论上老是有限度的,若是说硬件的发展在纵向上提升了系统的性能,那么并行技术的发展就是从横向上拓展了处理的方式。编程
2003 年美国 Google 公司对外发布了 MapReduce、GFS、BigData 三篇论文,至此正式将并行计算框架落地为 MapReduce 框架。数组
我国的并行和分布式计算技术研究起源于 60 年代末,按照国防科技大学周兴铭院士提出的观点,到目前为止已经三个阶段了。第一阶段,自 60 年代末至 70 年代末,主要从事大型机内的并行处理技术研究;第二阶段,自 70 年代末至 90 年代初,主要从事向量机和并行多处理器系统研究;第三阶段,自 80 年代末至今,主要从事 MPP(Massively Parallel Processor) 系统研究。缓存
尽管我国在并行计算方面开展的研究和应用较早,目前也拥有不少的并行计算资源,但研究和应用的成效相对美国还存在较大的差距,有待进一步的提升和发展。服务器
回页首网络
MapReduce 是由谷歌推出的一个编程模型,是一个能处理和生成超大数据集的算法模型,该架构可以在大量普通配置的计算机上实现并行化处理。MapReduce 编程模型结合用户实现的 Map 和 Reduce 函数。用户自定义的 Map 函数处理一个输入的基于 key/value pair 的集合,输出中间基于 key/value pair 的集合,MapReduce 库把中间全部具备相同 key 值的 value 值集合在一块儿后传递给 Reduce 函数,用户自定义的 Reduce 函数合并全部具备相同 key 值的 value 值,造成一个较小 value 值的集合。通常地,一个典型的 MapReduce 程序的执行流程如图 1 所示。多线程
MapReduce 执行过程主要包括:
任务成功完成后,MapReduce 的输出存放在 R 个输出文件中,通常状况下,这 R 个输出文件不须要合并成一个文件,而是做为另一个 MapReduce 的输入,或者在另外一个可处理多个分割文件的分布式应用中使用。
受 Google MapReduce 启发,许多研究者在不一样的实验平台上实现了 MapReduce 框架,本文将对 Apache Hadoop MapReduce、Apache、Spark、斯坦福大学的 Phoenix,Nokia 研发的 Disco,以及香港科技大学的 Mars 等 5 个 MapReduce 实现框架进行逐一介绍和各方面对比。
Hadoop 的设计思路来源于 Google 的 GFS 和 MapReduce。它是一个开源软件框架,经过在集群计算机中使用简单的编程模型,可编写和运行分布式应用程序处理大规模数据。Hadoop 包含三个子项目:Hadoop Common、Hadoop Distributed File System(HDFS) 和 Hadoop MapReduce。
第一代 Hadoop MapReduce 是一个在计算机集群上分布式处理海量数据集的软件框架,包括一个 JobTracker 和必定数量的 TaskTracker。运行流程图如图 2 所示。
在最上层有 4 个独立的实体,即客户端、JobTracker、TaskTracker 和分布式文件系统。客户端提交 MapReduce 做业;JobTracker 协调做业的运行;JobTracker 是一个 Java 应用程序,它的主类是 JobTracker;TaskTracker 运行做业划分后的任务,TaskTracker 也是一个 Java 应用程序,它的主类是 TaskTracker。Hadoop 运行 MapReduce 做业的步骤主要包括提交做业、初始化做业、分配任务、执行任务、更新进度和状态、完成做业等 6 个步骤。
Spark 是一个基于内存计算的开源的集群计算系统,目的是让数据分析更加快速。Spark 很是小巧玲珑,由加州伯克利大学 AMP 实验室的 Matei 为主的小团队所开发。使用的语言是 Scala,项目的核心部分的代码只有 63 个 Scala 文件,很是短小精悍。Spark 启用了内存分布数据集,除了可以提供交互式查询外,它还能够优化迭代工做负载。Spark 提供了基于内存的计算集群,在分析数据时将数据导入内存以实现快速查询,“速度比”基于磁盘的系统,如比 Hadoop 快不少。Spark 最初是为了处理迭代算法,如机器学习、图挖掘算法等,以及交互式数据挖掘算法而开发的。在这两种场景下,Spark 的运行速度能够达到 Hadoop 的几百倍。
Disco 是由 Nokia 研究中心开发的,基于 MapReduce 的分布式数据处理框架,核心部分由 Erlang 语言开发,外部编程接口为 Python 语言。Disco 是一个开放源代码的大规模数据分析平台,支持大数据集的并行计算,能运行在不可靠的集群计算机上。Disco 可部署在集群和多核计算机上,还可部署在 Amazon EC2 上。Disco 基于主/从架构 (Master/Slave),图 3 整体设计架构图展现了经过一台主节点 (Master) 服务器控制多台从节点 (Slave) 服务器的整体设计架构。
Disco 运行 MapReduce 步骤以下:
Phoenix 做为斯坦福大学 EE382a 课程的一类项目,由斯坦福大学计算机系统实验室开发。Phoenix 对 MapReduce 的实现原则和最初由 Google 实现的 MapReduce 基本相同。不一样的是,它在集群中以实现共享内存系统为目的,共享内存能最小化由任务派生和数据间的通讯所形成的间接成本。Phoenix 可编程多核芯片或共享内存多核处理器 (SMPs 和 ccNUMAs),用于数据密集型任务处理。
Mars 是香港科技大学与微软、新浪合做开发的基于 GPU 的 MapReduce 框架。目前已经包含字符串匹配、矩阵乘法、倒排索引、字词统计、网页访问排名、网页访问计数、类似性评估和 K 均值等 8 项应用,可以在 32 位与 64 位的 Linux 平台上运行。Mars 框架实现方式和基于 CPU 的 MapReduce 框架很是相似,也由 Map 和 Reduce 两个阶段组成,它的基本工做流程图如图 4 所示。
在开始每一个阶段以前,Mars 初始化线程配置,包括 GPU 上线程组的数量和每一个线程组中线程的数量。Mars 在 GPU 内使用大量的线程,在运行时阶段会均匀分配任务给线程,每一个线程负责一个 Map 或 Reduce 任务,以小数量的 key/value 对做为输入,并经过一种无锁的方案来管理 MapReduce 框架中的并发写入。
Mars 的工做流程主要有 7 个操做步骤:
上述步骤的 1,2,3,7 这四个步骤的操做由调度器来完成,调度器负责准备数据输入,在 GPU 上调用 Map 和 Reduce 阶段,并将结果返回给用户。
五种框架的优缺点比较
Hadoop MapReduce | Spark | Phoenix | Disco | Mars | |
---|---|---|---|---|---|
编程语言 | Java 为主 | Scala | C | Erlang | C++ |
构建平台 | 须要首先架构基于 Hadoop 的集群系统,经过 HDFS 完成运算的数据存储工做 | 能够的单独运行,也能够与 Hadoop 框架完整结合 | 独立运行,不须要提早部署集群,运行时系统的实现是创建在 PThread 之上的,也可方便地移植到其余共享内存线程库上 | 整个 Disco 平台由分布式存储系统 DDFS 和 MapReduce 框架组成,DDFS 与计算框架高度耦合,经过监控各个节点上的磁盘使用状况进行负载均衡 | 运行时为 Map 或 Reduce 任务初始化大量的 GPU 线程,并为每一个线程自动分配少许的 key/value 对来运行任务 |
功能特色 | 计算能力很是强,适合超大数据集的应用程序,可是因为系统开销等缘由,处理小规模数据的速度不必定比串行程序快,而且自己集群的稳定性不高 | 在保证容错的前提下,用内存来承载工做集,内存的存取速度快于磁盘多个数量级,从而能够极大提高性能 | 利用共享内存缓冲区实现通讯,从而避免了因数据复制产生的开销,但 Phoenix 也存在不能自动执行迭代计算、没有高效的错误发现机制等不足 | 由一个 Master 服务器和一系列 Worker 节点组成,Master 和 Worker 之间采用基于轮询的通讯机制,经过 HTTP 的方式传输数据。轮询的时间间隔很差肯定,若时间间隔设置不当,会显著下降程序的执行性能 | 因为 GPU 线程不支持运行时动态调度,因此给每一个 GPU 线程分配的任务是固定的,若输入数据划分布均匀,将致使 Map 或 Reduce 阶段的负载不均衡,使得整个系统性能急剧下降。同时因为 GPU 不支持运行时在设备内存中分配空间,须要预先在设备内存中分配好输入数据和输出数据的存放空间,可是 Map 和 Reduce 阶段输出数据大小是未知的,而且当多个 GPU 线程同时向共享输出区域中写数据时,易形成写冲突 |
单词计数 (WordCount) 是最简单也是最能体现 MapReduce 思想的程序之一,能够称为 MapReduce 版"Hello World"。单词计数主要完成功能是:统计一系列文本文件中每一个单词出现的次数。
本次实验的硬件资源基于 x86 服务器 1 台,配置为内存 32GB DDR三、E5 CPU/12 核、GPU,实验数据样本为 10M/50M/100M/500M/1000M 的文本文件五个,咱们使用 Hadoop MapReduce、Spark、Phoenix、Disco、Mars 等 MapReduce 框架分别运行文本分析程序,基于结果一致的前提下统计出运行时间、运行时 CPU 占有率、运行时内存占有率等数据,并采用这些数据绘制成柱状图。
首先须要将文件拆分红 splits,因为测试用的文件较小,因此每一个文件为一个 split,并将文件按行分割造成<key,value>对,图 12 分割过程图所示。这一步由 MapReduce 框架自动完成,其中偏移量(即 key 值)包括了回车所占的字符数(Windows 和 Linux 环境会不一样)。
将分割好的<key,value>对交给用户定义的 map 方法进行处理,生成新的<key,value>对,图 6 执行 map 方法所示。
获得 map 方法输出的<key,value>对后,Mapper 会将它们按照 key 值进行排序,并执行 Combine 过程,将 key 相同的 value 值累加,获得 Mapper 的最终输出结果。图 7Map 端排序及 Combine 过程所示。
Reducer 先对从 Mapper 接收的数据进行排序,再交由用户自定义的 reduce 方法进行处理,获得新的<key,value>对,并做为 WordCount 的输出结果,图 15Reduce 端排序及输出结果所示。
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); // 开始 Map 过程 public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); //遍历 Map 里面的字符串 while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); //开始 Reduce 过程 public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Spark 与 Hadoop MapReduce 的最大区别是它把全部数据保存在内存中,Hadoop MapReduce 须要从外部存储介质里把数据读入到内存,Spark 不须要这一步骤。它的实现原理与 Hadoop MapReduce 没有太大区别,这里再也不重复原理,完整的运行代码如清单 2 所示。
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount"); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaRDD<String> lines = ctx.textFile(args[0], Integer.parseInt(args[1])); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(SPACE.split(s)); } }); //定义 RDD ones JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); //ones.reduceByKey(func, numPartitions) JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } },10); //输出 List List<Tuple2<String, Integer>> output = counts.collect(); Collections.sort(output, new Comparator<Tuple2<String, Integer>>() { @Override public int compare(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { if(t1._2 > t2._2) { return -1; } else if(t1._2 < t2._2) { return 1; } return 0; } });
MapReduce 框架因为 Disco 有分布式文件系统存在,因此通常状况下都不会单独使用,都是从分布式文件系统内取数据后读入内存,而后再切分数据、进入 MapReduce 阶段。首先须要调用 ddfs 的 chunk 命令把文件上传到 DDFS,而后开始编写 MapReduce 程序,Disco 外层应用程序采用 Python 编写。Map 程序实例如清单 3 所示,Reduce 程序示例如清单 4 所示。
def fun_map(line, params): for word in line.split(): yield word, 1
def fun_reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter)): yield word, sum(counts)
from disco.core import Job, result_iterator def map(line, params): for word in line.split(): yield word, 1 def reduce(iter, params): from disco.util import kvgroup for word, counts in kvgroup(sorted(iter)): yield word, sum(counts) if __name__ == '__main__': job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"], map=map, reduce=reduce) for word, count in result_iterator(job.wait(show=True)): print(word, count) Note
Phoenix 是基于 CPU 的 MapReduce 框架,因此它也是采用将数据分割后读入内存,而后开始 MapReduce 处理阶段这样的传统方式。Phoenix 并不禁用户决定切分每一个 Map 分配到的数据块的大小,它是根据集群系统的实际 Cache 大小来切分的,这样能够避免出现分配到 Map 的数据块过大或者太小的状况出现。过大的数据快会致使 Map 执行较慢,太小的数据快会致使 Map 资源浪费,由于每次启动 Map 线程都须要消耗必定的系统资源。Map 阶段切分好的文本被多个 Map 并行执行,Phoenix 支持 100 个左右的 Map 并行执行,一个工做节点下能够有若干个 Map 并行执行。只有当一个工做节点上全部的 Map 任务都结束后才开始 Reduce 阶段。Reduce 阶段继续沿用了动态任务调度机制,同时容许用户自定义数据分区规则。
#include <stdio.h> #include <strings.h> #include <string.h> #include <stddef.h> #include <stdlib.h> #include <unistd.h> #include <assert.h> #include <sys/mman.h> #include <sys/stat.h> #include <sys/time.h> #include <fcntl.h> #include <ctype.h> #include <inttypes.h> #include "map_reduce.h" #include "stddefines.h" #include "sort.h" #define DEFAULT_DISP_NUM 10 typedef struct { int fpos; off_t flen; char *fdata; int unit_size; } wc_data_t; enum { IN_WORD, NOT_IN_WORD }; struct timeval begin, end; #ifdef TIMING unsigned int library_time = 0; #endif /** mystrcmp() * Comparison function to compare 2 words */ int mystrcmp(const void *s1, const void *s2) { return strcmp((const char *)s1, (const char *) s2); } /** mykeyvalcmp() * Comparison function to compare 2 ints */ int mykeyvalcmp(const void *v1, const void *v2) { keyval_t* kv1 = (keyval_t*)v1; keyval_t* kv2 = (keyval_t*)v2; intptr_t *i1 = kv1->val; intptr_t *i2 = kv2->val; if (i1 < i2) return 1; else if (i1 > i2) return -1; else { return strcmp((char *)kv1->key, (char *)kv2->key); //return 0; } } /** wordcount_分割器 () * 内存里面进行 Map 计算 */ int wordcount_splitter(void *data_in, int req_units, map_args_t *out) { wc_data_t * data = (wc_data_t *)data_in; assert(data_in); assert(out); assert(data->flen >= 0); assert(data->fdata); assert(req_units); assert(data->fpos >= 0); // End of file reached, return FALSE for no more data if (data->fpos >= data->flen) return 0; // Set the start of the next data out->data = (void *)&data->fdata[data->fpos]; // Determine the nominal length out->length = req_units * data->unit_size; if (data->fpos + out->length > data->flen) out->length = data->flen - data->fpos; // Set the length to end at a space for (data->fpos += (long)out->length; data->fpos < data->flen && data->fdata[data->fpos] != ' ' && data->fdata[data->fpos] != '\t' && data->fdata[data->fpos] != '\r' && data->fdata[data->fpos] != '\n'; data->fpos++, out->length++); return 1; } /** wordcount_locator() * Return the memory address where this map task would heavily access. */ void *wordcount_locator (map_args_t *task) { assert (task); return task->data; } /** wordcount_map() * 对文本进行计数 */ void wordcount_map(map_args_t *args) { char *curr_start, curr_ltr; int state = NOT_IN_WORD; int i; assert(args); char *data = (char *)args->data; assert(data); curr_start = data; for (i = 0; i < args->length; i++) { curr_ltr = toupper(data[i]); switch (state) { case IN_WORD: data[i] = curr_ltr; if ((curr_ltr < 'A' || curr_ltr > 'Z') && curr_ltr != '\'') { data[i] = 0; emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1); state = NOT_IN_WORD; } break; default: case NOT_IN_WORD: if (curr_ltr >= 'A' && curr_ltr <= 'Z') { curr_start = &data[i]; data[i] = curr_ltr; state = IN_WORD; } break; } } // Add the last word if (state == IN_WORD) { data[args->length] = 0; emit_intermediate(curr_start, (void *)1, &data[i] - curr_start + 1); } } /** wordcount_reduce() * 计算字符 */ void wordcount_reduce(void *key_in, iterator_t *itr) { char *key = (char *)key_in; void *val; intptr_t sum = 0; assert(key); assert(itr); while (iter_next (itr, &val)) { sum += (intptr_t)val; } emit(key, (void *)sum); } void *wordcount_combiner (iterator_t *itr) { void *val; intptr_t sum = 0; assert(itr); while (iter_next (itr, &val)) { sum += (intptr_t)val; } return (void *)sum; } int main(int argc, char *argv[]) { final_data_t wc_vals; int i; int fd; char * fdata; int disp_num; struct stat finfo; char * fname, * disp_num_str; struct timeval starttime,endtime; get_time (&begin); // 确保文件名 if (argv[1] == NULL) { printf("USAGE: %s <filename> [Top # of results to display]\n", argv[0]); exit(1); } fname = argv[1]; disp_num_str = argv[2]; printf("Wordcount: Running...\n"); // 读取文件 CHECK_ERROR((fd = open(fname, O_RDONLY)) < 0); // Get the file info (for file length) CHECK_ERROR(fstat(fd, &finfo) < 0); #ifndef NO_MMAP // 内存里面开始调用 map CHECK_ERROR((fdata = mmap(0, finfo.st_size + 1, PROT_READ | PROT_WRITE, MAP_PRIVATE, fd, 0)) == NULL); #else int ret; fdata = (char *)malloc (finfo.st_size); CHECK_ERROR (fdata == NULL); ret = read (fd, fdata, finfo.st_size); CHECK_ERROR (ret != finfo.st_size); #endif CHECK_ERROR((disp_num = (disp_num_str == NULL) ? DEFAULT_DISP_NUM : atoi(disp_num_str)) <= 0); wc_data_t wc_data; wc_data.unit_size = 5; // approx 5 bytes per word wc_data.fpos = 0; wc_data.flen = finfo.st_size; wc_data.fdata = fdata; CHECK_ERROR (map_reduce_init ()); map_reduce_args_t map_reduce_args; memset(&map_reduce_args, 0, sizeof(map_reduce_args_t)); map_reduce_args.task_data = &wc_data; map_reduce_args.map = wordcount_map; map_reduce_args.reduce = wordcount_reduce; map_reduce_args.combiner = wordcount_combiner; map_reduce_args.splitter = wordcount_splitter; map_reduce_args.locator = wordcount_locator; map_reduce_args.key_cmp = mystrcmp; map_reduce_args.unit_size = wc_data.unit_size; map_reduce_args.partition = NULL; // use default map_reduce_args.result = &wc_vals; map_reduce_args.data_size = finfo.st_size; map_reduce_args.L1_cache_size = atoi(GETENV("MR_L1CACHESIZE"));//1024 * 1024 * 2; map_reduce_args.num_map_threads = atoi(GETENV("MR_NUMTHREADS"));//8; map_reduce_args.num_reduce_threads = atoi(GETENV("MR_NUMTHREADS"));//16; map_reduce_args.num_merge_threads = atoi(GETENV("MR_NUMTHREADS"));//8; map_reduce_args.num_procs = atoi(GETENV("MR_NUMPROCS"));//16; map_reduce_args.key_match_factor = (float)atof(GETENV("MR_KEYMATCHFACTOR"));//2; printf("Wordcount: Calling MapReduce Scheduler Wordcount\n"); gettimeofday(&starttime,0); get_time (&end); #ifdef TIMING fprintf (stderr, "initialize: %u\n", time_diff (&end, &begin)); #endif get_time (&begin); CHECK_ERROR(map_reduce (&map_reduce_args) < 0); get_time (&end); #ifdef TIMING library_time += time_diff (&end, &begin); #endif get_time (&begin); gettimeofday(&endtime,0); printf("Wordcount: Completed %ld\n",(endtime.tv_sec - starttime.tv_sec)); printf("Wordcount: MapReduce Completed\n"); printf("Wordcount: Calling MapReduce Scheduler Sort\n"); mapreduce_sort(wc_vals.data, wc_vals.length, sizeof(keyval_t), mykeyvalcmp); CHECK_ERROR (map_reduce_finalize ()); printf("Wordcount: MapReduce Completed\n"); dprintf("\nWordcount: Results (TOP %d):\n", disp_num); for (i = 0; i < disp_num && i < wc_vals.length; i++) { keyval_t * curr = &((keyval_t *)wc_vals.data)[i]; dprintf("%15s - %" PRIdPTR "\n", (char *)curr->key, (intptr_t)curr->val); } free(wc_vals.data); #ifndef NO_MMAP CHECK_ERROR(munmap(fdata, finfo.st_size + 1) < 0); #else free (fdata); #endif CHECK_ERROR(close(fd) < 0); get_time (&end); #ifdef TIMING fprintf (stderr, "finalize: %u\n", time_diff (&end, &begin)); #endif return 0; }
Mars 框架中,Map 和 Reduce 的处理阶段都在 GPU 内进行,Map 和 Reduce 的分割数据阶段都在 CPU 内进行,这是与其余基于 CPU 的 MapReduce 框架的最大不一样。Mars 更多的是利用 CPU、GPU 缓存来替代内存,执行数据分割、处理过程。
具体的 Word count 的流程以下所示:
经过上述的 7 个步骤,WordCount 的计算过程所有完成而且输出结果集。
#ifndef __MAP_CU__ #define __MAP_CU__ #include "MarsInc.h" #include "global.h" __device__ int hash_func(char* str, int len) { int hash, i; for (i = 0, hash=len; i < len; i++) hash = (hash<<4)^(hash>>28)^str[i]; return hash; } __device__ void MAP_COUNT_FUNC//(void *key, void *val, size_t keySize, size_t valSize) { WC_KEY_T* pKey = (WC_KEY_T*)key; WC_VAL_T* pVal = (WC_VAL_T*)val; char* ptrBuf = pKey->file + pVal->line_offset; int line_size = pVal->line_size; char* p = ptrBuf; int lsize = 0; int wsize = 0; char* start = ptrBuf; while(1) { for (; *p >= 'A' && *p <= 'Z'; p++, lsize++); *p = '\0'; ++p; ++lsize; wsize = (int)(p - start); if (wsize > 6) { //printf("%s, wsize:%d\n", start, wsize); EMIT_INTER_COUNT_FUNC(wsize, sizeof(int)); } for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++); if (lsize >= line_size) break; start = p; } } __device__ void MAP_FUNC//(void *key, void val, size_t keySize, size_t valSize) { WC_KEY_T* pKey = (WC_KEY_T*)key; WC_VAL_T* pVal = (WC_VAL_T*)val; char* filebuf = pKey->file; char* ptrBuf = filebuf + pVal->line_offset; int line_size = pVal->line_size; char* p = ptrBuf; char* start = ptrBuf; int lsize = 0; int wsize = 0; while(1) { for (; *p >= 'A' && *p <= 'Z'; p++, lsize++); *p = '\0'; ++p; ++lsize; wsize = (int)(p - start); int* o_val = (int*)GET_OUTPUT_BUF(0); *o_val = wsize; if (wsize > 6) { //printf("%s, %d\n", start, wsize); EMIT_INTERMEDIATE_FUNC(start, o_val, wsize, sizeof(int)); } for (; (lsize < line_size) && (*p < 'A' || *p > 'Z'); p++, lsize++); if (lsize >= line_size) break; start = p; } } #endif //__MAP_CU__
#ifndef __REDUCE_CU__ #define __REDUCE_CU__ #include "MarsInc.h" __device__ void REDUCE_COUNT_FUNC//(void* key, void* vals, size_t keySize, size_t valCount) { } __device__ void REDUCE_FUNC//(void* key, void* vals, size_t keySize, size_t valCount) { } #endif //__REDUCE_CU__
图 9 实验运行时间比较图是分析不一样大小的文本文件所消耗的时间对比图。从上图能够看出,Hadoop MapReduce 的运行时间最长,缘由是 Hadoop 生态环境包含内容过多,因此每次任务启动时首先须要加载所需资源包,而后缓慢地发起任务,而且因为自己是用性能较差的 Java 语言编写的,因此致使总体计算时间长、性能差。Phoenix 因为采用汇编和 C 语言编写,内核很小,运行时所用资源不多,因此整个测试过程耗时也较少。Spark 框架在 WordCount 实验中消耗的时长较 Disco 稍少,可是比 Phoenix、Mars 耗时太多。耗时最短的两个框架是 Mars 和 Phoenix。须要时长从高到低分别是 Hadoop MapReduce、Disco、Spark、Phoenix、Mars。
图 10-CPU 使用率比较图是分析任务执行过程中 CPU 使用率状况图。从上图能够看出,Hadoop MapReduce、Disco 这两个框架须要占用的 CPU 资源在 1000M 文本处理时基本到达最大饱和度 (大于 90%),Apache Spark 的 CPU 使用率没有彻底伴随着文本文件增大而大幅上涨,Phoenix 和 Mars 基本控制在对 CPU 使用率较低的范围内。
图 11 内存使用率比较图是分析任务执行过程当中内存使用状况对比。从图中能够看出,Mars 和 Phoenix 这两款框架所使用的内存在文本数据较小时是最少的,随着文本数据的增大,Apache Spark 随着数据量增大而内存大幅增长,Mars 和 Phoenix 有必定幅度的内存使用增长趋势。当数据量达到本次测试最大的 1000M 文本时,Spark 框架对内存的消耗是最小的,Hadoop MapReduce 和 Disco 须要占用较多的内存。
从上面的测试结果咱们得出,若是用户只须要处理海量的文本文件,不须要考虑存储、二次数据挖掘等,采用 Phoenix 或者 Mars 是最大性价比的选择,可是因为 Mars 必须在 GPU 上运行,自己 GPU 因为价格因素,致使不太可能在实际应用场景里推广,因此综合来看 Phoenix 是性价比最高的框架。若是应用程序须要处理的数据量很是大,而且客户但愿计算出的数据能够被存储和二次计算或数据挖掘,那 Hadoop MapReduce 较好,由于整个 Hadoop 生态圈庞大,支持性很好。Apache Spark 因为架构层面设计不一样,因此对于 CPU、内存的使用率一直保持较低状态,它将来能够用于海量数据分析用途。
现实世界不少实例均可用 MapReduce 编程模型来表示,MapReduce 做为一个通用可扩展的、高容错性的并行处理模型,可有效地处理海量数据,不断地从中分析挖掘出有价值的信息。MapReduce 封装了并行处理、负载均衡、容错、数据本地化等技术难点细节。经过本文测试用例能够证实 MapReduce 适用于海量文本分析的应用场景,能够为处理大数据提供技术支撑。