Hadoop之YARN简介

YARN的由来

从Hadoop2开始,官方把资源管理单独剥离出来,主要是为了考虑后期做为一个公共的资源管理平台,任何知足规则的计算引擎均可以在它上面执行。
因此YARN能够实现HADOOP集群的资源共享,不只仅能够跑MapRedcue,还能够跑Spark、Flink。java

YARN架构分析

我们以前部署Hadoop集群的时候也对YARN的架构有了基本的了解
YARN主要负责集群资源的管理和调度 ,支持主从架构,主节点最多能够有2个,从节点能够有多个
其中:ResourceManager:是主节点,主要负责集群资源的分配和管理
NodeManager:是从节点,主要负责当前机器资源管理node

YARN资源管理模型

YARN主要管理内存和CPU这两种资源类型
当NodeManager节点启动的时候自动向ResourceManager注册,将当前节点上的可用CPU信息和内存信息注册上去。 这样全部的nodemanager注册完成之后,resourcemanager就知道目前集群的资源总量了。linux

那咱们如今来看一下我这个一主两从的集群资源是什么样子的,打开yarn的8088界面

注意,这里面显示的资源是集群中全部从节点的资源总和,不包括主节点的资源,
那咱们再详细看一下每个从节点的资源信息
可是这个数值是对不上的,个人linux机器每台只给它分配了2G的内存,经过free -m能够看到
CPU只分配了1个,经过top命令能够看到

那为何在这里显示是内存是8G,CPU是8个呢?
看一下下面这2个参数web

yarn.nodemanager.resource.memory-mb:单节点可分配的物理内存总量,默认是8MB*1024,即8G
yarn.nodemanager.resource.cpu-vcores:单节点可分配的虚拟CPU个数,默认是8复制代码

看到没有,这都是默认单节点的内存和CPU信息,就算你这个机器没有这么多资源,可是在yarn-default.xml中有这些默认资源的配置,这样当nodemanager去上报资源的时候就会读取这两个参数的值,这也就是为何咱们在前面看到了单节点都是8G内存和8个cpu,其实咱们的linux机器是没有这么大资源的,那你这就是虚标啊,确定不能这样干,你实际有多少就是多少,因此咱们能够修改这些参数的值,修改的话就在yarn-site.xml中进行配置便可,改完以后就能够看到真实的信息了。面试

YARN中的调度器

接下来咱们来详细分析一下YARN中的调度器,这个是很是实用的东西,面试的时候也会常常问到。
你们能够想象一个场景,咱们集群的资源是有限的,在实际工做中会有不少人向集群中提交任务,那这时候资源如何分配呢?
若是你提交了一个很占资源的任务,这一个任务就把集群中90%的资源都占用了,后面别人再提交任务,剩下的资源就不够用了,这个时候怎么办?
让他们等你的任务执行完了再执行?仍是说你把你的资源匀出来一些分给他,你少占用一些,让他也能慢慢执行?apache

YARN中支持三种调度器:架构

  • 1:FIFO Scheduler:先进先出(first in, first out)调度策略
  • 2:Capacity Scheduler:FIFO Scheduler的多队列版本
  • 3:FairScheduler:多队列,多用户共享资源

下面来看图分析一下这三种调度器的特性app

  • FIFO Scheduler:是先进先出的,你们都是排队的,若是你的任务申请不到足够的资源,那你就等着,等前面的任务执行结束释放了资源以后你再执行。这种在有些时候是不合理的,由于咱们有一些任务的优先级比较高,咱们但愿任务提交上去马上就开始执行,这个就实现不了了。
  • CapacityScheduler:它是FifoScheduler的多队列版本,就是咱们先把集群中的整块资源划分红多份,咱们能够人为的给这些资源定义使用场景,例如图里面的queue A里面运行普通的任务,queueB中运行优先级比较高的任务。这两个队列的资源是相互对立的, 可是注意一点,队列内部仍是按照先进先出的规则。
  • FairScheduler:支持多个队列,每一个队列能够配置必定的资源,每一个队列中的任务共享其所在队列的全部资源,不须要排队等待资源, 具体是这样的,假设咱们向一个队列中提交了一个任务,这个任务刚开始会占用整个队列的资源,当你再提交第二个任务的时候,第一个任务会把他的资源释放出来一部分给第二个任务使用

在实际工做中咱们通常都是使用第二种,CapacityScheduler,从hadoop2开始,CapacityScheduler也是集群中的默认调度器了, 那下面咱们到集群上看一下,点击左侧的Scheduler查看ide

Capacity,这个是集群的调度器类型,
下面的root是根的意思,他下面目前只有一个队列,叫default,咱们以前提交的任务都会进入到这个队列中。 下面咱们来修改一下,增长多个队列函数

案例:YARN多资源队列配置和使用

咱们的需求是这样的,但愿增长2个队列,一个是online队列,一个是offline队列
而后向offline队列中提交一个mapreduce任务
online队列里面运行实时任务
offline队列里面运行离线任务,咱们如今学习的mapreduce就属于离线任务
实时任务咱们后面会学习,等讲到了再具体分析。
这两个队列其实也是咱们公司中最开始分配的队列,不过随着后期集群规模的扩大和业务需求的增长,后期又增长了多个队列。
在这里咱们先增长这2个队列,后期再增长多个也是同样的。
具体步骤以下:
修改集群中etc/hadoop,目录下的capacity-scheduler.xml配置文件,修改和增长如下参数,针对已有的参数,修改value中的值,针对没有的参数,则直接增长,这里的default是须要保留的,增长online,offline,这三个队列的资源比例为7:1:2
具体的比例须要根据实际的业务需求来,看大家那些类型的任务比较多,对应的队列中资源比例就调高一些,咱们如今暂时尚未online任务,因此我就把online队列的资源占比设置的小一些。
先修改bigdata01上的配置

[root@bigdata01 hadoop]# vi capacity-scheduler.xml 
<property> <name>yarn.scheduler.capacity.root.queues</name> <value>default,online,offline</value> <description>队列列表,多个队列之间使用逗号分割</description> </property> <property> <name>yarn.scheduler.capacity.root.default.capacity</name> <value>70</value> <description>default队列70%</description> </property> <property> <name>yarn.scheduler.capacity.root.online.capacity</name> <value>10</value> <description>online队列10%</description> </property> <property> <name>yarn.scheduler.capacity.root.offline.capacity</name> <value>20</value> <description>offline队列20%</description> </property> <property> <name>yarn.scheduler.capacity.root.default.maximum-capacity</name> <value>70</value> <description>Default队列可以使用的资源上限.</description> </property> <property> <name>yarn.scheduler.capacity.root.online.maximum-capacity</name> <value>10</value> <description>online队列可以使用的资源上限.</description> </property> <property> <name>yarn.scheduler.capacity.root.offline.maximum-capacity</name> <value>20</value><description>offline队列可以使用的资源上限.</description> </property>复制代码

修改好之后再同步到另外两个节点上
而后重启集群才能生效

[root@bigdata01 hadoop-3.2.0]# sbin/stop-all.sh 
[root@bigdata01 hadoop-3.2.0]# sbin/start-all.sh复制代码

进入yarn的web界面,查看最新的调度器队列信息

注意了,如今默认提交的任务仍是会进入default的队列,若是但愿向offline队列提交任务的话,须要指定队列名称,不指定就进默认的队列
在这里咱们还须要同步微调一下代码,不然咱们指定的队列信息 代码是没法识别的
拷贝WordCountJob类,新的类名为WordCountJobQueue
主要在job配置中增长一行代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

public class WordCountJobQueue {
    /**
     * 建立自定义mapper类
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
        Logger logger = LoggerFactory.getLogger(MyMapper.class);
        /**
         * 须要实现map函数
         * 这个map函数就是能够接收k1,v1, 产生k2,v2
         *
         * @param k1
         * @param v1
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            // k1表明的是每一行的行首偏移量,v1表明的是每一行内容
            // 对获取到的每一行数据进行切割,把单词切割出来
            String[] words = v1.toString().split(" ");
            logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
           // System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
            for (String word : words) {
                // 迭代切割出来的单词数据
                Text k2 = new Text(word);
                LongWritable v2 = new LongWritable(1L);
                logger.info("k2:"+word+"...v2:1");
               // System.out.println("k2:"+word+"...v2:1");
                // 把<k2,v2>写出去 context.write(k2,v2);
                context.write(k2, v2);
            }
        }
    }

    /**
     * 建立自定义reducer类
     */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
        Logger logger = LoggerFactory.getLogger(MyReducer.class);
        /**
         * 针对<k2,{v2……}>的数据进行累加求和,而且最终把数据转化为k3,v3写出去
         *
         * @param k2
         * @param v2s
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
            long sum = 0L;
            for (LongWritable v2 : v2s) {
                logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                // System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                sum += v2.get();
            }
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
           // System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
            context.write(k3, v3);
        }
    }

    public static void main(String[] args) {
        try {

            // job须要的配置参数
            Configuration conf = new Configuration();
            // 解析命令行中经过-D传递过来的参数,添加到conf中
            String[] remainingArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
            // 建立一个job
            Job job = Job.getInstance(conf);
            // 注意:这一行必须设置,不然在集群中执行的是找不到WordCountJob这个类
            job.setJarByClass(WordCountJobQueue.class);
            // 指定输入路径(能够是文件,也能够是目录)
            FileInputFormat.setInputPaths(job, new Path(remainingArgs[0]));
            // 指定输出路径(只能指定一个不存在的目录)
            FileOutputFormat.setOutputPath(job, new Path(remainingArgs[1]));
            // 指定map相关的代码
            job.setMapperClass(MyMapper.class);
            // 指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            // 指定v2的类型
            job.setMapOutputValueClass(LongWritable.class);
            // 指定reduce相关的代码
            job.setReducerClass(MyReducer.class);
            // 指定k3的类型
            job.setOutputKeyClass(Text.class);
            // 指定v3的类型
            job.setOutputValueClass(LongWritable.class);
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}复制代码
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJobQueue -Dmapreduce.job.queuename=offline /test/hello.txt /out复制代码


不指定依旧是defalut

hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.cjt.mr.WordCountJobQueue  /test/hello.txt /out复制代码

相关文章
相关标签/搜索