Hadoop要点总结

Hadoop是一个由Apache基金会所开发的分布式系统基础架构,主要解决,海量数据的存储和海量数据的分析计算问题。java

HDFS

NameNode工做机制

  1. 加载fsimage(镜像文件)和edits.001(编辑日志)到内存中
  2. 客户端向namenode发起增删改查请求
  3. namenode记录操做日志,更新滚动日志
  4. namenode在内存中对数据进行增删改查

Secondary NameNode工做机制

  1. 向namenode询问是否须要checkpointnode

    checkpoint触发条件:web

    1. 定时时间到了
    2. edits中数据满了
  2. 请求执行checkpoint
  3. namenode滚动正在写的edits
  4. 拷贝编辑日志、镜像文件到secondary namenode中
  5. secondary namenode把拷贝来的镜像文件和编辑日志合并
  6. 生成新的fsimage命名为fsimage.chkpoint
  7. 将fsimage.chkpoint拷贝到namenode
  8. 重命名生成fsimage

HDFS写数据流程

  1. 客户端向namenode请求上传文件,namenode检查目标文件是否已存在,父目录是否存在。
  2. namenode返回是否能够上传
  3. 客户端请求第一个block上传到哪几个datanode服务器上
  4. namenode返回三个datanode节点,分别为dn1,dn2,dn3
  5. 客户端请求dn1上传数据,dn1收到请求会继续调用dn2,而后dn2调用dn3,将这个通讯管道创建完成
  6. dn一、dn二、dn3逐级应答客户端
  7. 客户端开始往dn1上传第一个block,以packet为单位,dn1收到一个packet就会传给dn2,dn2传给dn3,dn1每传一个packet会放入一个应答队列等待应答。
  8. 当一个block传输完成以后,客户端再次请求namenode上传第二个block的服务器。(重复执行3-7步)

HDFS读数据流程

  1. 客户端向namenode请求下载文件,namenode经过查询元数据,找到文件块所在的datanode地址
  2. 挑选一台datanode(就近原则,而后随机)服务器,请求读取数据
  3. datanode开始传输数据给客户端(从磁盘里读取数据放入流,以packet为单位作校验)
  4. 客户端以packet单位接收,先缓存在本地,而后写入目标文件中

SecondaryNameNode目录结构

$HADOOP_HOME/data/tmp/dfs/namesecondary/current这个目录中查看SecondaryNameNode目录结构。
在主namenode发生故障时(假设没有及时备份数据),能够从SecondaryNameNode恢复数据:
方案一:将SecondaryNameNode中数据拷贝到namenode存储数据的目录。
方案二:使用-importCheckpoint选项启动namenode守护进程,从而将SecondaryNameNode中数据拷贝到namenode目录中。(极慢)数据库

DataNode工做机制

  1. 一个数据块在datanode上以文件形式存储在磁盘上,包括两个文件,一个是数据自己,一个是元数据包括数据块长度,块数据校验和以及时间戳
  2. datanode启动后向namenode注册,经过后,周期性(1小时)的向namenode上报全部的块信息。
  3. 心跳是3秒一次,心跳返回结果带有namenode给该datanode的命令如复制数据块到另外一台机器,或删除某个数据块。若是超过10分钟没有收到某个datanode的心跳,则认为该节点不可用。
  4. 集群运行中能够安全加入和退出一些机器。

服役新节点

1.在namenode主机的$HADOOP_HOME/etc/hadoop目录下建立dfs.hosts文件,在该文件中添加全部主机名(包括新节点)e.g.apache

hadoop102
hadoop103
hadoop104
hadoop105

2.在namenode的hdfs-site.xml配置文件中增长dfs.hosts属性浏览器

<property>
    <name>dfs.hosts</name>
    <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts</value>
</property>

3.刷新namenode:hdfs dfsadmin -refreshNodes
4.更新resourcemanager节点:yarn rmadmin -refreshNodes
5.在namenode的slaves文件中增长新主机名称
6.单独命令启动新的数据节点和节点管理器
hadoop-daemon.sh start datanode
yarn-daemon.sh start nodemanager缓存

退役旧节点

1.在namenode的/opt/module/hadoop-2.7.2/etc/hadoop目录下建立dfs.hosts.exclude文件,向其中添加要退役的主机名称
2.在namenode的hdfs-site.xml配置文件中增长dfs.hosts.exclude属性安全

<property>
    <name>dfs.hosts.exclude</name>
    <value>/opt/module/hadoop-2.7.2/etc/hadoop/dfs.hosts.exclude</value>
</property>

3.刷新namenode、刷新resourcemanager
hdfs dfsadmin -refreshNodes
yarn rmadmin -refreshNodes
4.在web浏览器端检查,退役节点的状态为decommission in progress(退役中),说明数据节点正在复制块到其余节点。
5.等待退役节点状态为decommissioned(全部块已经复制完成),中止该节点及节点资源管理器。
hadoop-daemon.sh stop datanode
yarn-daemon.sh stop nodemanager性能优化

PS:若是副本数是3,服役的节点小于等于3,是不能退役成功的,须要修改副本数后才能退役。

6.从include文件中删除退役节点,再运行刷新节点的命令
7.从namenode的slave文件中删除退役节点服务器

MapReduce

Writable序列化

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带不少额外的信息(各类校验信息,header,继承体系等),不便于在网络中高效传输。因此,hadoop本身开发了一套序列化机制(Writable),精简、高效。

经常使用的数据类型对应的hadoop数据序列化类型

boolean --> BooleanWritable
byte --> ByteWritable
int --> IntWritable
float --> FloatWritable
long --> LongWritable
double --> DoubleWritable
string --> Text
map --> MapWritable
array --> ArrayWritable

自定义bean对象实现序列化接口

  • 必须实现Writable接口
  • 反序列化时,须要反射调用空参构造函数,因此必须有空参构造
  • 重写序列化方法
  • 重写反序列化方法(注意反序列化的顺序和序列化的顺序彻底一致)
  • 要想把结果显示在文件中,须要重写toString()
  • 若是须要将自定义的bean放在key中传输,则还须要实现comparable接口,由于mapreduce框中的shuffle过程必定会对key进行排序

MapTask并行度决定机制

  • 一个job的map阶段MapTask并行度(个数),由客户端提交job时的切片个数决定。
  • 每个切片分配一个MapTask并行实例
  • 切片大小默认=blocksize
  • 切片时针对每个文件单独切片,不考虑数据集总体

MapTask工做机制

Read阶段

maptask调用InputFormat,InputFormat又调用RecordReader从输入文件中解析出一个个K/V。

Map阶段

将解析出的K/V交给客户端map()方法处理,并产生新的K/V。

Collect阶段

当map()方法处理完数据后,通常会调用OutputCollector.collect()输出结果,在该函数内部调用Partitioner对K/V进行分区,且根据K进行分区内排序,并写入一个环形缓冲区中。

溢写阶段

当环形缓冲区达到80%时,会将数据写到本地磁盘上生成一个临时文件。

将数据写入本地磁盘以前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操做。

Combine阶段

当全部数据处理完成后,MapTask对全部临时文件进行一次合并,以确保最终只会生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

自定义InputFormat

编写一个类继承FileInputFormat

public class WholeInputFormat extends FileInputFormat<NullWritable, BytesWritable> {

    @Override
    public RecordReader<NullWritable, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context)
            throws IOException, InterruptedException {
        // 自定义recordreader
        WholeRecordReader recordReader = new WholeRecordReader();
        recordReader.initialize(split, context);
        return recordReader;
    }

    @Override
    protected boolean isSplitable(JobContext context, Path filename) {
        return false;
    }    
}

自定义一个RecordReader

public class WholeRecordReader extends RecordReader<NullWritable, BytesWritable> {

    private FileSplit split;
    private Configuration conf;
    private BytesWritable value = new BytesWritable();

    private boolean processed = false;

    @Override
    public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {

        this.split = (FileSplit) split;
        this.conf = context.getConfiguration();

    }

    @Override
    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (!processed) {
            byte[] contents = new byte[(int) split.getLength()];
            Path path = split.getPath();
            FileSystem fs = path.getFileSystem(conf);
            FSDataInputStream fis = null;

            try {
                fis = fs.open(path);
                IOUtils.readFully(fis, contents, 0, contents.length);
                value.set(contents, 0, contents.length);
            } finally {
                IOUtils.closeStream(fis);
            }
            processed = true;
            return true;
        }
        return false;
    }

    @Override
    public NullWritable getCurrentKey() throws IOException, InterruptedException {
        return NullWritable.get();
    }

    @Override
    public BytesWritable getCurrentValue() throws IOException, InterruptedException {
        return this.value;
    }

    @Override
    public float getProgress() throws IOException, InterruptedException {
        return processed ? 1 : 0;
    }

    @Override
    public void close() throws IOException {

    }
}

自定义RecordReader重点是nextKeyValue()方法,它定义了如何封装向maptask输入的键值对,本例中是将每一个文件的全部内容做为value输入到maptask中。

大量小文件的切片优化(CombineTextInputFormat)

// 若是不设置InputFormat,它默认用的是TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m

通过以上设置,可有效减小切片数
number of splits

Shuffle机制

MapReduce保证每一个reducer的输入都是按键有序排列的,系统执行排序的过程(即将map输出做为输入传给reducer)称为shuffle。
shuffle

Partition分区

默认partition分区

public int getPartition(K key, V value, int numReduceTasks) {
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}

默认采用的是org.apache.hadoop.mapreduce.lib.partition.HashPartitioner,当中的getPartition()方法是根据key的hashCode对reduceTasks个数取模获得的。

用户自定义分区

1.继承Partitioner,重写getPartition()方法

public class MyPartitioner extends Partitioner<K, V> {
    @Override
    public int getPartition(Text key, FlowBean value, int numPartitions) {
        int partition=xxx;

        if (exp0) {
            partition = 0;
        }else if (exp1) {
            partition = 1;
        }else if (exp2) {
            partition = 2;
        }else if (exp3) {
            partition = 3;
        }
        return partition;
    }
}

2.在job驱动中,设置自定义partitioner

job.setPartitionerClass(MyPartitioner.class)

3.根据自定义partitioner的逻辑设置相应数量的reduce task

job.setNumReduceTasks(5);
PS:
若是reduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
若是1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会产生异常;
若是reduceTask的数量=1,则无论mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件 part-r-00000。

WritableComparable排序

bean对象实现WritableComparable接口重写compareTo方法,就能够实现排序

@Override
public int compareTo(Bean o) {
    // 倒序排列,从大到小
    return this.xxx > o.getXxx() ? -1 : 1;
}

GroupingComparator分组

对reduce阶段的数据根据某一个或几个字段进行分组。

public class OrderGroupingComparator extends WritableComparator {

    protected OrderGroupingComparator() {
        // 记得此处设置true,不然会报空指针异常
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean abean = (OrderBean) a;
        OrderBean bbean = (OrderBean) b;
        return abean.getOrder_id().compareTo(bbean.getOrder_id());
    }
}

Combiner合并

combiner是MR程序中Mapper和Reducer以外的一种组件,combiner组件的父类就是Reducer,combiner和reducer的区别在于运行的位置:Combiner是在每个maptask所在的节点运行,Reducer是接收全局全部Mapper的输出结果。combiner的意义就是对每个maptask的输出进行局部汇总,以减少网络传输量。

自定义Combiner实现步骤

1.自定义一个combiner继承Reducer,重写reduce方法

public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable>{
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values,
            Context context) throws IOException, InterruptedException {
        int count = 0;
        for(IntWritable v :values){
            count = v.get();
        }
        context.write(key, new IntWritable(count));
    }
}

2.在job驱动类中设置

job.setCombinerClass(WordcountCombiner.class);
combiner可以应用的前提是不能影响最终的业务逻辑,并且,combiner的输出kv应该跟reducer的输入kv类型要对应起来。

从控制台观察使用combiner先后变化:
使用combiner前使用combiner后

数据倾斜&Distributedcache

数据倾斜缘由

若是是多张表的操做都是在reduce阶段完成,reduce端的处理压力太大,map节点的运算负载则很低,资源利用率不高,且在reduce阶段极易产生数据倾斜。

解决方案

在map端缓存多张表,提早处理业务逻辑,这样增长map端业务,减小reduce端数据的压力,尽量的减小数据倾斜。

  1. 在mapper的setup阶段,将文件读取到缓存集合中
  2. 在驱动函数中加载缓存
job.addCacheFile(new URI("file:/e:/cache/pd.txt")); // 缓存普通文件到task运行节点

自定义OutputFormat

1.自定义一个outputformat继承org.apache.hadoop.mapreduce.lib.output.FileOutputFormat

public class FilterOutputFormat extends FileOutputFormat<Text, NullWritable>{
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job)
            throws IOException, InterruptedException {
        // 建立一个RecordWriter
        return new FilterRecordWriter(job);
    }
}

2.具体的写数据RecordWriter

public class FilterRecordWriter extends RecordWriter<Text, NullWritable> {
    FSDataOutputStream atguiguOut = null;
    FSDataOutputStream otherOut = null;

    public FilterRecordWriter(TaskAttemptContext job) {
        // 1 获取文件系统
        FileSystem fs;
        try {
            fs = FileSystem.get(job.getConfiguration());
            // 2 建立输出文件路径
            Path atguiguPath = new Path("e:/atguigu.log");
            Path otherPath = new Path("e:/other.log");
            // 3 建立输出流
            atguiguOut = fs.create(atguiguPath);
            otherOut = fs.create(otherPath);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        // 判断是否包含“atguigu”输出到不一样文件
        if (key.toString().contains("atguigu")) {
            atguiguOut.write(key.toString().getBytes());
        } else {
            otherOut.write(key.toString().getBytes());
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        // 关闭资源
        if (atguiguOut != null) {
            atguiguOut.close();
        }
        
        if (otherOut != null) {
            otherOut.close();
        }
    }
}

数据压缩

在Map输出端采用压缩

// 开启map端输出压缩
configuration.setBoolean("mapreduce.map.output.compress", true);
// 设置map端输出压缩方式
configuration.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);

在reduce输出端压缩

// 设置reduce端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);
//      设置压缩的方式
        FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); 
//        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class); 
//        FileOutputFormat.setOutputCompressorClass(job, Lz4Codec.class); 
//        FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

ReduceTask工做机制

Copy阶段

ReduceTask从各个MapTask远程拷贝一片数据,若是某一片数据大小超过阈值,则写到磁盘上,不然直接放在内存中。

Merge阶段

在远程拷贝数据时,reducetask后台启动了两个线程对内存和磁盘上的文件进行合并,以防止内存或硬盘使用过多。

Sort阶段

用户编写的reduce()方法输入数据是按照key进行汇集的,为了将key相同的数据聚在一块儿,Hadoop采用了基于排序的策略。因为各个MapTask已经实现对本身的处理结果进行了局部排序,所以,ReduceTask只需对全部数据进行一次归并排序便可。

Reduce阶段

reduce()函数将计算结果写到HDFS上。

MapReduce参数优化

资源相关参数

用户在mr应用程序中配置能够生效:

  • mapreduce.map.memory.mb 一个Map Task可以使用的资源上限(单位:MB),默认为1024。若是Map Task实际使用的资源量超过该值,则会被强制杀死。
  • mapreduce.reduce.memory.mb 一个Reduce Task可以使用的资源上限(单位:MB),默认为1024。若是Reduce Task实际使用的资源量超过该值,则会被强制杀死。
  • mapreduce.map.java.opts Map Task的JVM参数,你能够在此配置默认的java heap size等参数, e.g."-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc" (@taskid@会被Hadoop框架自动换为相应的taskid), 默认值: ""
  • mapreduce.reduce.java.opts Reduce Task的JVM参数,你能够在此配置默认的java heap size等参数, e.g."-Xmx1024m -verbose:gc -Xloggc:/tmp/@taskid@.gc", 默认值: ""
  • mapreduce.map.cpu.vcores 每一个Map task可以使用的最多cpu core数目, 默认值: 1
  • mapreduce.reduce.cpu.vcores 每一个Reduce task可以使用的最多cpu core数目, 默认值: 1

应该在yarn启动以前就配置在服务器的配置文件中才能生效:

  • yarn.scheduler.minimum-allocation-mb 1024 给应用程序container分配的最小内存
  • yarn.scheduler.maximum-allocation-mb 8192 给应用程序container分配的最大内存

shuffle性能优化的关键参数,应在yarn启动以前就配置好:

  • mapreduce.task.io.sort.mb 100 shuffle的环形缓冲区大小,默认100m
  • mapreduce.map.sort.spill.percent 0.8 环形缓冲区溢出的阈值,默认80%

容错相关参数

  • mapreduce.map.maxattempts 每一个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
  • mapreduce.reduce.maxattempts 每一个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
  • mapreduce.map.failures.maxpercent 当失败的Map Task失败比例超过该值为,整个做业则失败,默认值为0. 若是你的应用程序容许丢弃部分输入数据,则该该值设为一个大于0的值,好比5,表示若是有低于5%的Map Task失败(若是一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个做业扔认为成功。
  • mapreduce.reduce.failures.maxpercent 当失败的Reduce Task失败比例超过该值为,整个做业则失败,默认值为0。
  • mapreduce.task.timeout Task超时时间,常常须要设置的一个参数,该参数表达的意思为:若是一个task在必定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,多是卡住了,也许永远会卡主,为了防止由于用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。若是你的程序对每条输入数据的处理时间过长(好比会访问数据库,经过网络拉取数据等),建议将该参数调大,该参数太小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

Yarn

Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,至关于一个分布式的操做系统平台,而mapreduce等运算程序则至关于运行于操做系统之上的应用程序。

yarn工做机制

  1. MR程序提交到客户端所在节点
  2. yarnrunner向resourcemanager申请一个application
  3. rm将该应用程序的资源路径返回给yarnrunner
  4. 将改程序所须要的资源提交到HDFS上
  5. 程序资源提交完毕后,申请运行MrAppMaster
  6. mr将用户请求初始化成一个task
  7. 其中一个nodemanager领取到task任务
  8. 该nodemanager建立容器container并产生MRAppMaster
  9. container从HDFS上拷贝资源到本地
  10. MrAppMaster向RM申请运行maptask容器
  11. rm将运行maptask任务分配给另外两个nodemanager,另外两个nodemanager分别领取任务并建立容器
  12. MR向两个接收到任务的nodemanager发送程序启动脚本,这两个nodemanager分别启动maptask,maptask对数据分区排序
  13. MRAppMaster向rm申请两个容器,运行reducetask
  14. reducetask向maptask获取相应分区数据
  15. 程序运行结束后,MR向RM注销本身
相关文章
相关标签/搜索