MapReduce分布式计算框架简称MR,比较适合作数据离线计算;其他计算框架如spark 基于内存的迭代式计算,适合作实时计算框架;Storm适合作流计算。java
分布式离线计算框架程序员
主要适用于大批量的集群任务,因为是批量执行,故时效性偏低。面试
原生支持 Java 语言开发 MapReduce ,其它语言须要使用到 Hadoop Streaming 来开发。算法
Spark 是专为大规模数据处理而设计的快速通用的计算引擎,其是基于内存的迭代式计算。apache
Spark 保留了MapReduce 的优势,并且在时效性上有了很大提升,从而对须要迭代计算和有较高时效性要求的系统提供了很好的支持。网络
开发人员能够经过Java、Scala或者Python等语言进行数据分析做业编写,并使用超过80种高级运算符。架构
Spark与HDFS全面兼容,同时还能与其它Hadoop组件—包括YARN以及HBase并行协做。app
Spark能够被用于处理多种做业类型,好比实时数据分析、机器学习与图形处理。多用于能容忍小延时的推荐与计算系统。负载均衡
Storm是一个分布式的、可靠的、容错的流式计算框架。框架
Storm 一开始就是为实时处理设计,所以在实时分析/性能监测等须要高时效性的领域普遍采用。
Storm在理论上支持全部语言,只须要少许代码便可完成适配。
Storm把集群的状态存在Zookeeper或者本地磁盘,因此后台进程都是无状态的(不须要保存本身的状态,都在zookeeper上),能够在不影响系统健康运行的同时失败或重启。
Storm可应用于--数据流处理、持续计算(持续地向客户端发送数据,它们能够实时的更新以及展示数据,好比网站指标)、分布式远程过程调用(轻松地并行化CPU密集型操做)。
参考http://blog.51cto.com/ijiajia/1958741。
核心思想:移动计算而非移动数据;通俗说就是把预先写好的算法在不一样的节点运行,而数据不动。
步骤:
input:hdfs 存储的数据做为mr的输入,也称为原始数据,数据比较大,能够是视频 图片 文档等。。。
split: 切片,对输入数据进行分割 切片,分发到不一样的节点计算
map: 映射 也能够叫建模,对数据切片并行的进行建模,有多少个切片就有多少个map进程。
SM:sort&merge 合并排序,对map的而结果进行合并排序操做
shuff:对相同的key值的数据移动到同一个block中
redu:对shuff的结果计算,数据清洗和处理,
计算框架shuffer:
partiton:分区算法,能够由程序员自定义也可使用系统默认的哈希模运算。每一个对象取哈希值而后模reducer进程数获得结果,按照结果规则进行分区。分区是为了把mapper数据进行从新分配,达到负载均衡目的,解决数据倾斜问题。数据倾斜通常发生在reducer阶段,mapper不会发生数据倾斜问题。默认的partiton算法有可能发生数据倾斜问题。
sort:排序,系统默认的排序是按照对象的ascii码排序,也能够说是按照字典排序。
merge:合并,相同的K进行合并,若有combiner框架则按照框架规则合并,没有则按照系统默认的合并规则
最后把处理好的数据固化到磁盘,把数据拷贝到reducer节点,按照分区不一样拷贝到不一样的的reducer进程。而后按照相同的K进行合并,这些数值有可能来自于不一样的mapper进程。
partiton,sort和combiner在面试中常常会被问到。
若是客户端设置了combiner,那么将会使用combiner对数据合并,将相同的K合并,减小数据量(后面的reducer task 从task tracker 拷贝数据。)。拷贝过来的数据先存放在内存中,在内存合并的时候会对数据作排序
当整个maptask结束后在对磁盘中的这个maptask产生的临时文件作合并。
MR配置:
主节点 jobtracker配置:
conf/mapred-site.xml
<property> <name>mapred.job.tracker</name> <VALUE>localhost:9001</VALUE> </property>
从tasktracker 默认在DN节点,能够不用配置。
mapper函数:封装数据,构造map<Key,Value>键-值对。
Key 文本行号,hadoop自动生成。
Value 每一行文件内容。
context 封装map<Key,Value>输出给reduce函数。
reducer函数:接受mapper函数输出的map<Key,Value>值做为输入值,构造context输出。
* 1.定义做业
* 2.设置Job主函数
* 3.定义Job输入,输出路径
* 4.设置mapper,reducer函数,Job在运行的时候会主动去加载
* 5.设置输出Key,Value格式
程序打包成*.jar格式
执行 export HADOOP_CLASSPATH=../../*.jar
hadoop com.crbc.TimpJob input output
例如:hadoop -Xmx1024m com.crbc.TimpJob file:///D:\timpfile\*.gz D:\timpfile\out\
2.集群模式
上传*.jar到集群主机
将要处理的文件上传到hdfs文件系统
hadoop jar *.jar /input /output
Mapper---->Reducer------->Job
(构造) (计算) (运行)
mapper类:
package com.crbc.www; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /* * Mapper * LongWritable :输入参数 ,内部定义行号 * Text :输入参数,文件value值 * Text :输出参数,输出给reduce函数处理的 值 * IntWritable:输出参数,输出给reduce函数处理的值 * */ public class TimpMapper extends Mapper<LongWritable, Text,Text, IntWritable> { /* * 重写map函数 * LongWritable :内部定义行号 * Text :文件value值 * context:输出函数, */ protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException { String line=value.toString(); String year=line.substring(15,19); int airt; if(line.charAt(87)=='+') { airt=Integer.parseInt(line.substring(88,92)); }else { airt=Integer.parseInt(line.substring(87,92)); } String quality=line.substring(92,93); if(airt !=9999 && quality.matches("[01459]")) { //写上下文,maper函数输出做为reduce函数的输入值,封装map<Key,Values> context.write(new Text(year), new IntWritable(airt)); } } }
reducer类:
package com.crbc.www; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /* * Reducer函数 * * */ public class TimpReduces extends Reducer<Text, IntWritable, Text,IntWritable> { /* * Text :输入函数, * IntWritable:输入函数,可迭代 * * context:输出函数 */ protected void reduce(Text key, Iterable<IntWritable> value,Context context) throws IOException, InterruptedException { int maxValues=Integer.MIN_VALUE; for(IntWritable values:value) { maxValues = Math.max(maxValues, values.get()); } //写上下文,封装map<Key,Values>输出 context.write(key,new IntWritable(maxValues)); } }
Job类:
package com.crbc.www; import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; /* * 1.定义做业 * 2.设置Job主函数 * 3.定义Job输入,输出路径 * 4.设置mapper,reducer函数,Job在运行的时候会主动去加载 * 5.设置输出Key,Value格式 */ public class TimpJob { public static void main(String[] args) throws Exception { //定义一个做业 Job job = new Job(); //设置做业主函数 job.setJarByClass(TimpJob.class); //设置做业名称,便于调试 job.setJobName("MapperReducer"); //设置job输入参数,输入函数能够是一个文件路径 FileInputFormat.addInputPath(job,new Path(args[0]) ); //设置job输出参数,输出函数可使一个路径,把计算计算结果输出到此路径下。 //注意此路径是函数建立的,不能跟现有的重名 FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置Mapper函数 job.setMapperClass(TimpMapper.class); //设置Reduce函数 job.setReducerClass(TimpReduces.class); //设置输出key格式 job.setOutputKeyClass(Text.class); //设置输出Value格式 job.setOutputValueClass(IntWritable.class); //等待做业完成 job.waitForCompletion(true); } }