这是我参与更文挑战的第1天,活动详情查看:更文挑战git
在Spark中建立RDD的建立方式能够分为四种:github
//内存建立RDD
def main(args: Array[String]): Unit = {
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("Rdd-Mem")
)
val rdd1: RDD[Int] = sc.makeRDD(
List(1, 2, 4, 5, 6)
)
val rdd2: RDD[Int] = sc.parallelize(
Array(1, 2, 3, 4, 5, 6)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
}
复制代码
//makeRDD源码
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
复制代码
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("Rdd-File")
)
val rdd1: RDD[String] = sc.textFile("data")
//wholeTextFiles Tuple第一个数据为文件全路径 Tuple第二个为每行数据
val rdd2: RDD[(String, String)] = sc.wholeTextFiles("data/word*.txt")
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
}
复制代码
默认状况下,Spark能够将一个做业切分多个任务后,发送给Executor节点并行计算,而可以并行计算的任务数量咱们称之为并行度。这个数量能够在构建RDD时指定。记住,这里的并行执行的任务数量,并非指的切分任务的数量,不要混淆了。apache
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sc = new SparkContext(sparkConf)
val dataRDD: RDD[Int] =
sc.makeRDD(
List(1,2,3,4),
4)
val fileRDD: RDD[String] =
sc.textFile(
"input",
2)
fileRDD.collect().foreach(println)
sparkContext.stop()
复制代码
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
//计算每一个分区开始位置和结束位置
//[1,2,3,4,5] 分红两个分区后会成为 [1,2][3,4,5]
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
//下面为具体的拆分代码
seq match {
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] =>
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
case _ =>
val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map { case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
复制代码
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
SparkHadoopUtil.get.addCredentials(jobConf)
try {
// 分区
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
..........
复制代码
// 具体如何分区
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
long totalSize = 0;
for (FileStatus file: files) {
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
...
for (FileStatus file: files) {
...
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
...
}
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
复制代码
从计算的角度, 算子之外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行。那么在scala的函数式编程中,就会致使算子内常常会用到算子外的数据,这样就造成了闭包的效果,若是使用的算子外的数据没法序列化,就意味着没法传值给Executor端执行,就会发生错误,因此须要在执行任务计算前,检测闭包内的对象是否能够进行序列化,这个操做咱们称之为闭包检测。Scala2.12版本后闭包编译方式发生了改变编程
从计算的角度, 算子之外的代码都是在Driver端执行, 算子里面的代码都是在Executor端执行,代码以下:数组
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("测试序列化")
)
val dept1 = new Dept(1, "研发部")
val dept0 = new Dept(0, "未知")
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 1),
("b", 4), ("F", 5), ("K", 6)
))
rdd.map(t => {
t._2 match {
case 1 => (t._1, dept1)
case _ => (t._1, dept0)
}
}).collect() foreach println
}
class Dept(var id: Int, var name: String) extends Serializable {
override def toString: String = id + "\t" + name
}
复制代码
//校验的代码
private def clean(
func: AnyRef,
checkSerializable: Boolean,
cleanTransitively: Boolean,
accessedFields: Map[Class[_], Set[String]]): Unit = {
..............
// 校验序列化
if (checkSerializable) {
ensureSerializable(func)
}
}
private def ensureSerializable(func: AnyRef): Unit = {
try {
if (SparkEnv.get != null) {
SparkEnv.get.closureSerializer.newInstance().serialize(func)
}
} catch {
case ex: Exception => throw new SparkException("Task not serializable", ex)
}
}
//不实现序列号接口会跑出以下异常
//Exception in thread "main" org.apache.spark.SparkException: Task not serializable
// at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
复制代码
项目地址: github.com/EsotericSof…
Java的序列化可以序列化任何的类。可是比较重(字节多),序列化后,对象的提交也比较大。Spark出于性能的考虑,Spark2.0开始支持另一种Kryo序列化机制。Kryo速度是Serializable的10倍。当RDD在Shuffle数据的时候,简单数据类型、数组和字符串类型已经在Spark内部使用Kryo来序列化
注意:即便使用Kryo序列化,也要继承Serializable接口。markdown
def main(args: Array[String]): Unit = {
val sc = new SparkContext(
new SparkConf()
.setMaster("local[*]")
.setAppName("测试序列化")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Dept]))
)
val dept1 = new Dept(1, "研发部")
val dept0 = new Dept(0, "未知")
val rdd = sc.makeRDD(List(
("a", 1), ("a", 2), ("b", 3), ("b", 1),
("b", 4), ("F", 5), ("K", 6)
))
rdd.map(t => {
t._2 match {
case 1 => (t._1, dept1)
case _ => (t._1, dept0)
}
}).collect() foreach println
}
class Dept(var id: Int, var name: String) extends Serializable {
override def toString: String = id + "\t" + name
}
复制代码