Master默认使用512M内存,当集群中运行的任务特别多时,就会挂掉,缘由是master会读取每一个task的event log日志去生成Spark ui,内存不足天然会OOM,能够在master的运行日志中看到,经过HA启动的master天然也会由于这个缘由失败。java
增长Master的内存占用,在Master节点spark-env.sh
中设置:node
export SPARK_DAEMON_MEMORY 10g # 根据你的实际状况
减小保存在Master内存中的做业信息python
spark.ui.retainedJobs 500 # 默认都是1000 spark.ui.retainedStages 500
有时候咱们还会在web ui中看到worker节点消失或处于dead状态,在该节点运行的任务则会报各类 lost worker
的错误,引起缘由和上述大致相同,worker内存中保存了大量的ui信息致使gc时失去和master之间的心跳。mysql
增长Master的内存占用,在Worker节点spark-env.sh
中设置:git
export SPARK_DAEMON_MEMORY 2g # 根据你的实际状况
减小保存在Worker内存中的Driver,Executor信息github
spark.worker.ui.retainedExecutors 200 # 默认都是1000 spark.worker.ui.retainedDrivers 200
Spark Shuffle FetchFailedException解决方案web
missing output locationsql
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
shuffle fetch faild数据库
org.apache.spark.shuffle.FetchFailedException: Failed to connect to spark047215/192.168.47.215:50268
当前的配置为每一个executor使用1core,5GRAM,启动了20个executorapache
这种问题通常发生在有大量shuffle操做的时候,task不断的failed,而后又重执行,一直循环下去,直到application失败。
通常遇到这种问题提升executor内存便可,同时增长每一个executor的cpu,这样不会减小task并行度。
启动的execuote数量为:7个
execuoterNum = spark.cores.max/spark.executor.cores
每一个executor的配置:
3core,15G RAM
消耗的内存资源为:105G RAM
15G*7=105G
能够发现使用的资源并无提高,可是一样的任务原来的配置跑几个小时还在卡着,改了配置后几分钟就能完成。
executor lost
WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local): ExecutorLostFailure (executor lost)
task lost
WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217): java.io.IOException: Connection from /192.168.47.217:55483 closed
各类timeout
java.util.concurrent.TimeoutException: Futures timed out after [120 second] ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network. timeout if this is wrong
由网络或者gc引发,worker或executor没有接收到executor或task的心跳反馈。
提升 spark.network.timeout
的值,根据状况改为300(5min)或更高。
默认为 120(120s),配置全部网络传输的延时,若是没有主动设置如下参数,默认覆盖其属性
数据倾斜
任务倾斜
差距不大的几个task,有的运行速度特别慢。
大多数任务都完成了,还有那么一两个任务怎么都跑不完或者跑的很慢,分为数据倾斜和task倾斜两种。
数据倾斜
数据倾斜大多数状况是因为大量的无效数据引发,好比null或者”“,也有多是一些异常数据,好比统计用户登陆状况时,出现某用户登陆过千万次的状况,无效数据在计算前须要过滤掉。
数据处理有一个原则,多使用filter,这样你真正须要分析的数据量就越少,处理速度就越快。
sqlContext.sql("...where col is not null and col != ''")
任务倾斜
task倾斜缘由比较多,网络io,cpu,mem都有可能形成这个节点上的任务执行缓慢,能够去看该节点的性能监控来分析缘由。之前遇到过同事在spark的一台worker上跑R的任务致使该节点spark task运行缓慢。
或者能够开启spark的推测机制,开启推测机制后若是某一台机器的几个task特别慢,推测机制会将任务分配到其余机器执行,最后Spark会选取最快的做为最终结果。
堆内存溢出
java.lang.OutOfMemoryError: Java heap space
内存不够,数据太多就会抛出OOM的Exeception,主要有driver OOM和executor OOM两种
driver OOM
通常是使用了collect操做将全部executor的数据聚合到driver致使。尽可能不要使用collect操做便可。
executor OOM
能够按下面的内存优化的方法增长code使用内存空间
spark.executor.memory
的值org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...
若是你在worker中调用了driver中定义的一些变量,Spark就会将这些变量传递给Worker,这些变量并无被序列化,因此就会看到如上提示的错误了。
val x = new X() //在driver中定义的变量 dd.map{r => x.doSomething(r) }.collect //map中的代码在worker(executor)中执行
除了上文的map,还有filter,foreach,foreachPartition等操做,还有一个典型例子就是在foreachPartition中使用数据库建立链接方法。这些变量没有序列化致使的任务报错。
下面提供三种解决方法:
sparkConf
,SparkContext
,都用 @transent
进行注解,表示这些变量不须要被序列化Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 374 tasks (1026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
spark.driver.maxResultSize默认大小为1G 每一个Spark action(如collect)全部分区的序列化结果的总大小限制,简而言之就是executor给driver返回的结果过大,报这个错说明须要提升这个值或者避免使用相似的方法,好比countByValue,countByKey等。
将值调大便可
spark.driver.maxResultSize 2g
WARN TaskSetManager: Stage 198 contains a task of very large size (5953 KB). The maximum recommended task size is 100 KB.
这个WARN可能还会致使ERROR
Caused by: java.lang.RuntimeException: Failed to commit task Caused by: org.apache.spark.executor.CommitDeniedException: attempt_201603251514_0218_m_000245_0: Not committed because the driver did not authorize commit
若是你比较了解spark中的stage是如何划分的,这个问题就比较简单了。
一个Stage中包含的task过大,通常因为你的transform过程太长,所以driver给executor分发的task就会变的很大。
因此解决这个问题咱们能够经过拆分stage解决。也就是在执行过程当中调用cache.count
缓存一些中间数据从而切断过长的stage。
driver节点内存不足
driver内存不足致使没法启动application,将driver分配到内存足够的机器上或减小driver-memory
Java HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0x0000000680000000, 4294967296, 0) failed;
error=’Cannot allocate memory’ (errno=12)
hdfs空间不够
hdfs空间不足,event_log没法写入,因此 ListenerBus会报错
,增长hdfs空间(删除无用数据或增长节点)
Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/spark-history/app-20151228095652-0072.inprogress could only be replicated to 0 nodes instead of minReplication (=1) ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.lang.reflect.InvocationTargetException
spark编译包与Hadoop版本不一致
下载对应hadoop版本的spark包或本身编译。
java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID
driver机器端口使用过多
在一台机器上没有指定端口的状况下,提交了超过15个任务。
16/03/16 16:03:17 ERROR SparkUI: Failed to bind SparkUI java.net.BindException: 地址已在使用: Service 'SparkUI' failed after 16 retries!
提交任务时指定app web ui端口号解决:
--conf spark.ui.port=xxxx
java.io.UIException: Cannot run program "python2.7": error=2,没有那个文件或目录
spark使用的Python版本为2.7,centOS默认python版本为2.6,升级便可。
部分节点上有错误提示
java.io.IOExeception: Cannot run program "python2.7": error=13, 权限不够
新加的节点运维装2.7版本的python,python命令是正确的,python2.7却没法调用,只要改改环境变量就行了。
TypeError: ('__cinit__() takes exactly 8 positional arguments (11 given)', <type 'sklearn.tree._tree.Tree'>, (10, array([1], dtype=int32), 1, <sklearn.tree._tree.RegressionCriterion object at 0x100077480>, 50.0, 2, 1, 0.1, 10, 1, <mtrand.RandomState object at 0x10a55da08>))
该pickle文件是在0.17版本的scikit-learn下训练出来的,有些机器装的是0.14版本,版本不一致致使,升级可解决,记得将老版本数据清理干净,不然会报各类Cannot import xxx
的错误。
有时候你会看到部分executor并无在执行任务,为何呢?
(1) 任务partition数过少,
要知道每一个partition只会在一个task上执行任务。改变分区数,能够经过 repartition
方法,即便这样,在 repartition
前仍是要从数据源读取数据,此时(读入数据时)的并发度根据不一样的数据源受到不一样限制,经常使用的大概有如下几种:
hdfs - block数就是partition数 mysql - 按读入时的分区规则分partition es - 分区数即为 es 的 分片数(shard)
(2) 数据本地性的反作用
taskSetManager在分发任务以前会先计算数据本地性,优先级依次是:
process(同一个executor) -> node_local(同一个节点) -> rack_local(同一个机架) -> any(任何节点)
会优先执行其它 executor
上高优先级的任务,这几台数据本地性没作好的机器在这个阶段常常处于干瞪眼的状态(不执行任务)。
判断的公式为:
curTime – lastLaunchTime >= localityWaits(currentLocalityIndex)
其中 curTime
为系统当前时间,lastLaunchTime
为在某优先级下最后一次启动task的时间
若是知足这个条件则会进入下一个优先级的时间判断,直到 any
,不知足则分配当前优先级的任务。
数据本地性任务分配的源码在 taskSetManager.scala
,有兴趣的同窗能够去看看
若是该问题很严重,能够下降如下参数的值,默认都是3s。
spark.locality.wait.process spark.locality.wait.node spark.locality.wait.rack
当你数据本地性不好,可适当提升上述值。
有可能哪台worker节点出现了故障,task执行失败后会在该 executor
上不断重试,达到最大重试次数后会致使整个 application
执行失败,咱们能够设置失败黑名单(task在该节点运行失败后会换节点重试),能够看到在源码中默认设置的是 0
,
private val EXECUTOR_TASK_BLACKLIST_TIMEOUT = conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)
在 spark-default.sh
中设置
spark.scheduler.executorTaskBlacklistTime 30000
当 task
在该 executor
运行失败后会在其它 executor
中启动,同时此 executor
会进入黑名单30s(不会分发任务到该executor)。
若是你的任务shuffle量特别大,同时rdd缓存比较少能够更改下面的参数进一步提升任务运行速度。
spark.storage.memoryFraction
- 分配给rdd缓存的比例,默认为0.6(60%),若是缓存的数据较少能够下降该值。
spark.shuffle.memoryFraction
- 分配给shuffle数据的内存比例,默认为0.2(20%)
剩下的20%内存空间则是分配给代码生成对象等。
若是任务运行缓慢,jvm进行频繁gc或者内存空间不足,或者能够下降上述的两个值。
"spark.rdd.compress","true"
- 默认为false,压缩序列化的RDD分区,消耗一些cpu减小空间的使用
spark.default.parallelism
发生shuffle时的并行度,在standalone模式下的数量默认为core的个数,也可手动调整,数量设置太大会形成不少小任务,增长启动任务的开销,过小,运行大数据量的任务时速度缓慢。
spark.sql.shuffle.partitions
sql聚合操做(发生shuffle)时的并行度,默认为200
若是该值过小会致使OOM,executor丢失,任务执行时间过长的问题
相同的两个任务:
spark.sql.shuffle.partitions=300:
spark.sql.shuffle.partitions=500:
速度变快主要是大量的减小了gc的时间。
可是设置过大会致使性能恶化,致使某些task会hang住没法执行。
修改map阶段并行度主要是在代码中使用rdd.repartition(partitionNum)
来操做。