1. 引入
在类Hadoop系统上支持ACID有了更大的吸引力,其中Databricks的Delta Lake和Uber开源的Hudi也成为了主要贡献者和竞争对手。二者都经过在“parquet”文件格式中提供不一样的抽象以解决主要问题;很难选择一个比另外一个更好。此博客将使用一个很是基本的示例来了解这些工具的工做原理,并让读者来比较二者的优缺点。html
咱们将使用与本系列下一篇文章中相反的方法,后面咱们将讨论Hadoop上Data Lake的重要性,以及为何会出现对诸如Delta/Hudi之类的系统的需求,以及数据工程师在过去如何为Lakes孤立地构建易错的ACID系统。web
2. 初始化
2.1 环境
源数据库:AWS RDS MySQL
CDC工具:AWS DMS
Hudi:AWS EMR 5.29.0
Delta:Databricks运行时6.1
对象/文件存储:AWS S3sql
上面的工具集主要用于演示;也可使用如下工具替代shell
源数据库:任何传统/基于云的RDBMS
CDC工具:Attunity,Oracle Golden Gate,Debezium,Fivetran,自定义Binlog解析器
Hudi:开源/企业Hadoop上的Apache Hudi
Delta:开源/企业Hadoop上的Delta Lake
对象/文件存储:ADLS / HDFS数据库
2.2 数据准备步骤
create database demo;
use demo;
create table hudi_delta_test
(
pk_id integer,
name varchar(255),
value integer,
updated_at timestamp default now() on update now(),
created_at timestamp default now(),
constraint pk primary key(pk_id)
);
insert into hudi_delta_test(pk_id,name,value) values(1,’apple’,10);
insert into hudi_delta_test(pk_id,name,value) values(2,’samsung’,20);
insert into hudi_delta_test(pk_id,name,value) values(3,’dell’,30);
insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40);
如今使用DMS将数据加载到S3中的某个位置,并使用文件夹名称full_load来标识该位置。为了更贴合标题,咱们将跳过DMS的设置和配置。加载到S3后以下图所示。
apache
insert into hudi_delta_test(pk_id,name,value) values(4,’motorola’,40);
update hudi_delta_test set value = 201 where pk_id=2;
delete from hudi_delta_test where pk_id=3;
继续略过DMS阶段,将CDC数据按如下方式加载到S3,以下图所示微信
注意:DMS将填充一个名为“ Op”的附加字段,表示“操做”,Op取值I/U/D,分别对应插入、更新和删除。如下图所示显示了CDC数据的内容。app
df = spark.read.parquet('s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test')
df.show()
完成了数据准备后正式开始比对。DMS将持续将CDC事件传送到S3(供Hudi和Delta Lake使用),此S3为数据源。两种工具的最终状态都旨在得到一致的统一视图,如上图MySQL所示。
框架
3. 使用Apache HUDI
Hudi有两种方式处理UPSERTS [1]工具
写时复制(CoW):数据以列格式(Parquet)存储,而且在更新时会建立文件的新版本。此存储类型最适合于读繁重的工做负载,由于数据集的最新版本始终在有效的列格式文件中可用。
读时合并(MoR):数据以列(Parquet)和基于行(Avro)的格式存储;更新记录到基于行的“增量文件”中,并在之后进行压缩,以建立列文件的新版本。此存储类型最适合于写繁重的工做负载,由于新提交会以增量文件的形式快速写入,可是读取数据集须要合并列文件与增量文件。
3.1 启动Spark Shell
使用如下命令打开Spark Shell并进行相关导入.
spark-shell — conf “spark.serializer=org.apache.spark.serializer.KryoSerializer” — conf “spark.sql.hive.convertMetastoreParquet=false” — jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
3.2 使用CoW
val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test”
val hudiTableName = “hudi_cow”
val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_cow”
val hudiOptions = Map[String,String]
(
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”,
HoodieWriteConfig.TABLE_NAME -> hudiTableName,
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “COPY_ON_WRITE”,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”,
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”,
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”,
DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”,
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)
val temp = spark.read.format(“parquet”).load(inputDataPath)
val fullDF = temp.withColumn(“Op”,lit(‘I’))
fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
因为在Hudi选项中使用了Hive自动同步配置,所以会在Hive中建立一个名为“ hudi_cow”的表。该表使用具备Hoodie格式的Parquet SerDe建立,表结构以下图所示。
val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)
updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)
进行更新操做,表“hudi_cow”将有最新的更新数据,以下图所示
如CoW定义中所述,当咱们以hudi格式将updateDF写入同一S3位置时,更新的数据在写时被复制,而且快照和增量数据使用同一张表。
3.3 使用MoR
val inputDataPath = “s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test”
val hudiTableName = “hudi_mor”
val hudiTablePath = “s3://development-dl/demo/hudi-delta-demo/hudi_mor”
val hudiOptions = Map[String,String]
(
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> “pk_id”,
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> “created_at”,
HoodieWriteConfig.TABLE_NAME -> hudiTableName,
DataSourceWriteOptions.OPERATION_OPT_KEY ->
DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> “MERGE_ON_READ”,
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> “updated_at”,
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> “true”,
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName,
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> “created_at”,
DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY -> “false”,
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)
val temp = spark.read.format(“parquet”).load(inputDataPath)
val fullDF = temp.withColumn(“Op”,lit(‘I’))
fullDF.write.format(“org.apache.hudi”).options(hudiOptions).mode(SaveMode.Overwrite).save(hudiTablePath)
仍是开启了Hive自动同步,将在Hive中建立两张名为“hudi_mor”和“ hudi_mor_rt”的表。hudi_mor是通过读优化的表,具备快照数据,而hudi_mor_rt将具备增量和实时合并数据。数据将会以频繁的压缩间隔被压缩,并提供给hudi_mor。hudi_mor_rt利用Avro格式存储增量数据。正如MoR定义所示,经过hudi_mor_rt读取数据时将即时合并。这对于高更新源表颇有用,同时还提供一致且非最新的读优化表。
注意:“ hudi_mor”和“ hudi_mor_rt”都指向相同的S3存储桶,只是定义了不一样的存储格式。
能够看到加载后两表内容相同,内容以下所示
val updateDF = spark.read.parquet(“s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test”)
updateDF.write.format(“org.apache.hudi”).options(hudiOptions).option(DataSourceWriteOptions.OPERATION_OPT_KEY,DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL).mode(SaveMode.Append).save(hudiTablePath)
表hudi_mor在很短期内就具备相同的内容(由于演示中的数据很小,而且很快会被压缩),只要merge成功,表hudi_mor_rt就会有最新数据。
如今看看这些Hudi格式表的S3日志的变化。底层存储格式为parquet,同时经过日志方式管理ACID。一般生成如下类型的文件:
hoodie_partition_metadata:这是一个小文件,包含有关给定分区中partitionDepth和最后一次commitTime的信息
hoodie.properties:存储表名称、存储类型信息
commit和clean:文件统计信息和有关正在写入的新文件的信息,以及诸如numWrites,numDeletes,numUpdateWrites,numInserts和一些其余相关审计字段之类的信息,存储在这些文件中。这些文件在每次提交后生成
以上3个文件对于CoW和MoR类型的表都是通用的。另外对于MoR表,额外有为UPSERTED分区建立的avro格式的日志文件。以下所示的第一个log文件是CoW表中不存在的日志文件。
4. 使用Delta Lake
使用下面的代码片断,咱们以parquet格式读取完整的数据,并以delta格式将其写入不一样的位置
from pyspark.sql.functions import *
inputDataPath = "s3://development-dl/demo/hudi-delta-demo/raw_data/full_load/demo/hudi_delta_test"
deltaTablePath = "s3://development-dl/demo/hudi-delta-demo/delta_table"
fullDF = spark.read.format("parquet").load(inputDataPath)
fullDF = fullDF.withColumn("Op",lit('I'))
fullDF.write.format("delta").mode("overwrite").save(deltaTablePath)
在Databricks Notebook的SQL界面中使用如下命令能够建立一个Hive外表,“using delta”关键字会包含基础SERDE和FILE格式的定义。
%sql
create table delta_table
using delta
location 's3://development-dl/demo/hudi-delta-demo/delta_table'
该表的DDL以下所示。
%sql
show create table delta_table
表会包含与完整加载文件相同的全部记录。
%sql
select * from delta_table
使用如下命令读取CDC数据并在Hive中注册为临时视图
updateDF = spark.read.parquet("s3://development-dl/demo/hudi-delta-demo/raw_data/cdc_load/demo/hudi_delta_test")
updateDF.createOrReplaceTempView("temp")
MERGE命令:下面是执行UPSERT的MERGE SQL,它做为SQL很方便地被执行,也能够在spark.sql()方法调用中执行
%sql
MERGE INTO delta_table target
USING
(SELECT Op,latest_changes.pk_id,name,value,updated_at,created_at
FROM temp latest_changes
INNER JOIN (
SELECT pk_id, max(updated_at) AS MaxDate
FROM temp
GROUP BY pk_id
) cm ON latest_changes.pk_id = cm.pk_id AND latest_changes.updated_at = cm.MaxDate) as source
ON source.pk_id == target.pk_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
MERGE以后,Hive中delta_table的内容也更新了。
%sql
select * from delta_table
与Hudi同样,Delta Lake基本文件存储格式也是“parquet”。Delta提供带有日志和版本控制的ACID功能。接着看看S3在装载和CDC合并后的变化。
增量日志包含JSON格式的日志,文件中包含每次提交后的schema和最新文件的信息。
在CDC合并的状况下,因为能够插入/更新或删除多条记录。初始parquet文件的内容分为多个较小的parquet文件,这些较小的文件会被重写。若是对表进行了分区,则仅与更新的分区相对应的CDC数据将受到影响。初始parquet文件仍存在于该文件夹中,但已重新的日志文件中删除。若是咱们在此表上运行VACUUM,则能够物理删除该文件。也可使用OPTIMIZE命令[6]来串联这些较小的文件。
Delta日志附加了另外一个JSON格式的日志文件,该文件存储schema和指向最新文件的文件指针。
5. 总结
上述两个示例中都按原样保留了删除的记录,并经过Op ='D'标识删除,这是故意而为以显示DMS的功能,下面的参考资料显示了如何将这种软删除转换为硬删除。
但愿这是一个有用的比较,有助于作出合理的选择,选择合适的数据湖框架。
参考资料
https://aws.amazon.com/blogs/aws/new-insert-update-delete-data-on-s3-with-amazon-emr-and-apache-hudi/
https://databricks.com/blog/2019/07/15/migrating-transactional-data-to-a-delta-lake-using-aws-dms.html
https://hudi.apache.org/
https://docs.delta.io/
https://databricks.com/blog/2019/08/21/diving-into-delta-lake-unpacking-the-transaction-log.html
https://docs.databricks.com/delta/optimizations/index.html
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,按期推送精彩案例,技术专家直播,问答区近万人Spark技术同窗在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
对开源大数据和感兴趣的同窗能够加小编微信(下图二维码,备注“进群”)进入技术交流微信群。Apache
Spark技术交流社区公众号,微信扫一扫关注
本文分享自微信公众号 - Delta Lake技术圈(deltalake-emr2020)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。