深刻研究 Apache Spark 3.0 的新功能

分享嘉宾 Apache Spark PMC 李潇,就任于 Databricks,Spark 研发部主管,领导 Spark,Koalas,Databricks runtime,OEM 的研发团队,在直播中为你们深刻讲解了Apache Spark 3.0的新功能。
html


直播回放:https://developer.aliyun.com/live/2894web

如下是直播内容精华整理。sql


Spark3.0解决了超过3400个JIRAs,历时一年多,是整个社区集体智慧的成果。Spark SQL和Spark Cores是其中的核心模块,其他模块如PySpark等模块均是创建在二者之上。Spark3.0新增了太多的功能,没法一一列举,下图是其中24个相对来讲比较重要的新功能,下文将会围绕这些进行简单介绍。数据库

1、Performance

与性能相关的新功能主要有:apache

  • Adaptive Query Execution微信

  • Dynamic Partition Pruning架构

  • Query Complication Speedupapp

  • Join Hints框架

(一)Adaptive Query Execution

Adaptive Query Execution(AQE)在以前的版本里已经有所实现,可是以前的框架存在一些缺陷,致使使用不是不少,在Spark3.0中Databricks(Spark初创团队建立的大数据与AI智能公司)和Intel的工程师合做,解决了相关的问题。函数

在Spark1.0中全部的Catalyst Optimizer都是基于规则 (rule) 优化的。为了产生比较好的查询规则,优化器须要理解数据的特性,因而在Spark2.0中引入了基于代价的优化器 (cost-based optimizer),也就是所谓的CBO。然而,CBO也没法解决不少问题,好比:

  • 数据统计信息广泛缺失,统计信息的收集代价较高;

  • 储存计算分离的架构使得收集到的统计信息可能再也不准确;

  • Spark部署在某一单一的硬件架构上,cost很难被估计;

  • Spark的UDF(User-defined Function)简单易用,种类繁多,可是对于CBO来讲是个黑盒子,没法估计其cost。

总而言之,因为种种限制,Spark的优化器没法产生最好的Plan。也正是由于上诉缘由,运行期的自适应调整就变得至关重要,对于Spark更是如此,因而有了AQE,其基本方法也很是简单易懂。以下图所示,在执行完部分的查询规划后,Spark能够收集到结果的统计信息,而后利用这些信息再对查询规划从新进行优化。这个优化的过程不是一次性的,而是自适应的,也就是说随着查询规划的执行会不断的进行优化, 并且尽量地复用了现有优化器的已有优化规则。让整个查询优化变得更加灵活和自适应。

Spark3.0中AQE包括三个主要的运行期自适应功能:

  • 能够基于运行期的统计信息,将Sort Merge Join 转换为Broadcast Hash Join;

  • 能够基于数据运行中间结果的统计信息,减小reducer数量,避免数据在shuffle期间的过量分区致使性能损失;

  • 能够处理数据分布不均致使的skew join。

更多的信息你们能够经过搜索引擎查询了解。

若是你是一个Spark的资深用户,可能你读了不少的调优宝典,其中第一条就是让你的Join变得更快的方法就是尽量地使用Broadcast Hash Join。好比你能够增长spark.sql.autoBroadcastJoinThreshold 阈值,或者使用 broadcast HINT。可是这基本上属于艺高人胆大。首先,这种方法很难调,一不当心就会Out of Memory,甚至性能变得更差,即便如今产生了必定效果,可是随着负载的变化可能调优会彻底失败。

也许你会想:Spark为何不解决这个问题呢?这里有不少挑战,好比:

  • 统计信息的缺失,统计信息的不许确,那么就是默认依据文件大小来预估表的大小,可是文件每每是压缩的,尤为是列存储格式,好比parquet 和 ORC,而Spark是基于行处理,若是数据连续重复,file size可能和真实的行存储的真实大小,差异很是之大。这也是为什么提升autoBroadcastJoinThreshold,即便不是太大也可能会致使out of memory;

  • Filter复杂、UDFs的使用都会使Spark没法准确估计Join输入数据量的大小。当你的query plan异常大和复杂的时候,这点尤为明显。

其中,Spark3.0中基于运行期的统计信息,将Sort Merge Join 转换为Broadcast Hash Join的过程以下图所示。

也许你还会看到调优宝典告诉你调整shuffle产生的partitions的数量。而当前默认数量是200,可是这个200为何就不得而知了。然而,这个值设置为多少都不是最优的。其实在不一样shuffle,数据的输入大小和分布绝大多数都是不同。那么简单地用一个配置,让全部的shuffle来遵循,显然是很差的。要设得过小,每一个partition的大小就会太大,那么GC的压力就会很大,aggregation和sort会更有可能的去spill数据到磁盘。可是,要是设太大,partition的大小就会过小,partition的数量会大。这个会致使没必要要的IO,也让task调度器的压力剧增。那么调度器会致使全部task都变慢。这一系列问题在query plan复杂的时候变得尤其突出,还可能会影响到其余性能,最后耗时耗力却调优失败。

对于这个问题的解决,AQE就有优点了。以下图所示,AQE能够在运行期动态的调整partition来达到性能最优。

此外,数据分布不均是Spark调优的一个疑难杂症,它的表现有多种,好比若干task停滞不前,像是出现了bugs,又好比大量的disk spilling会致使不少节点都无事可作。此外,你也许会看到out of memory这种异常。其解决方法也不少,好比找到skew values而后重写query,或者在join的状况下增长skew keys来消除数据分布不均,可是不管哪一种方法,都很是浪费时间,且后期难以维护。AQE解决问题的方式以下,其经过shuffle落地后的中间数据结果判断哪些partition是skew的,若是partition过大,就将其分红若干较小的partition,经过分而治之,整体性能大幅提高。

AQE的发布能够说是一个时代的开始,将来将会更进一步发展,引入更多自适应规则,让Spark能够随着数据分布和特性的变化自动改变Query plan,让更多的query编译静态优化变成运行时的动态优化。

(二)Dynamic Partition Pruning

Dynamic Partition Pruning也是一个运行时的动态优化方法,简单来讲就是咱们能够经过Query的某些分支的中间结果来避免没必要要的partition读取,这种方法是没法经过编译期推测出来的,只能在运行时根据结果来判断,这种方法对数据仓库的star-schema效果很是明显,在TPC-DS得到了很是明显的加速,能够加速2-18倍。

(三)Join Hints

Join Hints是一个很是广泛的数据库的优化策略,在3.0以前已经有了Broadcast hash join,3.0以后的版本加了Sort-merge join、Shuffle hash join和 Shuffle nested loop join,可是要注意谨慎使用,由于数据的特性不一样,很难保证一直有效,即便有效,也不表明一直有效,随着时间的变化,你的数据变了,可能会让你的query 变慢,变得不稳定。整体来讲上面的四种Join的适用条件和特色以下所示,总而言之,使用Join Hints要谨慎。

2、Richer APIs

Spark3.0简化了开发,不但增长了更多的新功能,也改善了众多现有的功能,让更多的用法成为可能,主要有:

  • Accelerator-aware Scheduler

  • Built-in Functions

  • pandas UDF enhancements

  • DELETE/UPDATE/MERGE in Catalyst

(一)pandas UDF enhancements

pandas UDF应该说是PySPark用户中最喜好的特性之一,对于其功能和性能的提高应该都是喜闻乐见的,其发展历程以下图所示。

最新的pandas UDF和以前的不一样之处在于引入了Python Type Hints,如今用户可使用pandas中的数据类型好比pandas.Series等来表示pandas UDF的种类,再也不须要记住原来的UDF类型,只须要指定正确的输入和输出类型便可。此外,pandas UDF能够分为pandas UDF和pandas API。

(二)Accelerator-aware Scheduler

Accelerator-aware Scheduler是加速器的调度支持,狭义上也就是指GPU调度支持。加速器常常用来对特定负载作加速,目前,用户仍是须要指定什么应用须要加速器资源,可是在未来咱们会支持job或者stage级别的调度。Spark3.0中咱们已经支持大多调度器,此外,咱们还能够经过Web UI来监控GPU的使用,欢迎你们使用,更多详细资料你们能够到社区学习。

(三)Built-in Functions

为了让Spark3.0更方便实用,Spark社区按照其余的主流,好比数据库厂商等,内嵌了如上图所示的32个经常使用函数,这样用户就无须本身写UDF,而且速度更快。好比针对map类型,Spark3.0新增长了map_keys和map_values,更加地方便易用。其余新增长的更多内嵌函数你们能够到社区具体了解。

3、Monitoring and Debuggability

Spark3.0也增长了一些对监控和调优的改进,主要有:

  • Structured Streaming UI

  • DDL/DML Enhancements

  • Observable Metrics

  • Event Log Rollover

(一)Structured Streaming UI

Structured Streaming是在Spark2.0中发布的,在Spark3.0中加入了UI的配置。新的UI主要包括了两种统计信息:已完成的Streaming查询聚合信息和未完成的Streaming查询的当前信息,包括Input Rate、Process Rate、Batch Duration和Operate Duration。

(二)DDL/DML Enhancements

咱们还增长了各类DDL/DML命令,好比EXPLAIN和。
EXPLAIN是性能调优的必备工具,读取EXPLAIN是每一个用户的基本功,可是随着系统的运行,EXPLAIN的信息愈来愈多,并且信息多元、多样,在新的版本中咱们引入了新的FORMATTED模式,以下所示,在开头处有一个很是精简的树状图,且以后的每一个部分都有很详细的解释,更容易加更多的注意,这就从水平扩展变成了垂直扩展,更加的直观。

(三)Observable Metrics

咱们还引入了Observable Metrics用以观测数据的质量。要知道数据质量对于不少Spark应用都是至关重要的,一般定义数据质量的Metrics仍是很是容易的,好比用一些聚合参数,可是算出这个Metrics的值就很是麻烦,尤为对于流计算来讲。

4、SQL Compatibility

SQL兼容性也是Spark必不可提的话题,良好的兼容性更方便用户迁移到Spark平台,在Spark3.0中新增的主要功能有:

  • ANSI Store Assignment

  • Overflow Checking

  • Reserved Keywords in Parser

  • Proleptic Gregorian Calendar

也就是说,这个版本中咱们让insert遵照了ANSI Store Assignment,而且增长了运行时的overflow的检查,还提供了一个模式让SQL Parser来准确地遵照ANSI标准的保留字,还切换了Calendar,这样更加符合ANSI的SQL标准。好比说咱们想要插入两列数据,类型是int和string,若是将int插入到了string中,仍是能够的,不会发生数据精度的损失和数据丢失;可是若是咱们尝试将string类型插入到int类型中,就有可能发生数据损失甚至丢失。ANSI Store Assignment+Overflow Checking在输入不合法的时候就会在运行时抛出异常,须要注意的是这个设置默认是关闭的,能够根据我的须要打开。

5、Built-in Data Sources

在这个版本中咱们提高了预装的数据源,好比Parquet table,咱们能够对Nested Column作Column Pruning和Filter Pushdown,此外还支持了对CSV的Filter Pushdown,还引入了Binary Data Source来处理相似于二进制的图片文件。

6、Extensibility and Ecosystem

Spark3.0继续增强了对生态圈的建设:

  • 对Data Source V2 API的持续改善和catalog支持;

  • 支持Java 11;

  • 支持Hadoop 3;

  • 支持Hive 3。

(一)Data Source V2 API+Catalog Support

Spark3.0加上了对Catalog的支持来扩展Data Source API。Catalog plugin API可让用户注册本身的catalog来实现对元数据的处理,这样可让Spark用户更简单方便的使用数据源的表。对于没有实现Catalog plugin的数据源,用户须要先注册每一个外部数据源的表才能访问,可是实现了Catalog plugin API以后咱们只须要注册Catalog,而后就能够直接远程访问和操做catalog的表。对于数据源的开发者来讲,何时支Data Source V2 API呢?下面是几点建议:

不过这里须要注意,Data Source V2还不是很稳定,开发者可能在将来还须要调整相关API的实现。
大数据的发展至关迅速,Spark3.0为了能更方便的部署,咱们升级了对各个组件和环境版本的支持,可是要注意如下事项。

关于生态圈,这里要提一下Koalas,它是一个纯的Python库,用Spark实现了绝大部分的pandas API,让pandas用户除了能够处理小数据,也能够处理大数据。Koalas对于pandas用户来讲能够将pandas的代码扩展到大数据处理,使得学习PySpark变得更简单;对于现有的PySpark用户来讲,多了更多的选择,能够用pandas API来解决生产力问题。过去一年多,Koalas的下载量是惊人的,在pip的下载量单日已经超过了37000,并且还在不断增加,5月的下载量也达到了85万。Koalas的代码其实很少,主要是API的实现,执行仍是由Spark来作,因此Spark性能的提高对于Koalas用户来讲是直接受益的。Koalas的发布周期当频密,目前已经有33个发布,欢迎你们下载使用。

如何读和理解Spark UI对大多数新用户来讲是一个很大的挑战,尤为对SQL用户来讲,在Spark3.0中咱们增长了本身的UI文档https://spark.apache.org/docs/latest/web-ui.html
而且增长了SQL Reference ,https://spark.apache.org/docs/latest/sql-ref.html
等,更详细的文档使得用户上手Spark的时候更加容易,欢迎你们去试一试Spark3.0,感觉Spark的强大。


关键词:Spark3.0、SQL、PySpark、Koalas、pandas、UDF、AQE

相关内容推荐

自适应查询执行AQE:在运行时加速SparkSQL

Apache Spark 3.0中的SQL性能改进概览

Structured Streaming生产化实践及调优

Apache Spark 3.0对Prometheus监控的原生支持



阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,按期推送精彩案例,技术专家直播,问答区近万人Spark技术同窗在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!

对开源大数据和感兴趣的同窗能够加小编微信(下图二维码,备注“进群”)进入技术交流微信群。

Apache Spark技术交流社区公众号,微信扫一扫关注


本文分享自微信公众号 - Apache Spark技术交流社区(E-MapReduce_Spark)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索