Spark 1.3 引入了初版的数据源 API,咱们可使用它将常见的数据格式整合到 Spark SQL 中。可是,随着 Spark 的不断发展,这一 API 也体现出了其局限性,故而 Spark 团队不得不加入愈来愈多的专有代码来编写数据源,以得到更好的性能。Spark 2.3 中,新一版的数据源 API 初见雏形,它克服了上一版 API 的种种问题,原来的数据源代码也在逐步重写。本文将演示这两版 API 的使用方法,比较它们的不一样之处,以及新版 API 的优点在哪里。html
V1 API 由一系列的抽象类和接口组成,它们位于 spark/sql/sources/interfaces.scala 文件中。主要的内容有:java
1 2 3 4 5 6 7 8 9 10 11 12 |
trait RelationProvider { def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation } abstract class BaseRelation { def sqlContext: SQLContext def schema: StructType } trait TableScan { def buildScan(): RDD[Row] } |
经过实现 RelationProvider
接口,代表该类是一种新定义的数据源,能够供 Spark SQL 取数所用。传入 createRelation
方法的参数能够用来作初始化,如文件路径、权限信息等。BaseRelation
抽象类则用来定义数据源的表结构,它的来源能够是数据库、Parquet 文件等外部系统,也能够直接由用户指定。该类还必须实现某个 Scan
接口,Spark 会调用 buildScan
方法来获取数据源的 RDD,咱们将在下文看到。mysql
下面咱们来使用 V1 API 实现一个经过 JDBC 读取数据库的自定义数据源。为简单起见,表结构信息是直接写死在代码里的,咱们先从整表扫描开始。完整的代码能够在 GitHub(连接)中找到,数据源表则能够在这个 连接 中查看。git
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
class JdbcSourceV1 extends RelationProvider { override def createRelation(parameters: Map[String, String]): BaseRelation = { new JdbcRelationV1(parameters("url")) } } class JdbcRelationV1(url: String) extends BaseRelation with TableScan { override def schema: StructType = StructType(Seq( StructField("id", IntegerType), StructField("emp_name", StringType) )) override def buildScan(): RDD[Row] = new JdbcRDD(url) } class JdbcRDD(url: String) extends RDD[Row] { override def compute(): Iterator[Row] = { val conn = DriverManager.getConnection(url) val stmt = conn.prepareStatement("SELECT * FROM employee") val rs = stmt.executeQuery() new Iterator[Row] { def hasNext: Boolean = rs.next() def next: Row = Row(rs.getInt("id"), rs.getString("emp_name")) } } } |
JdbcRDD#compute
负责实际的读取操做,它从上游获取到数据库链接信息、选取的字段、以及过滤条件,拼装 SQL 后执行,并返回一个 Row
类型的迭代器对象,每一行数据的结构和请求的字段列表相符。定义好数据源后,咱们就能够用 DataFrame
对象来直接操做了:github
1 2 3 4 5 6 7 |
val df = spark.read .format("JdbcSourceV2") .option("url", "jdbc:mysql://localhost/spark") .load() df.printSchema() df.show() |
上述代码输出的内容是:sql
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
root |-- id: integer (nullable = true) |-- emp_name: string (nullable = true) |-- dep_name: string (nullable = true) |-- salary: decimal(7,2) (nullable = true) |-- age: decimal(3,0) (nullable = true) +---+--------+----------+-------+---+ | id|emp_name| dep_name| salary|age| +---+--------+----------+-------+---+ | 1| Matthew|Management|4500.00| 55| | 2| Olivia|Management|4400.00| 61| | 3| Grace|Management|4000.00| 42| | 4| Jim|Production|3700.00| 35| | 5| Alice|Production|3500.00| 24| +---+--------+----------+-------+---+ |
咱们能够看到,V1 API 使用起来很是方便,所以可以知足 Spark SQL 初期的需求,但也难免存在不少局限性:数据库
createRelation
接收 SQLContext
做为参数;buildScan
方法返回的是 RDD[Row]
类型;而在实现写操做时,insert
方法会直接接收 DataFrame
类型的参数:apache
1 2 3 |
trait InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit } |
这些类型都属于较为上层的 Spark API,其中某些类已经发生了变化,如 SQLContext
已被 SparkSession
取代,而 DataFrame
也改成了 Dataset[Row]
类型的一个别称。这些改变不该该体现到底层的数据源 API 中。api
除了上文中的 TableScan
接口,V1 API 还提供了 PrunedScan
接口,用来裁剪不须要的字段;PrunedFilteredScan
接口则能够将过滤条件下推到数据源中。在 JdbcSourceV1
示例中,这类下推优化会体如今 SQL 语句里:服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
class JdbcRelationV1 extends BaseRelation with PrunedFilteredScan { def buildScan(requiredColumns: Array[String], filters: Array[Filter]) = { new JdbcRDD(requiredColumns, filters) } } class JdbcRDD(columns: Array[String], filters: Array[Filter]) { def compute() = { val wheres = filters.flatMap { case EqualTo(attribute, value) => Some(s"$attribute = '$value'") case _ => None } val sql = s"SELECT ${columns.mkString(", ")} FROM employee WHERE ${wheres.mkString(" AND ")}" } } |
若是咱们想添加新的优化算子(如 LIMIT 子句),就难免要引入一系列的 Scan
接口组合:
1 2 3 4 5 6 7 8 9 10 11 |
trait LimitedScan { def buildScan(limit: Int): RDD[Row] } trait PrunedLimitedScan { def buildScan(requiredColumns: Array[String], limit: Int): RDD[Row] } trait PrunedFilteredLimitedScan { def buildScan(requiredColumns: Array[String], filters: Array[Filter], limit: Int): RDD[Row] } |
对于支持数据分区的数据源,如 HDFS、Kafka 等,V1 API 没有提供原生的支持,于是也不能利用数据局部性(Data Locality)。咱们须要本身继承 RDD 来实现,好比下面的代码就对 Kafka 数据源进行了分区,并告知 Spark 能够将数据读取操做放入 Kafka Broker 所在的服务器上执行,以提高效率:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
case class KafkaPartition(partitionId: Int, leaderHost: String) extends Partition { def index: Int = partitionId } class KafkaRDD(sc: SparkContext) extends RDD[Row](sc, Nil) { def getPartitions: Array[Partition] = Array( // populate with Kafka PartitionInfo KafkaPartition(0, "broker_0"), KafkaPartition(1, "broker_1") ) override def getPreferredLocations(split: Partition): Seq[String] = Seq( split.asInstanceOf[KafkaPartition].leaderHost ) } |
此外,相似 Cassandra 这样的数据库,会按主键对数据进行分片。那么,若是查询语句中包含了按该主键进行分组的子句,Spark 就能够省去一次 Shuffle 操做。这在 V1 API 中也是不支持的,而 V2 API 则提供了 SupportsReportPartitioning
接口来支持。
Spark 任务是有可能失败的,使用 V1 API 时就会留下部分写入的数据。固然,对于 HDFS 这样的文件系统来讲问题不大,由于能够用 _SUCCESS
来标记该次写操做是否执行成功。但这一逻辑也须要最终用户来实现,而 V2 API 则提供了明确的接口来支持事务性的写操做。
Spark SQL 目前已支持列存储和流式计算,但二者都不是用 V1 API 实现的。ParquetFileFormat
和 KafkaSource
等类型都使用了专有代码和内部 API。这些特性也在 V2 API 中获得支持。
V2 API 首先使用了一个标记性的 DataSourceV2
接口,实现了该接口的类还必须实现 ReadSupport
或 WriteSupport
,用来表示自身支持读或写操做。ReadSupport
接口中的方法会被用来建立 DataSourceReader
类,同时接收到初始化参数;该类继而建立 DataReaderFactory
和 DataReader
类,后者负责真正的读操做,接口中定义的方法和迭代器相似。此外,DataSourceReader
还能够实现各种 Support
接口,代表本身支持某些优化下推操做,如裁剪字段、过滤条件等。WriteSupport
API 的层级结构与之相似。这些接口都是用 Java 语言编写,以得到更好的交互支持。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public interface DataSourceV2 {} public interface ReadSupport extends DataSourceV2 { DataSourceReader createReader(DataSourceOptions options); } public interface DataSourceReader { StructType readSchema(); List<DataReaderFactory<Row>> createDataReaderFactories(); } public interface SupportsPushDownRequiredColumns extends DataSourceReader { void pruneColumns(StructType requiredSchema); } public interface DataReaderFactory<T> { DataReader<T> createDataReader(); } public interface DataReader<T> extends Closeable { boolean next(); T get(); } |
可能你会注意到,DataSourceReader#createDataReaderFactories
仍然捆绑了 Row
类型,这是由于目前 V2 API 只支持 Row
类型的返回值,且这套 API 仍处于进化状态(Evolving)。
让咱们使用 V2 API 来重写 JDBC 数据源。下面是一个整表扫描的示例,完整代码能够在 GitHub(连接)上查看。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
class JdbcDataSourceReader extends DataSourceReader { def readSchema = StructType(Seq( StructField("id", IntegerType), StructField("emp_name", StringType) )) def createDataReaderFactories() = { Seq(new JdbcDataReaderFactory(url)).asJava } } class JdbcDataReader(url: String) extends DataReader[Row] { private var conn: Connection = null private var rs: ResultSet = null def next() = { if (rs == null) { conn = DriverManager.getConnection(url) val stmt = conn.prepareStatement("SELECT * FROM employee") rs = stmt.executeQuery() } rs.next() } def get() = Row(rs.getInt("id"), rs.getString("emp_name")) } |
经过实现 SupportsPushDownRequiredColumns
接口,Spark 会调用其 pruneColumns
方法,传入用户所指定的字段列表(StructType
),DataSourceReader
能够将该信息传给 DataReader
使用。
1 2 3 4 5 6 7 8 9 10 11 |
class JdbcDataSourceReader with SupportsPushDownRequiredColumns { var requiredSchema = JdbcSourceV2.schema def pruneColumns(requiredSchema: StructType) = { this.requiredSchema = requiredSchema } def createDataReaderFactories() = { val columns = requiredSchema.fields.map(_.name) Seq(new JdbcDataReaderFactory(columns)).asJava } } |
咱们能够用 df.explain(true)
来验证执行计划。例如,SELECT emp_name, age FROM employee
语句的执行计划在优化先后是这样的:
1 2 3 4 5 6 7 8 9 |
== Analyzed Logical Plan == emp_name: string, age: decimal(3,0) Project [emp_name#1, age#4] +- SubqueryAlias employee +- DataSourceV2Relation [id#0, emp_name#1, dep_name#2, salary#3, age#4], datasource.JdbcDataSourceReader@15ceeb42 == Optimized Logical Plan == Project [emp_name#1, age#4] +- DataSourceV2Relation [emp_name#1, age#4], datasource.JdbcDataSourceReader@15ceeb42 |
能够看到,字段裁剪被反映到了数据源中。若是咱们将实际执行的 SQL 语句打印出来,也能看到字段裁剪下推的结果。
相似的,实现 SupportsPushDownFilters
接口能够将过滤条件下推到数据源中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
class JdbcDataSourceReader with SupportsPushDownFilters { var filters = Array.empty[Filter] var wheres = Array.empty[String] def pushFilters(filters: Array[Filter]) = { val supported = ListBuffer.empty[Filter] val unsupported = ListBuffer.empty[Filter] val wheres = ListBuffer.empty[String] filters.foreach { case filter: EqualTo => { supported += filter wheres += s"${filter.attribute} = '${filter.value}'" } case filter => unsupported += filter } this.filters = supported.toArray this.wheres = wheres.toArray unsupported.toArray } def pushedFilters = filters def createDataReaderFactories() = { Seq(new JdbcDataReaderFactory(wheres)).asJava } } |
createDataReaderFactories
返回的是列表类型,每一个读取器都会产生一个 RDD 分区。若是咱们想开启多个读取任务,就能够生成多个读取器工厂,并为每一个读取器限定主键范围:
1 2 3 4 5 6 |
def createDataReaderFactories() = { Seq((1, 6), (7, 11)).map { case (minId, maxId) => val partition = s"id BETWEEN $minId AND $maxId" new JdbcDataReaderFactory(partition) }.asJava } |
V2 API 提供了两组 commit
/ abort
方法,用来实现事务性的写操做:
1 2 3 4 5 6 7 8 9 10 |
public interface DataSourceWriter { void commit(WriterCommitMessage[] messages); void abort(WriterCommitMessage[] messages); } public interface DataWriter<T> { void write(T record) throws IOException; WriterCommitMessage commit() throws IOException; void abort() throws IOException; } |
DataSourceWriter
在 Spark Driver 中执行,DataWriter
则运行在其余节点的 Spark Executor 上。当 DataWriter
成功执行了写操做,就会将提交信息传递给 Driver;当 DataSourceWriter
收集到了全部写任务的提交信息,就会执行最终的提交操做。若是某个写任务失败了,它的 abort
方法会获得执行;若是通过多轮重试后仍然失败,则全部写任务的 abort
方法都会被调用,进行数据清理操做。
这两个特性仍处于实验性阶段,在 Spark 中尚未获得使用。简单来讲,DataSourceReader
类能够实现 SupportsScanColumnarBatch
接口来声明本身会返回 ColumnarBatch
对象,这个对象是 Spark 内部用来存放列式数据的。对于流式数据,则有 MicroBatchReader
和 ContinuousReader
这两个接口,感兴趣的读者能够到 Spark 单元测试 代码中查看。