1、MR计算模型的由来html
MapReduce最先是由Google公司研究提出的一种面向大规模数据处理的并行计算模型和方法。Google公司设计MapReduce的初衷,主要是为了解决其搜索引擎中大规模网页数据的并行化处理。java
Google公司发明了MapReduce以后,首先用其从新改写了其搜索引擎中的Web文档索引处理系统。但因为MapReduce能够广泛应用于不少大规模数据的计算问题,所以自发明MapReduce之后,Google公司内部进一步将其普遍应用于不少大规模数据处理问题。到目前为止,Google公司内有上万个各类不一样的算法问题和程序都使用MapReduce进行处理。 linux
2003年和2004年,Google公司在国际会议上分别发表了两篇关于Google分布式文件系统和MapReduce的论文,公布了 Google的GFS和MapReduce的基本原理和主要设计思想。程序员
2004年,开源项目Lucene(搜索索引程序库)和Nutch(搜索引擎)的创始人Doug Cutting发现MapReduce正是其所须要的解决大规模Web数据处理的重要技术,于是模仿Google MapReduce,基于Java设计开发了一个称为Hadoop的开源MapReduce并行计算框架和系统。web
自此,Hadoop成为Apache开源组织下最重要的项目,自其推出后很快获得了全球学术界和工业界的广泛关注,并获得推广和普及应用。 MapReduce的推出给大数据并行处理带来了巨大的革命性影响,使其已经成为事实上的大数据处理的工业标准。算法
2、MapReduce基本设计思想apache
对付大数据并行处理:分而治之:编程
一个大数据若能够分为具备一样计算过程的数据块,而且这些数据块之间不存在数据依赖关系,则提升处理速度的最好办法就是采用“分而治之”的策略进行并行化计算。性能优化
MapReduce采用了这种“分而治之”的设计思想,对相互间不具备或者有较少数据依赖关系的大数据,用必定的数据划分方法对数据分片,而后将每一个数据分片交由一个节点去处理,最后汇总处理结果。服务器
上升到抽象模型:Map与Reduce:
MapReduce借鉴了函数式程序设计语言Lisp的设计思想。
用Map和Reduce两个函数提供了高层的并行编程抽象模型和接口,程序员只要实现这两个基本接口便可快速完成并行化程序的设计。
MapReduce的设计目标是能够对一组顺序组织的数据元素/记录进行处理。
现实生活中,大数据每每是由一组重复的数据元素/记录组成,例如,一个Web访问日志文件数据会由大量的重复性的访问日志构成,对这种顺序式数据元素/记录的处理一般也是顺序式扫描处理。
MapReduce提供了如下的主要功能:
数据划分和计算任务调度:系统自动将一个做业(Job)待处理的大数据划分为不少个数据块,每一个数据块对应于一个计算任务(Task),并自动调度计算节点来处理相应的数据块。做业和任务调度功能主要负责分配和调度计算节点(Map节点或Reduce节点),同时负责监控这些节点的执行状态,并负责Map节点执行的同步控制。
数据/代码互定位:为了减小数据通讯,一个基本原则是本地化数据处理,即一个计算节点尽量处理其本地磁盘上所分布存储的数据,这实现了代码向数据的迁移;当没法进行这种本地化数据处理时,再寻找其余可用节点并将数据从网络上传送给该节点(数据向代码迁移),但将尽量从数据所在的本地机架上寻 找可用节点以减小通讯延迟。
系统优化:为了减小数据通讯开销,中间结果数据进入Reduce节点前会进行必定的合并处理;一个Reduce节点所处理的数据可能会来自多个Map节点,为了不Reduce计算阶段发生数据相关性,Map节点输出的中间结果需使用必定的策略进行适当的划分处理,保证相关性数据发送到同一个Reduce节点;此外,系统还进行一些计算性能优化处理,如对最慢的计算任务采用多备份执行、选最快完成者做为结果。
出错检测和恢复:以低端商用服务器构成的大规模MapReduce计算集群中,节点硬件(主机、磁盘、内存等)出错和软件出错是常态,所以 MapReduce须要能检测并隔离出错节点,并调度分配新的节点接管出错节点的计算任务。同时,系统还将维护数据存储的可靠性,用多备份冗余存储机制提 高数据存储的可靠性,并能及时检测和恢复出错的数据
3、MapReduce的编写
1.加入依赖jar包--编写pom.xml
<properties> <org.apache.hadoop.version>2.7.5</org.apache.hadoop.version> </properties> <!--分布式计算--> <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <!--分布式存储--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>${org.apache.hadoop.version}</version> </dependency> </dependencies>
2.在resources中添加core-site.xml文件,配置内容以下:
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://master2:9000</value> </property> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>yarn.resourcemanager.scheduler.address</name> <value>master2:8030</value> </property> <property> <name>fs.hdfs.impl</name> <value>org.apache.hadoop.hdfs.DistributedFileSystem</value> </property> </configuration>
master2:表示我集群中NameNode的主机名,填写主机名须要在本地机中的hosts中添加IP配置,
若是不配置,请填写主机名所对应的IP。
3..mapreduce函数(类)的编写,有三个类分别是表明map、reduce、job
3.1:编写WCMapper类(map)继承Mapper并重写map这个方法,具体内容以下:
package com.day01; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; /** * java基本数据类型步支持序列化--存放在内存中不在磁盘中(不能被持久化) * 用来处理map任务:映射 * map任务接收的是kv,输出的也是kv 1(行号),hello world * 第一个泛型表示:输入key的数据类型 输入的数据至关于文件开头的偏移量(行号)没有实际意义 * 第二个泛型表示:输入value的数据类型 输入的文件的一行内容 * 第三个泛型表示:输出key的数据类型 输出的key是一个字符串 * 第四个泛型表示:输出value的数据类型 * * LongWritable:等价于java中的long * Text :等价与java中的string * IntWritable:等价于java中的int * * XXXWritable 是hadoop定义的基本数据类型,至关于对java中的数据类型作一个封装,同时序列化(能够网络传输以及存储到磁盘上) * */ public class WCMapper extends Mapper<LongWritable,Text,Text,IntWritable> { //map方法每次执行一行数据,会被循环调用map方法(有多少行就调用多少次) @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //将一行数据(text类型)变为string类型 String line = value.toString(); String[] words = line.split(" "); //定义value IntWritable one = new IntWritable(1); //便利单词,输出word 1 for (int i = 0; i < words.length; i++) { Text keyOut = new Text(words[i]); //输出word 1 context.write(keyOut,one); } } }
3.二、编写WCReducer类(reduce)继承Reducer并重写reduce这个方法,具体内容以下:
package com.day01; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; /** * 用来处理reduce任务:合并 * 在reduce端框架会将相同的key的value放在一个集合(迭代器) */ public class WCReducer extends Reducer<Text,IntWritable,Text,IntWritable> { //每次处理一个key,会被循环调用,有多少个key就会调用几回 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //获取迭代器 Iterator<IntWritable> iterator = values.iterator(); int count = 0; while (iterator.hasNext()){ IntWritable one = iterator.next(); count+=one.get(); } //context的write只接受hadoop的数据类型,不接受java的数据类型 context.write(key,new IntWritable(count)); } }
3.三、编写WCJob类(job),具体内容以下:
package com.day01; import com.google.common.io.Resources; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.io.InputStreamReader; import java.io.LineNumberReader; import java.util.ArrayList; //mapred是hadoop的1.X的包,mapreduce是2.X的API /** * 测试-设定任务的运行 * 输入与输出的路径 */ public class WCJob { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration coreSiteConf = new Configuration(); coreSiteConf.addResource(Resources.getResource("core-site.xml")); //设置一个任务,后面是job的名称 Job job = Job.getInstance(coreSiteConf, "wc"); //设置job的运行类,就是此类 job.setJarByClass(WCJob.class); //设置Map和Reduce处理类 job.setMapperClass(WCMapper.class); job.setReducerClass(WCReducer.class); //设置map输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //设置job/reduce输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //设置任务的输入路径 FileInputFormat.addInputPath(job, new Path("/wc")); //设置任务的输出路径--保存结果(这个目录必须是不存在的目录) //删除存在的文件 deleteFileName("/wcout"); FileOutputFormat.setOutputPath(job, new Path("/wcout")); //运行任务 true:表示打印详情 boolean flag = job.waitForCompletion(true); if (flag){ System.out.println(flag); readContent("/wcout/part-r-00000"); }else { System.out.println(flag+",读取文件失败"); } } //删除已经存在在hdfs上面的文件文件 private static void deleteFileName(String path) throws IOException { //将要删除的文件 Path fileName = new Path(path); Configuration entries = new Configuration(); //解析core-site-master2.xml文件 entries.addResource(Resources.getResource("core-site.xml")); //获取客户端文件系统 FileSystem fileSystem = FileSystem.get(entries); if (fileSystem.exists(fileName)){ System.out.println(fileName+"已经存在,正在删除它..."); boolean flag = fileSystem.delete(fileName, true); if (flag){ System.out.println(fileName+"删除成功"); }else { System.out.println(fileName+"删除失败"); return; } } //关闭资源 fileSystem.close(); } //读取文件内容 private static void readContent(String path) throws IOException { //将要读取的文件路径 Path fileName = new Path(path); ArrayList<String> returnValue = new ArrayList<String>(); Configuration configuration = new Configuration(); configuration.addResource(Resources.getResource("core-site.xml")); //获取客户端系统文件 FileSystem fileSystem = FileSystem.get(configuration); //open打开文件--获取文件的输入流用于读取数据 FSDataInputStream inputStream = fileSystem.open(fileName); InputStreamReader inputStreamReader = new InputStreamReader(inputStream); //一行一行的读取数据 LineNumberReader lineNumberReader = new LineNumberReader(inputStreamReader); //定义一个字符串变量用于接收每一行的数据 String str = null; //判断什么时候没有数据 while ((str=lineNumberReader.readLine())!=null){ returnValue.add(str); } //打印数据到控制台 System.out.println("文件内容以下:"); for (String read : returnValue) { System.out.println(read); } //关闭资源 lineNumberReader.close(); inputStream.close(); inputStreamReader.close(); } }
-----在本地运行
a.在你的集群的hdfs上建立/wc
hadoop fs -mkdir /wc
b.将两个文件写入内容,内容间以空格隔开,并将文件put到hdfs中的/wc上
c.在C:\Windows\System32中添加hadoop.dll与winutils.exe文件
hadoop.dll与winutils.exe这两个文件的连接:https://pan.baidu.com/s/10xa7wC3BwlH3oF7DoYFE1A
提取码:c7ie
d.在D:\soft\hadoop\hadoop-2.7.5\bin中添加hadoop.dll与winutils.exe文件
e.将idea中core-site.xml中关于yarn的配置删除掉、
-通过以上a-b-c-d-e操做以后,直接Run,出现如下内容就恭喜你成功!!!!!!!
----------在linux上运行
将写的项目打包
1.点击左下角的方框,再店家maven projects-->找到想要打包的项目
-->点击Lifecycle-->双击package就会开始打包
2.将jar包上传并更名(可改可不改)
3.开始运行jar包
例如运行mrdemo.jar中的WEJob类
hadoop jar mrdemo.jar com.day01.WCJob
也能够在web中输入http://192.168.228.13:8088/cluster/apps/FINISHED
192.168.228.13:修改为本身的主机IP,出现SUCCEEDED表示执行成功
过程当中出现下面异常,在网上查看说是hadoop的一个BUG
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1252)
at java.lang.Thread.join(Thread.java:1326)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:716)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:476)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:652)
异常问题:
保证本地运行成功
上传jar包到hdfs上运行找不到类异常--没有将第三方依赖一块儿打包
---将第三方jar包放在lib里面,再打包(就会自动加载lib里面的jar包)就解决了--以下图
还有其余异常,欢迎分享一块儿解决....