spark sql 读取数据库并保存

package quality.db

import java.util
import java.util.{Locale, Properties, ResourceBundle}

import common.qualityUtils.QualityUtils
import org.apache.spark.sql.{Column, DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import scala.collection.JavaConversions._
import scala.io.Source

object DBQuality {

  val rb = ResourceBundle.getBundle("application", Locale.getDefault)
  val warehouseLocation = "D:/kingbaseDown/nihaoshijie/sparkData/spark-warehouse"   def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .master("local[2]")
      .appName("Spark SQL Quality of Database")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .getOrCreate()
    runBaseQualitySql(spark)
    spark.stop()
  }
  case class Oreder(var id:String ,var name:String)


  private def runBaseQualitySql(spark: SparkSession): Unit = {
    val json = Source.fromFile("C:\\Users\\Administrator\\Desktop\\json.txt", "GBK").mkString;
    val map = QualityUtils.getDataBaseMap(json)
    val jdbcDF = spark.read.format("jdbc").options(map).load()
    jdbcDF.createOrReplaceTempView(map("dbtable"))
    val sqlMap = QualityUtils.getSql(json)
    val flag = QualityUtils.getIncFlag(json)
    var last_val = ""  for ((key, value) <- sqlMap) {
      var result: util.List[String] = null  if(flag){
        if("".equals(last_val)){

        }
        val df = spark.sql(value)
        val array = df.columns
        val headDF = df.select(array(0))
        val lastDF = headDF.orderBy(headDF(array(0)).desc).take(1)
        if(!lastDF.isEmpty){
          last_val = lastDF(0)(0).toString
        }
        val seq: Seq[String] = array.drop(1)
        result = df.select(seq.head,seq.tail:_*).toJSON.collectAsList()
        val javardd = df.select(seq.head,seq.tail:_*).toJavaRDD
   

      }else{
        result = spark.sql(value).toJSON.collectAsList()
      }
      if( result== null || result.isEmpty){
        return  }
      val schema = StructType(
        List(
          StructField("id", IntegerType, true),
          StructField("name", StringType, true)
        )
      )
      val rdd = spark.sparkContext.makeRDD(Array(Row(1, result.toString)))
      val DF = spark.createDataFrame(rdd, schema)

      val prop = new Properties()
      prop.setProperty("user", rb.getString("mysql.username"))
      prop.setProperty("password", rb.getString("mysql.password"))
      prop.setProperty("driver", rb.getString("mysql.driver"))
      DF.write.format("jdbc").mode(SaveMode.Append).jdbc(rb.getString("mysql.url"), "spark_test1", prop)}
  }
}