
wget https://github.com/apache/hudi/archive/release-0.5.2-incubating.tar.gztar zxvf release-0.5.2-incubating.tar.gzcd release-0.5.2-incubatingmvn clean package -DskipTests -DskipITscp ./hudi-hadoop-mr/target/hudi-hadoop-mr-0.5.2-incubating.jar $HIVE_HOME/lib/
拷贝依赖包到 Hive 路径是为了 Hive 可以正常读到 Hudi 的数据,至此服务器环境准备完毕,环境的初始化仍是比较简单的。php
CREATE TABLE ods.ods_user_event( uuid STRING, name STRING, addr STRING, update_time STRING, date STRING)stored as parquet;
而后是 Maven 的依赖,详细代码后台回复 hudi 后便可获取。
<dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-spark_2.11</artifactId> <version>0.5.2-incubating</version> </dependency> <dependency> <groupId>org.apache.hudi</groupId> <artifactId>hudi-common</artifactId> <version>0.5.2-incubating</version> </dependency>
-
初始化 SparkSession,配置相关配置项 -
构建 DataFrame,你们能够自由发挥,这里的案例是从Hive读数据构建。 -
DataFrame写入Hudi,这一块说到底就是把数据写入 HDFS 路径下,可是须要一堆配置,这些配置就体现了 Hudi 的特性:
def main(args: Array[String]): Unit = { val sss = SparkSession.builder.appName("hudi") .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("hive.metastore.uris", "thrift://ip:port") .enableHiveSupport().getOrCreate()
val sql = "select * from ods.ods_user_event" val df: DataFrame = sss.sql(sql)
df.write.format("org.apache.hudi") .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "recordKey") .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "update_time") .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "date") .option(HoodieIndexConfig.BLOOM_INDEX_UPDATE_PARTITION_PATH, "true") .option(HoodieIndexConfig.INDEX_TYPE_PROP, HoodieIndex.IndexType.GLOBAL_BLOOM.name()) .option("hoodie.insert.shuffle.parallelism", "10") .option("hoodie.upsert.shuffle.parallelism", "10") .option(HoodieWriteConfig.TABLE_NAME, "ods.ods_user_event_hudi") .mode(SaveMode.Append) .save("/user/hudi/lake/ods.db/ods_user_event_hudi") }
[ ]Found 4 itemsdrwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200501drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200502drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200503drwxr-xr-x - hadoop hadoop 0 2020-05-25 18:42 /user/hudi/lake/ods.db/ods_user_event_hudi/20200504
Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "user_uid" cannot be null or empty.
CREATE TABLE ods.ods_user_event_hudi( uuid STRING, name STRING, addr STRING, update_time STRING, date STRING)PARTITIONED BY ( `dt` string)ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'LOCATION '/user/hudi/lake/ods.db/ods_user_event_hudi'
alter table ods.ods_user_event_hudi add if not exists partition(dt='20200504') location '/user/hudi/lake/ods.db/ods_user_event_hudi/20200504'
抱歉我也没发现更好的办法,只能送你个简单的脚本了。
start_date=20190101end_date=20200520start=`date -d "$start_date" "+%s"`end=`date -d "$end_date" "+%s"`for((i=start;i<=end;i+=86400)); do dt=$(date -d "@$i" "+%Y%m%d") hive -e "alter table ods.ods_user_event_hudi add if not exists partition(dt='${dt}') location '/user/hudi/lake/ods.db/ods_user_event_hudi/${dt}'; "done



本文分享自微信公众号 - 老蒙大数据(simon_bigdata)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。git