Apache Hudi是一个开源的数据湖框架,旨在简化增量数据处理和数据管道开发。借助Hudi能够在Amazon S三、Aliyun OSS数据湖中进行记录级别管理插入/更新/删除。AWS EMR集群已支持Hudi组件,而且能够与AWS Glue Data Catalog无缝集成。此特性可以使得直接在Athena或Redshift Spectrum查询Hudi数据集。html
对于企业使用AWS云的一种常见数据流如图1所示,即将数据实时复制到S3。
git
本篇文章将介绍如何使用Oracle GoldenGate
来捕获变动事件并利用Hudi格式写入S3数据湖。github
Oracle GG可使用多个处理程序和格式输出,请查看此处获取更多信息。sql
本篇文章中不关心处理程序,咱们假设使用Avro Operation格式,这种格式较为冗长,但有着普遍应用,由于其平衡了数据完整性和性能。如图2所示,此格式包含每一个记录的before
和after
版本。shell
即便完整且易于生成,此格式也不适合用Athena或Spectrum进行分析,从使用角度也没法替代源数据。此外你可能须要对历史数据进行分区处理以便快速检索。数据库
本文咱们将介绍如何利用Apache Hudi框架作到这一点,以构建易于分析的目标数据集。express
咱们不详细介绍如何将avro格式文件放入Replica S3
桶中,整个数据体系结构以下所示apache
Hudi代码运行在EMR集群中,从Replica S3
桶中读取avro数据,并将目标数据集存储到Target S3
桶中。架构
EMR软件配置以下oracle
硬件配置以下
因为插入/更新
始终保留最后一条记录,所以Hudi做业很是具备弹性, 所以能够利用Spot Instance(抢占式实例)
大大下降成本。
除此以外,还须要设置
配置完后须要确保EMR集群有读写权限。
若是你须要一些样例数据,能够点击此处获取。当设置好桶后,启动EMR集群并将这些样例数据导入Replica
桶。
为构建按时间划分的数据集,必须肯定不可变的日期类型字段。参照示例数据集(销售订单),咱们假设订单日期永远不会改变,所以咱们将DAT_ORDER
字段做为写入Hudi数据集的分区字段。
分区方式是YYYY/MM/DD,经过该方式,全部数据将被组织在嵌套的子文件夹中。Hudi框架将提供此分区信息,并将一个特定字段添加到关联的Hive/Glue表中。当查询时,该字段上的过滤条件将转换为超高效的分区修剪扫描条件。
实际上这是咱们必须对数据集作的惟一强假设,全部其余信息都在avro文件中(字段名称,字段类型,PK等)。
除此元数据外,GoldenGate一般还会添加一些其余信息,例如表名称,操做时间戳,操做类型(插入/更新/删除)和自定义标记。你能够利用这些字段来构造通用逻辑并构建灵活的迁移平台。
启动spark-shell
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
启动后能够运行以下代码:
val ggDeltaFiles = "s3://" + sourceBucket + "/" + sourceSubFolder + "/" + sourceSystem + "/" + inputTableName + "/"; val rootDataframe:DataFrame = spark.read.format("avro").load(ggDeltaFiles); // extract PK fields name from first line val pkFields: Seq[String] = rootDataframe.select("primary_keys").limit(1).collect()(0).getSeq(0); // take into account the "after." fields only val columnsPre:Array[String] = rootDataframe.select("after.*").columns; // exclude "_isMissing" fields added by Oracle GoldenGate // The second part of the expression will safely preserve all native "**_isMissing" fields val columnsPost:Array[String] = columnsPre.filter { x => (!x.endsWith("_isMissing")) || (!x.endsWith("_isMissing_isMissing") && (columnsPre.filter(y => (y.equals(x + "_isMissing")) ).nonEmpty))}; val columnsFinal:ArrayBuffer[String] = new ArrayBuffer[String](); columnsFinal += "op_ts"; columnsFinal += "pos"; // add the "after." prefix columnsPost.foreach(x => (columnsFinal += "after." + x)); // prepare the target dataframe with the partition additional column val preparedDataframe = rootDataframe.select("opTypeFieldName", columnsFinal.toArray:_*). withColumn("HUDI_PART_DATE", date_format(to_date(col("DAT_ORDER"), "yyyy-MM-dd"),"yyyy/MM/dd")). filter(col(opTypeFieldName).isin(admittedValues.toList: _*)); // write data preparedDataframe.write.format("org.apache.hudi"). options(hudiOptions). option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, pkFields.mkString(",")). mode(SaveMode.Append). save(hudiTablePath);
上述简化了部分代码,能够在此处找到完整的代码。
输出的S3对象结果以下所示
同时Glue数据目录将使该表可用于经过外部模式在Athena或Spectrum中进行查询分析,外部表具备咱们用于分区的hudi_part_date附加字段。