Spark 系列(十)—— Spark SQL 外部数据源

1、简介

1.1 多数据源支持

Spark 支持如下六个核心数据源,同时 Spark 社区还提供了多达上百种数据源的读取方式,可以知足绝大部分使用场景。html

  • CSV
  • JSON
  • Parquet
  • ORC
  • JDBC/ODBC connections
  • Plain-text files

注:如下全部测试文件都可从本仓库的resources 目录进行下载java

1.2 读数据格式

全部读取 API 遵循如下调用格式:mysql

// 格式
DataFrameReader.format(...).option("key", "value").schema(...).load()

// 示例
spark.read.format("csv")
.option("mode", "FAILFAST")          // 读取模式
.option("inferSchema", "true")       // 是否自动推断 schema
.option("path", "path/to/file(s)")   // 文件路径
.schema(someSchema)                  // 使用预约义的 schema 
.load()
复制代码

读取模式有如下三种可选项:git

读模式 描述
permissive 当遇到损坏的记录时,将其全部字段设置为 null,并将全部损坏的记录放在名为 _corruption t_record 的字符串列中
dropMalformed 删除格式不正确的行
failFast 遇到格式不正确的数据时当即失败

1.3 写数据格式

// 格式
DataFrameWriter.format(...).option(...).partitionBy(...).bucketBy(...).sortBy(...).save()

//示例
dataframe.write.format("csv")
.option("mode", "OVERWRITE")         //写模式
.option("dateFormat", "yyyy-MM-dd")  //日期格式
.option("path", "path/to/file(s)")
.save()
复制代码

写数据模式有如下四种可选项:github

Scala/Java 描述
SaveMode.ErrorIfExists 若是给定的路径已经存在文件,则抛出异常,这是写数据默认的模式
SaveMode.Append 数据以追加的方式写入
SaveMode.Overwrite 数据以覆盖的方式写入
SaveMode.Ignore 若是给定的路径已经存在文件,则不作任何操做

2、CSV

CSV 是一种常见的文本文件格式,其中每一行表示一条记录,记录中的每一个字段用逗号分隔。sql

2.1 读取CSV文件

自动推断类型读取读取示例:数据库

spark.read.format("csv")
.option("header", "false")        // 文件中的第一行是否为列的名称
.option("mode", "FAILFAST")      // 是否快速失败
.option("inferSchema", "true")   // 是否自动推断 schema
.load("/usr/file/csv/dept.csv")
.show()
复制代码

使用预约义类型:apache

import org.apache.spark.sql.types.{StructField, StructType, StringType,LongType}
//预约义数据格式
val myManualSchema = new StructType(Array(
    StructField("deptno", LongType, nullable = false),
    StructField("dname", StringType,nullable = true),
    StructField("loc", StringType,nullable = true)
))
spark.read.format("csv")
.option("mode", "FAILFAST")
.schema(myManualSchema)
.load("/usr/file/csv/dept.csv")
.show()
复制代码

2.2 写入CSV文件

df.write.format("csv").mode("overwrite").save("/tmp/csv/dept2")
复制代码

也能够指定具体的分隔符:json

df.write.format("csv").mode("overwrite").option("sep", "\t").save("/tmp/csv/dept2")
复制代码

2.3 可选配置

为节省主文篇幅,全部读写配置项见文末 9.1 小节。api


3、JSON

3.1 读取JSON文件

spark.read.format("json").option("mode", "FAILFAST").load("/usr/file/json/dept.json").show(5)
复制代码

须要注意的是:默认不支持一条数据记录跨越多行 (以下),能够经过配置 multiLinetrue 来进行更改,其默认值为 false

// 默认支持单行
{"DEPTNO": 10,"DNAME": "ACCOUNTING","LOC": "NEW YORK"}

//默认不支持多行
{
  "DEPTNO": 10,
  "DNAME": "ACCOUNTING",
  "LOC": "NEW YORK"
}
复制代码

3.2 写入JSON文件

df.write.format("json").mode("overwrite").save("/tmp/spark/json/dept")
复制代码

3.3 可选配置

为节省主文篇幅,全部读写配置项见文末 9.2 小节。


4、Parquet

Parquet 是一个开源的面向列的数据存储,它提供了多种存储优化,容许读取单独的列非整个文件,这不只节省了存储空间并且提高了读取效率,它是 Spark 是默认的文件格式。

4.1 读取Parquet文件

spark.read.format("parquet").load("/usr/file/parquet/dept.parquet").show(5)
复制代码

2.2 写入Parquet文件

df.write.format("parquet").mode("overwrite").save("/tmp/spark/parquet/dept")
复制代码

2.3 可选配置

Parquet 文件有着本身的存储规则,所以其可选配置项比较少,经常使用的有以下两个:

读写操做 配置项 可选值 默认值 描述
Write compression or codec None,
uncompressed,
bzip2,
deflate, gzip,
lz4, or snappy
None 压缩文件格式
Read mergeSchema true, false 取决于配置项 spark.sql.parquet.mergeSchema 当为真时,Parquet 数据源将全部数据文件收集的 Schema 合并在一块儿,不然将从摘要文件中选择 Schema,若是没有可用的摘要文件,则从随机数据文件中选择 Schema。

更多可选配置能够参阅官方文档:spark.apache.org/docs/latest…


5、ORC

ORC 是一种自描述的、类型感知的列文件格式,它针对大型数据的读写进行了优化,也是大数据中经常使用的文件格式。

5.1 读取ORC文件

spark.read.format("orc").load("/usr/file/orc/dept.orc").show(5)
复制代码

4.2 写入ORC文件

csvFile.write.format("orc").mode("overwrite").save("/tmp/spark/orc/dept")
复制代码

6、SQL Databases

Spark 一样支持与传统的关系型数据库进行数据读写。可是 Spark 程序默认是没有提供数据库驱动的,因此在使用前须要将对应的数据库驱动上传到安装目录下的 jars 目录中。下面示例使用的是 Mysql 数据库,使用前须要将对应的 mysql-connector-java-x.x.x.jar 上传到 jars 目录下。

6.1 读取数据

读取全表数据示例以下,这里的 help_keyword 是 mysql 内置的字典表,只有 help_keyword_idname 两个字段。

spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")            //驱动
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")   //数据库地址
.option("dbtable", "help_keyword")                    //表名
.option("user", "root").option("password","root").load().show(10)
复制代码

从查询结果读取数据:

val pushDownQuery = """(SELECT * FROM help_keyword WHERE help_keyword_id <20) AS help_keywords"""
spark.read.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root").option("password", "root")
.option("dbtable", pushDownQuery)
.load().show()

//输出
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|             10|      ALTER|
|             11|    ANALYSE|
|             12|    ANALYZE|
|             13|        AND|
|             14|    ARCHIVE|
|             15|       AREA|
|             16|         AS|
|             17|   ASBINARY|
|             18|        ASC|
|             19|     ASTEXT|
+---------------+-----------+
复制代码

也可使用以下的写法进行数据的过滤:

val props = new java.util.Properties
props.setProperty("driver", "com.mysql.jdbc.Driver")
props.setProperty("user", "root")
props.setProperty("password", "root")
val predicates = Array("help_keyword_id < 10 OR name = 'WHEN'")   //指定数据过滤条件
spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql", "help_keyword", predicates, props).show() 

//输出:
+---------------+-----------+
|help_keyword_id|       name|
+---------------+-----------+
|              0|         <>|
|              1|     ACTION|
|              2|        ADD|
|              3|AES_DECRYPT|
|              4|AES_ENCRYPT|
|              5|      AFTER|
|              6|    AGAINST|
|              7|  AGGREGATE|
|              8|  ALGORITHM|
|              9|        ALL|
|            604|       WHEN|
+---------------+-----------+
复制代码

可使用 numPartitions 指定读取数据的并行度:

option("numPartitions", 10)
复制代码

在这里,除了能够指定分区外,还能够设置上界和下界,任何小于下界的值都会被分配在第一个分区中,任何大于上界的值都会被分配在最后一个分区中。

val colName = "help_keyword_id"   //用于判断上下界的列
val lowerBound = 300L    //下界
val upperBound = 500L    //上界
val numPartitions = 10   //分区综述
val jdbcDf = spark.read.jdbc("jdbc:mysql://127.0.0.1:3306/mysql","help_keyword",
                             colName,lowerBound,upperBound,numPartitions,props)
复制代码

想要验证分区内容,可使用 mapPartitionsWithIndex 这个算子,代码以下:

jdbcDf.rdd.mapPartitionsWithIndex((index, iterator) => {
    val buffer = new ListBuffer[String]
    while (iterator.hasNext) {
        buffer.append(index + "分区:" + iterator.next())
    }
    buffer.toIterator
}).foreach(println)
复制代码

执行结果以下:help_keyword 这张表只有 600 条左右的数据,原本数据应该均匀分布在 10 个分区,可是 0 分区里面却有 319 条数据,这是由于设置了下限,全部小于 300 的数据都会被限制在第一个分区,即 0 分区。同理全部大于 500 的数据被分配在 9 分区,即最后一个分区。

https://github.com/heibaiying

6.2 写入数据

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write
.format("jdbc")
.option("url", "jdbc:mysql://127.0.0.1:3306/mysql")
.option("user", "root").option("password", "root")
.option("dbtable", "emp")
.save()
复制代码

7、Text

Text 文件在读写性能方面并无任何优点,且不能表达明确的数据结构,因此其使用的比较少,读写操做以下:

7.1 读取Text数据

spark.read.textFile("/usr/file/txt/dept.txt").show()
复制代码

7.2 写入Text数据

df.write.text("/tmp/spark/txt/dept")
复制代码

8、数据读写高级特性

8.1 并行读

多个 Executors 不能同时读取同一个文件,但它们能够同时读取不一样的文件。这意味着当您从一个包含多个文件的文件夹中读取数据时,这些文件中的每个都将成为 DataFrame 中的一个分区,并由可用的 Executors 并行读取。

8.2 并行写

写入的文件或数据的数量取决于写入数据时 DataFrame 拥有的分区数量。默认状况下,每一个数据分区写一个文件。

8.3 分区写入

分区和分桶这两个概念和 Hive 中分区表和分桶表是一致的。都是将数据按照必定规则进行拆分存储。须要注意的是 partitionBy 指定的分区和 RDD 中分区不是一个概念:这里的分区表现为输出目录的子目录,数据分别存储在对应的子目录中。

val df = spark.read.format("json").load("/usr/file/json/emp.json")
df.write.mode("overwrite").partitionBy("deptno").save("/tmp/spark/partitions")
复制代码

输出结果以下:能够看到输出被按照部门编号分为三个子目录,子目录中才是对应的输出文件。

https://github.com/heibaiying

8.3 分桶写入

分桶写入就是将数据按照指定的列和桶数进行散列,目前分桶写入只支持保存为表,实际上这就是 Hive 的分桶表。

val numberBuckets = 10
val columnToBucketBy = "empno"
df.write.format("parquet").mode("overwrite")
.bucketBy(numberBuckets, columnToBucketBy).saveAsTable("bucketedFiles")
复制代码

8.5 文件大小管理

若是写入产生小文件数量过多,这时会产生大量的元数据开销。Spark 和 HDFS 同样,都不能很好的处理这个问题,这被称为“small file problem”。同时数据文件也不能过大,不然在查询时会有没必要要的性能开销,所以要把文件大小控制在一个合理的范围内。

在上文咱们已经介绍过能够经过分区数量来控制生成文件的数量,从而间接控制文件大小。Spark 2.2 引入了一种新的方法,以更自动化的方式控制文件大小,这就是 maxRecordsPerFile 参数,它容许你经过控制写入文件的记录数来控制文件大小。

// Spark 将确保文件最多包含 5000 条记录
df.write.option(“maxRecordsPerFile”, 5000)
复制代码

9、可选配置附录

9.1 CSV读写可选配置

读\写操做 配置项 可选值 默认值 描述
Both seq 任意字符 ,(逗号) 分隔符
Both header true, false false 文件中的第一行是否为列的名称。
Read escape 任意字符 \ 转义字符
Read inferSchema true, false false 是否自动推断列类型
Read ignoreLeadingWhiteSpace true, false false 是否跳过值前面的空格
Both ignoreTrailingWhiteSpace true, false false 是否跳过值后面的空格
Both nullValue 任意字符 “” 声明文件中哪一个字符表示空值
Both nanValue 任意字符 NaN 声明哪一个值表示 NaN 或者缺省值
Both positiveInf 任意字符 Inf 正无穷
Both negativeInf 任意字符 -Inf 负无穷
Both compression or codec None,
uncompressed,
bzip2, deflate,
gzip, lz4, or
snappy
none 文件压缩格式
Both dateFormat 任何能转换为 Java 的
SimpleDataFormat 的字符串
yyyy-MM-dd 日期格式
Both timestampFormat 任何能转换为 Java 的
SimpleDataFormat 的字符串
yyyy-MMdd’T’HH:mm:ss.SSSZZ 时间戳格式
Read maxColumns 任意整数 20480 声明文件中的最大列数
Read maxCharsPerColumn 任意整数 1000000 声明一个列中的最大字符数。
Read escapeQuotes true, false true 是否应该转义行中的引号。
Read maxMalformedLogPerPartition 任意整数 10 声明每一个分区中最多容许多少条格式错误的数据,超过这个值后格式错误的数据将不会被读取
Write quoteAll true, false false 指定是否应该将全部值都括在引号中,而不仅是转义具备引号字符的值。
Read multiLine true, false false 是否容许每条完整记录跨域多行

9.2 JSON读写可选配置

读\写操做 配置项 可选值 默认值
Both compression or codec None,
uncompressed,
bzip2, deflate,
gzip, lz4, or
snappy
none
Both dateFormat 任何能转换为 Java 的 SimpleDataFormat 的字符串 yyyy-MM-dd
Both timestampFormat 任何能转换为 Java 的 SimpleDataFormat 的字符串 yyyy-MMdd’T’HH:mm:ss.SSSZZ
Read primitiveAsString true, false false
Read allowComments true, false false
Read allowUnquotedFieldNames true, false false
Read allowSingleQuotes true, false true
Read allowNumericLeadingZeros true, false false
Read allowBackslashEscapingAnyCharacter true, false false
Read columnNameOfCorruptRecord true, false Value of spark.sql.column&NameOf
Read multiLine true, false false

9.3 数据库读写可选配置

属性名称 含义
url 数据库地址
dbtable 表名称
driver 数据库驱动
partitionColumn,
lowerBound, upperBoun
分区总数,上界,下界
numPartitions 可用于表读写并行性的最大分区数。若是要写的分区数量超过这个限制,那么能够调用 coalesce(numpartition) 重置分区数。
fetchsize 每次往返要获取多少行数据。此选项仅适用于读取数据。
batchsize 每次往返插入多少行数据,这个选项只适用于写入数据。默认值是 1000。
isolationLevel 事务隔离级别:能够是 NONE,READ_COMMITTED, READ_UNCOMMITTED,REPEATABLE_READ 或 SERIALIZABLE,即标准事务隔离级别。
默认值是 READ_UNCOMMITTED。这个选项只适用于数据读取。
createTableOptions 写入数据时自定义建立表的相关配置
createTableColumnTypes 写入数据时自定义建立列的列类型

数据库读写更多配置能够参阅官方文档:spark.apache.org/docs/latest…

参考资料

  1. Matei Zaharia, Bill Chambers . Spark: The Definitive Guide[M] . 2018-02
  2. spark.apache.org/docs/latest…

更多大数据系列文章能够参见 GitHub 开源项目大数据入门指南

相关文章
相关标签/搜索