从这篇文章开始,我会开始系统性地输出在大数据踩坑过程当中的积累,后面会涉及到实战项目的具体操做,目前的规划是按照系列来更新,力争作到一个系列在5
篇文章以内总结出最核心的干货,若是是涉及到理论方面的文章,会以画图的方式来说解,若是是涉及到操做方面,会以实际的代码来演示。java
这篇是MapReduce
系列的第一篇,初识MapReduce
的应用场景,在文章后面会有关于代码的演示。python
Hadoop
做为Apache
旗下的一个以Java
语言实现的分布式计算开源框架,其由两个部分组成,一个是分布式的文件系统HDFS
,另外一个是批处理计算框架MapReduce
。这篇文章做为MapReduce
系列的第一篇文章,会从MapReduce
的产生背景、框架的计算流程、应用场景和演示Demo
来说解,主要是让你们对MapReduce
的这个批计算框架有个初步的了解及简单的部署和使用。git
MapReduce
的产生背景MapReduce
的计算流程MapReduce
的框架架构MapReduce
的生命周期Demo
Google
在2004年的时候在 MapReduce: Simplified Data Processing on Large Clusters 这篇论文中提出了MapReduce
的功能特性和设计理念,设计MapReduce
的出发点就是为了解决如何把大问题分解成独立的小问题,再并行解决。例如,MapReduce
的经典使用场景之一就是对一篇长文进行词频统计,统计过程就是先把文章分为一句一句,而后进行分割,最后进行词的数量统计。github
咱们来说解下不一样的组件做用算法
Client
的含义是指用户使用MapReduce
程序经过Client
来提交任务到Job Tracker
上,同时用户也可使用Client
来查看一些做业的运行状态。apache
这个负责的是资源监控和做业调度。JobTracker
会监控着TaskTracker
和做业的健康情况,会把失败的任务转移到其余节点上,同时也监控着任务的执行进度、资源使用量等状况,会把这些消息通知任务调度器,而调度器会在资源空闲的时候选择合适的任务来使用这些资源。编程
任务调度器是一个可插拔的模块,用户能够根据本身的须要来设计相对应的调度器。bash
TaskTracker
会周期性地经过Hearbeat
来向Job Tracker
汇报本身的资源使用状况和任务的运行进度。会接受来自于JobTaskcker
的指令来执行操做(例如启动新任务、杀死任务之类的)。架构
在TaskTracker
中经过的是slot
来进行等量划分一个节点上资源量,只用Task
得到slot
的时候才有机会去运行。调度器的做用就是进行将空闲的slot
分配给Task
使用,能够配置slot
的数量来进行限定Task上的并发度。并发
Task分为Map Task
和Reduce Task
,在MapReduce
中的 split
就是一个 Map Task
,split
的大小能够设置的,由 mapred.max.spilt.size
参数来设置,默认是 Hadoop
中的block
的大小,在Hadoop 2.x
中默认是128M
,在Hadoop 1.x
中默认是64M
。
在Task
中的设置能够这么设置,通常来说,会把一个文件设置为一个split
,若是是小文件,那么就会存在不少的Map Task
,这是特别浪费资源的,若是split
切割的数据块的量大,那么会致使跨节点去获取数据,这样也是消耗不少的系统资源的。
一共分为5个步骤:
由用户提交做业以前,须要先把文件上传到HDFS
上,JobClient
使用upload
来加载关于打包好的jar
包,JobClient
会RPC
建立一个JobInProcess
来进行管理任务,而且建立一个TaskProcess
来管理控制关于每个Task
。
JobTracker
会调度和管理任务,一发现有空闲资源,会按照一个策略选择一个合适的任务来使用该资源。
任务调度器有两个点:一个是保证做业的顺利运行,若是有失败的任务时,会转移计算任务,另外一个是若是某一个Task的计算结果落后于同一个Task的计算结果时,会启动另外一个Task来作计算,最后去计算结果最块的那个。
TaskTracker会为每个Task来准备一个独立的JVM从而避免不一样的Task在运行过程当中的一些影响,同时也使用了操做系统来实现资源隔离防止Task滥用资源。
每一个Task的任务进度经过RPC来汇报给TaskTracker,再由TaskTracker汇报给JobTracker。
先来看一张图,系统地了解下 MapReduce
的运算流程。
为了方便你们理解,从新画了一张新的图,演示的是关于如何进行把一个长句进行分割,最后进行词频的统计(已忽略掉标点符号)。
整个过程就是先读取文件,接着进行split
切割,变成一个一个的词,而后进行 map task
任务,排列出全部词的统计量,接着 sorting
排序,按照字典序来排,接着就是进行 reduce task
,进行了词频的汇总,最后一步就是输出为文件。例如图中的 spacedong
就出现了两次。
其中对应着的是 Hadoop Mapreduce
对外提供的五个可编程组件,分别是InputFormat
、Mapper
、Partitioner
、Reduce
和OutputFormat
,后续的文章会详细讲解这几个组件。
用一句话简单地总结就是,Mapreduce
的运算过程就是进行拆解-排序-汇总,解决的就是统计的问题,使用的思想就是分治的思想。
MapReduce
的产生是为了把某些大的问题分解成小的问题,而后解决小问题后,大问题也就解决了。那么通常有什么样的场景会运用到这个呢?那可多了去,简单地列举几个经典的场景。
URL
的访问频率搜索引擎的使用中,会遇到大量的URL的访问,因此,可使用 MapReduce
来进行统计,得出(URL
,次数)结果,在后续的分析中可使用。
Map
函数去分析文件格式是(词,文档号)的列表,Reduce
函数就分析这个(词,文档号),排序全部的文档号,输出(词,list
(文档号)),这个就能够造成一个简单的倒排索引,是一种简单的算法跟踪词在文档中的位置。
在各类的文档分析,或者是不一样的场景中,常常会遇到关于 Top K
的问题,例如输出这篇文章的出现前5
个最多的词汇。这个时候也可使用 MapReduce
来进行统计。
Demo
今天的代码演示从Python
和Java
两个版本的演示,Python
版本的话即是不使用封装的包,Java
版本的话则是使用了Hadoop
的封装包。接下来便进行演示一个MapReduce
的简单使用,如何进行词汇统计。
Java
版本代码txt
格式的。文件名是WordMRDemo.txt
,内容是下面简短的一句话,以空格分割开:hello my name is spacedong welcome to the spacedong thank you
Hadoop
的依赖包//这里使用的是2.6.5的依赖包,你可使用其余版本的
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.5</version>
</dependency>
复制代码
WordMapper.java
文件,代码的做用是进行以空格的形式进行分词。public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
@Override
protected void map(LongWritable key, Text value, Mapper.Context context)
throws java.io.IOException, InterruptedException {
String line = value.toString();
//StringTokenizer默认按照空格来切
StringTokenizer st = new StringTokenizer(line);
while (st.hasMoreTokens()) {
String world = st.nextToken();
//map输出
context.write(new Text(world), new IntWritable(1));
}
}
}
复制代码
WordReduce.java
文件,做用是进行词汇的统计。public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> iterator, Context context)
throws java.io.IOException ,InterruptedException {
int sum = 0 ;
for(IntWritable i:iterator){
sum+=i.get();
}
context.write(key, new IntWritable(sum));
}
}
复制代码
WordMRDemo.java
文件,做用是运行Job
,开始分析句子。public class WordMRDemo {
public static void main(String[] args) {
Configuration conf = new Configuration();
//设置mapper的配置,既就是hadoop/conf/mapred-site.xml的配置信息
conf.set("mapred.job.tracker", "hadoop:9000");
try {
//新建一个Job工做
Job job = new Job(conf);
//设置运行类
job.setJarByClass(WordMRDemo.class);
//设置要执行的mapper类
job.setMapperClass(WordMapper.class);
//设置要执行的reduce类
job.setReducerClass(WordReduce.class);
//设置输出key的类型
job.setMapOutputKeyClass(Text.class);
//设置输出value的类型
job.setMapOutputValueClass(IntWritable.class);
//设置ruduce任务的个数,默认个数为一个(通常reduce的个数越多效率越高)
//job.setNumReduceTasks(2);
//mapreduce 输入数据的文件/目录,注意,这里能够输入的是目录。
FileInputFormat.addInputPath(job, new Path("F:\\BigDataWorkPlace\\data\\input"));
//mapreduce 执行后输出的数据目录,不能预先存在,不然会报错。
FileOutputFormat.setOutputPath(job, new Path("F:\\BigDataWorkPlace\\data\\out"));
//执行完毕退出
System.exit(job.waitForCompletion(true) ? 0 : 1);
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
WordMRDemo.java
文件,而后获得的结果是out
文件夹内的内容,它长这个样子:
打开part-r-00000
文件的内容以下
map.py
文件,进行词汇的切割。for line in sys.stdin:
time.sleep(1000)
ss = line.strip().split(' ')
for word in ss:
print '\t'.join([word.strip(), '1'])
复制代码
red.py
文件,进行词汇的统计。cur_word = None
sum = 0
for line in sys.stdin:
ss = line.strip().split('\t')
if len(ss) != 2:
continue
word, cnt = ss
if cur_word == None:
cur_word = word
if cur_word != word:
print '\t'.join([cur_word, str(sum)])
cur_word = word
sum = 0
sum += int(cnt)
print '\t'.join([cur_word, str(sum)])
复制代码
run.sh
文件,直接运行便可。HADOOP_CMD="/usr/local/src/hadoop-2.6.5/bin/hadoop"
STREAM_JAR_PATH="/usr/local/src/hadoop-2.6.5/share/hadoop/tools/lib/hadoop-streaming-2.6.5.jar"
INPUT_FILE_PATH_1="/test.txt"
OUTPUT_PATH="/output"
$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH
# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH_1 \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python red.py" \
-file ./map.py \
-file ./red.py
复制代码
以上的是演示demo
的核心代码,完整的代码能够上github
的代码仓库上获取。 仓库地址为:https://github.com/spacedong/bigDataNotes
以上的文章是MapReduce
系列的第一篇,下篇预告是MapReduce的编程模型
,敬请期待!
参考资料:
Hadoop的技术内幕:深刻解析MapReduce架构设计及实现原理