A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, partitioned collection of elements that can be operated on in parallel. This class contains the basic operations available on all RDDs, such as map
, filter
, and persist
. In addition,[[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such as groupByKey
and join
;[[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of Doubles;[[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can be saved as SequenceFiles.
All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)]) through implicit.
//任何正确类型的RDD 都可经过隐式转换自动得到全部操做。
Internally, each RDD is characterized by five main properties:mysql
Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)sql
All of the scheduling and execution in Spark is done based on these methods, allowing each RDD to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for reading data from a new storage system) by overriding these functions. 数据库
1.若是想往hdfs中写入一个文件,可是若是这个文件已经存在,则会报错。
2.触发Action时才会造成一个完整的DAG,而后须要提交到集群上执行。
3.任务在提交到集群以前,须要作一些准备工做,这些准备工做都是在Driver端进行的。准备工做包括:apache
4.RDD之间存在着依赖关系,依赖关系有两种网络
6.广播变量并发
7.若将数据collect到Driver端,再写到MySQL中,不是最佳选择,理由以下:
01,浪费网络带宽,
02,不可以并发地写到数据库
8.result.foreach(data =>{
//不能每写一条,就建立一个JDBC链接,致使消耗很大资源
//val conn : Connection = DriverManager.getConnection(“……”)
})
9.正确的操做应该是,一次拿出来一个分区,分区用迭代器引用。即每写一个分区,才会建立一个链接,这样会大大减小资源的消耗。
result.foreachPartition(part=>{
dataToMysql(part)
})ide
def dataToMysql(part:Iterator[(String,Int)]={
val conn : Connection = DriverManager.getConnection(“jdbc:mysql://localhost:3306/……charactorEncoding=utf8”,”root”,”“)//再传入用户名,密码
val prepareStatement = conn.prepareStatement(“Insert into access_log (province,counts)
values (?,?)”)函数
//写入数据 part.foreach(data =>{ prepareStatement.setString(1,data._1) prepareStatement.setString(2,data._2) prepareStatement.executeUpdate() }) prepareStatement.close() conn.close()
}学习