MaxCompute不只仅在阿里集团内部被普遍的使用,也支撑着许多著名的互联网方面的厂商,以及关系到国计民生、国家安全方面的应用。
算法
一条SQL在分布式系统中的旅程
上图所示为运行SQL任务中的大概流程。首先使用一条SQL语句,经过Compile,能够生成一个逻辑的执行计划,这个逻辑的执行计划是计算机可以理解的,再通过Optimize过程,不管逻辑计划有多复杂,都要翻译成针对目前集群和运行时刻的Runtime最优的物理执行计划,对于每个Optimize不必定与原始的SQL相关。而后通过计算调度框架,使得合理快速的安排执行任务。由调度框架作的安排应用到每台机器以后,每台机器都会有一个SQL的运行时(Runtime Engine),它是真正可以理解物理执行计划的,而且一步一步把数据从Storage中读出来,再通过Shuffle获得结果,最后返回到Storage中。能够看出,运行时自己的性能是很是关键的,仅仅一条SQL语句有可能消耗几百T的data,这时,Storage的性能也是相当重要的。
SQL的功能shell
上图为SQL的一个脚本,上面是配置语句,下面是建立的表,每句都是SQL的语句,可是这些SQL语句均可以串在一个脚本里,当表述一个很是复杂的逻辑时,不须要把脚本写成嵌套的形式,这种方式更灵活,可以支撑更复杂的业务场景。阿里内部有很是复杂的业务场景,过去不支持这种方式时,用户是使用嵌套的方式,使脚本变得复杂和扭曲,而且有大量的重复,以至不能解决时,就会将其拆分,再经过外部调动的方式串连起来。由于人若是承受不了维护的代价,就要引用额外性能的开销,后面的语句就要引用前面的语句。无论脚本有多复杂,通过编译器以后,仍是一个单一完整的执行计划,并不会带来额外的开销。优化器看到的上下文越多,优化的机会越多,造成单一完整的执行计划以后,就能够以最高效的方式执行整个业务模式。此外,DataWorks也是支持这种模式的。经过脚本模式能够效仿C++或者Java来写SQL。安全
例如,在写C++或者Java时,常常在公共的逻辑中抽取出来一个函数,把公共的逻辑放到某一个模块里,这个过程视为代码的重用机制。可是,标准的SQL,尤为是大数据的SQL是缺少这种机制的。对于阿里这么复杂的场景,这种需求是很迫切的。底层的数据集提供了各个部门都须要的基础的数据,不一样的业务部门可能都要消耗这部分数据,消耗的方式是不一样的。这时,也想像C++或者Java那样抽取一个函数,在MaxCompute中是能够实现的。在MaxCompute里,上图中的红框除了做为普通的view,也能够封裝一些SQL复杂的逻辑和对数据的读取,能够把表的变量传入,这时就能够实现像C++或者Java中函数的功能,能够把SQL里公共的业务逻辑封装在一块儿,同时结合上文的脚本模式,参数化视图就能够组织很是复杂的SQL的业务逻辑用来支撑很是复杂的业务场景。性能优化
通常的大数据不支持IF/ELSE,但对于IF/ELSE是有需求的。例如,每周作一次全量的计算,但天天只作增量的计算,若是没有IF/ELSE的支持,就须要把脚本拆成两个,经过调度的框架串连起来。可是,在MaxCompute中结合脚本模式,能够直接的写入IF语句或SELECT语句,若是返回的是异常的结果,直接能够放在一个表达式里,决定执行SQL的分支。全部的SQL的功能都是针对复杂的应用场景的须要。服务器
普通的SQL都会有基本的数据类型,有时也有复杂类型,但都是属于给定范围的数据类型。当数据类型特别复杂时,在MaxCompute里能够直接使用。右侧框架是将Java和SQL无缝的融合在一块儿,无需UDF封装。左侧为SELECT TRANSFORM,是直接就能够在SQL里调用shell脚本,而且彻底兼容Hive。
SQL的性能网络
Adaptive Join包括Hash Join和Merge Join。Hash Join的性能是比较好的,但有时碰到不合适的场景时,特别是有很是多的Hash冲突时,性能就会变得不好。Merge Join的特色是可以提供一个性能的下限。能够经过动态的选择适合哪一种场景,以便作智能的选取。并发
Shuffle也有针对特定大规模系统的优化,包括提高Shuffle 70%的性能,提高大规模共享集群性能,提高稳定性,下降IO压力。具体包括如下优化方式:
一、Greysort模式(Mapper不排序,Reducer排序),增长与下游流水线机会;下游转化为HashJoin时消除排序
二、Encoding & Adaptive列式压缩,下降IO与Cache Miss
三、优化内存结构,下降Working Set Size并消除Pointer Chasingapp
打造企业级分布式调度执行系统
整个系统的发展有两个维度,一个维度是系统的规模,随着系统规模的不断成长,对于分布式调度执行系统要面对天天千万级须要解决的问题,在阿里这个大致量的数据下,单个分布式做业规模已经能达到数十万个计算节点,已经有上百亿链接和运行数万台的物理机。
另外一个维度是系统的成熟度,一个系统成为企业级的分布式执行调度系统就须要达到成熟度,包括三个阶段,第一个阶段是可用性(正确性),一个做业在单机系统上执行的结果和分布式系统上执行的结果是不同的,尤为是在系统的超大规模上,在面对系统各类各样的节点失败问题、网络层的失败问题和各类容灾问题时,怎样经过正确的方式能保证做业正确的产出是很重要的。第二阶段是够用,是指每个计算的系统都要锻造本身的性能,能在各类各样的benchmark上标准结果,经过此方法来提高性能。第三个阶段是好用(智能化),是指在动态执行过程当中拥有动态能力和自适应能力,能够根据做业的不一样特色来调整做业执行的计划。
企业级分布式计算调度框架
企业级分布式计算调度框架分为三个阶段:框架
上图所示为阿里的一个做业在离开优化器之后,在分布式系统里执行的过程。能够理解为从逻辑图到物理图映射的过程。
上图所示为三个阶段的做业,第一个阶段是做业提交开始运行,第二个阶段是根据实际产出动态调整并发,第三个阶段是产生所需数据提早结束做业。
上图所示为智能化DAG执行的动态逻辑图,包括Sorted Merge Join和Broadcast Join两种算法。其中Sorted Merge Join的特色包括经典分布式join算法,可支持大规模做业,可用范围广(slow but reliable),代价较昂贵 (full shuffle + sort),且shuffle可能带来数据倾斜。Broadcast Join的特色是只适用特定类型做业 (一路输入可载入单计算节点内存),非适用场景上可能致使OOM,做业失败。
对动态的选择执行计划,在理想状况下都但愿数据的分布是均匀的,而且能够理解数据的特性,因此优化器均可以作出“最佳”的计划,尤为是在作benchmark时,可是因为源数据统计不许确 、中间数据特性波动 ,所产生数据的特色是没有办法提早预估的,因此容许优化器来给一个非肯定的执行计划(Conditional Join),这时,优化器会给出两个执行路径的计划,调度执行框架能够根据上游实际产生的数据量,动态的调整逻辑图的执行。
上图所示为并发度的例子。简单的并发调整是根据上游总数据量直接取平均做为并发,仅支持向下调整,但问题是数据多是倾斜的,这种方法已再也不适用。下面给出两种新的调度方法:
一、依据分区数据统计调整:避免并发调整加剧数据倾斜,可向上向下调整。
二、分区统计基础上,自动切分大分区:双重调整,消除分区内的数据倾斜,并支持数据处理归并,以保留分区特性。机器学习
对于阿里如此大规模的做业,调度的敏捷度是十分重要的,由于集群规模很大,一个做业怎样理解各个计算节点和物理机的状态,作智能的容错和预判性的容错是阿里所作的一项工做。随着做业规模愈来愈大,一个很是优秀的调度框架能带来的性能提高会愈来愈明显。
阿里整个计算平台做为飞天的底座,不只仅运行SQL,也有可能运行其余。最经典的SQL是batch执行。离线和一体式的执行是资源利用率和性能优化的两个极端,做为一个用户,会同时关注执行性能和资源利用率,须要思考的问题是,怎样在两个点中达到平衡。所以,阿里也支持一种称为bubble的调度,所谓bubble调度是容许一个做业的子图同时调度,下游的子图分布调度,在不一样的SQL上会有不一样的效果。例如。在TPCH11的状况下,相对于离线(batch)会有66%的性能提高,相对于一体式(all-in-one)会节省3倍的资源,同时获取95%的性能。
在AliOrc的里程中,起点和终点都是在存储层,数据的读和写是AliOrc执行的开始和结束,存储引擎做为AliOrc的底座,承担着一个很是重要的做用。
基于Apache Orc的深度优化
整个计算引擎是基于列结构的,技术的出发点是Apache Orc。在此基础上,阿里作了不少深度的优化,包括I/O维度、内存优化、索引和数据编码压缩。其中有一部分已经贡献到了社区。
新一代列式存储引擎
新一代列式存储引擎包括如下技术方面:
对于有一系列的大数和小数,直接存放时会产生4个字节,而对于小数,前面会产生不少的零,这些零是没有意义的。并行化编码技术的主要思想就是将冗余的信息删掉,将真正有意义的batch留下,而且pack到一块儿。这种编码方式的好处是能实现并行化。此外,还进行了一些扩展,包括对有序数据的优化,以及对数据的编码优化。同时从新设计了编码存储格式,更利于内存对齐,以及列存储。
从测试的结果来看,此编码技术比传统游程编码速度快4到6倍,压缩率提高大概10%左右,在反应到TPC Benchmark表扫描效率提高24%。之因此有如此快的结果,是由于使用AVX256一条指令能够处理8个64位数,或者16个32位数,同时充分利用函数模板展开,最大程度避免循环和分支预测失败。
阿里是属于列存储引擎的,是指在同一个列是放在一块儿的,好处是在读数据时选择几个列放到存储引擎中去读,就不须要读全部的列。假设在上图中的场景中,有三个列为A、B、C。最先的IO模型是串行的,存在许多等待时间。所以,阿里作了一个改进为Prefetch模型,IO是不须要一个一个发出去的,在一开始时能够将三个读取引擎一块儿发出去,可是须要一个一个的等待它们回来,虽然有了一些提高,可是还仍然存在IO等待的时间。目前为止,改进的模型为Prefetch+Async Paraller IO,是将IO所有并行化,将三个一块儿发出去以后,并不须要按照原来A、B、C的顺序等待,能够按照回来的顺序作解压和解码。这样作能够对IO等待的时间降到最小。
如上图所示,异步并行IO与同步读取相比较,IO等待时间减小97%,端到端时间减小45%。
如上图所示为延迟读取的一个例子,经过只读取DEPT列,把ADDRESS以及SALARY列延迟到过滤以后读取,能够大幅减小了没必要要的数据读取。
对于字符串类型的列,有一种方法叫字典编码,是指将字符串里不同的Key找出而且给予ID,这时,数据在存放时是不须要存放整个字符串的,只须要存放ID就能够。可是使用此方法是很耗时的。由此,作了如下改进:
使用延迟解码,跳过解码步骤,直接在字典上匹配,再以ID到数据列搜索。好处是减小了字符串匹配次数以及减小了字典解码时间。
如上图所示,对打开延迟读写和没有打开延迟读写作了比较,横坐标为filter过滤的数据,“1”表示没有过滤,纵轴是花费的时间,实现延迟读取以后,读取数据量随Selectivity的提高而减小,读取时间也相应大幅下降。
本文为云栖社区原创内容,未经容许不得转载。