故障描述
java
前段时间在测试Spark的RDD转换的lazy特性是发现了一个Spark内部对taskSet在executor的运行分配不均匀问题。先上两张图出现问题时间点的图,你们估计就明白怎么回事了:
apache
再看看简单的测试代码:
dom
import org.apache.spark._ import org.apache.spark.storage.StorageLevel /** * Created by zhaozhengzeng on 2015/1/5. */ import java.util.Random import org.apache.hadoop.io.compress.CompressionCodec import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ object JoinTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Spark count test"). set("spark.kryoserializer.buffer.max.mb", "128"). set("spark.shuffle.manager", "sort") // set("spark.default.parallelism","1000") val sc = new SparkContext(sparkConf) //链接表1 val textFile1 = sc.textFile("/user/hive/warehouse/test1.db/st_pc_lifecycle_list/dt=2014-07-01").map(p => { val line = p.split("\\|") (line(10), 1) } ).reduceByKey((x, y) => x + y) //测试RDD的lozy特性 val textFile3 = sc.textFile("/user/hive/warehouse/test1.db/st_pc_lifecycle_list/dt=2014-09-*").map(p => { val line = p.split("\\|") (line(11),"") }) val textFile2 = sc.textFile("/user/hive/warehouse/test1.db/st_pc_lifecycle_list/*").mapPartitions({ it => for { line <- it } yield (line.split("\\|")(10), "") }) val count = textFile1.join(textFile2).count() println("join 以后的记录数据:" + count) //textFile1.saveAsTextFile("/user/hive/warehouse/test1.db/testRs/rs2") sc.stop() } }
描述下,上面代码主要测试RDD的Join转换,以及测试textFile3的translation的lazy特性。在整个测试过程经过观察Spark UI看到上面这种TaskSet分布不均匀状况。第一个图中的Active Task为0的executor中在运行第一个stage的taskSet后,spark不会讲第二个stage的taskSet分配到这些executor中执行了。可是奇怪的是这种状况并非常常会出现,我再接下来的N次重跑做业又不会出现这种状况,具体什么缘由暂时没法找到,连重现的机会都没有,哈哈。这里先记录下吧,再观察...ide