一个近期由Hudi PMC & Uber Senior Engineering Manager Nishith Agarwal分享的Talk数据库
关于Nishith Agarwal更详细的介绍,主要从事数据方面的工做,包括摄取标准化,数据湖原语等。网络
什么是数据湖?数据湖是一个集中式的存储,容许以任意规模存储结构化和非结构化数据。你能够存储原始数据,而不须要先转化为结构化的数据,基于数据湖之上能够运行多种类型的分析,如dashboard、大数据处理的可视化、实时分析、机器学习等。架构
接着看看对于构建PB级数据湖有哪些关键的要求并发
第一个要求:增量摄取(CDC)机器学习
企业中高价值的数据每每存储在OLTP中,例以下图中,users表包含用户ID,国家/地区,修改时间和其余详细信息,但OLTP系统并未针对大批量分析进行优化,所以可能须要引入数据湖。同时一些企业采用备份在线数据库的方式,并将其存储到数据湖中的方法来摄取数据,但这种方式没法扩展,同时它给上游数据库增长了沉重的负担,也致使数据重写的浪费,所以须要一种增量摄取数据的方法。异步
第二个要求:Log Event去重工具
考虑分析大规模时间序列数据的场景,这些事件被写入数据管道,而且数量很是大,可达数十亿,每秒可达百万的量。但流中可能有重复项,多是因为至少一次(atleast-once)保证,数据管道或客户端失败重试处理等发送了重复的事件,若是不对日志流进行重复处理,则对这些数据集进行的分析会有正确性问题。下图是一个示例日志事件流,其中事件ID为惟一键,带有事件时间和其余有效负载。oop
第三个要求:存储管理(自动管理DFS上文件)性能
咱们已经了解了如何摄取数据,那么如何管理数据的存储以扩展整个生态系统呢?其中小文件是个大问题,它们会致使查询引擎的开销并增长文件系统元数据的压力。而若是写入较大的文件,则可能致使摄取延迟增长。一种常见的策略是先摄取小文件,而后再进行合并,这种方法没有标准,而且在某些状况下是非原子行为,会致使一致性问题。不管如何,当咱们写小文件而且在合并这些文件以前,查询性能都会受到影响。学习
第四个要求:事务写(ACID能力)
传统数据湖在数据写入时的事务性方面作得不太好,但随着愈来愈多的业务关键处理流程移至数据湖,状况也在发生变化,咱们须要一种机制来原子地发布一批数据,即仅保存有效数据,部分失败必须回滚而不会损坏已有数据集。同时查询的结果必须是可重复的,查询端看不到任何部分提取的数据,任何提交的数据都必须可靠地写入。Hudi提供了强大的ACID能力。
第五个要求:更快地派生/ETL数据(增量处理)
仅仅能快速摄取数据还不够,咱们还须要具备计算派生数据的能力,没有这个能力,数据工程师一般会绕过原始表来构建其派生/ETL并最终破坏整个体系结构。下面示例中,咱们看到原始付款表(货币未标准化)和发生货币转换的派生表。
扩展此类数据管道时颇有挑战,如仅对变动进行计算,或者基于窗口的Join的挑战。对基础数据集进行大规模从新处理不太可能,这会浪费计算资源。须要在数据湖上进行抽象以支持对上游表中已更改的行(数据)进行智能计算。
第六个要求:法律合规/数据删除(更新&删除)
近年来随着新的数据保护法规生效,对数据保留有了严格的规定,须要删除原始记录,修复数据的正确性等,当须要在PB级数据湖中高效执行合规性时很是困难,如同大海捞针通常,须要高效的删除,如进行索引,对扫描进行优化,将删除记录有效地传播到下游表的机制。
要求回顾(汇总)
有没有能知足上面全部需求的系统呢?接下来咱们引入Apache Hudi,HUDI表明Hadoop Upserts Deletes and Incrementals。从高层次讲,HUDI容许消费数据库和kafa事件中的变动事件,也能够增量消费其余HUDI数据集中的变动事件,并将其提取到存储在Hadoop兼容,如HDFS和云存储中。在读取方面,它提供3种不一样的视图:增量视图,快照视图和实时视图。
HUDI支持2种存储格式:“写时复制”和“读时合并”。
首先来看看写时复制。以下图所示,HUDI管理了数据集,并尝试将一批数据写入数据湖,HUDI维护称为“提交时间轴(commit timeline)”的内容,以跟踪HUDI管理的数据集上发生的操做/更改,它在提交时间轴上标记了一个“inflight”文件,表示操做已开始,HUDI会写2个parquet文件,而后将“inflight”文件标记为已完成,这从原子上使该新数据写入HUDI管理的数据集中,并可用于查询。正如咱们提到的,RO视图优化查询性能,并提供parquet的基本原始列存性能,无需增长任何额外成本。
如今假设须要更新另外一批数据,HUDI在提交时间轴上标记了一个“inflight”文件,并开始合并这些更新并重写Parquet File1。此时,因为提交仍在进行中,所以用户看不到正在写入任何这些更新(这就是咱们称为“快照隔离”)。最终以原子方式发布提交后,就能够查询版本为C2的新合并的parquet文件。
COW已经在Uber投入运行多年,大多数数据集都位于COW存储类型上。
尽管COW服务于咱们的大多数用例,但仍有一些因素值得咱们关注。以Uber的行程表为例,能够想象这多是一个很大的表,它在旅程的整个生命周期中获取大量更新。每隔30分钟,咱们就会得到一组新旅行以及对旧旅行的一些更新,在Hive上的旅行数据是按天划分分区的,所以新旅行最终会在最新分区中写入新文件,而某些更新会在旧分区中写入文件。使用COW,咱们只能重写那些更新所涉及的文件,而且可以高效地更新。因为COW最终会重写某些文件,所以能够像合并和重写该数据同样快。在该用例中一般大于15分钟。再来看另一种状况,因为某些业务用例(例如GDPR),必须更新大量历史行程,这些更新涉及过去几个月数据,从而致使很高的写入延迟,并一遍又一遍地重写大量数据,写放大也会致使大量的IO。若为工做负载分配的资源不足,可能就会严重损害摄取延迟。
在真实场景中,会将ETL连接在一块儿来构建数据管道,问题会变得更加复杂。
对问题进行总结以下:在COW中,太多的更新(尤为是杂乱的跨分区/文件)会严重影响提取延迟(因为做业运行时间较长且没法追遇上入流量),同时还会引发巨大的写放大,从而影响HDFS(相同文件的48个版本+过多的IO)。合并更新和重写parquet文件会限制咱们的数据的新鲜度,由于完成此类工做须要时间 = (重写parquet文件所花费的时间*parquet文件的数量)/(并行性)。
在COW中,咱们实际上并无太大的parquet文件,由于即便只有一行更新也可能要重写整个文件,由于Hudi会选择写入小于预期大小的文件。
MergeOnRead将全部这些更新分组到一个文件中,而后在稍后的时刻建立一个新版本。对于重更新的表,重写大文件会致使开销变大。
如何解决上述写放大问题呢?除了将更新合并并重写parquet文件以外,咱们将更新写入增量文件中,这能够帮助咱们下降摄取延迟并得到更好的新鲜度。
将更新写入增量文件将须要在读取端作额外的工做以便可以读取增量文件中记录,这意味着咱们须要构建更智能,更智能的读取端。
首先来看看写时复制。以下图所示,HUDI管理了数据集,并尝试将一批数据写入数据湖,HUDI维护称为“提交时间轴(commit timeline)”的内容,以跟踪HUDI管理的数据集上发生的操做/更改,它在提交时间轴上标记了一个“inflight”文件,表示操做已开始,HUDI会写2个parquet文件,而后将“inflight”文件标记为已完成,这从原子上使该新数据写入HUDI管理的数据集中,并可用于查询。正如咱们提到的,RO视图优化查询性能,并提供parquet的基本原始列存性能,无需增长任何额外成本。
如今须要进行第二次更新,与合并和重写新的parquet文件(如在COW中同样)不一样,这些更新被写到与基础parquet文件对应的增量文件中。RO视图继续查询parquet文件(过期的数据),而RealTime View(Snapshot query)会合并了parquet中的数据和增量文件中的更新,以提供最新数据的视图。能够看到,MOR是在查询执行时间与较低摄取延迟之间的一个权衡。
那么,为何咱们要异步运行压缩?咱们实现了MERGE_ON_READ来提升数据摄取速度,咱们但愿尽快摄取较新的数据。而合并更新和建立列式文件是Hudi数据摄取的主要耗时部分。
所以咱们引入了异步Compaction步骤,该步骤能够与数据摄取同时运行,减小数据摄取延迟。
Hudi将事务引入到了大规模数据处理中,实际上,咱们是最先这样作的系统之一,最近,它已经过其余项目的相似方法得到了社区承认。
Hudi支持多行多分区的原子性提交,Hudi维护一个特殊的文件夹.hoodie,在该文件夹中记录以单调递增的时间戳表示的操做,Hudi使用此文件夹以原子方式公开已提交的操做;发生的部分故障会透明地回滚,而且不会影响读者和后面的写入;Hudi使用MVCC模型将读取与并发摄取和压缩隔离开来;Hudi提交协议和DFS存储保证了数据的持久写入。
下面介绍Hudi在Uber的使用状况
Hudi管理了超过150PB数据湖,超过10000张表,天天摄入5000亿条记录。
接着看看Hudi如何替代分析架构。利用Hudi的upsert原语,能够在摄取到数据湖中时实现<5分钟的新鲜度,而且能继续得到列式数据的原始性能(parquet格式),同时使用Hudi还能够得到实时视图,以5-10分钟的延迟提供dashboard,此外HUDI支持的增量视图有助于长尾效应对数据集的突变。
为方便用户能快速使用Hudi,Hudi提供了一些开箱即用的工具,如HoodieDeltaStreamer,在Uber内部,HoodieDeltaStreamer用来对全球网络进行近实时分析,可用来消费DFS/Kafka中的数据。
除了DeltaStreamer,Hudi还集成了Spark Datasource,也提供了开箱即用的能力,基于Spark,能够快速构建ETL管道,同时也可无缝使用Hudi + PySpark。
接着介绍更高级的原语和特性。
如何从损坏的数据中恢复?例如线上因为bug致使写入了不正确的数据,或者上游系统将某一列的值标记为null,Hudi也能够很好的处理上述场景,能够将表恢复到最近的一次正确时间,如Hudi提供的savepoint就能够将不一样的commit保存起来,用于后续恢复,注意MoR表暂时不支持savepoint;Hudi还提供了文件的版本号,便可以保存多个版本的文件,这对于CoW和MoR表都适用,可是会占用一些存储空间。
Hudi还提供便于增量ETL的高级特性,经过Spark/Spark即可以轻松增量拉取Hudi表的变动。
除了增量拉取,Hudi也提供了时间旅行特性,一样经过Spark/Hive即可以轻松查询指定版本的数据,其中对于Hive查询中指定hoodie.table_name.consume.end.timestamp
也立刻会获得支持。
下面看看对于线上的Hudi Spark做业如何调优。
下面列举了几个调优手段,设置Kryo序列化器,使用Shuffle Service,利用开源的profiler来进行内存调优,固然Hudi也提供了Hudi生产环境的调优配置,可参考【调优 | Apache Hudi应用调优指南】
下面介绍社区正在进行的工做,敬请期待。
即将发布的0.6.0版本,将企业中存量的parquet表高效导入Hudi中,与传统经过Spark读取Parquet表而后再写入Hudi方案相比,占用的资源和耗时都将大幅下降。以及对于查询计划的O(1)时间复杂度的处理,新增列索引及统一元数据管理以消除对DFS的文件list操做。
还有一些值得关注的特性,好比支持行级别的索引,该功能将极大下降upsert的延迟;异步数据clustering以优化存储和查询性能;支持Presto对MoR表的快照查询;Hudi集成Flink,经过Flink可将数据写入Hudi数据湖。
整个分享就介绍到这里,欢迎观看。