MR计算框架学习笔记-持续更新

MapReduce分布式计算框架简称MR,比较适合作数据离线计算;其他计算框架如spark 基于内存的迭代式计算,适合作实时计算框架;Storm适合作流计算。java

MapReduce

  • 分布式离线计算框架程序员

  • 主要适用于大批量的集群任务,因为是批量执行,故时效性偏低。面试

  • 原生支持 Java 语言开发 MapReduce ,其它语言须要使用到 Hadoop Streaming 来开发。算法

Spark

  • Spark 是专为大规模数据处理而设计的快速通用的计算引擎,其是基于内存的迭代式计算。apache

  • Spark 保留了MapReduce 的优势,并且在时效性上有了很大提升,从而对须要迭代计算和有较高时效性要求的系统提供了很好的支持。网络

  • 开发人员能够经过Java、Scala或者Python等语言进行数据分析做业编写,并使用超过80种高级运算符。架构

  • Spark与HDFS全面兼容,同时还能与其它Hadoop组件—包括YARN以及HBase并行协做。app

  • Spark能够被用于处理多种做业类型,好比实时数据分析、机器学习与图形处理。多用于能容忍小延时的推荐与计算系统。负载均衡

Storm

  • Storm是一个分布式的、可靠的、容错的流式计算框架。框架

  • Storm 一开始就是为实时处理设计,所以在实时分析/性能监测等须要高时效性的领域普遍采用。

  • Storm在理论上支持全部语言,只须要少许代码便可完成适配。

  • Storm把集群的状态存在Zookeeper或者本地磁盘,因此后台进程都是无状态的(不须要保存本身的状态,都在zookeeper上),能够在不影响系统健康运行的同时失败或重启。

  • Storm可应用于--数据流处理、持续计算(持续地向客户端发送数据,它们能够实时的更新以及展示数据,好比网站指标)、分布式远程过程调用(轻松地并行化CPU密集型操做)。

参考http://blog.51cto.com/ijiajia/1958741。

核心思想:移动计算而非移动数据;通俗说就是把预先写好的算法在不一样的节点运行,而数据不动。

步骤:

input:hdfs 存储的数据做为mr的输入,也称为原始数据,数据比较大,能够是视频 图片 文档等。。。

split: 切片,对输入数据进行分割 切片,分发到不一样的节点计算

map: 映射 也能够叫建模,对数据切片并行的进行建模,有多少个切片就有多少个map进程。

SM:sort&merge 合并排序,对map的而结果进行合并排序操做

shuff:对相同的key值的数据移动到同一个block中

redu:对shuff的结果计算,数据清洗和处理,

计算框架shuffer:

  • mapeper和reducer中间步骤
  • 把mapper输出结果按照某种k-v切分组合,数据处理以后输出到reducer
  • 简化reducer过程

partiton:分区算法,能够由程序员自定义也可使用系统默认的哈希模运算。每一个对象取哈希值而后模reducer进程数获得结果,按照结果规则进行分区。分区是为了把mapper数据进行从新分配,达到负载均衡目的,解决数据倾斜问题。数据倾斜通常发生在reducer阶段,mapper不会发生数据倾斜问题。默认的partiton算法有可能发生数据倾斜问题。

sort:排序,系统默认的排序是按照对象的ascii码排序,也能够说是按照字典排序。

merge:合并,相同的K进行合并,若有combiner框架则按照框架规则合并,没有则按照系统默认的合并规则

最后把处理好的数据固化到磁盘,把数据拷贝到reducer节点,按照分区不一样拷贝到不一样的的reducer进程。而后按照相同的K进行合并,这些数值有可能来自于不一样的mapper进程。

partiton,sort和combiner在面试中常常会被问到。

若是客户端设置了combiner,那么将会使用combiner对数据合并,将相同的K合并,减小数据量(后面的reducer task 从task tracker 拷贝数据。)。拷贝过来的数据先存放在内存中,在内存合并的时候会对数据作排序

当整个maptask结束后在对磁盘中的这个maptask产生的临时文件作合并。

MR架构:主从架构  (1.0)  

主jobtracker:

  • 负责调度分配每个子任务运行在tasktracker上,若是发现有失败的task就从新分配任务到其余节点,一个集群只有一个jobtracker节点,通常运行在master节点上。

从tasktracker:

  • tasktracker主动与jobtracker通讯,接收做业,负责执行每个任务,为了减小    网络带宽tasktracker最好运行在hdfs上的DN节点上。

MR配置: 

主节点 jobtracker配置:

conf/mapred-site.xml

<property>
            <name>mapred.job.tracker</name>
            <VALUE>localhost:9001</VALUE>
        </property>

从tasktracker 默认在DN节点,能够不用配置。

 

 

MR 简单实现:

mapper函数:封装数据,构造map<Key,Value>键-值对。
Key 文本行号,hadoop自动生成。
Value 每一行文件内容。
context 封装map<Key,Value>输出给reduce函数。


reducer函数:接受mapper函数输出的map<Key,Value>值做为输入值,构造context输出。

Job函数:

  •  * 1.定义做业

  •  * 2.设置Job主函数

  •  * 3.定义Job输入,输出路径

  •  * 4.设置mapper,reducer函数,Job在运行的时候会主动去加载

  •  * 5.设置输出Key,Value格式

 
 调用方法:

 

  1. 单节点

 程序打包成*.jar格式
 执行 export HADOOP_CLASSPATH=../../*.jar
    hadoop com.crbc.TimpJob input   output 
    例如:hadoop -Xmx1024m  com.crbc.TimpJob file:///D:\timpfile\*.gz  D:\timpfile\out\   
 2.集群模式
       上传*.jar到集群主机
       将要处理的文件上传到hdfs文件系统
   hadoop jar *.jar /input /output 

Mapper---->Reducer------->Job
(构造)    (计算)              (运行)

mapper类:

package com.crbc.www;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*
 * Mapper 
 *  LongWritable :输入参数 ,内部定义行号
 *  Text :输入参数,文件value值
 *  Text :输出参数,输出给reduce函数处理的 值
 *  IntWritable:输出参数,输出给reduce函数处理的值
 * 
 */
public class TimpMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
	 
	/*
	 * 重写map函数
	  *  LongWritable :内部定义行号
	  *  Text :文件value值 
	  *  context:输出函数,
	 */
	protected void map(LongWritable key, Text value,Context context)
			throws IOException, InterruptedException {
		String line=value.toString();
		String year=line.substring(15,19);
		int airt;
		if(line.charAt(87)=='+') {
			airt=Integer.parseInt(line.substring(88,92));
		}else {
			airt=Integer.parseInt(line.substring(87,92));
		}
		String quality=line.substring(92,93);
		if(airt !=9999 && quality.matches("[01459]")) {
			//写上下文,maper函数输出做为reduce函数的输入值,封装map<Key,Values>
			context.write(new Text(year), new IntWritable(airt));
		}
	}
 
}

reducer类:

package com.crbc.www;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/*
 * Reducer函数
 *  
 * 
 */
public class TimpReduces extends Reducer<Text, IntWritable, Text,IntWritable> {
 
	/*
	 * Text :输入函数,
	 * IntWritable:输入函数,可迭代
	 * 
	 * context:输出函数
	 */
protected void reduce(Text key, Iterable<IntWritable> value,Context context) throws IOException, InterruptedException {
		int maxValues=Integer.MIN_VALUE;
		for(IntWritable values:value) {
			maxValues = Math.max(maxValues, values.get());
		}
		//写上下文,封装map<Key,Values>输出
		context.write(key,new IntWritable(maxValues));
}

}

Job类:

package com.crbc.www;

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/*
 * 1.定义做业
 * 2.设置Job主函数
 * 3.定义Job输入,输出路径
 * 4.设置mapper,reducer函数,Job在运行的时候会主动去加载
 * 5.设置输出Key,Value格式
 */
public class TimpJob {

	public static void main(String[] args) throws Exception {
		
	 //定义一个做业
	 Job job = new Job();
	 
	 //设置做业主函数
	 job.setJarByClass(TimpJob.class);
	 
	 //设置做业名称,便于调试
	 job.setJobName("MapperReducer");
	 
	 //设置job输入参数,输入函数能够是一个文件路径
	 FileInputFormat.addInputPath(job,new Path(args[0]) );
	 
	 //设置job输出参数,输出函数可使一个路径,把计算计算结果输出到此路径下。
	 //注意此路径是函数建立的,不能跟现有的重名
	 FileOutputFormat.setOutputPath(job, new Path(args[1]));
	 
	 //设置Mapper函数
     job.setMapperClass(TimpMapper.class);
     
     //设置Reduce函数
     job.setReducerClass(TimpReduces.class);
     
     //设置输出key格式
     job.setOutputKeyClass(Text.class);
     
     //设置输出Value格式
     job.setOutputValueClass(IntWritable.class);
     
     //等待做业完成
     job.waitForCompletion(true);
	}

}
相关文章
相关标签/搜索