Spark RDD、DataFrame原理及操做详解

RDD是什么?

  RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的所有或部分能够缓存在内存中,在屡次计算间重用。html

  RDD内部能够有许多分区(partitions),每一个分区又拥有大量的记录(records)。java

五个特征:sql

  dependencies:创建RDD的依赖关系,主要rdd之间是宽窄依赖的关系,具备窄依赖关系的rdd能够在同一个stage中进行计算。数据库

  partition:一个rdd会有若干个分区,分区的大小决定了对这个rdd计算的粒度,每一个rdd的分区的计算都在一个单独的任务中进行。json

  preferedlocations:按照“移动数据不如移动计算”原则,在spark进行任务调度的时候,优先将任务分配到数据块存储的位置数组

  compute:spark中的计算都是以分区为基本单位的,compute函数只是对迭代器进行复合,并不保存单次计算的结果。缓存

  partitioner:只存在于(K,V)类型的rdd中,非(K,V)类型的partitioner的值就是None。网络

 

  rdd的算子action会触发真正的做业提交,而transformation算子是不会当即触发做业提交的。app

  在Spark中,全部RDD的转换都是是惰性求值的。RDD的转换操做transformation会生成新的RDD,新的RDD的数据依赖于原来的RDD的数据,每一个RDD又包含多个分区。那么一段程序实际上就构造了一个由相互依赖的多个RDD组成的有向无环图(DAG)。并经过在RDD上执行action动做将这个有向无环图做为一个Job提交给Spark执行分布式

  在DAG中又进行stage的划分,划分的依据是依赖算子是不是shuffle(如reduceByKey,Join等)的,每一个stage又能够划分红若干task。接下来的事情就是driver发送task到executor,executor本身的线程池去执行这些task,完成以后将结果返回给driver。action算子是划分不一样job的依据。

  Spark对于有向无环图Job进行调度,肯定阶段(Stage),分区(Partition),流水线(Pipeline),任务(Task)和缓存(Cache),进行优化,并在Spark集群上运行Job。RDD之间的依赖分为宽依赖(依赖多个分区)和窄依赖(只依赖一个分区),在肯定阶段时,须要根据宽依赖shuffle划分阶段。根据分区划分任务。

  Spark支持故障恢复的方式也不一样,提供两种方式,Linage,经过数据的血缘关系,再执行一遍前面的处理,Checkpoint,将数据集存储到持久存储中。  Spark为迭代式数据处理提供更好的支持。每次迭代的数据能够保存在内存中,而不是写入文件

 

这里注意两个算子coalesce()和repartition()

coalesce

def coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T] 
该函数用于将RDD进行重分区,使用HashPartitioner。 
第一个参数为重分区的数目,第二个为是否进行shuffle,默认为false。

repartition

def repartition(numPartitions: Int): RDD[T] 
该函数其实就是coalesce函数第二个参数为true的实现。

使用注意

他们两个都是RDD的分区进行从新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,须要从新划分红M个分区) 
  1)N < M。通常状况下N个分区有数据分布不均匀的情况,利用HashPartitioner函数将数据从新分区为M个,这时须要将shuffle设置为true。 
  2)若是N > M而且N和M相差很少,(假如N是1000,M是100)那么就能够将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时能够将shuff设置为false,在shuffl为false的状况下,若是M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。 
  3)若是N > M而且二者相差悬殊,这时若是将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个stage中,就可能形成Spark程序的并行度不够,从而影响性能,若是在M为1的时候,为了使coalesce以前的操做有更好的并行度,能够讲shuffle设置为true。

  总之:若是shuff为false时,若是传入的参数大于现有的分区数目,RDD的分区数不变,也就是说不通过shuffle,是没法将RDDde分区数变多的

参考:

Spark算子:RDD基本转换操做(2)–coalesce、repartition

更多RDD算子内容推荐参考

    Spark函数详解系列之RDD基本转换

    Spark经常使用函数讲解之键值RDD转换

    Spark经常使用函数讲解之Action操做

 

Spark的RDD原理以及2.0特性的介绍

 

 

窄依赖和宽依赖

  shuffle 是划分 DAG 中 stage 的标识,同时影响 Spark 执行速度的关键步骤
  RDD 的 Transformation 函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操做.窄依赖跟宽依赖的区别是是否发生 shuffle(洗牌) 操做.宽依赖会发生 shuffle 操做. 窄依赖是子 RDD的各个分片(partition)不依赖于其余分片,可以独立计算获得结果,宽依赖指子 RDD 的各个分片会依赖于父RDD 的多个分片,因此会形成父 RDD 的各个分片在集群中从新分片  

以下图所示:map就是一种窄依赖,而join则会致使宽依赖

 

如上面的map,filter,union属于第一类窄依赖,而join with inputs co-partitioned(对输入进行协同划分的join操做,也就是说先按照key分组而后shuffle write的时候一个父分区对应一个子分区)则为第二类窄依赖

 groupByKey和对输入未协同划分的join操做就是宽依赖,这是shuffle类操做。

细说:

  首先,窄依赖容许在单个集群节点上流水线式执行,这个节点能够计算全部父级分区。例如,能够逐个元素地依次执行filter操做和map操做。相反,宽依赖须要全部的父RDD数据可用而且数据已经经过类MapReduce的操做shuffle完成。 
  其次,在窄依赖中,节点失败后的恢复更加高效。由于只有丢失的父级分区须要从新计算,而且这些丢失的父级分区能够并行地在不一样节点上从新计算。与此相反,在宽依赖的继承关系中,单个失败的节点可能致使一个RDD的全部先祖RDD中的一些分区丢失,致使计算的从新执行。

 

// Map: "cat" -> c, cat
val rdd1 = rdd.Map(x => (x.charAt(0), x))
// groupby same key and count
val rdd2 = rdd1.groupBy(x => x._1).
                Map(x => (x._1, x._2.toList.length))

 

第一个 Map 操做将 RDD 里的各个元素进行映射, RDD 的各个数据元素之间不存在依赖,能够在集群的各个内存中独立计算,也就是并行化,第二个 groupby 以后的 Map 操做,为了计算相同 key 下的元素个数,须要把相同 key 的元素汇集到同一个 partition 下,因此形成了数据在内存中的从新分布,即 shuffle 操做.shuffle 操做是 spark 中最耗时的操做,应尽可能避免没必要要的 shuffle.

根据是否发生 shuffle 操做可以将其分红以下的 stage 类型

(join 须要针对同一个 key 合并,因此须要 shuffle) 
  运行到每一个 stage 的边界时,数据在父 stage 中按照 Task 写到磁盘上,而在子 stage 中经过网络从上一个 Task 中去读取数据。这些操做会致使很严重的网络传输以及磁盘的I/O,因此 stage 的边界是很是占资源的,在编写 Spark 程序的时候须要尽可能避免的 。父 stage 中 partition 个数与子 stage 的 partition 个数可能不一样,因此那些产生 stage 边界的 Transformation 经常须要接受一个 numPartition 的参数来以为子 stage 中的数据将被切分为多少个 partition。 
PS:shuffle 操做的时候能够用 combiner 压缩数据,减小 IO 的消耗

 

参考:那些年咱们对Spark RDD的理解

 

DataFrame是什么?
  在Spark中,DataFrame是一种以RDD为基础的分布式数据集,相似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及做用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提高运行时效率的目标。反观RDD,因为无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
更多参考

 

一:DataFrame建立

SparkSQL能够以其余RDD对象、parquet文件、json文件、hive表,以及经过JDBC链接到其余关系型数据库做为数据源来生成DataFrame对象。

1)jdbc

【读】

postgresUrl="jdbc:postgresql://127.0.0.1:5432/testdb" dimDF = sqlContext.read.format('jdbc').options(url=postgresUrl,dbtable=tableName,user="root",password="root") .load() dimDF.registerTempTable(tmpTableName)

 【写】

 
 
self.postgresURL = str(self.postgresIP) + ":" + str(self.postgresPort) + "/" + str(self.postgresDB)
self.postgresqlDatasource = {
"url" : "jdbc:postgresql://" + self.postgresURL,
"user" : self.postgresUser,
"password" : self.postgresPwd
}
resultDF.coalesce(int(partitionNum)).write.jdbc(url=postgresqlDatasource["url"], table=reportTable, mode='append', properties=postgresqlDatasource)

 

2)parquet

【读】

telematicFilePath = "/user/spark/test/telematic.parquet/key=" + handleRecordDateStr if( common.fileExist(telematicFilePath, self.sc) ): df = self.sqlContext.read.schema(TELEMATIC_PARQUET_SCHEMA).parquet(telematicFilePath).coalesce(int(self.partitionNum))
# schema for /user/spark/test/telematic.parquet
TELEMATIC_PARQUET_SCHEMA = SQLType.StructType([
  SQLType.StructField('dm_transct_date_hr_key', SQLType.LongType(), True), SQLType.StructField('dm_vehicle_dim_key', SQLType.IntegerType(), True), SQLType.StructField('dm_driver_dim_key', SQLType.IntegerType(), True), SQLType.StructField('dm_company_dim_key', SQLType.IntegerType(), True), SQLType.StructField('deviceId', SQLType.StringType(), True), SQLType.StructField('companyId', SQLType.StringType(), True)])

 【写】

df.write.parquet(parquetPath, mode="overwrite")

 

3)json 

df = sqlContext.read.json(path)

 

4)list列表

dataList = resultDF.collect()
resultDF = self.sqlContext.createDataFrame(dataList)

 

5)Rdd

if rddSchema is None:
    df = sqlContext.createDataFrame(rdd)
else:
    df = sqlContext.createDataFrame(rdd, rddSchema)
rdd = sc.parallelize(resultList)
df = self.sqlContext.createDataFrame(rdd)

 

二:Transform操做

三:Action操做

一、 collect() ,返回一个数组,包括dataframe集合全部的行

df = sqlContext.createDataFrame(parquetRecordList, PARQUET_FILE_SCHEMA) for key in df.rdd.map(lambda x: x["key"]).distinct().collect(): filePath = "/user/spark/test.parquet/key=20171110" df.filter("key="+str(key)).drop("key").write.parquet(filePath, mode="append")

 

二、 collectAsList() 返回值是一个java类型的数组,返回dataframe集合全部的行
三、 count() 返回一个number类型的,返回dataframe集合的行数
四、 toJson
五、 first() 返回第一行 ,类型是row类型
六、 head() 返回第一行 ,类型是row类型
七、 head(n:Int)返回n行  ,类型是row 类型
八、 show()返回dataframe集合的值 默认是20行,返回类型是unit
九、 show(n:Int)返回n行,,返回值类型是unit
十、table(n:Int) 返回n行  ,类型是row 类型

 

dataframe的基本操做
一、 cache()同步数据的内存

 

data = self.sqlContext.sql(queryStr).toJSON().cache().collect()


二、 columns 返回一个string类型的数组,返回值是全部列的名字
三、 dtypes返回一个string类型的二维数组,返回值是全部列的名字以及类型
四、 explan()打印执行计划  物理的
五、 toJSON 转换为json格式数据
六、 isLocal 返回值是Boolean类型,若是容许模式是local返回true 不然返回false
七、 persist(newlevel:StorageLevel) 返回一个dataframe.this.type 输入存储模型类型

稍后详解
八、 printSchema() 打印出字段名称和类型 按照树状结构来打印
九、 registerTempTable(tablename:String) 返回Unit ,将df的对象只放在一张表里面,这个表随着对象的删除而删除了
十、 schema 返回structType 类型,将字段名称和类型按照结构体类型返回
十一、 toDF()返回一个新的dataframe类型的
十二、 toDF(colnames:String*)将参数中的几个字段返回一个新的dataframe类型的,
1三、 unpersist() 返回dataframe.this.type 类型,去除模式中的数据
1四、 unpersist(blocking:Boolean)返回dataframe.this.type类型 true 和unpersist是同样的做用false 是去除RDD

 

集成查询:
一、 agg(expers:column*) 返回dataframe类型 ,按每一个device分组查最小时间

df = sqlContext.createDataFrame(tensRdd)
resultDF = df.groupBy("device_id").agg({RegularDataEtlConstants.TIME: 'min'})
resultDF.repartition(self._partitionNum).foreachPartition(lambda iterator: self.__saveToHBase(iterator))

 

startTime = df.filter((df.startTime != "") & (df.startTime >= minStartTimeCurrent)).agg({"startTime": "min"}).collect()[0][0]


四、 apply(colName: String) 返回column类型,捕获输入进去列的对象
五、 as(alias: String) 返回一个新的dataframe类型,就是原来的一个别名
六、 col(colName: String)  返回column类型,捕获输入进去列的对象
七、 cube(col1: String, cols: String*) 返回一个GroupedData类型,根据某些字段来汇总
八、 distinct 去重 返回一个dataframe类型
九、 drop(col: Column) 删除某列 返回dataframe类型

 

columnList = ['key', 'type', 'timestamp', 'data']
df = sqlContext.createDataFrame(dataList[index], columnList)
for key in df.rdd.map(lambda x: x["key"]).distinct().collect():
   parquetPath = parquetList[index] + "/key=" + str(key)
   df.filter("key="+str(key)).drop("key").write.parquet(parquetPath, mode="append", partitionBy="type")


十、 dropDuplicates(colNames: Array[String]) 删除相同的列 返回一个dataframe
十一、 except(other: DataFrame) 返回一个dataframe,返回在当前集合存在的在其余集合不存在的


十二、 explode[A, B](inputColumn: String, outputColumn: String)行转列

根据c3字段中的空格将字段内容进行分割,分割的内容存储在新的字段c3_中
jdbcDF.explode( "c3" , "c3_" ){time: String => time.split( " " )}

 


1三、 filter(conditionExpr: String): 刷选部分数据,返回dataframe类型

df.filter("age>10").show(); 
df.filter(df("age")>10).show();  
df.where(df("age")>10).show();

 

1四、 groupBy(col1: String, cols: String*) 分组  

dfgroupBy("age").avg().show();
1五、 intersect(other: DataFrame) 返回一个dataframe,在2个dataframe都存在的元素
1六、 join(right: DataFrame, joinExprs: Column, joinType: String)
一个是关联的dataframe,第二个关联的条件,第三个关联的类型:inner, outer, left_outer, right_outer, leftsemi
df.join(ds,df("name")===ds("name") and  df("age")===ds("age"),"outer").show();
1七、 limit(n: Int) 返回dataframe类型  去n 条数据出来
1八、 na: DataFrameNaFunctions ,能够调用dataframenafunctions的功能区作过滤 df.na.drop().show(); 删除为空的行
1九、 orderBy(sortExprs: Column*) 作alise排序
20、 select(cols:string*) dataframe 作字段的刷选 df.select($"colA", $"colB" + 1)
2一、 selectExpr(exprs: String*) 作字段的刷选 df.selectExpr("name","name as names","upper(name)","age+1").show();
2二、 sort(sortExprs: Column*) 排序 df.sort(df("age").desc).show(); 默认是asc
2三、 unionAll(other:Dataframe) 合并 

 

df = df.unionAll(dfTemp).coalesce(int(self.partitionNum))


2四、 withColumnRenamed(existingName: String, newName: String) 修改列表 df.withColumnRenamed("name","names").show();
2五、 withColumn(colName: String, col: Column) 增长一列

df中新增一个名为aa的列,值与列name的同样

 

df.withColumn("aa",df("name")).show(); 
将该列时间值计算加上时区偏移值 mergeDF
= mergeDF.withColumn("dm_transct_date_hr_key", functions.lit(self.__datehandle(mergeDF["dm_transct_date_hr_key"], self.timezoneOffset)))

 

Spark-SQL之DataFrame操做大全

http://blog.csdn.net/mtj66/article/details/52064827

相关文章
相关标签/搜索