Apache Hudihtml
Apache Hudi 在基于 HDFS/S3 数据存储之上,提供了两种流原语:git
通常来讲,咱们会将大量数据存储到HDFS/S3,新数据增量写入,而旧数据鲜有改动,特别是在通过数据清洗,放入数据仓库的场景。并且在数据仓库如 hive中,对于update的支持很是有限,计算昂贵。另外一方面,如果有仅对某段时间内新增数据进行分析的场景,则hive、presto、hbase等也未提供原生方式,而是须要根据时间戳进行过滤分析。sql
在此需求下,Hudi能够提供这两种需求的实现。第一个是对record级别的更新,另外一个是仅对增量数据的查询。且Hudi提供了对Hive、presto、Spark的支持,能够直接使用这些组件对Hudi管理的数据进行查询。shell
存储类型apache
咱们看一下 Hudi 的两种存储类型:app
视图异步
在了解这两种存储类型后,咱们再看一下Hudi支持的存储数据的视图(也就是查询模式):ide
在以上3种视图中,“读优化视图”与“增量视图”都可在“写时复制”与“读时合并”的存储类型下使用。而“实时视图“仅能在”读时合并“模式下使用。性能
存储类型优化 |
支持的视图 |
写时复制 |
读优化 + 增量 |
读时合并 |
读优化 + 增量 + 近实时 |
时间轴
最后介绍一下 Hudi 的核心 —— 时间轴。Hudi 会维护一个时间轴,在每次执行操做时(如写入、删除、合并等),均会带有一个时间戳。经过时间轴,能够实如今仅查询某个时间点以后成功提交的数据,或是仅查询某个时间点以前的数据。这样能够避免扫描更大的时间范围,并不是常高效地只消费更改过的文件(例如在某个时间点提交了更改操做后,仅query某个时间点以前的数据,则仍能够query修改前的数据)。
使用案例
下面咱们尝试使用Hudi API 进行读写。
写入数据
首先准备数据集,部分条目为:
1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794
875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7
1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10
26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681
1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605
…
启动spark-shell,并指定hudi jar包:
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
加载指定包:
import scala.collection.JavaConversions._ import org.apache.spark.sql.SaveMode._ import org.apache.hudi.DataSourceReadOptions._ import org.apache.hudi.DataSourceWriteOptions._ import org.apache.hudi.config.HoodieWriteConfig._ import org.apache.hudi.hive.MultiPartKeysValueExtractor
指定建立的Hudi表名与路径:
val tableName = "hudi_table" val basePath = "s3://xxxx/xxx"
构造 DataFrame:
val lineRDD = sc.textFile("features.txt").map(_.split("\\|")).filter(_.length > 6) case class Record(id:Int, name:String, c_class:String, state:String, latitude:Float, longitude:String, elevation:Int) val RecordRDD = lineRDD.map(x=>Record(x(0).toInt, x(1), x(2), x(3), x(4).toFloat, x(5), x(6).toInt)) val featureDF=RecordRDD.toDF
插入数据到 Hudi(以及Hive):
featureDF.write.format("org.apache.hudi"). option(RECORDKEY_FIELD_OPT_KEY, "c_class"). option(PARTITIONPATH_FIELD_OPT_KEY, "state"). option(PRECOMBINE_FIELD_OPT_KEY, "id"). option(TABLE_NAME, tableName). option(HIVE_SYNC_ENABLED_OPT_KEY, "true"). option(HIVE_TABLE_OPT_KEY, "hivehudi"). option(HIVE_PARTITION_FIELDS_OPT_KEY, "state"). option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName). mode(Overwrite). save(basePath);
咱们能够看到目录结构相似于 Hive:
hudi/hudi_table/AR/44bfae35-056b-4bcd-8970-5f1271c3845d-0_18-215-89206_20191121100011.parquet
hudi/hudi_table/CA/2a591ee9-afa4-48d9-bd16-63376a1b8e06-0_38-215-89226_20191121100011.parquet
hudi/hudi_table/CT/911510f9-0655-405f-afad-be9c15429e81-0_46-215-89234_20191121100011.parquet
…
表名为hudi_table,分区键为 state,真正存储数据的文件为parquet。
查询数据
首先载入数据格式:
val toViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*")
咱们在上面插入数据的时候,同时建立了 Hive 表,因此有如下两种方式作查询:
spark.sql("select name from hivehudi where c_class='Summit'").show()
+--------------------+
| name|
+--------------------+
| High Knob|
| White Rock Mountain|
| Open Mine Hill|
…
2. 使用临时表:
roViewDF.registerTempTable("hudi_ro_table")
spark.sql("select id,name from hudi_ro_table where c_class='Stream'").show()
+-------+--------------------+
| id| name|
+-------+--------------------+
| 539931| Tiger Point Gully|
| 871801| Dry Brook|
| 847407| McClusky Creek|
| 637687| Shaw Drain|
| 749747| Duncan Creek|
|1502779| Brushy Lick|
…
更新数据
首先咱们看一条数据:
spark.sql("select id,name from hudi_ro_table where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
而后更新此数据(更新的数据存储在一个新的源文件中):
val updateRDD = sc.textFile("update.txt").map(_.split("\\|")).filter(_.length>6) val updateDF = updateRDD.map(x=>Record(x(0).toInt, x(1), x(2), x(3), x(4).toFloat, x(5), x(6).toInt)).toDF updateDF.write.format("org.apache.hudi"). option(RECORDKEY_FIELD_OPT_KEY, "c_class"). option(PARTITIONPATH_FIELD_OPT_KEY, "state"). option(PRECOMBINE_FIELD_OPT_KEY, "id"). option(TABLE_NAME, tableName). option(HIVE_SYNC_ENABLED_OPT_KEY, "true"). option(HIVE_TABLE_OPT_KEY, "hivehudi"). option(HIVE_PARTITION_FIELDS_OPT_KEY, "state"). option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, classOf[MultiPartKeysValueExtractor].getName). mode(Append). save(basePath);
能够看到咱们这里使用的模式由Overwrite 改成了 Append,也就是追加的模式,其他的基本不变。咱们首先分别看一下 hive 表与 hudi 表中的数据变化。
Hive 表中:
spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
|539931| Tiger-update|
+------+-----------------+
Hudi 表中:
val appViewDF = spark.read.format("org.apache.hudi").load(basePath + "/*/*") appViewDF.registerTempTable("hudi_update_table") spark.sql("select id,name from hudi_update_table where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
|539931| Tiger-update|
+------+-----------------+
能够看到都可以查到更新后的数据。
对数据执行 select * 加上过滤条件:
能够看到表中有2个比较有意思的字段,分别为:_hoodie_commit_time, _hoodie_commit_seqno
上文咱们提到过 Hudi 有一个核心为时间轴,每次执行一个commit时,都会生成一个时间戳。这里 _hoodie_commit_time 即记录了commit 的时间戳。进一步的,Hudi 即是基于此实现了增量查询。
下面咱们尝试一下增量查询:
// 获取 commit 时间戳 val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_update_table order by commitTime").map(k => k.getString(0)).take(3) // 设置起始时间戳为上次时间戳 val beginTime = commits(commits.length - 2) // 增量查询 val incViewDF = spark. read. format("org.apache.hudi"). option(VIEW_TYPE_OPT_KEY, VIEW_TYPE_INCREMENTAL_OPT_VAL). option(BEGIN_INSTANTTIME_OPT_KEY, beginTime). load(basePath);
incViewDF.registerTempTable("hudi_incr_table") spark.sql("select * from hudi_incr_table where c_class='Stream' and id=539931").show()
这里咱们使用增量查询的选项 VIEW_TYPE_INCREMENTAL_OPT_VAL,以及设置了时间戳的起始时间。查询结果为:
能够看到查询到的数据仅为上次commit 后的数据。
固然,咱们也能够指定时间段内的数据进行查询,指定下面选项便可:
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
Hudi CLI
最后咱们看下一下 Hudi CLI
// 启动 hudi cli:
/usr/lib/hudi/cli/bin/hudi-cli.sh
// 链接hudi 数据表
connect --path s3://xxxx/hudi/hudi_table
接下来咱们能够查看提交过的 commit:
甚至回滚 commit:
commit rollback --commit 20191122073858
回滚后再次对 hive 表执行查询:
spark.sql("select id,name from hivehudi where c_class='Stream' and id=539931").show()
+------+-----------------+
| id| name|
+------+-----------------+
|539931|Tiger Point Gully|
+------+-----------------+
能够看到以前更新的数据已经被删除。
在 Hudi Cli 下,咱们也能够建立表(create)、列出commit时文件级别的信息(commit showfiles)等。更多 Hudi cli 的用法,能够在 Hudi Cli 下输入 help 获取更多信息。
References:
Apache Hudi 官方介绍:https://hudi.apache.org/index.html
Apache Hudi Quick Start:https://hudi.apache.org/quickstart.html
Apache Hudi CLI: https://hudi.apache.org/admin_guide.html
原文出处:https://www.cnblogs.com/zackstang/p/11912994.html