package quality.db import java.util import java.util.{Locale, Properties, ResourceBundle} import common.qualityUtils.QualityUtils import org.apache.spark.sql.{Column, DataFrame, Row, SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import scala.collection.JavaConversions._ import scala.io.Source object DBQuality { val rb = ResourceBundle.getBundle("application", Locale.getDefault) val warehouseLocation = "D:/kingbaseDown/nihaoshijie/sparkData/spark-warehouse" def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .master("local[2]") .appName("Spark SQL Quality of Database") .config("spark.sql.warehouse.dir", warehouseLocation) .getOrCreate() runBaseQualitySql(spark) spark.stop() } case class Oreder(var id:String ,var name:String) private def runBaseQualitySql(spark: SparkSession): Unit = { val json = Source.fromFile("C:\\Users\\Administrator\\Desktop\\json.txt", "GBK").mkString; val map = QualityUtils.getDataBaseMap(json) val jdbcDF = spark.read.format("jdbc").options(map).load() jdbcDF.createOrReplaceTempView(map("dbtable")) val sqlMap = QualityUtils.getSql(json) val flag = QualityUtils.getIncFlag(json) var last_val = "" for ((key, value) <- sqlMap) { var result: util.List[String] = null if(flag){ if("".equals(last_val)){ } val df = spark.sql(value) val array = df.columns val headDF = df.select(array(0)) val lastDF = headDF.orderBy(headDF(array(0)).desc).take(1) if(!lastDF.isEmpty){ last_val = lastDF(0)(0).toString } val seq: Seq[String] = array.drop(1) result = df.select(seq.head,seq.tail:_*).toJSON.collectAsList() val javardd = df.select(seq.head,seq.tail:_*).toJavaRDD }else{ result = spark.sql(value).toJSON.collectAsList() } if( result== null || result.isEmpty){ return } val schema = StructType( List( StructField("id", IntegerType, true), StructField("name", StringType, true) ) ) val rdd = spark.sparkContext.makeRDD(Array(Row(1, result.toString))) val DF = spark.createDataFrame(rdd, schema) val prop = new Properties() prop.setProperty("user", rb.getString("mysql.username")) prop.setProperty("password", rb.getString("mysql.password")) prop.setProperty("driver", rb.getString("mysql.driver")) DF.write.format("jdbc").mode(SaveMode.Append).jdbc(rb.getString("mysql.url"), "spark_test1", prop)} } }