EMR 打造高效云原生数据分析引擎

本场视频连接:EMR打造高效云原生数据分析引擎算法

本场ppt材料:https://www.slidestalk.com/AliSpark/2019___0926_110365sql


基于开源体系打造云上数据分析平台

客户选择开源方案的缘由主要有如下几点:

• 灵活多样的业务场景:目前即使是一个小企业,其数据存储也多是多种多样的,好比业务数据、日志数据和图数据等,这种状况下,须要有一个高度定制化的系统来串联不一样的业务场景;
• 本身有专业的运维能力:开源系统有充足的人才储备,丰富的网上资料与开源的强大后盾,能够确保公司业务的顺利开展;
• 多种业务需求vs成本压力:每种云上产品有本身的使用场景,对于中小企业来讲购买多种云产品将会形成很大的成本压力,而经过开源体系维护一套系统,在集成用户业务中所需组件的同时,能够下降用户的成本。
image.png数据库

下图是阿里巴巴EMR系统的产品架构图。用户上云的方式主要有两种,一种是购买ECS资源本身搭建一套开源系统;另外一种是直接选择阿里巴巴的EMR系统。第一种方式因为开源系统组件多,涉及到了Spark、Hive、Flink和TensorFlow等,从零搭建一套完整的大数据系统对于用户来说很是复杂,尤为是成百上千的集群规模也给运维形成了很大的挑战。而使用EMR系统具备如下优势:
1) 阿里云EMR系统能够帮助用户一键化自动部署、配置相关组件,开箱即用,同时还会根据用户的机器类型进行参数的自动推荐调优。
2) 阿里云EMR系统与阿里云其余产品实现了打通,好比数据存放在OSS,EMR系统无需额外再作认证配置,即可以很方便地读取OSS上的数据;
3) 阿里云EMR系统集成了不少自研插件,这些插件在其余产品中是没有的;
4) 阿里云EMR系统的全部组件兼容开源但优于开源,如Flink集成了阿里云自研的Blink和TensorFlow(PAI),这也是阿里云为社区作的一点贡献,目的是为了让用户能用到阿里云内部的技术;
5) 阿里云EMR系统提供了全平台的做业诊断与告警组件APM来实现自动化运维,大大下降集群运维的复杂性;
6) 阿里云EMR系统还与DataWorks对接,用户能够以DataWorks为入口,傻瓜式地使用EMR系统。apache

image.png

EMR系统的目标主要有如下三个:

• 平台化:将EMR作成一个统一的云上数据分析平台,帮助用户打造全栈式的大数据解决方案,支持全系列VM容器化,提供企业级HAS和大数据APM;
• 技术社区&深度:持续深耕技术社区,打造大数据友好的云 Native 存储,同时将技术回馈给社区,为社区作贡献;
• 生态:EMR系统将结合阿里云其余产品构建一个生态,接入Blink、PAI,集成OSS、OTS方案。缓存

image.png

EMR-Jindo:云原生高效数据分析引擎

下图展现了TPC-DS的基准测试报告,能够发如今2019年3月份10TB的测试中,性能指标得分是182万左右,成本是0.31 USD;而2019年十月份一样的测试性能指标得分已经变成526万,成本降低到0.53 CNY,也就是说通过半年左右性能提高了2.9倍,成本缩减到原来的四分之一。同时阿里巴巴还成为了首个提交TPC-DS测试100TB测试报告的厂商。这些成绩的背后是EMR-Jindo引擎的支持。服务器

image.png

EMR-Jindo引擎架构主要分为两部分:

• Jindo-Spark:EMR内部全面优化的Spark高效计算引擎,能够处理多种计算任务;
• Jindo-FS:自研的云原生存储引擎,兼容开源HDFS的接口,兼顾性能与价格。网络

image.png

1) Jindo-Spark

Jindo-Spark高效计算引擎对Spark采起了一系列优化措施,好比Runtime Filter支持自适应的运行时数据裁剪;Enhanced Join Reorder来解决外链接重排等问题;TopK支持推理并下推 TopK 逻辑,帮助尽早地过滤数据;File Index支持文件级别过滤和min/max/bloom/倒排等;自研开发了Relational Cache,实现使用一套引擎就能够将查询从分钟级提高为亚秒级;针对特定的场景推出Spark Transaction功能,为Spark引入Full ACID支持;实现了Smart Shuffle功能,从底层来减小sort-merge 次数,提高Shuffle的效率。session

image.png

• Runtime Filter
相似于Spark中的Dynamic Partition Pruning(DPP),可是其比DPP功能更强大。除了DPP能处理的分析表以外,Runtime Filter还能够处理非分析表。其基本原理是运行时动态裁剪数据,避免没必要要的计算。好比,面对一个join查询,没法经过value下推到存储层而将数据过滤,逻辑推算的时候没法预知最后的数据量级。这种状况下若是是分析表,Runtime Filter首先会估计其中一个表中参与join操做的数据量,若是数据量较小,则提早进行数据筛选,再推送到另外一侧作数据过滤;而对于非分析表,会引入Filter,如BloomFilter得到Min或Max的统计信息,架构

image.png

根据这些统计信息,将备选数据比较少的一侧提取出来,推到另外一侧进行过滤。Runtime Filter的成本很小,只须要在优化器中进行简单评估,却能够带来显著的性能提高。以下图所示,Runtime Filter实现了35%左右的总体性能提高。该特性已经在Spark提交了PR(SPARK-27227)。
• Enhanced Join Recorder
你们都知道,算子执行顺序可能会极大地影响sql的执行效率,这种状况下优化的核心原则是改变算子的执行顺序,尽早地过滤数据。运维

好比下图左上角的例子中,若是最底层两个表很是大的话,则这两张表join的开销会很是大,join后的大数据再去join小表,大数据一层一层地传递下去,就会影响整个流程的执行效率。此时,优化的思想是先将大表中一些无关的数据过滤掉,减小往下游传递的数据量。针对该问题,Spark使用的是动态规划算法,但其只适用于表的数量比较少的状况,若是表的数量大于12,该算法就一筹莫展。面对表的数量比较多的状况,EMR提供了多表join的遗传算法,其能够将原来的动态规划算法的2n的复杂度降到线性的量级,能完成成百上千张表的join。

下图右上角能够看到,Query64有18个表参与join,动态规划算法优化时间就须要耗费1400秒,而多表join的遗传算法仅须要20秒左右就可完成。Join Recorder另一个重要的功能是外链接重排算法,你们都知道sql中外链接不能随意交换顺序的,但这并不表明不能交换顺序,好比A left join B, 而后再left join C,事实上在某种条件下其顺序是可交换的。在Spark中,外链接的优化是直接被放弃掉,而EMR则根据现有研究找到了顺序可交换的充分必要条件,实现了外链接重排算法(以下图左下角所示),对外链接的执行效率有了质的提高(下图右下角)
image.png

• Relational Cache
Spark本来的Cache存在几个局限点,其一Spark的Cache是session级别,若是发现某一个Query的片断使用比较频繁,就会对为这个session建立一个cache,可是session结束后,cache就会消失;其二Spark的Cache是存储在本机上,而不是分布式存储,所以没法作到通用。在此基础上,EMR平台实现了Relational Cache,对任意Spark表,视图或者Dataset等关系型数据抽象的数据实体都建立cache, 相似于物化视图(Materialized View),可是比物化视图功能要丰富。Relational Cache的使用场景包括a)亚秒级响应MOLAP引擎;b)交互式BI,Dashboard;c)数据同步;d)数据预组织。

image.png

Relational Cache的建立过程以下,其语法与Spark sql常见的DDL相似。首先CACHE一个表或视图,而后指定Relational Cache的更新策略(DEMAND或COMMIT)、是否用于后续优化、Cache数据的存储方式以及Cache的视图逻辑。Relational Cache支持cache任意Table、View,支持cache到内存、HDFS、OSS等任意数据源,JSON、ORC、Parquet等任意数据格式。

image.png

Relational Cache还支持对用户输入的sql的优化。原来的Spark sql Cache对于用户输入的sql优化很是僵硬死板,用户输入的sql必须精确匹配上Cache才能使用。而Relational Cache则彻底不一样,若是有a、b、c、d四个表join的cache,当又有a、b、e三个表join的状况下,a、b join的结果即可以从四个表join时生成的Cache数据中读取。下图中右侧展现了Cache和没有Cache的基准测试结果,能够看出Relational Cache能够保证测试的响应时间在亚秒级。
请参考 Spark Relational Cache实现亚秒级响应的交互式分析

image.png

• Spark Transaction:有些用户可能会使用Hive表,Hive表有事务支持,然而Spark在事务这一项上是不兼容Hive的。所以,为了知足用户数据订正/删除以及数据流导入的场景支持,EMR平台提供了Spark Transaction支持事务的ACID支持。

传统的数据导入是分批的,好比一天一导入,而流数据导入场景下数据是实时写入的原始数据,并未通过任何处理,所以会有delete和update的需求。Spark Transaction总体来说是一种锁+MVCC的实现形式,MVCC与底层的存储密不可分。大数据在Hive和Spark兼容的状况下,都是文件的形式存在目录中,文件的版本经过行来控制,写入的每一行都会加上Meta Columns,如op、original_write-id、bucket id和row_id等,来标识这是全表惟一的一行。当须要更新某一行的时候,并不会原地更新该行,而是将该行取出来,重写后产生新的版本进行存储。读取的时候,多版本会进行合并后返回给用户。

image.png

###2) Jindo-FS
EMR早期推出了一种本地盘机型,使用这种机型来部署集群相似于用本地集群在云下部署大数据发行版,价格较高;此外因为当时HDFS有元数据瓶颈,本地存储的动态化伸缩面临很大的挑战。针对这方面的问题,解决的方案是计算与存储分离,将数据存储在OSS上,可是这种分离带来的直接结果就是性能变差,由于OSS元数据操做耗时,读取数据跨网络,传输带宽也会严重影响性能。

image.png

进而的解决方案是将数据从远端拉取到计算侧进行缓存,这也是Jindo-FS作的事情。Jindo-FS是相似于HDFS的系统,其架构也相似于HDFS的Master-Slave架构,分红Name Service 和Storage Service。它支持将某些访问频率比较高的表能够放到RocksDB中进行多级缓存。Jindo-FS总体不一样于HDFS的Master结点, Jindo-FS的“Master”(Name Service)是一个分布式集群,使用raft 协议,提供入口服务;提供多Name Space支持;元数据以kv形式存放于高性能kv store 中;由于其自己不存储数据,真实数据在OSS和OTS中,所以支持数据的弹性扩展和销毁重建。

image.png

Jindo-FS底层的元数据管理会将数据拆成一系列的kv,经过递增的id来逐层查询。如/home/Hadoop/file1.txt须要读三次OTS。下图右侧的测试结果说明Jindo-FS在元数据操做方面相对于OSS有较好的性能提高。

image.png

Jindo-FS使用Storage Service来进行底层存储,在写流程中Storage Service将要写的文件同时存储到本地和OSS中,而后再返回给用户写的结果,同时还会在集群结点内进行多副本传输;而读操做和HDFS相似,若是命中本地,则在本地存储中读取,不然要进行远程读取。Storage Service具有高性能、高可靠、高可用、弹性存储等特性,为了支持高性能,Jindo-FS创建了数据流高速通道,同时还有一系列的策略,如减小内存拷贝次数等。

image.png

Jindo-FS中Name Service如何实现高可靠、如何进行热点数据发现与缓存替换、块存储模式与缓存模式;以及Storage Service如何应对读写失败、数据块如何设计并存储、如何实现高速数据通道等问题,请参见大数据生态专场《云上大数据的高效能数据库的存储方案》的分享。

image.png

相关文章:
JindoFS概述:云原生的大数据计算存储分离方案

JindoFS解析 - 云上大数据高性能数据湖存储方案

 

双11福利来了!先来康康#怎么买云服务器最便宜# [并不简单]参团购买指定配置云服务器仅86元/年,开团拉新享三重礼:1111红包+瓜分百万现金+31%返现,爆款必买清单,还有iPhone 11 Pro、卫衣、T恤等你来抽,立刻来试试手气👉 https://www.aliyun.com/1111/2019/home?utm_content=g_1000083110

 

阅读原文

本文为云栖社区原创内容,未经容许不得转载。

相关文章
相关标签/搜索