spark任务java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

crash:“Caused by: java.lang.RuntimeException: java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE”

这是 spark 的一个经典错误,很有可能就是 shuffle 的时候有太大的 key 或 value 造成的(当然,此 crash 也有可能涉及 序列化、反序列化、cache 等环节原因)。spark默认的each partition size is <= 2GB。

原来我分了300份报错,
var data = hc.sql(s"select pri_acct_no_conv, trans_id, trans_at, pdate, loc_trans_tm, acpt_ins_id_cd, trans_md, cross_dist_in,trans_id, " +
s"trim($srcColumn) as s r c C o l u m n , " + s " t r i m ( srcColumn," + s"trim( destColumn) as $destColumn " +
s"from t a b l e N a m e " + s " w h e r e p d a t e > = tableName " + s"where pdate>= beginDate and pdate<=$endDate").repartition(300).persist(StorageLevel.MEMORY_AND_DISK_SER)

在UI 界面,在执行persist(StorageLevel.MEMORY_AND_DISK_SER)的时候,会显示input进程,例如下图:
在这里插入图片描述
Input大约84.5G的时候,才进行了117/5360阶段,因此可以推算出总数据量大约在3800G左右。
这样一来,至少得分配3800/2 =1900个partition才够。
碰到的spark程序的错误(Size exceeds Integer.MAX_VALUE)以及其他一点优化

因此,我后面分配了3000的参数以后,程序就能正常运行了,大概在40分钟左右所有程序就结束了。
var data = hc.sql(s"select pri_acct_no_conv, trans_id, trans_at, pdate, loc_trans_tm, acpt_ins_id_cd, trans_md, cross_dist_in,trans_id, " +
s"trim($srcColumn) as s r c C o l u m n , " + s " t r i m ( srcColumn," + s"trim( destColumn) as $destColumn " +
s"from t a b l e N a m e " + s " w h e r e p d a t e > = tableName " + s"where pdate>= beginDate and pdate<=$endDate").repartition(3000).persist(StorageLevel.MEMORY_AND_DISK_SER)

根据某司机的经验,如果数据量没这么大的话,repartition的参数跟在spark-submit文件中配置的
–num-executors 保持差不多就够了,这样性能最好。

顺便说一句,persist和cache函数在数据量较大的情况下还是挺耗时间的,cache不是很明显,persist很明显,所以不要乱加persist。

哦,该程序中还发现
var dataFrame1=transferData_tmp.select(s" s r c C o l u m n " ) . d i s t i n c t ( ) v a r d a t a F r a m e 2 = t r a n s f e r D a t a t m p . s e l e c t ( s " {srcColumn}").distinct() var dataFrame2=transferData_tmp.select(s" {destColumn}").distinct()
var cardRdd_transfer= dataFrame1.unionAll(dataFrame2).distinct().map {_.getString(0)}

要比下面这样写法要快不少(感觉跟mapreduce中的combine函数的思路比较像吧。)
var dataFrame1=transferData_tmp.select(s" s r c C o l u m n " ) v a r d a t a F r a m e 2 = t r a n s f e r D a t a t m p . s e l e c t ( s " {srcColumn}") var dataFrame2=transferData_tmp.select(s" {destColumn}")
var cardRdd_transfer= dataFrame1.unionAll(dataFrame2).distinct().map {_.getString(0)}

在对一些RDD或者dataframe操作的时候,碰到下面这些问题

Container killed by YARN for exceeding memory limits. 5.6 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

可以尝试对这些RDD 进行repartition,然后再操作。我理解的repartition在两方面比较有用,1是当中间计算的结果过于分散,我们使用repartition可以适当地将数据进行集中,然后在进行计算加快计算速度。2是当partition过少,导致每个partition上的数据量过大的时候,repartition进行增大数量可以分散每个partition上的数量。