SparkSQL数据源实战篇

             SparkSQL数据源实战篇html

                                     做者:尹正杰java

版权声明:原创做品,谢绝转载!不然将追究法律责任。mysql

 

 

 

 

一.通用加载/保存方法linux

1>.spark官方默认提供的测试数据sql

[root@hadoop101.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/examples/src/main/resources/            #该目录下是spark官方提供不一样文件格式的测试文件
total 44
-rw-r--r-- 1 yinzhengjie yinzhengjie  130 May 30 08:02 employees.json
-rw-r--r-- 1 yinzhengjie yinzhengjie  240 May 30 08:02 full_user.avsc
-rw-r--r-- 1 yinzhengjie yinzhengjie 5812 May 30 08:02 kv1.txt
-rw-r--r-- 1 yinzhengjie yinzhengjie   49 May 30 08:02 people.csv
-rw-r--r-- 1 yinzhengjie yinzhengjie   73 May 30 08:02 people.json
-rw-r--r-- 1 yinzhengjie yinzhengjie   32 May 30 08:02 people.txt
-rw-r--r-- 1 yinzhengjie yinzhengjie  185 May 30 08:02 user.avsc
-rw-r--r-- 1 yinzhengjie yinzhengjie  334 May 30 08:02 users.avro
-rw-r--r-- 1 yinzhengjie yinzhengjie  547 May 30 08:02 users.orc
-rw-r--r-- 1 yinzhengjie yinzhengjie  615 May 30 08:02 users.parquet
[root@hadoop101.yinzhengjie.org.cn ~]# 

2>.手动指定选项shell

  Spark SQL的DataFrame接口支持多种数据源的操做。一个DataFrame能够进行RDDs方式的操做,也能够被注册为临时表。把DataFrame注册为临时表以后,就能够对该DataFrame执行SQL查询。

  Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL能够方便的执行全部的操做。修改配置项spark.sql.sources.default,可修改默认数据源格式。当数据源格式不是parquet格式文件时,须要手动指定数据源的格式。

  数据源格式须要指定全名(例如:org.apache.spark.sql.parquet),若是数据源格式为内置格式,则只须要指定简称定json, parquet, jdbc, orc, libsvm, csv, text来指定数据的格式。   能够经过SparkSession提供的read.load方法用于通用加载数据,使用write和save保存数据。除此以外,能够直接运行SQL在文件上。
[root@hadoop101.yinzhengjie.org.cn ~]# spark-shell 
20/07/15 01:09:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop101.yinzhengjie.org.cn:4040
Spark context available as 'sc' (master = local[*], app id = local-1594746601368).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.6
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val df = spark.read.load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/users.parquet")      #加载parquet文件无需指定格式,由于默认就是parquet哟~
df: org.apache.spark.sql.DataFrame = [name: string, favorite_color: string ... 1 more field]

scala> df.show
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+


scala> 
scala> val df = spark.read.load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/users.parquet")      #加载parquet文件无需指定格式,由于默认就是parquet哟~
scala> val df = spark.read.format("json").load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")    #加载json格式文件
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> 
scala> val df = spark.read.format("json").load("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")    #加载json格式文件
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")        #上面的一种简写形式,直接使用json方法来读取json文件,无需手动指定格式
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.show
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+


scala> 
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")        #上面的一种简写形式,直接使用json方法来读取json文件,无需手动指定格式

3>.文件保存选项数据库

  能够采用SaveMode执行存储操做,SaveMode定义了对数据的处理模式。须要注意的是,这些保存模式不使用任何锁定,不是原子操做。

  此外,当使用Overwrite方式执行时,在输出新数据以前原数据就已经被删除。SaveMode详细介绍以下表:     Scala
/Java             Any Language       Meaning   SaveMode.ErrorIfExists(default) "error"(default)   若是文件存在,则报错   SaveMode.Append            "append"        追加   SaveMode.Overwrite         "overwrite"      覆写   SaveMode.Ignore           "ignore"        数据存在,则忽略
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")          #使用spark变量来读取json文件
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.write.save("file:///tmp/output")                    #咱们经过df将数据保存到本地磁盘,默认保存格式依旧是parquet哟~

scala>


[root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output/            #查看本地磁盘文件的后缀名称依旧是parquet,说明默认的保存格式就是parquet
total 4
-rw-r--r-- 1 root root 687 Jul 15 01:34 part-00000-00ce4157-82e7-438b-a0b6-bdbaa29d0f4f-c000.snappy.parquet
-rw-r--r-- 1 root root   0 Jul 15 01:34 _SUCCESS
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# 
scala> df.write.save("file:///tmp/output")                        #咱们经过df将数据保存到本地磁盘,默认保存格式依旧是parquet哟~
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.write.format("json").save("file:///tmp/output2")           #保存文件时咱们能够指定保存的格式为"json"

scala> 


[root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/             #不难发现,保存的文件格式的确是json哟~
total 4
-rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json
-rw-r--r-- 1 root root  0 Jul 15 01:38 _SUCCESS
[root@hadoop101.yinzhengjie.org.cn ~]# 
scala> df.write.format("json").save("file:///tmp/output2")               #保存文件时咱们能够指定保存的格式为"json"
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.write.format("json").save("file:///tmp/output2")              #第一次保存是成功的

scala> df.write.format("json").save("file:///tmp/output2")              #第二次保存到相同目录发现报错了,说是该目录已经存在啦~
org.apache.spark.sql.AnalysisException: path file:/tmp/output2 already exists.;
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:114)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:677)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:677)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:286)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:272)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:230)
  ... 49 elided

scala> df.write.format("json").mode("append").save("file:///tmp/output2")      #保存时咱们能够指定模式为追加("append"),这样即便和上一次保存的路径相同也不会报错目录已存在的状况。

scala> 



[root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/                #第一次保存目录时的文件
total 4
-rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json
-rw-r--r-- 1 root root  0 Jul 15 01:38 _SUCCESS
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/                 #第二次保存目录成功的文件
total 8
-rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json
-rw-r--r-- 1 root root 71 Jul 15 01:42 part-00000-a668e03a-c098-4eb9-b44d-c195c6557ec0-c000.json
-rw-r--r-- 1 root root  0 Jul 15 01:42 _SUCCESS
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# 
scala> df.write.format("json").mode("append").save("file:///tmp/output2")      #保存时咱们能够指定模式为追加("append"),这样即便和上一次保存的路径相同也不会报错目录已存在的状况。
scala> val df = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> df.write.format("json").save("file:///tmp/output2")

scala> df.write.format("json").mode("append").save("file:///tmp/output2")

scala> df.write.format("json").mode("overwrite").save("file:///tmp/output2")      #以覆盖的模式写入指定路径,该路径以前若存储的有数据会被清空哟~

scala> 



[root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/        #第一次写入
total 4
-rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json
-rw-r--r-- 1 root root  0 Jul 15 01:38 _SUCCESS
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/        #第二次追加写入
total 8
-rw-r--r-- 1 root root 71 Jul 15 01:38 part-00000-a52115a6-aede-4a2e-aa20-d31495f4b1cb-c000.json
-rw-r--r-- 1 root root 71 Jul 15 01:42 part-00000-a668e03a-c098-4eb9-b44d-c195c6557ec0-c000.json
-rw-r--r-- 1 root root  0 Jul 15 01:42 _SUCCESS
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# ll /tmp/output2/        #第三次覆盖写入
total 4
-rw-r--r-- 1 root root 71 Jul 15 02:01 part-00000-7ea74899-2f1d-43bd-8c63-4ba17032974b-c000.json
-rw-r--r-- 1 root root  0 Jul 15 02:01 _SUCCESS
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# 
scala> df.write.format("json").mode("overwrite").save("file:///tmp/output2")      #以覆盖的模式写入指定路径,该路径以前若存储的有数据会被清空哟~

 

二.JSON文件express

  Spark SQL可以自动推测 JSON数据集的结构,并将它加载为一个Dataset[Row]. 我们能够经过SparkSession.read.json()去加载一个JSON文件。

  舒适提示:
    这个JSON文件不是一个传统的JSON文件,每一行都得是一个JSON串。
[root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# cat /tmp/user.json
{"name":"yinzhengjie","passwd":"2020","age":18}
{"name":"Jason","passwd":"666666","age":27}
{"name":"Liming","passwd":"123","age":49}
{"name":"Jenny","passwd":"456","age":23}
{"name":"Danny","passwd":"789","age":56} 
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# vim /tmp/user.json          #测试文件
scala> val path = "/tmp/user.json"
path: String = /tmp/user.json

scala> val userDF = spark.read.json(path)              #读取json文件
userDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]

scala> userDF.createOrReplaceTempView("users")            #建立临时视图

scala> val teenagerNamesDF = spark.sql("SELECT name FROM users WHERE age BETWEEN 13 AND 19")      #根据视图执行SQL
teenagerNamesDF: org.apache.spark.sql.DataFrame = [name: string]

scala> teenagerNamesDF.show()                       #展现查询的结果
+-----------+
|       name|
+-----------+
|yinzhengjie|
+-----------+


scala> 

 

三.Parquet文件apache

  Parquet是一种流行的列式存储格式,能够高效地存储具备嵌套字段的记录。

  Parquet格式常常在Hadoop生态圈中被使用,它也支持Spark SQL的所有数据类型。

  Spark SQL提供了直接读取和存储Parquet格式文件的方法。
scala> val peopleDF = spark.read.json("file:///yinzhengjie/softwares/spark/examples/src/main/resources/people.json")      #读取本地json文件
peopleDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> peopleDF.write.parquet("hdfs://hadoop101.yinzhengjie.org.cn:9000/yinzhengjie/spark/resources/people.parquet")      #将读取的内容保存到hdfs上并指定格式为parquet

scala> val parquetFileDF = spark.read.parquet("hdfs://hadoop101.yinzhengjie.org.cn:9000/yinzhengjie/spark/resources/people.parquet")    #从hdfs中读取文件
parquetFileDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

scala> parquetFileDF.createOrReplaceTempView("parquetFile")          #建立临时视图

scala> val namesDF = spark.sql("SELECT name FROM parquetFile WHERE age BETWEEN 13 AND 19")      #根据建立的临时视图执行SQL查询
namesDF: org.apache.spark.sql.DataFrame = [name: string]

scala> namesDF.map(attributes => "Name: " + attributes(0)).show()      #展现查询结果
+------------+
|       value|
+------------+
|Name: Justin|
+------------+


scala> 


[root@hadoop101.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/examples/src/main/resources/people.json     #该文件为spark官方提供
-rw-r--r-- 1 yinzhengjie yinzhengjie 73 May 30 08:02 /yinzhengjie/softwares/spark/examples/src/main/resources/people.json
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# cat /yinzhengjie/softwares/spark/examples/src/main/resources/people.json     
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# hdfs dfs -ls /yinzhengjie/spark/resources/people.parquet              #数据被写入到hdfs集群,所以须要保证你的Hadoop集群时启动着的。
Found 2 items
-rw-r--r--   3 root supergroup          0 2020-07-15 02:21 /yinzhengjie/spark/resources/people.parquet/_SUCCESS
-rw-r--r--   3 root supergroup        687 2020-07-15 02:21 /yinzhengjie/spark/resources/people.parquet/part-00000-3ead21bf-d453-4161-8d6f-08c069d4cb50-c000.snappy.parquet
[root@hadoop101.yinzhengjie.org.cn ~]# 

 

四.JDBCjson

  Spark SQL能够经过JDBC从关系型数据库中读取数据的方式建立DataFrame,经过对DataFrame一系列的计算后,还能够将数据再写回关系型数据库中。

  舒适提示:
    须要将相关的数据库驱动放到spark的类路径下。

1>.建立MySQL数据库

安装MariaDB数据库:
[root@hadoop101.yinzhengjie.org.cn ~]# yum -y install mariadb-server

将数据库设置为开机自启动:
[root@hadoop101.yinzhengjie.org.cn ~]# systemctl enable mariadb
Created symlink from /etc/systemd/system/multi-user.target.wants/mariadb.service to /usr/lib/systemd/system/mariadb.service.
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# systemctl start mariadb
[root@hadoop101.yinzhengjie.org.cn ~]# 

登陆数据库,建立spark数据库并受权用户登陆:
MariaDB [(none)]> CREATE SCHEMA IF NOT EXISTS spark DEFAULT CHARACTER SET = utf8mb4;
Query OK, 1 row affected (0.00 sec)

MariaDB [(none)]> 
MariaDB [(none)]> CREATE USER jason@'172.200.%' IDENTIFIED BY 'yinzhengjie';
Query OK, 0 rows affected (0.00 sec)

MariaDB [(none)]> 
MariaDB [(none)]> GRANT ALL ON spark.* TO jason@'172.200.%';
Query OK, 0 rows affected (0.00 sec)

MariaDB [(none)]> 
[root@hadoop105.yinzhengjie.org.cn ~]# mysql -u jason -pyinzhengjie -h 172.200.4.101
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 7
Server version: 5.5.65-MariaDB MariaDB Server

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MariaDB [(none)]> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| spark              |
| test               |
+--------------------+
3 rows in set (0.01 sec)

MariaDB [(none)]> 
MariaDB [(none)]> quit
Bye
[root@hadoop105.yinzhengjie.org.cn ~]# 
[root@hadoop105.yinzhengjie.org.cn ~]# mysql -u jason -pyinzhengjie -h 172.200.4.101        #测试是否能够正常链接数据库

2>.将相关的数据库驱动放到spark的类路径下

[root@hadoop105.yinzhengjie.org.cn ~]# ll
total 188288
-rw-r--r-- 1 root root      8409 Dec 12  2018 jce_policy-8.zip
-rw-r--r-- 1 root root 191817140 Mar 25  2019 jdk-8u201-linux-x64.tar.gz
-rw-r--r-- 1 root root    972009 Mar  1 22:52 mysql-connector-java-5.1.36-bin.jar
drwxrwxr-x 2 root root        24 Jan 21 01:36 UnlimitedJCEPolicyJDK8
[root@hadoop105.yinzhengjie.org.cn ~]# 
[root@hadoop105.yinzhengjie.org.cn ~]# cp mysql-connector-java-5.1.36-bin.jar /yinzhengjie/softwares/spark/jars/
[root@hadoop105.yinzhengjie.org.cn ~]# 
[root@hadoop105.yinzhengjie.org.cn ~]# ll /yinzhengjie/softwares/spark/jars/ | grep mysql
-rw-r--r-- 1 root root   972009 Jul 15 03:33 mysql-connector-java-5.1.36-bin.jar
[root@hadoop105.yinzhengjie.org.cn ~]# 

3>.从Mysql数据库加载数据方式一

[root@hadoop105.yinzhengjie.org.cn ~]# spark-shell 
20/07/15 03:33:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop105.yinzhengjie.org.cn:4040
Spark context available as 'sc' (master = local[*], app id = local-1594755234548).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.6
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark").option("dbtable", "users").option("user", "jason").option("password", "yinzhengjie").load()
jdbcDF: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields]

scala> jdbcDF.show
+---+-----------+---+------+
| id|       name|age|passwd|
+---+-----------+---+------+
|  1|yinzhengjie| 18|  2020|
|  2|      Jason| 27|666666|
|  3|     Liming| 49|   123|
|  4|      Jenny| 23|   456|
|  5|      Danny| 56|   789|
+---+-----------+---+------+


scala> 

4>.从Mysql数据库加载数据方式二

[root@hadoop105.yinzhengjie.org.cn ~]# spark-shell 
20/07/15 03:39:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Spark context Web UI available at http://hadoop105.yinzhengjie.org.cn:4040
Spark context available as 'sc' (master = local[*], app id = local-1594755548606).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.4.6
      /_/
         
Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_201)
Type in expressions to have them evaluated.
Type :help for more information.

scala> val connectionProperties = new java.util.Properties()
connectionProperties: java.util.Properties = {}

scala> connectionProperties.put("user", "jason")
res0: Object = null

scala> connectionProperties.put("password", "yinzhengjie")
res1: Object = null

scala> val jdbcDF2 = spark.read.jdbc("jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark", "users", connectionProperties)
jdbcDF2: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 2 more fields]

scala> jdbcDF2.show
+---+-----------+---+------+
| id|       name|age|passwd|
+---+-----------+---+------+
|  1|yinzhengjie| 18|  2020|
|  2|      Jason| 27|666666|
|  3|     Liming| 49|   123|
|  4|      Jenny| 23|   456|
|  5|      Danny| 56|   789|
+---+-----------+---+------+


scala> 

5>.将数据写入Mysql方式一

直接使用读取的对象进行操做:

scala> jdbcDF.write.format("jdbc").option("url", "jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark").option("dbtable", "users2").option("user", "jason").option("password", "yinzhengjie").save()

scala>
观察MySQL数据库的变化以下:
[root@hadoop105.yinzhengjie.org.cn ~]# mysql -ujason -pyinzhengjie -h 172.200.4.101
Welcome to the MariaDB monitor.  Commands end with ; or \g.
Your MariaDB connection id is 20
Server version: 5.5.65-MariaDB MariaDB Server

Copyright (c) 2000, 2018, Oracle, MariaDB Corporation Ab and others.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

MariaDB [(none)]> use spark
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
MariaDB [spark]> 
MariaDB [spark]> show tables;
+-----------------+
| Tables_in_spark |
+-----------------+
| users           |
+-----------------+
1 row in set (0.00 sec)

MariaDB [spark]> 
MariaDB [spark]> 
MariaDB [spark]> show tables;
+-----------------+
| Tables_in_spark |
+-----------------+
| users           |
| users2          |
+-----------------+
2 rows in set (0.00 sec)

MariaDB [spark]> select * from users2;
+------+-------------+------+--------+
| id   | name        | age  | passwd |
+------+-------------+------+--------+
|    1 | yinzhengjie |   18 | 2020   |
|    2 | Jason       |   27 | 666666 |
|    3 | Liming      |   49 | 123    |
|    4 | Jenny       |   23 | 456    |
|    5 | Danny       |   56 | 789    |
+------+-------------+------+--------+
5 rows in set (0.00 sec)

MariaDB [spark]> 
MariaDB [spark]> select * from users2;

6>.将数据写入Mysql方式二

直接使用读取的对象进行操做:
scala> jdbcDF2.write.jdbc("jdbc:mysql://hadoop101.yinzhengjie.org.cn:3306/spark", "users3", connectionProperties)

scala>
观察MySQL数据库的变化以下:
MariaDB [spark]> show tables;
+-----------------+
| Tables_in_spark |
+-----------------+
| users           |
| users2          |
+-----------------+
2 rows in set (0.00 sec)

MariaDB [spark]> select * from users2;
+------+-------------+------+--------+
| id   | name        | age  | passwd |
+------+-------------+------+--------+
|    1 | yinzhengjie |   18 | 2020   |
|    2 | Jason       |   27 | 666666 |
|    3 | Liming      |   49 | 123    |
|    4 | Jenny       |   23 | 456    |
|    5 | Danny       |   56 | 789    |
+------+-------------+------+--------+
5 rows in set (0.00 sec)

MariaDB [spark]> 
MariaDB [spark]> show tables;
+-----------------+
| Tables_in_spark |
+-----------------+
| users           |
| users2          |
| users3          |
+-----------------+
3 rows in set (0.00 sec)

MariaDB [spark]> 
MariaDB [spark]> select * from users3;
+------+-------------+------+--------+
| id   | name        | age  | passwd |
+------+-------------+------+--------+
|    1 | yinzhengjie |   18 | 2020   |
|    2 | Jason       |   27 | 666666 |
|    3 | Liming      |   49 | 123    |
|    4 | Jenny       |   23 | 456    |
|    5 | Danny       |   56 | 789    |
+------+-------------+------+--------+
5 rows in set (0.00 sec)
MariaDB [spark]> select * from users3;

 

五.SparkSQL数据源-Hive数据库

  博主推荐阅读:
    https://www.cnblogs.com/yinzhengjie2020/p/13216504.html