spark ui说明

sparkstreaming ui

图一java

第一行(标记为 [A])展现了Streaming应用程序当前的状态;在这个例子中,应用已经以1秒的批处理间隔运行了将近40分钟;在它下面是输入速率(Input rate)的时间轴(标记为 [B]),显示了Streaming应用从它全部的源头以大约49 events每秒的速度接收数据。在这个例子中,时间轴显示了在中间位置(标记为[C])平均速率有明显的降低,在时间轴快结束的地方应用又恢复了。若是你想获得更多详细的信息,你能够点击 Input Rate旁边(靠近[B])的下拉列表来显示每一个源头各自的时间轴,正以下面图2所示:sql

图二api

图2显示了这个应用有两个来源,(SocketReceiver-0和 SocketReceiver-1),其中的一个致使了整个接收速率的降低,由于它在接收数据的过程当中中止了一段时间。缓存

  这一页再向下(在图1中标记为 [D] ),处理时间(Processing Time)的时间轴显示,这些批次大约在平均20毫秒内被处理完成,和批处理间隔(在本例中是1s)相比花费的处理时间更少,意味着调度延迟(被定义为:一个批次等待以前批次处理完成的时间,被标记为 [E])几乎是零,由于这些批次在建立的时候就已经被处理了。调度延迟是你的Streaming引用程序是否稳定的关键所在,UI的新功能使得对它的监控更加容易。分布式

  批次细节性能

  再次参照图1,你可能很好奇,为何向右的一些批次花费更长的时间才能完成(注意图1中的[F])。你能够经过UI轻松的分析缘由。首先,你能够点击时间轴视图中批处理时间比较长的点,这将会在页面下方产生一个关于完成批次的详细信息列表。测试

图三优化

它将显示这个批次的全部主要信息(在上图3中以绿色高亮显示)。正如你所看到的,这个批次较之其余批次有更长的处理时间。另外一个很明显的问题是:究竟是哪一个spark job引发了这个批次的处理时间过长。你能够经过点击Batch Time(第一列中的蓝色连接),这将带你看到对应批次的详细信息,向你展现输出操做和它们的spark job,正如图4所示。ui

图四spa

图4显示有一个输出操做,它产生了3个spark job。你能够点击job ID连接继续深刻到stages和tasks作更深刻的分析。此后跟spark sql的ui差很少,以下。

spark sql ui

主页面

上面就是Spark的UI主页,首先进来能看到的是Spark当前应用的job页面,在上面的导航栏:

  • 1 表明job页面,在里面能够看到当前应用分析出来的全部任务,以及全部的excutors中action的执行时间。
  • 2 表明stage页面,在里面能够看到应用的全部stage,stage是按照宽依赖来区分的,所以粒度上要比job更细一些
  • 3 表明storage页面,咱们所作的cache persist等操做,都会在这里看到,能够看出来应用目前使用了多少缓存
  • 4 表明environment页面,里面展现了当前spark所依赖的环境,好比jdk,lib等等
  • 5 表明executors页面,这里能够看到执行者申请使用的内存以及shuffle中input和output等数据
  • 6 这是应用的名字,代码中若是使用setAppName,就会显示在这里
  • 7 是job的主页面。

模块讲解

下面挨个介绍一下各个页面的使用方法和实践,为了方便分析,我这里直接使用了分布式计算里面最经典的helloworld程序——WordCount,这个程序用于统计某一段文本中一个单词出现的次数。原始的文本以下:

for the shadow of lost knowledge at least protects you from many illusions

 上面这句话是有一次逛知乎,一个标题为 读那么多书,最后也没记住多少,还为何读书?其中有一个回复,引用了上面的话,也是我最喜欢的一句。意思是:“知识,哪怕是知识的幻影,也会成为你的铠甲,保护你不被愚昧反噬”(来自知乎——《为何读书?》)

程序代码以下:

public static void main(String[] args) throws InterruptedException {
        SparkConf sparkConf = new SparkConf();
        sparkConf.setMaster("local[2]");
        sparkConf.setAppName("test-for-spark-ui");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);

        //知识,哪怕是知识的幻影,也会成为你的铠甲,保护你不被愚昧反噬。
        JavaPairRDD<String,Integer> counts = sc.textFile( "C:\\Users\\xinghailong\\Desktop\\你为何要读书.txt" )
                .flatMap(line -> Arrays.asList(line.split(" ")))
                .mapToPair(s -> new Tuple2<String,Integer>(s,1))
                .reduceByKey((x,y) -> x+y);

        counts.cache();
        List<Tuple2<String,Integer>> result = counts.collect();
        for(Tuple2<String,Integer> t2 : result){
            System.out.println(t2._1+" : "+t2._2);
        }
        sc.stop();
}

这个程序首先建立了SparkContext,而后读取文件,先使用` `进行切分,再把每一个单词转换成二元组,再根据key进行累加,最后输出打印。为了测试storage的使用,我这对计算的结果添加了缓存。

job页面

主页能够分为两部分,一部分是event timeline,另外一部分是进行中和完成的job任务。

第一部分event timeline展开后,能够看到executor建立的时间点,以及某个action触发的算子任务,执行的时间。经过这个时间图,能够快速的发现应用的执行瓶颈,触发了多少个action。

第二部分的图表,显示了触发action的job名字,它一般是某个count,collect等操做。有spark基础的人都应该知道,在spark中rdd的计算分为两类,一类是transform转换操做,一类是action操做,只有action操做才会触发真正的rdd计算。具体的有哪些action能够触发计算,能够参考api。collect at test2.java:27描述了action的名字和所在的行号,这里的行号是精准匹配到代码的,因此经过它能够直接定位到任务所属的代码,这在调试分析的时候是很是有帮助的。Duration显示了该action的耗时,经过它也能够对代码进行专门的优化。最后的进度条,显示了该任务失败和成功的次数,若是有失败的就须要引发注意,由于这种状况在生产环境可能会更广泛更严重。点击能进入该action具体的分析页面,能够看到DAG图等详细信息。

stage页面

在Spark中job是根据action操做来区分的,另外任务还有一个级别是stage,它是根据宽窄依赖来区分的。

窄依赖是指前一个rdd计算能出一个惟一的rdd,好比map或者filter等;宽依赖则是指多个rdd生成一个或者多个rdd的操做,好比groupbykey reducebykey等,这种宽依赖一般会进行shuffle。

所以Spark会根据宽窄依赖区分stage,某个stage做为专门的计算,计算完成后,会等待其余的executor,而后再统一进行计算。

stage页面的使用基本上跟job相似,不过多了一个DAG图。这个DAG图也叫做血统图,标记了每一个rdd从建立到应用的一个流程图,也是咱们进行分析和调优很重要的内容。好比上面的wordcount程序,就会触发acton,而后生成一段DAG图:

从这个图能够看出,wordcount会生成两个dag,一个是从读数据到切分到生成二元组,第二个进行了reducebykey,产生shuffle。

点击进去还能够看到详细的DAG图,鼠标移到上面,能够看到一些简要的信息。

storage页面

storage页面能看出目前使用的缓存,点击进去能够看到具体在每一个机器上,使用的block的状况。

environment页面

这个页面通常不太用,由于环境基本上不会有太多差别的,不用时刻关注它。

excutors页面

这个页面比较经常使用了,一方面经过它能够看出来每一个excutor是否发生了数据倾斜,另外一方面能够具体分析目前的应用是否产生了大量的shuffle,是否能够经过数据的本地性或者减少数据的传输来减小shuffle的数据量。

调优经验总结

1 输出信息

在Spark应用里面能够直接使用System.out.println把信息输出出来,系统会直接拦截out输出到spark的日志。像咱们使用的yarn做为资源管理系统,在yarn的日志中就能够直接看到这些输出信息了。这在数据量很大的时候,作一些show()(默认显示20),count() 或者 take(10)的时候会很方便。

2 内存不够

当任务失败,收到sparkContext shutdown的信息时,基本都是执行者的内存不够。这个时候,一方面能够调大--excutor-memory参数,另外一方面仍是得回去看看程序。若是受限于系统的硬件条件,没法加大内存,能够采用局部调试法,检查是在哪里出现的内存问题。好比,你的程序分红几个步骤,一步一步的打包运行,最后检查出现问题的点就能够了。

3 ThreadPool

线程池不够,这个是由于--excutor-core给的太少了,出现线程池不够用的状况。这个时候就须要调整参数的配置了。

4 physical memory不够

这种问题通常是driver memory不够致使的,driver memory一般存储了以一些调度方面的信息,这种状况颇有多是你的调度过于复杂,或者是内部死循环致使。

5 合理利用缓存

在Spark的计算中,不太建议直接使用cache,万一cache的量很大,可能致使内存溢出。能够采用persist的方式,指定缓存的级别为MEMORY_AND_DISK,这样在内存不够的时候,能够把数据缓存到磁盘上。另外,要合理的设计代码,恰当地使用广播和缓存,广播的数据量太大会对传输带来压力,缓存过多未及时释放,也会致使内存占用。通常来讲,你的代码在须要重复使用某一个rdd的时候,才须要考虑进行缓存,而且在不使用的时候,要及时unpersist释放。

6 尽可能避免shuffle

这个点,在优化的过程当中是很重要的。好比你须要把两个rdd按照某个key进行groupby,而后在进行leftouterjoin,这个时候必定要考虑大小表的问题。若是把大表关联到小表,那么性能极可能会很惨。而只须要简单的调换一下位置,性能就可能提高好几倍。

相关文章
相关标签/搜索