Spark程序排错

1.shuffle相关

报错提示

org.apache.spark.shuffle.MetadataFetchFailedException: 
Missing an output location for shuffle 0
 
org.apache.spark.shuffle.FetchFailedException:
Failed to connect to hostname/192.168.xx.xxx:50268

java.lang.AssertionError: assertion failed
at scala.Predef$.assert(Predef.scala:165)
at org.apache.spark.memory.UnifiedMemoryManager.acquireExecutionMemory(UnifiedMemoryManager.scala:80)    java

原理分析

shuffle分为shuffle writeshuffle read两部分。
shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。sql

shuffle write能够简单理解为相似于saveAsLocalDiskFile的操做,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。apache

shuffle read的时候数据的分区数则是由spark提供的一些参数控制。能够想到的是,若是这个参数值设置的很小,同时shuffle read的量很大,那么将会致使一个task须要处理的数据很是大。结果致使JVM crash,从而致使取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即便不会致使JVM crash也会形成长时间的gc。ssh

解决办法

知道缘由后问题就好解决了,主要从shuffle的数据量和处理shuffle数据的分区数两个角度入手。ide

  1. 减小shuffle数据ui

    思考是否可使用map side join或是broadcast join来规避shuffle的产生。spa

    将没必要要的数据在shuffle前进行过滤,好比原始数据有20个字段,只要选取须要的字段进行处理便可,将会减小必定的shuffle数据。.net

  2. SparkSQL和DataFrame的join,group by等操做scala

    经过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度提升这个值。code

  3. Rdd的join,groupBy,reduceByKey等操做

    经过spark.default.parallelism控制shuffle read与reduce处理的分区数,默认为运行任务的core的总数(mesos细粒度模式为8个,local模式为本地的core总数),官方建议为设置成运行任务的core的2-3倍。

  4. 提升executor的内存

    经过spark.executor.memory适当提升executor的memory值。

  5. 是否存在数据倾斜的问题

    空值是否已通过滤?异常数据(某个key数据特别大)是否能够单独处理?考虑改变数据分区规则。

参考

http://blog.csdn.net/lsshlsw/article/details/51213610

相关文章
相关标签/搜索