目前开源社区有许多并行计算模型和框架可供选择,按照实现方式、运行机制、依附的产品生态圈等可以被划分为几个类型,每个类型各有优缺点,如果能够对各类型的并行计算框架都进行深入研究及适当的缺点修复,就可以为不同硬件环境下的海量数据分析需求提供不同的软件层面的解决方案。
并行计算或称平行计算是相对于串行计算来说的。它是一种一次可执行多个指令的算法,目的是提高计算速度,以及通过扩大问题求解规模,解决大型而复杂的计算问题。所谓并行计算可分为时间上的并行和空间上的并行。时间上的并行就是指流水线技术,而空间上的并行则是指用多个处理器并发的执行计算。并行计算(Parallel Computing)是指同时使用多种计算资源解决计算问题的过程,是提高计算机系统计算速度和处理能力的一种有效手段。它的基本思想是用多个处理器来协同求解同一问题,即将被求解的问题分解成若干个部分,各部分均由一个独立的处理机来并行计算。并行计算系统既可以是专门设计的、含有多个处理器的超级计算机,也可以是以某种方式互连的若干台的独立计算机构成的集群。通过并行计算集群完成数据的处理,再将处理的结果返回给用户。
欧美发达国家对于并行计算技术的研究要远远早于我国,从最初的并行计算逐渐过渡到网格计算,随着 Internet 网络资源的迅速膨胀,因特网容纳了海量的各种类型的数据和信息。海量数据的处理对服务器 CPU、IO 的吞吐都是严峻的考验,不论是处理速度、存储空间、容错性,还是在访问速度等方面,传统的技术架构和仅靠单台计算机基于串行的方式越来越不适应当前海量数据处理的要求。国内外学者提出很多海量数据处理方法,以改善海量数据处理存在的诸多问题。
目前已有的海量数据处理方法在概念上较容易理解,然而由于数据量巨大,要在可接受的时间内完成相应的处理,只有将这些计算进行并行化处理,通过提取出处理过程中存在的可并行工作的分量,用分布式模型来实现这些并行分量的并行执行过程。随着技术的发展,单机的性能有了突飞猛进的发展变化,尤其是内存和处理器等硬件技术,但是硬件技术的发展在理论上总是有限度的,如果说硬件的发展在纵向上提高了系统的性能,那么并行技术的发展就是从横向上拓展了处理的方式。
2003 年美国 Google 公司对外发布了 MapReduce、GFS、BigData 三篇论文,至此正式将并行计算框架落地为 MapReduce 框架。
我国的并行和分布式计算技术研究起源于 60 年代末,按照国防科技大学周兴铭院士提出的观点,到目前为止已经三个阶段了。第一阶段,自 60 年代末至 70 年代末,主要从事大型机内的并行处理技术研究;第二阶段,自 70 年代末至 90 年代初,主要从事向量机和并行多处理器系统研究;第三阶段,自 80 年代末至今,主要从事 MPP(Massively Parallel Processor) 系统研究。
尽管我国在并行计算方面开展的研究和应用较早,目前也拥有很多的并行计算资源,但研究和应用的成效相对美国还存在较大的差距,有待进一步的提高和发展。
MapReduce 是由谷歌推出的一个编程模型,是一个能处理和生成超大数据集的算法模型,该架构能够在大量普通配置的计算机上实现并行化处理。MapReduce 编程模型结合用户实现的 Map 和 Reduce 函数。用户自定义的 Map 函数处理一个输入的基于 key/value pair 的集合,输出中间基于 key/value pair 的集合,MapReduce 库把中间所有具有相同 key 值的 value 值集合在一起后传递给 Reduce 函数,用户自定义的 Reduce 函数合并所有具有相同 key 值的 value 值,形成一个较小 value 值的集合。一般地,一个典型的 MapReduce 程序的执行流程如图 1 所示。
MapReduce 执行过程主要包括:
任务成功完成后,MapReduce 的输出存放在 R 个输出文件中,一般情况下,这 R 个输出文件不需要合并成一个文件,而是作为另外一个 MapReduce 的输入,或者在另一个可处理多个分割文件的分布式应用中使用。
受 Google MapReduce 启发,许多研究者在不同的实验平台上实现了 MapReduce 框架,本文将对 Apache Hadoop MapReduce、Apache、Spark、斯坦福大学的 Phoenix,Nokia 研发的 Disco,以及香港科技大学的 Mars 等 5 个 MapReduce 实现框架进行逐一介绍和各方面对比。
Hadoop 的设计思路来源于 Google 的 GFS 和 MapReduce。它是一个开源软件框架,通过在集群计算机中使用简单的编程模型,可编写和运行分布式应用程序处理大规模数据。Hadoop 包含三个子项目:Hadoop Common、Hadoop Distributed File System(HDFS) 和 Hadoop MapReduce。
第一代 Hadoop MapReduce 是一个在计算机集群上分布式处理海量数据集的软件框架,包括一个 JobTracker 和一定数量的 TaskTracker。运行流程图如图 2 所示。
在最上层有 4 个独立的实体,即客户端、JobTracker、TaskTracker 和分布式文件系统。客户端提交 MapReduce 作业;JobTracker 协调作业的运行;JobTracker 是一个 Java 应用程序,它的主类是 JobTracker;TaskTracker 运行作业划分后的任务,TaskTracker 也是一个 Java 应用程序,它的主类是 TaskTracker。Hadoop 运行 MapReduce 作业的步骤主要包括提交作业、初始化作业、分配任务、执行任务、更新进度和状态、完成作业等 6 个步骤。
Spark 是一个基于内存计算的开源的集群计算系统,目的是让数据分析更加快速。Spark 非常小巧玲珑,由加州伯克利大学 AMP 实验室的 Matei 为主的小团队所开发。使用的语言是 Scala,项目的核心部分的代码只有 63 个 Scala 文件,非常短小精悍。Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 提供了基于内存的计算集群,在分析数据时将数据导入内存以实现快速查询,“速度比”基于磁盘的系统,如比 Hadoop 快很多。Spark 最初是为了处理迭代算法,如机器学习、图挖掘算法等,以及交互式数据挖掘算法而开发的。在这两种场景下,Spark 的运行速度可以达到 Hadoop 的几百倍。
Disco 是由 Nokia 研究中心开发的,基于 MapReduce 的分布式数据处理框架,核心部分由 Erlang 语言开发,外部编程接口为 Python 语言。Disco 是一个开放源代码的大规模数据分析平台,支持大数据集的并行计算,能运行在不可靠的集群计算机上。Disco 可部署在集群和多核计算机上,还可部署在 Amazon EC2 上。Disco 基于主/从架构 (Master/Slave),图 3 总体设计架构图展示了通过一台主节点 (Master) 服务器控制多台从节点 (Slave) 服务器的总体设计架构。
Disco 运行 MapReduce 步骤如下:
Phoenix 作为斯坦福大学 EE382a 课程的一类项目,由斯坦福大学计算机系统实验室开发。Phoenix 对 MapReduce 的实现原则和最初由 Google 实现的 MapReduce 基本相同。不同的是,它在集群中以实现共享内存系统为目的,共享内存能最小化由任务派生和数据间的通信所造成的间接成本。Phoenix 可编程多核芯片或共享内存多核处理器 (SMPs 和 ccNUMAs),用于数据密集型任务处理。
Mars 是香港科技大学与微软、新浪合作开发的基于 GPU 的 MapReduce 框架。目前已经包含字符串匹配、矩阵乘法、倒排索引、字词统计、网页访问排名、网页访问计数、相似性评估和 K 均值等 8 项应用,能够在 32 位与 64 位的 Linux 平台上运行。Mars 框架实现方式和基于 CPU 的 MapReduce 框架非常类似,也由 Map 和 Reduce 两个阶段组成,它的基本工作流程图如图 4 所示。
在开始每个阶段之前,Mars 初始化线程配置,包括 GPU 上线程组的数量和每个线程组中线程的数量。Mars 在 GPU 内使用大量的线程,在运行时阶段会均匀分配任务给线程,每个线程负责一个 Map 或 Reduce 任务,以小数量的 key/value 对作为输入,并通过一种无锁的方案来管理 MapReduce 框架中的并发写入。
Mars 的工作流程主要有 7 个操作步骤:
上述步骤的 1,2,3,7 这四个步骤的操作由调度器来完成,调度器负责准备数据输入,在 GPU 上调用 Map 和 Reduce 阶段,并将结果返回给用户。
五种框架的优缺点比较
单词计数 (WordCount) 是最简单也是最能体现 MapReduce 思想的程序之一,可以称为 MapReduce 版"Hello World"。单词计数主要完成功能是:统计一系列文本文件中每个单词出现的次数。
本次实验的硬件资源基于 x86 服务器 1 台,配置为内存 32GB DDR3、E5 CPU/12 核、GPU,实验数据样本为 10M/50M/100M/500M/1000M 的文本文件五个,我们使用 Hadoop MapReduce、Spark、Phoenix、Disco、Mars 等 MapReduce 框架分别运行文本分析程序,基于结果一致的前提下统计出运行时间、运行时 CPU 占有率、运行时内存占有率等数据,并采用这些数据绘制成柱状图。
首先需要将文件拆分成 splits,由于测试用的文件较小,所以每个文件为一个 split,并将文件按行分割形成<key,value>对,图 12 分割过程图所示。这一步由 MapReduce 框架自动完成,其中偏移量(即 key 值)包括了回车所占的字符数(Windows 和 Linux 环境会不同)。
将分割好的<key,value>对交给用户定义的 map 方法进行处理,生成新的<key,value>对,图 6 执行 map 方法所示。
得到 map 方法输出的<key,value>对后,Mapper 会将它们按照 key 值进行排序,并执行 Combine 过程,将 key 相同的 value 值累加,得到 Mapper 的最终输出结果。图 7Map 端排序及 Combine 过程所示。
Reducer 先对从 Mapper 接收的数据进行排序,再交由用户自定义的 reduce 方法进行处理,得到新的<key,value>对,并作为 WordCount 的输出结果,图 15Reduce 端排序及输出结果所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
|
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<
Object
, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
// 开始 Map 过程
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
//遍历 Map 里面的字符串
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<
Text
,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
//开始 Reduce 过程
public void reduce(Text key, Iterable<
IntWritable
> values,Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <
in
> <
out
>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
|
Spark 与 Hadoop MapReduce 的最大区别是它把所有数据保存在内存中,Hadoop MapReduce 需要从外部存储介质里把数据读入到内存,Spark 不需要这一步骤。它的实现原理与 Hadoop MapReduce 没有太大区别,这里不再重复原理,完整的运行代码如清单 2 所示。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaRDD<
String
> lines = ctx.textFile(args[0], Integer.parseInt(args[1]));
JavaRDD<
String
> words = lines.flatMap(new FlatMapFunction<
String
, String>() {
@Override
public Iterable<
String
> call(String s) {
return Arrays.asList(SPACE.split(s));
}
});
//定义 RDD ones
JavaPairRDD<
String
, Integer> ones = words.mapToPair(new PairFunction<
String
, String, Integer>() {
@Override
public Tuple2<
String
, Integer> call(String s) {
return new Tuple2<
String
, Integer>(s, 1);
}
});
//ones.reduceByKey(func, numPartitions)
JavaPairRDD<
String
, Integer> counts = ones.reduceByKey(new Function2<
Integer
, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
},10);
//输出 List
List<
Tuple2
<String, Integer>> output = counts.collect();
Collections.sort(output, new Comparator<
Tuple2
<String, Integer>>() {
@Override
public int compare(Tuple2<
String
, Integer> t1,
Tuple2<
String
, Integer> t2) {
if(t1._2 > t2._2) {
return -1;
} else if(t1._2 < t2._2) {
return 1;
}
return 0;
}
});
|
MapReduce 框架由于 Disco 有分布式文件系统存在,所以一般情况下都不会单独使用,都是从分布式文件系统内取数据后读入内存,然后再切分数据、进入 MapReduce 阶段。首先需要调用 ddfs 的 chunk 命令把文件上传到 DDFS,然后开始编写 MapReduce 程序,Disco 外层应用程序采用 Python 编写。Map 程序实例如清单 3 所示,Reduce 程序示例如清单 4 所示。
1
2
3
|
def fun_map(line, params):
for word in line.split():
yield word, 1
|
1
2
3
4
|
def fun_reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
from disco.core import Job, result_iterator
def map(line, params):
for word in line.split():
yield word, 1
def reduce(iter, params):
from disco.util import kvgroup
for word, counts in kvgroup(sorted(iter)):
yield word, sum(counts)
if __name__ == '__main__':
job = Job().run(input=["http://discoproject.org/media/text/chekhov.txt"],
map=map,
reduce=reduce)
for word, count in result_iterator(job.wait(show=True)):
print(word, count)
Note
|
Phoenix 是基于 CPU 的 MapReduce 框架,所以它也是采用将数据分割后读入内存,然后开始 MapReduce 处理阶段这样的传统方式。Phoenix 并不由用户决定切分每个 Map 分配到的数据块的大小,它是根据集群系统的实际 Cache 大小来切分的,这样可以避免出现分配到 Map 的数据块过大或者过小的情况出现。过大的数据快会导致 Map 执行较慢,过小的数据快会导致 Map 资源浪费,因为每次启动 Map 线程都需要消耗一定的系统资源。Map 阶段切分好的文本被多个 Map 并行执行,Phoenix 支持 100 个左右的 Map 并行执行,一个工作节点下可以有若干个 Map 并行执行。只有当一个工作节点上所有的 Map 任务都结束后才开始 Reduce 阶段。Reduce 阶段继续沿用了动态任务调度机制,同时允许用户自定义数据分区规则。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
65
66
67
|