最近作一个oracle项目迁移工做,跟着spark架构师学着作,进行一些方法的总结。mysql
一、首先,建立SparkSession对象(老版本为sparkContext)sql
val session = SparkSession.builder().appName("app1").getOrCreate()数据库
二、数据的更新时间配置表,选用mysql,就是说每次结果数据计算写入mysql后,还会将这次数据的更新时间写入数据配置表。 那么在代码里,须要建立配置表的case class,配置与构造数据库schema信息,url,用户名密码等,随后根据配置表中的不一样app进行数据的过滤。缓存
val appId = "1"session
case class DBInformation(url:Stirng,schema:String,user:String,passwd:String)架构
val mysqlDB = DBInformation("jdbc:mysql://....",schema,user,passowrd)oracle
val tableName = mysqlDB.schema + "." + nameapp
val props = new Properties()函数
props.setProperty("user",mysqlDB.user)ui
props.setProperty("password",mysqlDB.passwd)
props.setProperty(JDBCOptions.JDBC_DRIVER_CLASS,"com.mysql.jdbc.Driver")
val record = session.read.jdbc(mysqlDB.url,tableName,props).filter(row => row.getAs[Int]("app_id") == appId).take(1)
//第一次写入,木有数据
if(0 == record.size){
DBInfoMation(null,null,null)
}else{
DBInfoMation(record(0).getTimestmap(1),recode(0).getTimestamp(2),recode(0)..getTimestamp(3))
三、注册UDF,因为原来是用oracle的语法,现现在转为sparksql,须要注册一些UDF,来兼容原有oracle的函数
def registerUDF(session:SparkSession) : Unit = {
session.udf.register("UDF",(value : String,modifieds:Array[String) => {
val filter = modifieds.filter(_!=null)
if(!filter.isEmpty){
filter.max
}else{
null
}
})
{
四、不少计算是须要过往的历史数据的,在第一次初始化的时候,先对历史数据进行缓存。这里有个知识点,会将一直计算的同步数据进行checkPoint落地磁盘,若是发现历史时间在同步时间以后,则加载历史数据,不然就加载同步数据。
val (updateTime,initData) = if(historyTime.after(syncTime)){
(historyTime,initFromHistory(tableName))
} else {
(syncTime,initFromCheckPoint(syncTime))
}
//记录schema
schema = initData.schema
//baseData为缓存在内存的数据,并根据数据量进行repartition
baseData = initData.repartition(numPartitions,_partitionColumns.map(new Column()):_*).rdd.persisit(storageLevel)
//触发action动做
baseData.foreach(_=>Unit)
五、有一种状况,下游三个表要关联生成一张大表,这三张表的数据来源于消息中间件中的三个topic,可是数据可能不是同时到来,那么就须要将历史加载的大表拆根据ID拆分为三个小表,而后逐个append到三个小表上,随后再根据ID关联起来,再组成最终表。
val table1 = new createUpdatingTable(session,"tableName1",topicConf,numPartitons,...)
val table2 = new createUpdatingTable (session,"tableName2",topicConf1,numPartitions,...)
val table3 = new createUpdatingTable(session,"tableName3","topicConf2,numPartitions,...)
val mergeBaseTable = (session,"mergeTableName",Array(table1,table2,table3),finallyColumn,finallyPartitions...)
mergeBaseTable.updateAndGetData(Some(genDataFilter(currentTime)))
//三表拆分与合并
val tmpPartitionKey = "pd_code"
if(baseData != null) {
val oldData = getOldData(baseData,keyDF.rdd,tmpPartitionKey)
oldDf = session.createDataFrame(oldData,schema)
.repartition(numPartitions,new Column(tmpPartitionKey))
.persist(storageLevel)
}
val table1 = updateShardTable(oldDf,inDfs(0)...).sparksession.createDataFrame(data,schema)
val table2 = ....
val table3 = ....
六、三表key进行合并,经过sql进行三来源表合并
val keySet = keys.collect()
val broadcastKeys = session.sparkContext.broadCast(keySet)
baseData.mapPartitions({iter =>
val set = broadcastKey.value.toSet
iter.filter(row=>set.contains(row.getAs[Any](keyCol)))
},true)
val sql ="select a.column,b.column,c.column.... from table1 a left join table2 b on a.pd_code = b.pd_code......
val finallyTable = session.sql(sql)
七、从历史数据中筛选出这次须要更新的数据(经过ID进行过滤),随后将新数据进行append
val new Data = baseData.zipPartitions(updateData,true){case(liter,riter)=>
val rset = new mutable.HashSet[Any]
for(row <- riter){
rset.add(row.getAs[Any](keyCol))
}
liter.filter(row=>!rset.contains(row.getAs[Any](keyCol))))
}.zipPartitions(updateData,true){case (liter,riter)=>
liter++riter
}.persisit(storageLevel)