大数据框架面试

sparkjava

spark的优点:node

  (1)每个做业独立调度,能够把全部的做业作一个图进行调度,各个做业之间相互依赖,在调度过程当中一块儿调度,速度快。mysql

  (2)全部过程都基于内存,因此一般也将Spark称做是基于内存的迭代式运算框架。web

  (3)spark提供了更丰富的算子,让操做更方便。算法

为何Spark比Map Reduced运算速度快:sql

  (1)Spark计算比MapReduce快的根本缘由在于DAG计算模型。通常而言,DAG相比Hadoop的MapReduce在大多数状况下能够减小shuffle次数。数据库

  (2)Hadoop每次计算的结果都要保存到hdfs,而后每次计算都须要从hdfs上读书数据,磁盘上的I/O开销比较大。 spark一次读取数据缓存在内存中,内存的数据读取比磁盘数据读取快不少。还有一点就是spark的RDD数据结构,RDD在每次transformation后并不当即执行,并且action后才执行,有进一步减小了I/O操做。apache

  (3)MR它必须等map输出的全部数据都写入本地磁盘文件之后,才能启动reduce操做,由于mr要实现默认的根据Key的排序!因此要排序确定得写完全部数据,才能排序,而后reduce来拉取。可是spark不须要,spark默认状况下,是不会对数据进行排序的。所以shufflemaptask每写入一点数据,resulttask就能够拉取一点数据,而后再本地执行咱们定义的聚合函数和算子,进行计算.编程

spark的DAG有向无循环图:api

  DAG叫作有向无环图,原始的RDD经过一系列的转换就造成了DAG,根据RDD之间依赖关系的不一样将DAG划分红不一样的Stage(调度阶段)。对于窄依赖,partition的转换处理在一个Stage中完成计算。对于宽依赖,因为有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,所以宽依赖是划分Stage的依据。

spark如何分区:  分区是RDD内部并行计算的一个计算单元,RDD的数据集在逻辑上被划分为多个分片,每个分片称为分区,分区的格式决定了并行计算的粒度,而每一个分区的数值计算都是在一个任务中进行的,所以任务的个数,也是由RDD(准确来讲是做业最后一个RDD)的分区数决定。spark默认分区方式是HashPartitioner.

  只有Key-Value类型的RDD才有分区的,非Key-Value类型的RDD分区的值是None,每一个RDD的分区ID范围:0~numPartitions-1,决定这个值是属于那个分区的。

  (1)HashPartitioner分区:partition = key.hashCode () % numPartitions,若是余数小于0,则用余数+分区的个数,最后返回的值就是这个key所属的分区ID。

               缺点:可能致使每一个分区中数据量的不均匀,极端状况下会致使某些分区拥有RDD的所有数据

  (2)RangePartitioner分区(范围分区):RangePartitioner会对key值进行排序,而后将key值按照分区个数进行划分分区.尽可能保证每一个分区中数据量的均匀,并且分区与分区之间是有序的,一个分区中的元素确定都是比另外一个分区内的元素小或者大;可是分区内的元素是不能保证顺序的。分界的算法尤其重要。算法对应的函数是rangeBounds.

  (3)CustomPartitioner自定义分区:须要继承org.apache.spark.Partitioner类,sc.parallelize(List((1,'a'),(1,'aa'),(2,'b'),(2,'bb'),(3,'c')), 3).partitionBy(new CustomPartitioner(3))

spark从HDFS中读取数据是如何分区的:

  Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),HDFS中的block是分布式存储的最小单元。若是咱们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,所以该文件在HDFS上会被分为235块(30GB/128MB);Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235。

 通常合理的分区数设置为总核数的2~3倍

spark数据倾斜出现的缘由:

  根本缘由是分区不均匀,在执行shuffle操做的时候,是按照key,来进行values的数据的输出、拉取和聚合的。同一个key的values,必定是分配到一个reduce task进行处理的。某个或者某些key对应的数据,远远的高于其余的key。定位数据倾斜就是看哪些地方用了会产生shuffle的算子,groupByKey、countByKey、reduceByKey、join

数据倾斜发生的现象:

  (1)大部分的task执行的特别快,剩下的几个task执行的特别慢.

  (2)运行一段时间后,其余task都已经执行完成,可是有的task可能会出现OOM异常由于task的所分配的数据量太大,并且task每处理一条数据还要建立大量的对象,内存存储不下.

解决数据倾斜的方法:

  (1)聚合源数据:在数据的源头将数据聚合成一个key对应多个value值.这样在进行操做时就可能不会出现shuffle过程.

  (2)将致使数据倾斜的key提取出来,如果key对应的null或者无效数据,就将其删除,如果正常的数据,就将其单独处理,再与正常处理的数据进行union操做.

  (3)提升shuffle操做reduce的并行度:将reduce task的数量变多,好比groupByKey、countByKey、reduceByKey。在调用的时候,传入进去一个参数。那个数字,就表明了那个shuffle操做的reduce端的并行度。那么在进行shuffle操做的时候,就会对应着建立指定数量的reduce task。

  (4)对key先添加随机值,进行操做后,去掉随机值,再进行一次操做。将原始的 key 转化为 key + 随机值(例如Random.nextInt),对数据进行操做后,将 key + 随机值 转成 key.

reduceByKey与groupByKey的区别:

  pairRdd.reduceByKey(_+_).collect.foreach(println) 等价于pairRdd.groupByKey().map(t => (t._1,t._2.sum)).collect.foreach(println)

  reduceByKey的结果:(hello,2)(world,3)    groupByKey的结果:(hello,(1,1))(world,(1,1,1))

  使用reduceByKey()的时候,本地的数据先进行merge而后再传输到不一样节点再进行merge,最终获得最终结果。而使用groupByKey()的时候,并不进行本地的merge,所有数据传出,获得所有数据后才会进行聚合成一个sequence.groupByKey()传输速度明显慢于reduceByKey()。虽然groupByKey().map(func)也能实现reduceByKey(func)功能,可是,优先使用reduceByKey(func).

Spark运行的全流程:

  (1)首先经过spark-submit提交Application应用,后台就会建立相应的driver进程,driver进程会运行Application中的代码,

  (2)初始化Sparkcontext,Sparkcontext是用户通向spark集群的入口,在初始化sparkContext时,同时的会初始化DAGScheduler、TaskScheduler,初始化TaskScheduler的同时,会建立两个很是重要的对象,分别是 DriverActor 和 ClientActor.

  (3)clientActor向master注册Application,master收到Application的注册请求后,会使用本身的资源调度算法,通知相应的worker节点为Application启动多个Executor.

  (4)多个Executor启动以后,会反向注册到DriverActor,以后driver结束sparkcontext的初始化,继续执行接下来的代码.

  (5)在接下来的代码中,将所遇到的对RDD的全部操做造成一个DAG有向无循环图,每执行到action操做就会建立一个job到DAGScheduler中,而job又根据RDD的依赖关系划分红多个stage,每一个stage里根据最后一个RDD的分区数目来建立task,多个task造成一个taskset

  (6)将taskset送到taskscheduler中,而后taskscheduler对task进行序列化,而后将序列化好的task封装到launchTask中,而后将launchTask发送给指定的executor中运行.

  (7)executor接收到了DriverActor 发送过来的launchTask 时,会对launchTask 进行反序列化,封装到一个TaskRunner 中,而后从executor这个线程池中获取一个线程来执行指定的任务.

  (8)最终当全部的task任务完成以后,整个application执行完成,关闭sparkContext对象。

spark处理任务的过程:

  (1)构建DAG(有向无环图)(调用RDD上的方法)

  (2)DAGScheduler将DAG切分Stage(切分的依据是Shuffle),将Stage中生成的Task以TaskSet的形式给TaskScheduler

Spark 的运行模式中有哪几种:

  (1)本地模式:driver和Executors处于同一个jvm

  (2)standalone模式:基本Spark内置的集群搭建模式,运行时要开起master和worker的守护进程.适合于不太依赖Hadoop的运算环境.

  (3)基于yarn-cluster模式:做业调度、资源调度由Yarn分配。Yarn在这方面作得比Spark standalone集群好,适用于存储计算合一,或者须要依赖MR、Hive等做业的场景,通常用于生产模式.

  (4)基于yarn-client模式,通常用来测试,传输消耗大,方便调试.

spark的yarn-cluster模式:

  在 Yarn-Cluster 模式中,当用户向 Yarn 中提交一个应用程序后, Yarn 将分两个阶段运行该应用程序:第一个阶段是把 Spark 的 Driver 做为一个 ApplicationMaster 在 Yarn 集群中先启动;第二个阶段是由 ApplicationMaster 建立应用程序,而后为它向 ResourceManager 申请资源,并启动 Executor 来运行 Task,同时监控它的整个运行过程,直到运行完成。(过程相似于mapreduce)

spark的shuffle过程:

未优化:  

  (1)每个ShufflleMapTask会为每个ReduceTask建立一个bucket缓存,而且会为每个bucket建立一个文件。这个bucket存放的数据就是通过Partitioner操做(默认是HashPartitioner)以后找到对应的bucket而后放进去,最后将bucket缓存的数据刷新到磁盘上,即对应的block file.

  (2)而后ShuffleMapTask将输出做为MapStatus发送到DAGScheduler的MapOutputTrackerMaster,每个MapStatus包含了每个ResultTask要拉取的数据的位置和大小.

  (3)ResultTask而后去利用BlockStoreShuffleFetcher向MapOutputTrackerMaster获取MapStatus,看哪一份数据是属于本身的,而后底层经过BlockManager将数据拉取过来.

  (4)拉取过来的数据会组成一个内部的ShuffleRDD,优先放入内存,内存不够用则放入磁盘,而后ResulTask开始进行聚合,最后生成咱们但愿获取的那个MapPartitionRDD

  缺点:如上图所示:在这里有1个worker,2个executor,每个executor运行2个ShuffleMapTask,有三个ReduceTask,因此总共就有4 * 3=12个bucket和12个bucket和12个block file。

    若是数据量较大,将会生成M*R个小文件,好比ShuffleMapTask有100个,ResultTask有100个,这就会产生100*100=10000个小文件

     bucket缓存很重要,须要将ShuffleMapTask全部数据都写入bucket,才会刷到磁盘,那么若是Map端数据过多,这就很容易形成内存溢出,尽管后面有优化,bucket写入的数据达到刷新到磁盘的阀值以后,就会将数据一点一点的刷新到磁盘,可是这样磁盘I/O就多了.与MR彻底不同的是,MR它必须将全部的数据都写入本地磁盘文件之后,才能启动reduce操做,来拉取数据。由于mr要实现默认的根据Key的排序!因此要排序确定得写完全部数据,才能排序,而后reduce来拉取。可是spark不须要,spark默认状况下,是不会对数据进行排序的。所以shufflemaptask每写入一点数据,resulttask就能够拉取一点数据,而后再本地执行咱们定义的聚合函数和算子.spark这种机制的好处在于速度比mr快多了.因为这种事实拉取的机制,一次提供不了直接处理key对应的valur的算子,只能经过reducebykey,先shuffle,有一个maptartitionsRDD,而后用map算子,来处理每一个key对应的values.

优化后:

  (1)每个Executor进程根据核数,决定Task的并发数量,好比executor核数是2,就是能够并发运行两个task,若是是一个则只能运行一个task,

  (2)假设executor核数是1,ShuffleMapTask数量是M,那么executor依然会根据ResultTask的数量R,建立R个bucket缓存,而后对key进行hash,数据进入不一样的bucket中,每个bucket对应着一个block file,用于刷新bucket缓存里的数据

  (3)而后下一个task运行的时候,那么不会再建立新的bucket和block file,而是复用以前的task已经建立好的bucket和block file。即所谓同一个Executor进程里全部Task都会把相同的key放入相同的bucket缓冲区中

这样的话,生成文件的数量就是(本地worker的executor数量*executor的cores*ResultTask数量)如上图所示,即2 * 1* 3 = 6个文件,每个Executor的shuffleMapTask数量100,ReduceTask数量为100,那么  未优化的HashShuffle的文件数是2 *1* 100*100 =20000,优化以后的数量是2*1*100 = 20

  缺点:缺点:若是 Reducer 端的并行任务或者是数据分片过多的话则 Core * Reducer Task 依旧过大,也会产生不少小文件。

spark的checkpoint操做:

  checkpoint的意思就是创建检查点,相似于快照,例如在spark计算里面 计算流程DAG特别长,服务器须要将整个DAG计算完成得出结果,可是若是在这很长的计算流程中忽然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头至尾计算一遍,这样子就很费性能,固然咱们能够将中间的计算结果经过cache或者persist放到内存或者磁盘中,可是这样也不能保证数据彻底不会丢失,存储的这个内存出问题了或者磁盘坏了,也会致使spark从头再根据RDD计算一遍,因此就有了checkpoint,其中checkpoint的做用就是将DAG中比较重要的中间数据作一个检查点将结果存储到一个高可用的地方(一般这个地方就是HDFS里面)

spark的cache和persist的区别:

  计算流程DAG特别长,服务器须要将整个DAG计算完成得出结果,可是若是在这很长的计算流程中忽然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头至尾计算一遍,这样子就很费性能,固然咱们能够将中间的计算结果经过cache或者persist放到内存或者磁盘中

  cache最终调用了persist方法,默认的存储级别仅是存储内存中的,persist有好几个存储级别,persist是最根本的底层函数,executor执行时,60%用来缓存RDD,40%用来存放数据.

spark中transform与action操做的区别:

  transformation是获得一个新的RDD,方式不少,好比从数据源生成一个新的RDD,从RDD生成一个新的RDD,action是获得一个值,或者一个结果(直接将RDD cache到内存中).

  全部的transformation都是采用的懒策略,就是若是只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。

spark的RDD与DataFrame以及Dataset的区别:

  spark的基本数据结构:RDD是弹性分布式数据集,编译时类型安全,具备面向对象的风格RDD是一组表示数据的Java或Scala对象,可是序列化性能开销大,须要频繁非删除对象致使GC性能开销大.

        弹性:RDD的每一个分区在spark节点上存储时默认是放在内存中的,若内存存储不下,则存储在磁盘中

        分布性:每一个RDD中的数据能够处在不一样的分区中,而分区能够处在不一样的节点中.

        容错性:当一个RDD出现故障时,能够根据RDD之间的依赖关系来从新计算出发生故障的RDD.

从如下方面是区别三者之间的关系:

  (1)数据的表示形式:RDD是数据元素的分布式集合,RDD是一组表示数据的Java或Scala对象;DataFrame是以列方式构成的分布式数据集合,相似于关系数据库中的表;Dataset是DataFrame API的扩展.

  (2)数据格式:RDD能够轻松有效地处理结构化和非结构化的数据,DataFrame仅适用于结构化和半结构化数据,Dataset能够有效地处理结构化和非结构化数据它表示行(row)的JVM对象或行对象集合形式的数据.

  (3)编译时类型安全:RDD提供了一种熟悉的面向对象编程风格,具备编译时类型安全性。DataFrame尝试访问表中不存在的列,则持编译错误仅在运行时检测属性错误,DataSet能够在编译时检查类型, 它提供编译时类型安全性。

  (4)性能开销:RDD:分发数据或者将数据写入磁盘时,会使用java序列化,序列化单个Java或者scala对象的开销较大,销毁单个对象时,会致使垃圾回收.

          DataFrame:能够将数据序列化为二进制的格式存储在堆外内存中,而后直接在内存中进行转换,无需使用java序列化来编码数据.避免在为数据集中的每一行构造单个对象时引发的垃圾回收。

          Dataset:在序列化数据时,它使用spark内部Tungsten二进制格式存储表格表示,由于序列化是经过Tungsten进行的,它使用了off heap()数据序列化,不须要垃圾回收器来摧毁对象

spark广播变量及其原理:

  当在Executor端用到了Driver变量,不使用广播变量,在每一个Executor中有多少个task就有多少个Driver端变量副本。若是使用广播变量在每一个Executor端中只有一份Driver端的变量副本。广播变量在Driver定义,在Exector端不可改变,在Executor端不能定义

  原理:实际上就是Executor端用到了driver端的变量,若是在executor端你使用到了driver端的广播变量,若是不使用广播变量,在每一个executor中有多少task就有多少变量副本。使用了广播变量,实际上就是为了减小executor端的备份,最终减小executor端的内存。

spark streaming从kafka中读数据的两种方式:

  Receiver方式是经过zookeeper来链接kafka队列,Direct方式是直接链接到kafka的节点上获取数据

  (1)receiver方式:Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的(若是忽然数据暴增,大量batch堆积,很容易出现内存溢出的问题),而后Spark Streaming启动的job会去处理那些数据。 然而,在默认的配置下,这种方式可能会由于底层的失败而丢失数据。若是要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(好比HDFS)上的预写日志中。因此,即便底层节点出现了失败,也可使用预写日志中的数据进行恢复。

  然而,在默认的配置下,这种方式可能会由于底层的失败而丢失数据。若是要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(好比HDFS)上的预写日志中。因此,即便底层节点出现了失败,也可使用预写日志中的数据进行恢复。

  注意:Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。因此,在一、KafkaUtils.createStream()中,提升partition的数量,只会增长一个Receiver中,读取partition的线程的数量。不会增长Spark处理数据的并行度。 能够建立多个Kafka输入DStream,使用不一样的consumer group和topic,来经过多个receiver并行接收数据。

  (2)基于Direct方式:这种方式会周期性地查询Kafka,来得到每一个topic+partition的最新的offset,从而定义每一个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

  优势:简化并行读取:若是要读取多个partition,不须要建立多个输入DStream而后对它们进行union操做。Spark会建立跟Kafka partition同样多的RDD partition,而且会并行从Kafka中读取数据。因此在Kafka partition和RDD partition之间,有一个一对一的映射关系。

    高性能:若是要保证零数据丢失,在基于receiver的方式中,须要开启WAL机制。这种方式其实效率低下,由于数据实际上被复制了两份,Kafka本身自己就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不须要开启WAL机制,只要Kafka中做了数据的复制,那么就能够经过Kafka的副本进行恢复。
receiver与和direct的比较:

  (1)基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制能够保证数据零丢失的高可靠性,可是却没法保证数据被处理一次且仅一次,可能会处理两次。由于Spark和ZooKeeper之间多是不一样步的。

  (2)基于direct的方式,使用kafka的简单api,Spark Streaming本身就负责追踪消费的offset,并保存在checkpoint中。Spark本身必定是同步的,所以能够保证数据是消费一次且仅消费一次。

spark的内存管理机制:

堆内内存:  

  做为一个 JVM 进程,Executor 的内存管理创建在 JVM 的内存管理之上,Spark 对 JVM 的堆内(On-heap)空间进行了更为详细的分配,以充分利用内存。同时,Spark 引入了堆外(Off-heap)内存,使之能够直接在工做节点的系统内存中开辟空间,进一步优化了内存的使用。

  堆内内存:堆内内存的大小,由 Spark 应用程序启动时的 –executor-memory参数配置,一个Executor中的内存分为三块,一块是execution内存,一块是storage内存,一块是other内存。

  (1)execution内存是执行内存,文档中说join,aggregate都在这部份内存中执行,shuffle的数据也会先缓存在这个内存中,满了再写入磁盘,可以减小IO。其实map过程也是在这个内存中执行的。

  (2)storage内存是存储broadcast,cache,persist数据的地方。

  (3)other内存是程序执行时预留给本身的内存。

堆外内存:

  Off-heap memory不在 JVM 内申请内存,而是调用 Java 的 unsafe 相关 API (相似于malloc()函数)直接向操做系统申请内存因为这种方式不进过 JVM 内存管理,因此能够避免频繁的 GC,这种内存申请的缺点是必须本身编写内存申请和释放的逻辑。堆外内存只区分 Execution 内存和 Storage 内存.

  不管堆内和堆外内存目前 Execution 内存和 Storage 内存能够互相共享的。也就是说,若是 Execution 内存不足,而 Storage 内存有空闲,那么 Execution 能够从 Storage 中申请空间;反之亦然.

 Spark中的OOM问题不外乎如下两种状况:

  (1)map执行中内存溢出如flatMap,filter,mapPatitions

    map端过程产生大量对象致使内存溢出:这种溢出的缘由是在单个map中产生了大量的对象致使的针对这种问题,在不增长内存的状况下,能够经过减小每一个Task的大小,以便达到每一个Task即便产生大量的对象Executor的内存也可以装得下。具体作法能够在会产生大量对象的map操做以前调用repartition方法,分区成更小的块传入map。

  (2)shuffle后内存溢出如join,reduceByKey,repartition

    shuffle内存溢出的状况能够说都是shuffle后,单个文件过大致使的。在shuffle的使用,须要传入一个partitioner,大部分Spark中的shuffle操做,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数.这个参数spark.default.parallelism只对HashPartitioner有效.若是是别的partitioner致使的shuffle内存溢出就须要重写partitioner代码了.

task之间的内存分配:

  为了更好地使用使用内存,Executor 内运行的 Task 之间共享着 Execution 内存。具体的,Spark 内部维护了一个 HashMap 用于记录每一个 Task 占用的内存。当 Task 须要在 Execution 内存区域申请 numBytes 内存,其先判断 HashMap 里面是否维护着这个 Task 的内存使用状况,若是没有,则将这个 Task 内存使用置为0,而且以 TaskId 为 key,内存使用为 value 加入到 HashMap 里面。以后为这个 Task 申请 numBytes 内存,若是 Execution 内存区域正好有大于 numBytes 的空闲内存,则在 HashMap 里面将当前 Task 使用的内存加上 numBytes,而后返回;若是当前 Execution 内存区域没法申请到每一个 Task 最小可申请的内存,则当前 Task 被阻塞,直到有其余任务释放了足够的执行内存,该任务才能够被唤醒。每一个 Task 可使用 Execution 内存大小范围为 1/2N ~ 1/N,其中 N 为当前 Executor 内正在运行的 Task 个数。一个 Task 可以运行必须申请到最小内存为 (1/2N * Execution 内存);当 N = 1 的时候,Task 可使用所有的 Execution 内存。好比若是 Execution 内存大小为 10GB,当前 Executor 内正在运行的 Task 个数为5,则该 Task 能够申请的内存范围为 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的范围。

Hadoop

HA(高可用集群):

  (1)脑裂现象:指在一个高可用(HA)系统中,当联系着的两个节点断开联系时,原本为一个总体的系统,分裂为两个独立节点,这时两个节点开始争抢共享资源,结果会致使系统混乱,数据损坏。

  (2)如何防止脑裂:仲裁:当两个节点出现分歧时,由第3方的仲裁者决定听谁的。这个仲裁者,多是一个锁服务,一个共享盘.

            fencing: 当不能肯定某个节点的状态时,经过fencing把对方干掉,确保共享资源被彻底释放,前提是必需要有可靠的fence设备。

hdfs的优势:

  (1)高容错性,保存多个副本,且提供容错机制。副本丢失或宕机自动恢复。默认存3份。

  (2)流式数据访问,一次写入,屡次读取,高吞吐量,因此能够同时处理大量数据

  (3)运行在廉价的机器上,经过副本提升可靠性,提供了容错和恢复机制

  (4)能够存储大量文件,如PB级

 缺点:不擅长低延迟数据访问,不擅长小文件的分区,不擅长并发写入,文件随机修改

namenode与datanode:

  namenode:namenode管理文件系统的命名空间。它维护着文件系统树及整棵树内的全部文件和目录。这些信息以两个文件形式永久保存在本地磁盘上:命名空间镜像文件和编辑日志文件。namenode也记录着每一个文件中各个块所在的数据节点信息,可是它并不会永久保存块的位置信息,由于这些信息会在系统启动时根据数据节点信息重建。

  datanode:一个数据块在datanode上以文件形式存储在磁盘上,包括两个文件,一个是数据自己,一个是元数据包括数据块的长度,块数据的校验和,以及时间戳。心跳是每3秒一次,心跳返回结果带有namenode给该datanode的命令如复制块数据到另外一台机器,或删除某个数据块。若是超过10分钟没有收到某个datanode的心跳,则认为该节点不可用。

如何肯定map与reduce的任务个数:

map个数取决于文件的个数,能够设置map的数量,可是不能小于文件分块的数量.

正确的reduce任务的个数应该是0.95或者1.75 *(节点数 ×mapred.tasktracker.tasks.maximum参数值)。Reduce任务也可以与 map任务同样,经过设定JobConf 的conf.setNumReduceTasks(int num)方法来增长任务个数。

hdfs的高可用性:

  配置了一对活动备用namenode,当活动namenode失效,备用namenode就会接管它的任务并开始服务于来自客户端的请求.

hdfs的容错性:

  (1)心跳机制:namenode 和datanode是维护心跳的检测。可能网络故障,致使namenode接收不到datanode的心跳包,namenode就不会将任何新的I/O操做派发给那个datanode,namenode会检测到文件块的副本数目小于设置值,若是小于就自动开始复制新的副本并分发到其余的datanode.

  (2)集群的负载均衡:节点的丢失或者增长会使数据分布不均,当某个Datanode节点的空闲空间大于一个临界点值时,HDFS会自动从其余节点把数据迁移过来

  (3)当namenode的损坏时,会利用secondNamenode来恢复namenode(为namenode内存中的文件系统元数据建立检查点).

  (4)datanode的容错性:DataNode是以数据块做为容错单元,默认状况下,每一个数据块会被备份在三分,分别存在不一样的DataNode上。当一个数据块访问失效,则会从备份的DataNode中选取一个,并备份该数据块,以保证数据

为namenode内存中的元数据建立检查点机制:

  (1)secondNamenode请求主namenode中止使用正在进行中的edit文件,这样新的编辑操做记录到一个新文件中edit.new

  (2)secondNamenode从主nomanode获取最近的fsimage和edit文件.

  (3)SecondNamenode将fsimage文件载入内存,逐一执行edit文件中的操做.建立新的合并的fsimage.

  (4)辅助namenode将新的fsimage文件发送回namenode,主namenode将其保存为临时的.ckpt

  (5)主namenode从新命名为临时的fsimage文件,以便之后使用.

利用检查点进行故障恢复:

  当namenode发生故障时,能够从辅助namenode恢复数据.

  (1)方法1:将相关的存储目录恢复到新的namenode中.

  (2)方法2:启动namenode的辅助进程,将secondNamenode用做新的主namenode.

hdfs上传写数据的过程:

  (1)client跟NameNode通讯请求上传文件,NameNode检查目标文件是否存在以及client是否有新建此文件的权限.

  (2)若经过检查,NameNode就会为建立一条记录,记录该新建文件的信息.若未经过检查就会抛出异常.

  (3)而后NameNode向客户端返回能够新建文件的信息以及相应的一组datanode,Client就近原则请求3台中的一台DataNode 1上传数据(本质上是一个RPC调用,创建pipeline),DataNode 1收到请求会继续调用DataNode 2,而后DataNode 2调用DataNode 3,将整个pipeline(管线)创建完成,而后逐级返回客户端

  (4)Client开始往DataNode 1上传第一个block,以pocket为单位.DataNode 1收到一个packet就会传给DataNode 2,DataNode 2传给DataNode 3,DataNode 1每传一个pocket会放入一个应答队列等待应答.以此类推.

  (5)当DataNode 3写完最后一个pocket时,就会返回确认消息给DataNode 2,DataNode 2返回确认消息给DataNode 1,而后返回给客户端,最后由客户端返回给namenode.

hdfs读取数据的过程:

  (1)client与NameNode通讯查询元数据,找到文件块所在的DataNode,namenode将相应的元数据信息返回给client.

  (2)client根据元数据信息挑选一台DataNode(网络拓扑上的就近原则,若是都同样,则随机挑选一台DataNode)服务器,请求创建socket流.以packet(一个packet为64kb)为单位进行发送.

  (3)DataNode开始发送数据,客户端以packet为单位接收,先在本地缓存,而后写入目标文件

  HDFS的块划分:FileInputFormat对文件的切分是严格按照偏移量来的,FileInputFormat默认为文件在HDFS上的每个Block生成一个对应的FileSplit。那么天然,FileSplit.start 就是对应Block在文件中的Offset、FileSplit.length就是对应Block的Length、FileSplit.hosts就是对应Block的Location(寻找距离最近的location)。划分后数据如何读取:在Map执行的时候,会使用InputFormat.getRecordReader()所返回的RecordReader对象来读取Split中的每一条记录.

hadoop实现冗余备份的难点:

  对于不一样的数据备份,须要放到不一样的节点上面,利用Hash函数,这样能够把每一个备份id对应到一个哈希值,而后再将这个哈希值与某个节点对应起来,就完成了一个数据备份的分配。这样作在时间复杂度上只有O(1),可是不少哈希函数有一个问题,就是不稳定。这里所谓的不稳定是指,当节点个数发生变化的时候,原来被分配到节点K上的数据备份可能就会被分配到另外一个节点上。经常使用的哈希函数为:hash(x) = x % N,其中N为节点个数,x为备份id,这样当集群中节点出现故障或者扩展新的节点时,原来的计算的哈希值几乎全都变了,那么对于整个系统中的数据访问来讲,无疑是一个灾难,由于访问位置全都得改变,而且须要从新迁移数据。

  那么有没有可能在N变化的侯,原有数据备份的哈希值不改变呢?这就是一致性哈希的优点所在。一致性哈希的原理能够这么理解:原来哈希是用x%N,如今是用x%S且N%S,这里的S表示哈希函数自己能够表示的哈希值范围,好比它的范围是0~2^32 - 1,那么S=2^32。

  以下图若是选取的哈希函数取值范围在0到2^32 - 1之间(Hash Range),那么咱们能够同时把Data Blocks和Data Nodes同时哈希到这个范围里面,这些Nodes会把Hash Range划分为若干区域,规定每一个Node存储与其相邻的前一个区域中的Blocks,从而完成数据的分配。这种方式的好处在于,即便出现Data Nodes数量变化的状况,也不会影响其它Nodes和Blocls的位置状况,最可能是在被删除节点或者新增节点的附近进行调整,好比将原有区域中的Blocks进一步划分或者合并。

 

上图展现的方式中,三个Nodes将Hash Range分为了4个区域,显然不方便分配,因此提出一致性哈希环的概念,即将Hash Range的首位相连,而后在一个环路上面进行划分,N个Nodes必定可以划分出N个区域,而后让每一个Node存储前一个相邻区域便可。以下图所示:

 

 

 

 

 

如何解决client和hdfs读写延迟太高:

  (1)Chunk缓冲机制:数据会被写入一个chunk缓冲数组,这个chunk是一个512字节大小的数据片断,这个缓冲数组能够容纳多个chunk大小的数据在里面缓冲。

  (2)Packet数据包机制:当chunk缓冲数组都写满了以后,就会把这个chunk缓冲数组进行一下chunk切割,切割为一个一个的chunk,一个chunk是一个数据片断。而后多个chunk会直接一次性写入另一个内存缓冲数据结构,就是Packet数据包,一个Packet数据包,设计为能够容纳127个chunk

  (3)当一个Packet被塞满了chunk以后,就会将这个Packet放入一个内存队列来进行排队。而后有一个DataStreamer线程会不断的获取队列中的Packet数据包,经过网络传输直接写一个Packet数据包给DataNode。

mapreduce的排序:

  在Hadoop中,排序是MapReduce框架中最重要的操做之一,Map Task和Reduce Task都会对数据按照key排序,无论逻辑上是否真的须要排序,任何程序中的数据都会被排序,这是Haddoop的默认行为。MapReduce中使用了两种排序算法:快速排序和优先队列。在Map和Reduce Task的缓冲区使用的是快速排序,而对磁盘上的IFile文件合并使用的是优先队列

  优先队列:本质上是最小堆,在优先队列中,元素被赋予优先级。当访问元素时,具备最高优先级的元素最早删除。优先队列具备最高级先出 (first in, largest out)的行为特征。一般采用堆数据结构来实现。优先队列的两个核心函数就是 upHeap() 和 downHeap(),全部优先队列的操做都是围绕这两个函数进行的。upHeap()函数名里的up表明目的方向,实际是从down 到 up 的 过程。downHeap()函数名里的down也表明目的方向,实际是从up到 down 的 过程。upHeap() 是建小顶堆,downHeap() 从新调整节点,从新成为小顶堆。

YARN结构:

  yarn经过两类长期运行的守护进程提供本身的核心服务:管理集群上的资源使用的资源管理器(ResourceManager),以及运行在集群中的全部节点上且可以启动和监控容器的节点管理器(nodeManager).容器Container 是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源即是用Container表示的。YARN会为每一个任务分配一个Container,且该任务只能使用该Container中描述的资源。

  YARN 采用了一种分层的集群框架,经过将资源管理和应用程序管理两部分分剥离开,分别由ResouceManager和ApplicationMaster负责,其中,ResouceManager专管资源管理和调度,而ApplicationMaster则负责与具体应用程序相关的任务切分、任务调度和容错等,每一个应用程序对应一个ApplicationMaster.

YARN的运行机制:

  (1)客户端提交application到资源管理器,要求资源管理器运行ApplicationMaster进程.

  (2)资源管理器找到一个可以在容器中启动application的节点管理器.若是是一个简单的计算过程,就会在资源管理器所处的容器中进行计算,将计算结果返回给客户端.

  (3)当计算任务比较复杂时,就会向资源管理器请求更多的节点的容器,以用于运行一个分布式计算.

yarn在什么层面调度,内存调度是什么怎么调度的,若是考虑CPU怎么调度的。如何实现隔离的:

  一些分布式框架MR和Spark做为YARN的应用运行在集群计算层和集群的存储层上的.

   Yarn的资源隔离是指为运行着不一样任务的“Container”提供可独立使用的计算资源,以免它们之间相互干扰。目前支持两种类型的资源隔离:CPU和内存,对于这两种类型的资源,Yarn使用了不一样的资源隔离方案。

  (1)CPU的隔离:对于CPU而言,它是一种“弹性”资源,使用量大小不会直接影响到应用程序的存亡,所以CPU的资源隔离方案采用了Linux Kernel提供的轻量级资源隔离技术ControlGroup;yarn使用cgroup的两种方式来控制cpu资源分配分别是严格按核数隔离资源和按比例隔离资源.

  (2)内存的隔离:对于内存而言,它是一种“限制性”资源,使用量大小直接决定着应用程序的存亡,Cgroup会严格限制应用程序的内存使用上限,一旦使用量超过预先定义的上限值,就会将该应用程序“杀死”,所以没法使用Cgroup进行内存资源隔离,而是选择了线程监控的方式。

yarn的资源调度器:

  YARN中有三种调度器可用:FIFO调度器(FIFO Scheduler),容量调度器(Capacity Scheduler)和公平调度器(Fair Scheduler)。

  (1)FIFO调度器将应用放置在第一个队列中,而后按照提交的顺序(先进先出)运行应用。首先为队列中第一个应用的请求分配资源,第一个应用的请求被知足后再一次为队列中下一个应用服务。优势是简单易懂不须要任何配置,可是不适合共享集群,当使用FIFO调度器(i)时,小做业一直被阻塞,直至大做业完成。

  (2)容量调度器时,一个独立的专门队列保证小做业一提交就能够启动,因为队列容量是为那个队列中的做业所保留的,所以这种策略是以整个集群的利用率为代价的。这意味着与使用FIFO调度器相比,大做业执行的时间要长。

  (3)公平调度器不须要预留必定量的资源,由于调度器会在全部运行的做业之间动态平衡资源。第一个(大)做业启动时,它是惟一运行的做业,于是得到集群中全部的资源。当第二个(小)做业启动时,它被分配到集群的一半资源,这样每一个做业都能公平共享资源。

公平调度器支持抢占功能:

  所谓抢占就是容许调度器终止那些占用资源超过了其公平共享份额的队列的容器,这些容器资源释放后能够分配给资源数量低于应得份额的队列。注意,抢占会下降整个集群的效率,由于被终止的containers须要从新执行。若是队列在指定的时间内未得到的资源仍然低于其公平共享份额的一半,那么调度器就会抢占其余容器。

任务的分配:

  若是不是小做业, 那么应用管理器向资源管理器请求container来运行全部的map任务和reduce任务,每一个任务对应一个container,且只能在该container上运行。这些请求是经过心跳来传输的, 包括每一个map任务的数据位置,好比存放输入分片的主机名和机架(rack). 调度器利用这些信息来调度任务, 尽可能将任务分配给存储数据的节点, 或者退而分配给和存放输入分片的节点相同机架的节点。 请求也包括了任务的内存需求, 默认状况下map和reduce任务的内存需求都1024MB.

Mapreduce中的shuffle过程:

shuffle 开始和结束时间:

  开始时间:map执行完成有输出文件产生,shuffle开始;

  结束时间:reduce输入文件最终肯定了,shuffle结束;

  shuffle优化核心:减小拉取数据的量(io操做)及尽可能使用内存而不是磁盘。

  每一个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候须要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的全部临时文件作合并,生成最终的正式输出文件,而后等待reduce task来拉数据。

具体分为如下步骤:

  shuffle前半段:

  (1)在通过mapper的运行后,mapper的输出结果是key/value对。MapReduce提供Partitioner接口,它根据key决定当前的这对输出数据最终应该交由哪一个reduce task处理。默认是对key进行 hashcode后再以reduce task数量取模。

  (2)接下来,须要将数据写入内存缓冲区中,缓冲区的做用是批量收集map结果,减小磁盘IO的影响。咱们的key/value对以及Partition的结果都会被写入环形缓冲区。环形缓冲区分为两个部分,数据区和索引区。数据区是存放用户真实的数据,索引区存放数据对应的key值,partition和位置信息。整个环形内存缓冲区就是一个字节数组,当map task的输出结果不少时,就可能会撑爆内存,因此须要在达到设定的环形缓冲区的阈值后将缓冲区中的数据临时写入磁盘,而后从新利用这块缓冲区。在溢写前,会将数据根据key和partition进行排序,这个从内存往磁盘写数据的过程被称为Spill(溢写),这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不该该阻止map的结果输出,因此整个缓冲区有个溢写的比例默认是0.8。也就是当缓冲区的数据已经达到阈值,溢写线程启动,锁定这80%的内存,执行溢写过程。Map task的输出结果还能够往剩下的20%内存中写,互不影响。  当溢写线程启动后,须要对这80MB空间内的key作排序(Sort)。若是有一个combiner函数,就在排序后的输出上运行。(运行combiner函数使map的结果更加紧凑,所以减小到写到磁盘上的数据和传递给reduce的数据)

  (3)merger溢写文件:每次溢写会在磁盘上生成一个溢写文件,若是map的输出结果然的很大,有屡次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当map task真正完成时,内存缓冲区中的数据也所有溢写到磁盘中造成一个溢写文件。而后将这些溢写文件归并到一块儿

    至此,map端的全部工做都已结束,最终生成的这个文件也存放在TaskTracker够得着的某个本地目录内。每一个reduce task不断地经过RPC从JobTracker那里获取map task是否完成的信息,若是reduce task获得通知,获知某台TaskTracker上的map task执行完成,Shuffle的后半段过程开始启动。

  Shuffle后半段:

  (4)Copy过程:简单地拉取数据,将Copy过来的数据会先放入reduce端的内存缓冲区中。Reduce进程启动一些数据copy线程(Fetcher),经过HTTP方式请求map task所在的TaskTracker获取map task的输出文件。由于map task早已结束,这些文件就归TaskTracker管理在本地磁盘中。

  (5)Merge阶段:合并不一样map端copy来的数值。

  (6)Reducer的输入文件:不断地merge后,最后会生成一个“最终文件”,这个文件可能存在于磁盘上,也可能存在于内存中。对咱们来讲,固然但愿它存放于内存中,直接做为Reducer的输入,但默认状况下,这个文件是存放于磁盘中的。

java的序列化方式,hadoop的序列化方式:

    序列化就是把内存中的对象的状态信息转换成字节序列,以便于存储(持久化)和网络传输,反序列化就是就将收到的字节序列或者是硬盘的持久化数据,转换成内存中的对象。

  (1)java序列化:只要实现了serializable接口就能实现序列化与反序列化,必定要加上序列化版本ID serialVersionUID,这个是用来识别序列化的以前的类究竟是哪个。好比但愿类的不一样版本对序列化兼容,须要确保类的不一样版本具备相同的serialVersionUID;

  (2)hadoop的序列化信息:hadoop原生的序列化类须要实现一个叫Writeable的接口,相似于serializable接口,实现Writable接口必须实现两个方法:write(DataOutputStream out);readField(DataInputStream in)方法。

      特色:紧凑,对象可重用(  java序列化每次序列化都要从新建立对象,内存消耗大。Writable能够重用)可拓展性(hadoop本身写序列化很容易,能够利用实现hadoop的Writable接口 实现了直接比较字符流以肯定两个Writable对象的大小)

hive

hive的做用:

  hive是基于Hadoop的一个数据仓库工具,能够将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能,能够将sql语句转换为MapReduce任务进行运行。 相对于mapreduce 离线计算须要写不少java代码去实现数据提取,hive能够经过类SQL语句快速实现简单的MapReduce统计,没必要开发专门的MapReduce应用开发程序,更适合数据仓库的统计分析。

hive的架构:

  (1)用户接口:主要有:CLI,client,wui.

  (2)元数据存储:Hive 将元数据存储在数据库中,如 mysql、derby。Hive 中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),表的数据所在目录等

  (3)解释器,编译器,优化器,执行器:生成的查询计划存储在 HDFS 中,并在随后由 MapReduce 调用执行。

hive是如何将HQL语句转化为mapreduce的:

  接收到用户的指令(HQL),使用本身的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口。HiveSQL ->AST(抽象语法树) -> QB(查询块) ->OperatorTree(操做树)->优化后的操做树->mapreduce任务树->优化后的mapreduce任务树

hive的数据倾斜问题:

  缘由:key分布不均匀,业务数据自己的特性,建表时考虑不周,某些SQL语句自己就有数据倾斜.

  解决方法:

    (1)参数调节:hive.map.aggr = true,Map 端部分聚合,至关于Combiner

            hive.groupby.skewindata=true,数据倾斜的时候进行负载均衡,当项设定为 true,生成的查询计划会有两个 MR Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到 Reduce 中,每一个 Reduce 作部分聚合操做,并输出结果,这样处理的结果是相同的 Group By Key 有可能被分发到不一样的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程能够保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成最终的聚合操做。

    (2)HQL语句调节:使用map join让小的维度表(1000条如下的记录条数) 先进内存。在map端完成reduce.

    (3)空值产生的数据倾斜:赋与空值分新的key值.

udf(用户自定义函数)

  虽然有不少内置的函数,可是生产上确定不够那么全面,全部,用户须要自定义函数来知足自身的要求.只须要继承UDF类或者GenericUDF类,重写evaluate方法.而后打包上传jar包,在终端中自定义临时或者永久函数,create关键字.基础UDF的函数读取和返回基本类型,即Hadoop和Hive的基本类型。如,Text、IntWritable、LongWritable、DoubleWritable等。复杂的GenericUDF能够处理Map、List、Set类型.

Hivesql内置函数:

  数学函数:round():四舍五入,ceil():向上取整,floor():向下取整.

  字符函数:lower():转小写,concat():字符串拼接.

kafka

kafka的三种消费模式(消息接收):

  (1)自动提交offset:在建立一个消费者时,默认是自动提交偏移量,这种方式也被称为【at most once】,fetch到消息后就能够更新offset,不管是否消费成功。

  (2)手动提交:对偏移量实行更加精确的管理,以保证消息不被重复消费以及消息不被丢失。这种方式称为【at least once】。fetch到消息后,等消费完成再调用方法【consumer.commitSync()】,手动更新offset;若是消费失败,          则offset也不会更新,此条消息会被重复消费一次。

     手动提交又能够分为:(1)同步提交:同步模式下提交失败时一直尝试提交,直到遇到没法重试的状况下才会结束,同时,同步方式下消费者线程在拉取消息时会被阻塞,直到偏移量提交操做成功或者在提交过程当中发生错误。

               (2)异步提交:而异步方式下消费者线程不会被阻塞,可能在提交偏移量操做的结果还未返回时就开始进行下一次的拉取操做,在提交失败时也不会尝试提交。

 

 kafka在什么地方须要用到zookeeper:

Kafka将元数据信息保存在Zookeeper中,可是发送给Topic自己的数据是不会发到Zk上的

   (1)kafka使用zookeeper来实现动态的集群扩展,不须要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。

  (2) 而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并做出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。这里的客户端指的是Kafka的消息生产端(Producer)和消息消费端(Consumer)

  (3)Broker端使用zookeeper来注册broker信息,以及监测partitionleader存活性.

  (4) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partitionleader创建socket链接,并获取消息.

  (5)Zookeer和Producer没有创建关系,只和Brokers、Consumers创建关系以实现负载均衡,即同一个ConsumerGroup中的Consumers能够实现负载均衡(由于Producer是瞬态的,能够发送后关闭,无需直接等待)

 kafka的高效读写:

  (1)顺序磁盘写:producer生产出来的数据一直是追加到文件末端,不是在文件的随机位置来修改数据.顺序读写能省去类大量磁头寻址的时间.

  (2)零拷贝技术:一般是指计算机在网络上发送文件时,不须要将文件内容拷贝到用户空间(User Space)而直接在内核空间(Kernel Space)中传输到网络的方式。根本没有把数据复制到咱们的应用缓存中,实际上只复制一次到cpu

 kafka结构,为何最新版本不用zookeeper来维护offset:

  之前的版本是将offset 存储在zookeeper上的,kafka在传输数据时,数据消费成功就会修改偏移量,这样就能够保证数据不会丢失而致使传输出错;可是这也存在一个问题:那就是每次消费数据时都要将数据的offset写入一次,效率比较低,并且zookeeper与kafka的offset变化确认也须要走网络IO,这样就会给offset的维护带来不稳定性和低效。

  offset的使用了内部的roker来管理,这样仅仅只须要broker,而不要zookeeper来维护,都是将topic提交给__consumer_offsets函数来执行。

consumer group:

  (1)consumer group下能够有一个或多个consumer instance,consumer instance能够是一个进程,也能够是一个线程

  (2)consumer instance共享一个公共的ID,即group ID。

  (3)consumer group下订阅的topic下的每一个分区只能分配给某个group下的一个consumer(固然该分区还能够被分配给其余group)

 kafka完整运行流程:

  https://blog.csdn.net/qq_35641192/article/details/80956244

相关文章
相关标签/搜索