絮叨两句:
博主是一名软件工程系的在校生,利用博客记录本身所学的知识,也但愿能帮助到正在学习的同窗们
人的一辈子中会遇到各类各样的困难和折磨,逃避是解决不了问题的,惟有以乐观的精神去迎接生活的挑战
少年易老学难成,一寸光阴不可轻。
最喜欢的一句话:今日事,今日毕
java
从Python爬虫到Spark预处理数据的真实需求[一]
从Python爬虫到Spark预处理数据的真实需求[二]
从Python爬虫到Spark预处理数据的真实需求[三]
从Python爬虫到Spark预处理数据的真实需求[四]
从Python爬虫到Spark预处理数据的真实需求[五]mysql
这一章是使用Spark对数据进行处理web
提示:如下是本篇文章正文内容,下面案例可供参考sql
将相同几张表的相同字段做为单个字段,不一样字段的做为一个json格式放入一个字段中apache
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>SparkConnectMysql</artifactId> <version>1.0-SNAPSHOT</version> <!-- 指定仓库位置,依次为aliyun、cloudera和jboss仓库 --> <repositories> <repository> <id>aliyun</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> </repository> <repository> <id>cloudera</id> <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url> </repository> <repository> <id>jboss</id> <url>http://repository.jboss.com/nexus/content/groups/public</url> </repository> </repositories> <properties> <!-- 111--> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <encoding>UTF-8</encoding> <scala.version>2.11.8</scala.version> <scala.compat.version>2.11</scala.compat.version> <hadoop.version>2.7.4</hadoop.version> <spark.version>2.2.0</spark.version> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive-thriftserver_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency>--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql-kafka-0-10_2.11</artifactId> <version>${spark.version}</version> </dependency> <!--<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.6.0-mr1-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0-cdh5.14.0</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.2.0-cdh5.14.0</version> </dependency>--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.4</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>com.typesafe</groupId> <artifactId>config</artifactId> <version>1.3.3</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/test/java</testSourceDirectory> <plugins> <!-- 指定编译java的插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> </plugin> <!-- 指定编译scala的插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.18.1</version> <configuration> <useFile>false</useFile> <disableXmlReport>true</disableXmlReport> <includes> <include>**/*Test.*</include> <include>**/*Suite.*</include> </includes> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>2.3</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass></mainClass> </transformer> </transformers> </configuration> </execution> </executions> </plugin> </plugins> </build> </project>
case class TRulet(id:Int,govid:String,shortname:String,brand:String,govurl:String,price:String,name:String,picurl:String,description:String)
import java.util.Properties import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object ReadMysqlToMysqlHHS { def setdata_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" }else{ rulet= "\""+data+"\"" } return rulet } def setdata_NULL_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" return rulet }else{ return data } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate() val sparkContext = spark.sparkContext sparkContext.setLogLevel("WARN") import spark.implicits._ //引入java 和scala相互转换 import scala.collection.JavaConverters._ //引入sparkSQL的内置函数 import org.apache.spark.sql.functions._ val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei" var table="xxuan_car_jd_hhs_product" var properties = new Properties() properties.put("user","root") properties.put("password","root") val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties) val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'url, 'commodity_Name, 'image, 'sales, 'material, 'type, 'ArticleNumbera, 'GrossWeight) val ss: List[TRulet] = ALLDF.map(row => { var skuid = row.getAs("skuid").toString skuid=setdata_NULL_args(skuid) var name = row.getAs("name").toString name=setdata_NULL_args(name) var brand = row.getAs("brand").toString brand=setdata_NULL_args(brand) var url = row.getAs("url").toString url=setdata_NULL_args(url) var price = row.getAs("price").toString price=setdata_NULL_args(price) var commodity_Name = row.getAs("commodity_Name").toString commodity_Name=setdata_NULL_args(commodity_Name) var image = row.getAs("image").toString image=setdata_NULL_args(image) var sales = row.getAs("sales").toString sales=setdata_args(sales) var material = row.getAs("material").toString material=setdata_args(material) var types = row.getAs("type").toString types=setdata_args(types) var ArticleNumbera = row.getAs("ArticleNumbera").toString ArticleNumbera=setdata_args(ArticleNumbera) var GrossWeight = row.getAs("GrossWeight").toString GrossWeight=setdata_args(GrossWeight) var all_Josn="{\"sales\":"+sales+","+"\"material\":"+material+","+"\"types\":"+types+","+"\"ArticleNumbera\":"+ArticleNumbera+","+ "\"GrossWeight\":"+GrossWeight+"}" TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn) }).collectAsList().asScala.toList val result_Json = ss.toDF() result_Json.show() val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties) result_Json.show() } }
import java.util.Properties import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object ReadMysqlToMysqlJY { def setdata_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" }else{ rulet= "\""+data+"\"" } return rulet } def setdata_NULL_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" return rulet }else{ return data } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate() val sparkContext = spark.sparkContext sparkContext.setLogLevel("WARN") import spark.implicits._ //引入java 和scala相互转换 import scala.collection.JavaConverters._ //引入sparkSQL的内置函数 import org.apache.spark.sql.functions._ val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei" var table="xxuan_car_jd_mobil_product" var properties = new Properties() properties.put("user","root") properties.put("password","root") val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties) val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'type, 'url, 'originplace, 'netweight, 'commodity_Name, 'image, 'viscosity, 'volume) val ss: List[TRulet] = ALLDF.map(row => { var skuid = row.getAs("skuid").toString skuid=setdata_NULL_args(skuid) var name = row.getAs("name").toString name=setdata_NULL_args(name) var brand = row.getAs("brand").toString brand=setdata_NULL_args(brand) var url = row.getAs("url").toString url=setdata_NULL_args(url) var price = row.getAs("price").toString price=setdata_NULL_args(price) var commodity_Name = row.getAs("commodity_Name").toString commodity_Name=setdata_NULL_args(commodity_Name) var image = row.getAs("image").toString image=setdata_NULL_args(image) var types = row.getAs("type").toString types=setdata_args(types) var originplace = row.getAs("originplace").toString originplace=setdata_args(originplace) var netweight = row.getAs("netweight").toString netweight=setdata_args(netweight) var viscosity = row.getAs("viscosity").toString viscosity=setdata_args(viscosity) var volume = row.getAs("volume").toString volume=setdata_args(volume) var all_Josn="{\"types\":"+types+","+"\"originplace\":"+originplace+","+"\"netweight\":"+netweight+","+"\"viscosity\":"+viscosity+","+ "\"volume\":"+volume+"}" TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn) }).collectAsList().asScala.toList val result_Json = ss.toDF() result_Json.show() val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties) } }
import java.util.Properties import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object ReadMysqlToMysqlLT { def setdata_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" }else{ rulet= "\""+data+"\"" } return rulet } def setdata_NULL_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" return rulet }else{ return data } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName("Mysql").config("spark.driver.memory", "6g").getOrCreate() val sparkContext = spark.sparkContext sparkContext.setLogLevel("WARN") import spark.implicits._ //引入java 和scala相互转换 import scala.collection.JavaConverters._ //引入sparkSQL的内置函数 import org.apache.spark.sql.functions._ val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei" var table="xxuan_car_jd_lt_product" var properties = new Properties() properties.put("user","root") properties.put("password","root") val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties) val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'url, 'price, 'commodity_Name, 'image, 'netweight, 'originplace, 'size, 'width, 'number, 'performance, 'Flattening, 'characteristics, 'type) val ss: List[TRulet] = ALLDF.map(row => { var skuid = row.getAs("skuid").toString skuid=setdata_NULL_args(skuid) var name = row.getAs("name").toString name=setdata_NULL_args(name) var brand = row.getAs("brand").toString brand=setdata_NULL_args(brand) var url = row.getAs("url").toString url=setdata_NULL_args(url) var price = row.getAs("price").toString price=setdata_NULL_args(price) var commodity_Name = row.getAs("commodity_Name").toString commodity_Name=setdata_NULL_args(commodity_Name) var image = row.getAs("image").toString image=setdata_NULL_args(image) var netweight = row.getAs("netweight").toString netweight=setdata_args(netweight) var originplace = row.getAs("originplace").toString originplace=setdata_args(originplace) var size = row.getAs("size").toString size=setdata_args(size) var width = row.getAs("width").toString width=setdata_args(width) var number = row.getAs("number").toString number=setdata_args(number) var performance = row.getAs("performance").toString performance=setdata_args(performance) var Flattening = row.getAs("Flattening").toString Flattening=setdata_args(Flattening) var characteristics = row.getAs("characteristics").toString characteristics=setdata_args(characteristics) var types = row.getAs("type").toString types=setdata_args(types) var all_Josn="{\"netweight\":"+netweight+","+"\"originplace\":"+originplace+","+"\"size\":"+size+","+"\"width\":"+width+","+ "\"number\":"+number+","+"\"performance\":"+performance+","+"\"Flattening\":"+Flattening+","+"\"characteristics\":"+characteristics+","+"\"types\":"+types+"}" TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn) }).collectAsList().asScala.toList val result_Json = ss.toDF() result_Json.show() val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties) } }
import java.util.Properties import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object ReadMysqlToMysqlSCP { def setdata_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" }else{ rulet= "\""+data+"\"" } return rulet } def setdata_NULL_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" return rulet }else{ return data } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate() val sparkContext = spark.sparkContext sparkContext.setLogLevel("WARN") import spark.implicits._ //引入java 和scala相互转换 import scala.collection.JavaConverters._ //引入sparkSQL的内置函数 import org.apache.spark.sql.functions._ val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei" var table="xxuan_car_jd_scp_product" var properties = new Properties() properties.put("user","root") properties.put("password","root") val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties) val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'url, 'commodity_Name, 'image, 'ArticleNumber, 'boiling, 'package, 'GrossWeight, 'CommodityOrigin, 'process, 'Installation, 'type, 'texture) val ss: List[TRulet] = ALLDF.map(row => { var skuid = row.getAs("skuid").toString skuid=setdata_NULL_args(skuid) var name = row.getAs("name").toString name=setdata_NULL_args(name) var brand = row.getAs("brand").toString brand=setdata_NULL_args(brand) var url = row.getAs("url").toString url=setdata_NULL_args(url) var price = row.getAs("price").toString price=setdata_NULL_args(price) var commodity_Name = row.getAs("commodity_Name").toString commodity_Name=setdata_NULL_args(commodity_Name) var image = row.getAs("image").toString image=setdata_NULL_args(image) var ArticleNumber = row.getAs("ArticleNumber").toString ArticleNumber=setdata_args(ArticleNumber) var boiling = row.getAs("boiling").toString boiling=setdata_args(boiling) var packages = row.getAs("package").toString packages=setdata_args(packages) var GrossWeight = row.getAs("GrossWeight").toString GrossWeight=setdata_args(GrossWeight) var CommodityOrigin = row.getAs("CommodityOrigin").toString CommodityOrigin=setdata_args(CommodityOrigin) var process = row.getAs("process").toString process=setdata_args(process) var Installation = row.getAs("Installation").toString Installation=setdata_args(Installation) var types = row.getAs("type").toString types=setdata_args(types) var texture = row.getAs("texture").toString texture=setdata_args(texture) var all_Josn="{\"ArticleNumber\":"+ArticleNumber+","+"\"boiling\":"+boiling+","+"\"packages\":"+packages+","+"\"GrossWeight\":"+GrossWeight+","+ "\"CommodityOrigin\":"+CommodityOrigin+","+"\"process\":"+process+","+"\"Installation\":"+Installation+","+"\"type\":"+types+","+"\"texture\":"+texture+"}" TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn) }).collectAsList().asScala.toList val result_Json = ss.toDF() result_Json.show() val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties) } }
import java.util import java.util.Properties import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession} object ReadMysqlToMysqlTJJ { def setdata_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" }else{ rulet= "\""+data+"\"" } return rulet } def setdata_NULL_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" return rulet }else{ return data } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate() val sparkContext = spark.sparkContext sparkContext.setLogLevel("WARN") import spark.implicits._ //引入java 和scala相互转换 import scala.collection.JavaConverters._ //引入sparkSQL的内置函数 import org.apache.spark.sql.functions._ val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei" var table="xxuan_car_jd_tjj_product" var properties = new Properties() properties.put("user","root") properties.put("password","root") val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties) val ALLDF: DataFrame = mysqlCooenct.select('skuid, 'name, 'brand, 'price, 'url, 'commodity_Name, 'image, 'Additivetype, 'TypesOfAdditives, 'NetContent, 'ArticleNumber,'GrossWeight, 'CommodityOrigin) val ss: List[TRulet] = ALLDF.map(row => { var skuid = row.getAs("skuid").toString skuid=setdata_NULL_args(skuid) var name = row.getAs("name").toString name=setdata_NULL_args(name) var brand = row.getAs("brand").toString brand=setdata_NULL_args(brand) var url = row.getAs("url").toString url=setdata_NULL_args(url) var price = row.getAs("price").toString price=setdata_NULL_args(price) var commodity_Name = row.getAs("commodity_Name").toString commodity_Name=setdata_NULL_args(commodity_Name) var image = row.getAs("image").toString image=setdata_NULL_args(image) var Additivetype = row.getAs("Additivetype").toString Additivetype=setdata_args(Additivetype) var TypesOfAdditives = row.getAs("TypesOfAdditives").toString TypesOfAdditives=setdata_args(TypesOfAdditives) var NetContent = row.getAs("NetContent").toString NetContent=setdata_args(NetContent) var ArticleNumber = row.getAs("ArticleNumber").toString ArticleNumber=setdata_args(ArticleNumber) var GrossWeight = row.getAs("GrossWeight").toString GrossWeight=setdata_args(GrossWeight) var CommodityOrigin = row.getAs("CommodityOrigin").toString CommodityOrigin=setdata_args(CommodityOrigin) var all_Josn="{\"Additivetype\":"+Additivetype+","+"\"TypesOfAdditives\":"+TypesOfAdditives+","+"\"NetContent\":"+NetContent+","+"\"ArticleNumber\":"+ArticleNumber+","+ "\"GrossWeight\":"+GrossWeight+","+"\"CommodityOrigin\":"+CommodityOrigin+"}" TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn) }).collectAsList().asScala.toList val result_Json = ss.toDF() result_Json.show() val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties) } }
import java.util import java.util.Properties import org.apache.spark.sql.{DataFrame, Dataset, SaveMode, SparkSession} object ReadMysqlToMysqlYCJ { def setdata_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" }else{ rulet= "\""+data+"\"" } return rulet } def setdata_NULL_args( data:String): String ={ var rulet="" if (data=="NULL"){ rulet="\"\"" return rulet }else{ return data } } def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName("Mysql").getOrCreate() val sparkContext = spark.sparkContext sparkContext.setLogLevel("WARN") import spark.implicits._ //引入java 和scala相互转换 import scala.collection.JavaConverters._ //引入sparkSQL的内置函数 import org.apache.spark.sql.functions._ val mysqlUrl="jdbc:mysql://localhost:3306/jd_qipei" var table="xxuan_car_jd_ycj_product" var properties = new Properties() properties.put("user","root") properties.put("password","root") val mysqlCooenct = spark.read.jdbc(mysqlUrl, table, properties) val ALLDF: DataFrame = mysqlCooenct.select( 'skuid, 'name, 'brand, 'url, 'price, 'commodity_Name, 'image,'freezing, 'originplace, 'netweight, 'commodity_Name, 'category, 'package, 'boiling, 'sales, 'installation, 'transmission) val ss: List[TRulet] = ALLDF.map(row => { var skuid = row.getAs("skuid").toString skuid=setdata_NULL_args(skuid) var name = row.getAs("name").toString name=setdata_NULL_args(name) var brand = row.getAs("brand").toString brand=setdata_NULL_args(brand) var url = row.getAs("url").toString url=setdata_NULL_args(url) var price = row.getAs("price").toString price=setdata_NULL_args(price) var commodity_Name = row.getAs("commodity_Name").toString commodity_Name=setdata_NULL_args(commodity_Name) var image = row.getAs("image").toString image=setdata_NULL_args(image) var freezing = row.getAs("freezing").toString freezing=setdata_args(freezing) var originplace = row.getAs("originplace").toString originplace=setdata_args(originplace) var netweight = row.getAs("netweight").toString netweight=setdata_args(netweight) var category = row.getAs("category").toString category=setdata_args(category) var packages = row.getAs("package").toString packages=setdata_args(packages) var boiling = row.getAs("boiling").toString boiling=setdata_args(boiling) var sales = row.getAs("sales").toString sales=setdata_args(sales) var installation = row.getAs("installation").toString installation=setdata_args(installation) var transmission = row.getAs("transmission").toString transmission=setdata_args(transmission) var all_Josn="{\"freezing\":"+freezing+","+"\"originplace\":"+originplace+","+"\"netweight\":"+netweight+","+"\"category\":"+category+","+ "\"packages\":"+packages+","+"\"boiling\":"+boiling+","+"\"sales\":"+sales+","+"\"installation\":"+installation+","+"\"transmission\":"+transmission+"}" TRulet(id=0,govid=skuid,shortname=name,brand = brand, govurl=url , price = price, name = commodity_Name, picurl = image,description = all_Josn) }).collectAsList().asScala.toList val result_Json = ss.toDF() result_Json.show() // val sqlconnect = result_Json.write.mode(saveMode = SaveMode.Append).jdbc(mysqlUrl, "xxuan_car_jd_test_product", properties) } }