Apache Hudi与Delta Lake对比

1. 引入

在类Hadoop系统上支持ACID有了更大的吸引力,其中Databricks的Delta Lake和Uber开源的Hudi也成为了主要贡献者和竞争对手。二者都经过在“parquet”文件格式中提供不一样的抽象以解决主要问题;很难选择一个比另外一个更好。此博客将使用一个很是基本的示例来了解这些工具的工做原理,并让读者来比较二者的优缺点。html

咱们将使用与本系列下一篇文章中相反的方法,后面咱们将讨论Hadoop上Data Lake的重要性,以及为何会出现对诸如Delta/Hudi之类的系统的需求,以及数据工程师在过去如何为Lakes孤立地构建易错的ACID系统。web

2. 初始化

2.1 环境

源数据库:AWS RDS MySQL
CDC工具:AWS DMS
Hudi:AWS EMR 5.29.0
Delta:Databricks运行时6.1
对象/文件存储:AWS S3sql

上面的工具集主要用于演示;也可使用如下工具替代shell

源数据库:任何传统/基于云的RDBMS
CDC工具:Attunity,Oracle Golden Gate,Debezium,Fivetran,自定义Binlog解析器
Hudi:开源/企业Hadoop上的Apache Hudi
Delta:开源/企业Hadoop上的Delta Lake
对象/文件存储:ADLS / HDFS数据库

2.2 数据准备步骤

create database demo;
use demo;
create table hudi_delta_test
(
pk_id integer,
name varchar(255),
value integer,
updated_at timestamp default now() on update now(),
created_at timestamp default now(),
constraint pk primary key(pk_id)
);
insert into hudi_delta_test(pk_id,name,valuevalues(1,’apple’,10);
insert into hudi_delta_test(pk_id,name,valuevalues(2,’samsung’,20);
insert into hudi_delta_test(pk_id,name,valuevalues(3,’dell’,30);
insert into hudi_delta_test(pk_id,name,valuevalues(4,’motorola’,40);

如今使用DMS将数据加载到S3中的某个位置,并使用文件夹名称full_load来标识该位置。为了更贴合标题,咱们将跳过DMS的设置和配置。加载到S3后以下图所示。
apache

接着在MySQL表中执行一些插入/更新/删除操做
insert into hudi_delta_test(pk_id,name,valuevalues(4,’motorola’,40);
update hudi_delta_test set value = 201 where pk_id=2;
delete from hudi_delta_test where pk_id=3;

继续略过DMS阶段,将CDC数据按如下方式加载到S3,以下图所示微信

意:DMS将填充一个名为“ Op”的附加字段,表示“操做”,Op取值I/U/D,分别对应插入、更新和删除。如下图所示显示了CDC数据的内容。app

df = spark.read.parquet('s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test')
df.show()

完成了数据准备后正式开始比对。DMS将持续将CDC事件传送到S3(供Hudi和Delta Lake使用),此S3为数据源。两种工具的最终状态都旨在得到一致的统一视图,如上图MySQL所示。
框架

3. 使用Apache HUDI

Hudi有两种方式处理UPSERTS [1]工具

  • 写时复制(CoW):数据以列格式(Parquet)存储,而且在更新时会建立文件的新版本。此存储类型最适合于读繁重的工做负载,由于数据集的最新版本始终在有效的列格式文件中可用。

  • 读时合并(MoR):数据以列(Parquet)和基于行(Avro)的格式存储;更新记录到基于行的“增量文件”中,并在之后进行压缩,以建立列文件的新版本。此存储类型最适合于写繁重的工做负载,由于新提交会以增量文件的形式快速写入,可是读取数据集须要合并列文件与增量文件。

3.1 启动Spark Shell

使用如下命令打开Spark Shell并进行相关导入.

spark-shell — conf “spark.serializer=org.apache.spark.serializer.KryoSerializer” — conf “spark.sql.hive.convertMetastoreParquet=false” — jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._ 
import org.apache.hudi.DataSourceWriteOptions 
import org.apache.hudi.config.HoodieWriteConfig 
import org.apache.hudi.hive.MultiPartKeysValueExtractor

3.2 使用CoW

val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test”
val hudiTableName = “hudi_cow”
val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_cow”
val hudiOptions = Map[String,String]
 (
 DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”,
 DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”, 
 HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
 DataSourceWriteOptions.OPERATION_OPT_KEY ->
 DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
 DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “COPY_ON_WRITE”,
 DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”, 
 DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”, 
 DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
 DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”, 
 DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”,
 DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
 )
val temp = spark.read.format(“parquet”).load(inputDataPath)
val fullDF = temp.withColumn(“Op”,lit(‘I’))
fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)

因为在Hudi选项中使用了Hive自动同步配置,所以会在Hive中建立一个名为“ hudi_cow”的表。该表使用具备Hoodie格式的Parquet SerDe建立,表结构以下图所示。

表数据以下图所示

val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)
updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)

进行更新操做,表“hudi_cow”将有最新的更新数据,以下图所示

如CoW定义中所述,当咱们以hudi格式将updateDF写入同一S3位置时,更新的数据在写时被复制,而且快照和增量数据使用同一张表。

3.3 使用MoR

val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test”
val hudiTableName = “hudi_mor”
val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_mor”
val hudiOptions = Map[String,String]
 (
 DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”,
 DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”, 
 HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
 DataSourceWriteOptions.OPERATION_OPT_KEY ->
 DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
 DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “MERGE_ON_READ”,
 DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”, 
 DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”, 
 DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
 DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”, 
 DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”,
 DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
 )
val temp = spark.read.format(“parquet”).load(inputDataPath)
val fullDF = temp.withColumn(“Op”,lit(‘I’))
fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)

仍是开启了Hive自动同步,将在Hive中建立两张名为“hudi_mor”和“ hudi_mor_rt”的表。hudi_mor是通过读优化的表,具备快照数据,而hudi_mor_rt将具备增量和实时合并数据。数据将会以频繁的压缩间隔被压缩,并提供给hudi_mor。hudi_mor_rt利用Avro格式存储增量数据。正如MoR定义所示,经过hudi_mor_rt读取数据时将即时合并。这对于高更新源表颇有用,同时还提供一致且非最新的读优化表。

注意:“ hudi_mor”和“ hudi_mor_rt”都指向相同的S3存储桶,只是定义了不一样的存储格式。

能够看到加载后两表内容相同,内容以下所示

val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)
updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)

表hudi_mor在很短期内就具备相同的内容(由于演示中的数据很小,而且很快会被压缩),只要merge成功,表hudi_mor_rt就会有最新数据。

如今看看这些Hudi格式表的S3日志的变化。底层存储格式为parquet,同时经过日志方式管理ACID。一般生成如下类型的文件:

  • hoodie_partition_metadata:这是一个小文件,包含有关给定分区中partitionDepth和最后一次commitTime的信息

  • hoodie.properties:存储表名称、存储类型信息

  • commit和clean:文件统计信息和有关正在写入的新文件的信息,以及诸如numWrites,numDeletes,numUpdateWrites,numInserts和一些其余相关审计字段之类的信息,存储在这些文件中。这些文件在每次提交后生成

以上3个文件对于CoW和MoR类型的表都是通用的。另外对于MoR表,额外有为UPSERTED分区建立的avro格式的日志文件。以下所示的第一个log文件是CoW表中不存在的日志文件。

4. 使用Delta Lake

使用下面的代码片断,咱们以parquet格式读取完整的数据,并以delta格式将其写入不一样的位置

from pyspark.sql.functions import *
inputDataPath = "s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test"
deltaTablePath = "s3://development-dl/demo/hudi-delta-demo/delta_table"
fullDF = spark.read.format("parquet").load(inputDataPath)
fullDF = fullDF.withColumn("Op",lit('I'))
fullDF.write.format("delta").mode("overwrite").save(deltaTablePath)

在Databricks Notebook的SQL界面中使用如下命令能够建立一个Hive外表,“using delta”关键字会包含基础SERDE和FILE格式的定义。

%sql
create table delta_table 
using delta
location 's3://development-dl/demo/hudi-delta-demo/delta_table'

该表的DDL以下所示。

%sql
show create table delta_table

表会包含与完整加载文件相同的全部记录。

%sql
select * from delta_table

使用如下命令读取CDC数据并在Hive中注册为临时视图

updateDF = spark.read.parquet("s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test")
updateDF.createOrReplaceTempView("temp")

MERGE命令:下面是执行UPSERT的MERGE SQL,它做为SQL很方便地被执行,也能够在spark.sql()方法调用中执行

%sql
MERGE INTO delta_table target
USING 
(SELECT Op,latest_changes.pk_id,name,value,updated_at,created_at
  FROM temp latest_changes
 INNER JOIN (
   SELECT pk_id,  max(updated_at) AS MaxDate
   FROM temp
   GROUP BY pk_id
) cm ON latest_changes.pk_id = cm.pk_id AND latest_changes.updated_at = cm.MaxDate) as source
ON source.pk_id == target.pk_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED  THEN INSERT *

MERGE以后,Hive中delta_table的内容也更新了。

%sql
select * from delta_table

与Hudi同样,Delta Lake基本文件存储格式也是“parquet”。Delta提供带有日志和版本控制的ACID功能。接着看看S3在装载和CDC合并后的变化。

增量日志包含JSON格式的日志,文件中包含每次提交后的schema和最新文件的信息。

在CDC合并的状况下,因为能够插入/更新或删除多条记录。初始parquet文件的内容分为多个较小的parquet文件,这些较小的文件会被重写。若是对表进行了分区,则仅与更新的分区相对应的CDC数据将受到影响。初始parquet文件仍存在于该文件夹中,但已重新的日志文件中删除。若是咱们在此表上运行VACUUM,则能够物理删除该文件。也可使用OPTIMIZE命令[6]来串联这些较小的文件。

Delta日志附加了另外一个JSON格式的日志文件,该文件存储schema和指向最新文件的文件指针。

5. 总结

上述两个示例中都按原样保留了删除的记录,并经过Op ='D'标识删除,这是故意而为以显示DMS的功能,下面的参考资料显示了如何将这种软删除转换为硬删除。

但愿这是一个有用的比较,有助于作出合理的选择,选择合适的数据湖框架。


参考资料

  1. https://aws.amazon.com/blogs/aws/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/

  2. https://databricks.com/blog/2019/07/15/migrating-transactional-data-to-a-delta-lake-using-aws-dms.html

  3. https://hudi.apache.org/

  4. https://docs.delta.io/

  5. https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html

  6. https://docs.databricks.com/delta/optimizations/index.html



阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,按期推送精彩案例,技术专家直播,问答区近万人Spark技术同窗在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!

对开源大数据和感兴趣的同窗能够加小编微信(下图二维码,备注“进群”)进入技术交流微信群。Apache

Spark技术交流社区公众号,微信扫一扫关注


本文分享自微信公众号 - Delta Lake技术圈(deltalake-emr2020)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索