前提已经安装好hadoop的hdfs集群,能够查看html
https://www.cnblogs.com/tree1123/p/10683570.htmlnode
Mapreduce是hadoop的运算框架,能够对hdfs中的数据分开进行计算,先执行不少maptask,在执行reducetask,这个过程当中任务的执行须要一个任务调度的平台,就是yarn。windows
1、安装YARN集群app
yarn集群中有两个角色:框架
主节点:Resource Manager 1台分布式
从节点:Node Manager N台ide
Resource Manager通常安装在一台专门的机器上oop
Node Manager应该与HDFS中的data node重叠在一块儿code
修改配置文件:yarn-site.xmlorm
<property> <name>yarn.resourcemanager.hostname</name> <value>主机名</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>2048</value> </property> <property> <name>yarn.nodemanager.resource.cpu-vcores</name> <value>2</value> </property>
而后scp到全部机器,修改主节点hadoop的slaves文件,列入要启动nodemanager的机器,配好免密
而后,就能够用脚本启动yarn集群:
sbin/start-yarn.sh
中止:
sbin/stop-yarn.sh
页面:http://主节点:8088 看看node manager节点是否识别
开发一个提交job到yarn的客户端类,mapreduce全部jar和自定义类,打成jar包上传到hadoop集群中的任意一台机器上,运行jar包中的(YARN客户端类
hadoop jar ......JobSubmitter
2、开发mapreduce程序
注意理解分而治之的思想,先进行map:映射,对应,个数不变。 reduce:化简,合并,将一系列数据,化简为一个值。
主要须要开发:
map阶段的进、出数据,
reduce阶段的进、出数据,
类型都应该是实现了HADOOP序列化框架的类型,如:
String对应Text
Integer对应IntWritable
Long对应LongWritable
例子wordcount代码:
WordcountMapper
public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 切单词 String line = value.toString(); String[] words = line.split(" "); for(String word:words){ context.write(new Text(word), new IntWritable(1)); } } }
WordcountReducer
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException { int count = 0; Iterator<IntWritable> iterator = values.iterator(); while(iterator.hasNext()){ IntWritable value = iterator.next(); count += value.get(); } context.write(key, new IntWritable(count)); } } public class JobSubmitter { public static void main(String[] args) throws Exception { // 在代码中设置JVM系统参数,用于给job对象来获取访问HDFS的用户身份 System.setProperty("HADOOP_USER_NAME", "root"); Configuration conf = new Configuration(); // 一、设置job运行时要访问的默认文件系统 conf.set("fs.defaultFS", "hdfs://hdp-01:9000"); // 二、设置job提交到哪去运行 conf.set("mapreduce.framework.name", "yarn"); conf.set("yarn.resourcemanager.hostname", "hdp-01"); // 三、若是要从windows系统上运行这个job提交客户端程序,则须要加这个跨平台提交的参数 conf.set("mapreduce.app-submission.cross-platform","true"); Job job = Job.getInstance(conf); // 一、封装参数:jar包所在的位置 job.setJar("d:/wc.jar"); //job.setJarByClass(JobSubmitter.class); // 二、封装参数: 本次job所要调用的Mapper实现类、Reducer实现类 job.setMapperClass(WordcountMapper.class); job.setReducerClass(WordcountReducer.class); // 三、封装参数:本次job的Mapper实现类、Reducer实现类产生的结果数据的key、value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); Path output = new Path("/wordcount/output"); FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root"); if(fs.exists(output)){ fs.delete(output, true); } // 四、封装参数:本次job要处理的输入数据集所在路径、最终结果的输出路径 FileInputFormat.setInputPaths(job, new Path("/wordcount/input")); FileOutputFormat.setOutputPath(job, output); // 注意:输出路径必须不存在 // 五、封装参数:想要启动的reduce task的数量 job.setNumReduceTasks(2); // 六、提交job给yarn boolean res = job.waitForCompletion(true); System.exit(res?0:-1); } }
MR还有一些高级的用法:自定义类型,自定义Partitioner,Combiner,排序,倒排索引,自定义GroupingComparator
3、mapreduce与yarn的核心机制
yarn是一个分布式程序的运行调度平台
yarn中有两大核心角色:
一、Resource Manager
接受用户提交的分布式计算程序,并为其划分资源
管理、监控各个Node Manager上的资源状况,以便于均衡负载
二、Node Manager
管理它所在机器的运算资源(cpu + 内存)
负责接受Resource Manager分配的任务,建立容器、回收资源
Mapreduce工做机制:
划分输入切片——》 环形缓冲区 ——》 分区排序 ——》Combiner 局部聚合——》shuffle ——》GroupingComparator——》输出