从Python爬虫到Spark预处理数据的真实需求[五](Spark)

絮叨两句:
博主是一名软件工程系的在校生,利用博客记录本身所学的知识,也但愿能帮助到正在学习的同窗们
人的一辈子中会遇到各类各样的困难和折磨,逃避是解决不了问题的,惟有以乐观的精神去迎接生活的挑战
少年易老学难成,一寸光阴不可轻。
最喜欢的一句话:今日事,今日毕java


博主刚刚接触爬虫,有什么不足之处请你们谅解,也但愿能指导一下

系列文章目录

从Python爬虫到Spark预处理数据的真实需求[一]
从Python爬虫到Spark预处理数据的真实需求[二]
从Python爬虫到Spark预处理数据的真实需求[三]
从Python爬虫到Spark预处理数据的真实需求[四]
从Python爬虫到Spark预处理数据的真实需求[五]mysql



前言

这一章是使用Spark对数据进行处理web


提示:如下是本篇文章正文内容,下面案例可供参考sql

数据

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
将相同几张表的相同字段做为单个字段,不一样字段的做为一个json格式放入一个字段中apache

代码

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>SparkConnectMysql</artifactId>
    <version>1.0-SNAPSHOT</version>
    <!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 -->
    <repositories>
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
        <repository>
            <id>jboss</id>
            <url>http://repository.jboss.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

    <properties>
<!--        111-->
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
        <hadoop.version>2.7.4</hadoop.version>
        <spark.version>2.2.0</spark.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive-thriftserver_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <!-- <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
             <version>${spark.version}</version>
         </dependency>-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <!--<dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0-mr1-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.2.0-cdh5.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.2.0-cdh5.14.0</version>
        </dependency>-->

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.7.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.3.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.38</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
            <!-- 指定编译java的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
            </plugin>
            <!-- 指定编译scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include> <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project> 

建立一个样例类

case class TRulet(id:Int,govid:String,shortname:String,brand:String,govurl:String,price:String,name:String,picurl:String,description:String)

读取数据进行处理

火花塞

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object ReadMysqlToMysqlHHS {
  def setdata_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
    }else{
      rulet= "\""+data+"\""
    }
    return rulet
  }
  def setdata_NULL_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
      return rulet
    }else{
      return data
    }
  }
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate()
    val sparkContext = spark.sparkContext
    sparkContext.setLogLevel("WARN")
    import  spark.implicits._
    //引入java 和scala相互转换
    import scala.collection.JavaConverters._
    //引入sparkSQL的内置函数
    import org.apache.spark.sql.functions._
    val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"
    var table="xxuan_car_jd_hhs_product"
    var properties = new Properties()
    properties.put("user","root")
    properties.put("password","root")
    val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)
    val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'url, 'commodity_Name, 'image, 'sales, 'material, 'type,
      'ArticleNumbera, 'GrossWeight)
    val ss: List[TRulet] = ALLDF.map(row => {
      var skuid = row.getAs("skuid").toString
      skuid=setdata_NULL_args(skuid)
      var name = row.getAs("name").toString
      name=setdata_NULL_args(name)
      var brand = row.getAs("brand").toString
      brand=setdata_NULL_args(brand)
      var url = row.getAs("url").toString
      url=setdata_NULL_args(url)
      var price = row.getAs("price").toString
      price=setdata_NULL_args(price)
      var commodity_Name = row.getAs("commodity_Name").toString
      commodity_Name=setdata_NULL_args(commodity_Name)
      var image = row.getAs("image").toString
      image=setdata_NULL_args(image)

      var sales = row.getAs("sales").toString
      sales=setdata_args(sales)

      var material = row.getAs("material").toString
      material=setdata_args(material)

      var types = row.getAs("type").toString
      types=setdata_args(types)

      var ArticleNumbera = row.getAs("ArticleNumbera").toString
      ArticleNumbera=setdata_args(ArticleNumbera)

      var GrossWeight = row.getAs("GrossWeight").toString
      GrossWeight=setdata_args(GrossWeight)

      var all_Josn="{\"sales\":"+sales+","+"\"material\":"+material+","+"\"types\":"+types+","+"\"ArticleNumbera\":"+ArticleNumbera+","+
        "\"GrossWeight\":"+GrossWeight+"}"

      TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)
    }).collectAsList().asScala.toList
    val result_Json = ss.toDF()
    result_Json.show()
    val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)
    result_Json.show()

  }
}

机油

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object ReadMysqlToMysqlJY {
  def setdata_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
    }else{
      rulet= "\""+data+"\""
    }
    return rulet
  }
  def setdata_NULL_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
      return rulet
    }else{
      return data
    }
  }
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate()
    val sparkContext = spark.sparkContext
    sparkContext.setLogLevel("WARN")
    import  spark.implicits._
    //引入java 和scala相互转换
    import scala.collection.JavaConverters._
    //引入sparkSQL的内置函数
    import org.apache.spark.sql.functions._
    val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"
    var table="xxuan_car_jd_mobil_product"
    var properties = new Properties()
    properties.put("user","root")
    properties.put("password","root")
    val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)
    val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'type, 'url, 'originplace, 'netweight, 'commodity_Name, 'image, 'viscosity, 'volume)
    val ss: List[TRulet] = ALLDF.map(row => {
      var skuid = row.getAs("skuid").toString
      skuid=setdata_NULL_args(skuid)
      var name = row.getAs("name").toString
      name=setdata_NULL_args(name)
      var brand = row.getAs("brand").toString
      brand=setdata_NULL_args(brand)
      var url = row.getAs("url").toString
      url=setdata_NULL_args(url)
      var price = row.getAs("price").toString
      price=setdata_NULL_args(price)
      var commodity_Name = row.getAs("commodity_Name").toString
      commodity_Name=setdata_NULL_args(commodity_Name)
      var image = row.getAs("image").toString
      image=setdata_NULL_args(image)

      var types = row.getAs("type").toString
      types=setdata_args(types)

      var originplace = row.getAs("originplace").toString
      originplace=setdata_args(originplace)

      var netweight = row.getAs("netweight").toString
      netweight=setdata_args(netweight)

      var viscosity = row.getAs("viscosity").toString
      viscosity=setdata_args(viscosity)

      var volume = row.getAs("volume").toString
      volume=setdata_args(volume)

      var all_Josn="{\"types\":"+types+","+"\"originplace\":"+originplace+","+"\"netweight\":"+netweight+","+"\"viscosity\":"+viscosity+","+
        "\"volume\":"+volume+"}"

      TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)
    }).collectAsList().asScala.toList
    val result_Json = ss.toDF()
    result_Json.show()
    val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)

  }
}

轮胎

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object ReadMysqlToMysqlLT {
  def setdata_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
    }else{
      rulet= "\""+data+"\""
    }
    return rulet
  }
  def setdata_NULL_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
      return rulet
    }else{
      return data
    }
  }
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").appName("Mysql").config("spark.driver.memory", "6g").getOrCreate()
    val sparkContext = spark.sparkContext
    sparkContext.setLogLevel("WARN")

    import  spark.implicits._
    //引入java 和scala相互转换
    import scala.collection.JavaConverters._
    //引入sparkSQL的内置函数
    import org.apache.spark.sql.functions._
    val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"
    var table="xxuan_car_jd_lt_product"
    var properties = new Properties()
    properties.put("user","root")
    properties.put("password","root")
    val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)
    val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'url, 'price, 'commodity_Name, 'image, 'netweight, 'originplace,
      'size,
      'width, 'number, 'performance, 'Flattening, 'characteristics, 'type)
    val ss: List[TRulet] = ALLDF.map(row => {
      var skuid = row.getAs("skuid").toString
      skuid=setdata_NULL_args(skuid)
      var name = row.getAs("name").toString
      name=setdata_NULL_args(name)
      var brand = row.getAs("brand").toString
      brand=setdata_NULL_args(brand)
      var url = row.getAs("url").toString
      url=setdata_NULL_args(url)
      var price = row.getAs("price").toString
      price=setdata_NULL_args(price)
      var commodity_Name = row.getAs("commodity_Name").toString
      commodity_Name=setdata_NULL_args(commodity_Name)
      var image = row.getAs("image").toString
      image=setdata_NULL_args(image)

      var netweight = row.getAs("netweight").toString
      netweight=setdata_args(netweight)

      var originplace = row.getAs("originplace").toString
      originplace=setdata_args(originplace)

      var size = row.getAs("size").toString
      size=setdata_args(size)

      var width = row.getAs("width").toString
      width=setdata_args(width)

      var number = row.getAs("number").toString
      number=setdata_args(number)

      var performance = row.getAs("performance").toString
      performance=setdata_args(performance)

      var Flattening = row.getAs("Flattening").toString
      Flattening=setdata_args(Flattening)

      var characteristics = row.getAs("characteristics").toString
      characteristics=setdata_args(characteristics)

      var types = row.getAs("type").toString
      types=setdata_args(types)
      var all_Josn="{\"netweight\":"+netweight+","+"\"originplace\":"+originplace+","+"\"size\":"+size+","+"\"width\":"+width+","+
        "\"number\":"+number+","+"\"performance\":"+performance+","+"\"Flattening\":"+Flattening+","+"\"characteristics\":"+characteristics+","+"\"types\":"+types+"}"

      TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)
    }).collectAsList().asScala.toList
    val result_Json = ss.toDF()
    result_Json.show()
    val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)

  }
}

刹车片

import java.util.Properties

import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

object ReadMysqlToMysqlSCP {
  def setdata_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
    }else{
      rulet= "\""+data+"\""
    }
    return rulet
  }
  def setdata_NULL_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
      return rulet
    }else{
      return data
    }
  }
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate()
    val sparkContext = spark.sparkContext
    sparkContext.setLogLevel("WARN")
    import  spark.implicits._
    //引入java 和scala相互转换
    import scala.collection.JavaConverters._
    //引入sparkSQL的内置函数
    import org.apache.spark.sql.functions._
    val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"
    var table="xxuan_car_jd_scp_product"
    var properties = new Properties()
    properties.put("user","root")
    properties.put("password","root")
    val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)
    val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'url, 'commodity_Name, 'image, 'ArticleNumber, 'boiling, 'package,
      'GrossWeight, 'CommodityOrigin, 'process, 'Installation, 'type, 'texture)
    val ss: List[TRulet] = ALLDF.map(row => {
      var skuid = row.getAs("skuid").toString
      skuid=setdata_NULL_args(skuid)
      var name = row.getAs("name").toString
      name=setdata_NULL_args(name)
      var brand = row.getAs("brand").toString
      brand=setdata_NULL_args(brand)
      var url = row.getAs("url").toString
      url=setdata_NULL_args(url)
      var price = row.getAs("price").toString
      price=setdata_NULL_args(price)
      var commodity_Name = row.getAs("commodity_Name").toString
      commodity_Name=setdata_NULL_args(commodity_Name)
      var image = row.getAs("image").toString
      image=setdata_NULL_args(image)

      var ArticleNumber = row.getAs("ArticleNumber").toString
      ArticleNumber=setdata_args(ArticleNumber)

      var boiling = row.getAs("boiling").toString
      boiling=setdata_args(boiling)

      var packages = row.getAs("package").toString
      packages=setdata_args(packages)

      var GrossWeight = row.getAs("GrossWeight").toString
      GrossWeight=setdata_args(GrossWeight)

      var CommodityOrigin = row.getAs("CommodityOrigin").toString
      CommodityOrigin=setdata_args(CommodityOrigin)

      var process = row.getAs("process").toString
      process=setdata_args(process)

      var Installation = row.getAs("Installation").toString
      Installation=setdata_args(Installation)

      var types = row.getAs("type").toString
      types=setdata_args(types)

      var texture = row.getAs("texture").toString
      texture=setdata_args(texture)


      var all_Josn="{\"ArticleNumber\":"+ArticleNumber+","+"\"boiling\":"+boiling+","+"\"packages\":"+packages+","+"\"GrossWeight\":"+GrossWeight+","+
        "\"CommodityOrigin\":"+CommodityOrigin+","+"\"process\":"+process+","+"\"Installation\":"+Installation+","+"\"type\":"+types+","+"\"texture\":"+texture+"}"

      TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)
    }).collectAsList().asScala.toList
    val result_Json = ss.toDF()
    result_Json.show()
    val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)

  }
}

添加剂

import java.util
import java.util.Properties

import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}

object ReadMysqlToMysqlTJJ {
  def setdata_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
    }else{
      rulet= "\""+data+"\""
    }
    return rulet
  }
  def setdata_NULL_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
    return rulet
    }else{
      return data
    }
  }
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate()
    val sparkContext = spark.sparkContext
    sparkContext.setLogLevel("WARN")
    import  spark.implicits._
    //引入java 和scala相互转换
    import scala.collection.JavaConverters._
    //引入sparkSQL的内置函数
    import org.apache.spark.sql.functions._
    val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"
    var table="xxuan_car_jd_tjj_product"
    var properties = new Properties()
    properties.put("user","root")
    properties.put("password","root")
    val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)
    val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'url, 'commodity_Name, 'image, 'Additivetype, 'TypesOfAdditives, 'NetContent, 'ArticleNumber,'GrossWeight, 'CommodityOrigin)
    val ss: List[TRulet] = ALLDF.map(row => {
      var skuid = row.getAs("skuid").toString
      skuid=setdata_NULL_args(skuid)
      var name = row.getAs("name").toString
      name=setdata_NULL_args(name)
      var brand = row.getAs("brand").toString
      brand=setdata_NULL_args(brand)
      var url = row.getAs("url").toString
      url=setdata_NULL_args(url)
      var price = row.getAs("price").toString
      price=setdata_NULL_args(price)
      var commodity_Name = row.getAs("commodity_Name").toString
      commodity_Name=setdata_NULL_args(commodity_Name)
      var image = row.getAs("image").toString
      image=setdata_NULL_args(image)

      var Additivetype = row.getAs("Additivetype").toString
      Additivetype=setdata_args(Additivetype)

      var TypesOfAdditives = row.getAs("TypesOfAdditives").toString
      TypesOfAdditives=setdata_args(TypesOfAdditives)

      var NetContent = row.getAs("NetContent").toString
      NetContent=setdata_args(NetContent)

      var ArticleNumber = row.getAs("ArticleNumber").toString
      ArticleNumber=setdata_args(ArticleNumber)

      var GrossWeight = row.getAs("GrossWeight").toString
      GrossWeight=setdata_args(GrossWeight)

      var CommodityOrigin = row.getAs("CommodityOrigin").toString
      CommodityOrigin=setdata_args(CommodityOrigin)


      var all_Josn="{\"Additivetype\":"+Additivetype+","+"\"TypesOfAdditives\":"+TypesOfAdditives+","+"\"NetContent\":"+NetContent+","+"\"ArticleNumber\":"+ArticleNumber+","+
      "\"GrossWeight\":"+GrossWeight+","+"\"CommodityOrigin\":"+CommodityOrigin+"}"

      TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)
    }).collectAsList().asScala.toList
    val result_Json = ss.toDF()
    result_Json.show()
    val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)

  }
}

原厂件

import java.util
import java.util.Properties

import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession}

object ReadMysqlToMysqlYCJ {
  def setdata_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
    }else{
      rulet= "\""+data+"\""
    }
    return rulet
  }
  def setdata_NULL_args( data:String): String ={
    var rulet=""
    if (data=="NULL"){
      rulet="\"\""
    return rulet
    }else{
      return data
    }
  }
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate()
    val sparkContext = spark.sparkContext
    sparkContext.setLogLevel("WARN")
    import  spark.implicits._
    //引入java 和scala相互转换
    import scala.collection.JavaConverters._
    //引入sparkSQL的内置函数
    import org.apache.spark.sql.functions._
    val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei"
    var table="xxuan_car_jd_ycj_product"
    var properties = new Properties()
    properties.put("user","root")
    properties.put("password","root")
    val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties)
    val ALLDF: DataFrame = mysqlCooenct.select( 'skuid, 'name, 'brand, 'url, 'price, 'commodity_Name, 'image,'freezing, 'originplace, 'netweight, 'commodity_Name, 'category, 'package, 'boiling, 'sales, 'installation, 'transmission)
    val ss: List[TRulet] = ALLDF.map(row => {
      var skuid = row.getAs("skuid").toString
      skuid=setdata_NULL_args(skuid)

      var name = row.getAs("name").toString
      name=setdata_NULL_args(name)

      var brand = row.getAs("brand").toString
      brand=setdata_NULL_args(brand)

      var url = row.getAs("url").toString
      url=setdata_NULL_args(url)

      var price = row.getAs("price").toString
      price=setdata_NULL_args(price)

      var commodity_Name = row.getAs("commodity_Name").toString
      commodity_Name=setdata_NULL_args(commodity_Name)

      var image = row.getAs("image").toString
      image=setdata_NULL_args(image)

      var freezing = row.getAs("freezing").toString
      freezing=setdata_args(freezing)

      var originplace = row.getAs("originplace").toString
      originplace=setdata_args(originplace)

      var netweight = row.getAs("netweight").toString
      netweight=setdata_args(netweight)

      var category = row.getAs("category").toString
      category=setdata_args(category)

      var packages = row.getAs("package").toString
      packages=setdata_args(packages)

      var boiling = row.getAs("boiling").toString
      boiling=setdata_args(boiling)

      var sales = row.getAs("sales").toString
      sales=setdata_args(sales)

      var installation = row.getAs("installation").toString
      installation=setdata_args(installation)

      var transmission = row.getAs("transmission").toString
      transmission=setdata_args(transmission)

      var all_Josn="{\"freezing\":"+freezing+","+"\"originplace\":"+originplace+","+"\"netweight\":"+netweight+","+"\"category\":"+category+","+
      "\"packages\":"+packages+","+"\"boiling\":"+boiling+","+"\"sales\":"+sales+","+"\"installation\":"+installation+","+"\"transmission\":"+transmission+"}"

      TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn)
    }).collectAsList().asScala.toList
    val result_Json = ss.toDF()
    result_Json.show()
// val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties)

  }
}

总结

完美结束 谢谢你们的阅读,有什么须要帮忙的随时私信我