Hive 集成 Hudi 实践(含代码)| 数据湖系列据湖系列


公众号后台愈来愈多人问关于数据湖相关的内容,看来你们对新技术仍是很感兴趣的。关于数据湖的资料网络上仍是比较少的,特别是实践系列,对于新技术来讲,基础的入门文档仍是颇有必要的,因此这一篇但愿可以帮助到想使用Hudi的同窗入门。

本篇的Hudi使用的是孵化版本 0.5.2;其余依赖 Spark-2.4.4,Hive-1.1.0

Hudi 服务器环境准备
    
      
    
    
     
     
              
     
     
 
      
wget https://github.com/apache/hudi/archive/release-0.5.2-incubating.tar.gztar zxvf release-0.5.2-incubating.tar.gzcd release-0.5.2-incubatingmvn clean package -DskipTests -DskipITscp ./hudi-hadoop-mr/target/hudi-hadoop-mr-0.5.2-incubating.jar $HIVE_HOME/lib/

拷贝依赖包到 Hive 路径是为了 Hive 可以正常读到 Hudi 的数据,至此服务器环境准备完毕,环境的初始化仍是比较简单的。php


用 Spark 写一段数据
一切准备完毕先写一段数据到 Hudi 里,首先数据源 ods.ods_user_event 的表结构为:
  
    
  
  
   
   
            
   
   

 
    

而后是 Maven 的依赖,详细代码后台回复 hudi 后便可获取。
CREATE TABLE ods.ods_user_event( uuid STRING, name STRING, addr STRING, update_time STRING, date STRING)stored as parquet;
  
    
  
  
   
   
            
   
   

 
    
<dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark_2.11</artifactId> <version>0.5.2-incubating</version> </dependency> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-common</artifactId> <version>0.5.2-incubating</version> </dependency>
代码逻辑:
  1. 初始化 SparkSession,配置相关配置项
  2. 构建 DataFrame,你们能够自由发挥,这里的案例是从Hive读数据构建。
  3. DataFrame写入Hudi,这一块说到底就是把数据写入 HDFS 路径下,可是须要一堆配置,这些配置就体现了 Hudi 的特性:
  4. DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY :指定惟一id的列名
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY:指定更新时间,该字段数值大的数据会覆盖小的
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY :指定分区列,和Hive的分区概念相似
    HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH :设置当分区变动时,当前数据的分区目录是否变动
    HoodieIndexConfig.INDEX_TYPE_PROP :设置索引类型目前有 HBASE,INMEMORY,BLOOM,GLOBAL_BLOOM 四种索引。例子中,选择了 HoodieGlobalBloomIndex(全局索引),会在全部分区内查找指定的 recordKey。而 HoodieBloomIndex 只在指定的分区内查找。
  
    
  
  
   
   
            
   
   
 
    
def main(args: Array[String]): Unit = { val sss = SparkSession.builder.appName("hudi") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("hive.metastore.uris", "thrift://ip:port") .enableHiveSupport().getOrCreate()
val sql = "select * from ods.ods_user_event" val df: DataFrame = sss.sql(sql)
df.write.format("org.apache.hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "recordKey") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "update_time") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "date") .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) .option("hoodie.insert.shuffle.parallelism", "10") .option("hoodie.upsert.shuffle.parallelism", "10") .option(HoodieWriteConfig.TABLE_NAME, "ods.ods_user_event_hudi") .mode(SaveMode.Append) .save("/user/hudi/lake/ods.db/ods_user_event_hudi") }
执行成功后会有以下结果,由于咱们是按照date分区,每一天的数据会生成一个文件夹和Hive相似。
  
    
  
  
   
   
            
   
   
 
    
[hadoop@hadoop31 ~]# hdfs dfs -ls /user/hudi/lake/ods.db/ods_user_event_hudi/Found 4 itemsdrwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200501drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200502drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200503drwxr-xr-x   - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200504
另外,注意 recordKey 必须惟一,否则数据会被覆盖,且值不能为 null,不然会有如下报错。
  
    
  
  
   
   
            
   
   
 
    
Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "user_uid" cannot be null or empty.


Hive 建立外部表读数据

上一步中 Spark 将数据写到了 hudi,想要经过Hive访问到这块数据,就须要建立一个Hive外部表了,由于 Hudi 配置了分区,因此为了能读到全部的数据,我们的外部表也得分区,分区字段名可随意配置。
  
    
  
  
   
   
            
   
   

 
    
CREATE TABLE ods.ods_user_event_hudi( uuid STRING, name STRING, addr STRING, update_time STRING, date STRING)PARTITIONED BY ( `dt` string)ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION '/user/hudi/lake/ods.db/ods_user_event_hudi'
至此,直接读数据确定是空的,由于咱们建立的是个分区表,因此还须要指定分区。
  
    
  
  
   
   
            
   
   
 
    
alter table ods.ods_user_event_hudi add if not exists partition(dt='20200504') location '/user/hudi/lake/ods.db/ods_user_event_hudi/20200504'

那么这个时候问题来了,一年有365个分区,要一个一个创建手动建立分区吗?
抱歉我也没发现更好的办法,只能送你个简单的脚本了。
  
    
  
  
   
   
            
   
   
 
    
#!/bin/bashstart_date=20190101end_date=20200520start=`date -d "$start_date" "+%s"`end=`date -d "$end_date" "+%s"`for((i=start;i<=end;i+=86400)); do dt=$(date -d "@$i" "+%Y%m%d") hive -e "alter table ods.ods_user_event_hudi add if not exists partition(dt='${dt}') location '/user/hudi/lake/ods.db/ods_user_event_hudi/${dt}'; "done


后记

最后,执行 select * from ods.ods_user_event_hudi 要是没有数据你来找我。另外值得注意的是,若是此时直接用 Hive 将数据 insert into ods.ods_user_event_hudi,虽然数据会写入到 hudi 的目录下,可是相同的 recordKey 是不会覆盖原有数据的。

下一篇详细写 Spark 操做 Hudi 的相关内容,敬请期待。本篇详细代码后台回复 hudi 后便可获取。




- END -

有收获就点个「在看」吧 ▼

本文分享自微信公众号 - 老蒙大数据(simon_bigdata)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。git

相关文章
相关标签/搜索