实时分为处理的实时和数据的实时
即席分析是要求对数据实时的处理,立刻要获得对应的结果
Flink、Spark Streaming是用来对实时数据的实时处理,数据要求实时,处理也要迅速
数据不实时,处理也不及时的场景则是咱们的数仓T+1数据java
而本文探讨的Apache Hudi,对应的场景是数据的实时,而非处理的实时。它旨在将Mysql中的时候以近实时的方式映射到大数据平台,好比Hive中。mysql
传统的离线数仓,一般数据是T+1的,不能知足对当日数据分析的需求
而流式计算通常是基于窗口,而且窗口逻辑相对比较固定。
而笔者所在的公司有一类特殊的需求,业务分析比较熟悉现有事务数据库的数据结构,而且但愿有不少即席分析,这些分析包含当日比较实时的数据。惯常他们是基于Mysql从库,直接经过Sql作相应的分析计算。但不少时候会遇到以下障碍git
所以,一些弥合在OLTP和OLAP之间的技术框架出现,典型有TiDB。它能同时支持OLTP和OLAP。而诸如Apache Hudi和Apache Kudu则至关于现有OLTP和OLAP技术的桥梁。他们可以以现有OLTP中的数据结构存储数据,支持CRUD,同时提供跟现有OLAP框架的整合(如Hive,Impala),以实现OLAP分析github
Apache Kudu,须要单独部署集群。而Apache Hudi则不须要,它能够利用现有的大数据集群好比HDFS作数据文件存储,而后经过Hive作数据分析,相对来讲更适合资源受限的环境sql
Hudi 提供了Hudi 表的概念,这些表支持CRUD操做。咱们能够基于这个特色,将Mysql Binlog的数据重放至Hudi表,而后基于Hive对Hudi表进行查询分析。数据流向架构以下
数据库
Hudi表的数据文件,可使用操做系统的文件系统存储,也可使用HDFS这种分布式的文件系统存储。为了后续分析性能和数据的可靠性,通常使用HDFS进行存储。以HDFS存储来看,一个Hudi表的存储文件分为两类。apache
_partition_key
相关的路径是实际的数据文件,按分区存储,固然分区的路径key是能够指定的,我这里使用的是_partition_keyHudi真实的数据文件使用Parquet文件格式存储
session
Hudi把随着时间流逝,对表的一系列CRUD操做叫作Timeline。Timeline中某一次的操做,叫作Instant。Instant包含如下信息数据结构
.hoodie文件夹中存放对应操做的状态记录
架构
hudi为了实现数据的CRUD,须要可以惟一标识一条记录。hudi将把数据集中的惟一字段(record key ) + 数据所在分区 (partitionPath) 联合起来当作数据的惟一键
基于上述基础概念之上,Hudi提供了两类表格式COW和MOR。他们会在数据的写入和查询性能上有一些不一样
简称COW。顾名思义,他是在数据写入的时候,复制一份原来的拷贝,在其基础上添加新数据。正在读数据的请求,读取的是是近的完整副本,这相似Mysql 的MVCC的思想。
上图中,每个颜色都包含了截至到其所在时间的全部数据。老的数据副本在超过必定的个数限制后,将被删除。这种类型的表,没有compact instant,由于写入时至关于已经compact了。
简称MOR。新插入的数据存储在delta log 中。按期再将delta log合并进行parquet数据文件。读取数据时,会将delta log跟老的数据文件作merge,获得完整的数据返回。固然,MOR表也能够像COW表同样,忽略delta log,只读取最近的完整数据文件。下图演示了MOR的两种数据读写方式
我在github上放置了基于Hudi的封装实现,对应的源码地址为 https://github.com/wanqiufeng/hudi-learn。
参数名 | 含义 | 是否必填 | 默认值 |
---|---|---|---|
--base-save-path |
hudi表存放在HDFS的基础路径,好比hdfs://192.168.16.181:8020/hudi_data/ | 是 | 无 |
--mapping-mysql-db-name |
指定处理的Mysql库名 | 是 | 无 |
--mapping-mysql-table-name |
指定处理的Mysql表名 | 是 | 无 |
--store-table-name |
指定Hudi的表名 | 否 | 默认会根据--mapping-mysql-db-name和--mapping-mysql-table-name自动生成。假设--mapping-mysql-db-name 为crm,--mapping-mysql-table-name为order。那么最终的hudi表名为crm__order |
--real-save-path |
指定hudi表最终存储的hdfs路径 | 否 | 默认根据--base-save-path和--store-table-name自动生成,生成格式为'--base-save-path'+'/'+'--store-table-name' ,推荐默认 |
--primary-key |
指定同步的mysql表中能惟一标识记录的字段名 | 否 | 默认id |
--partition-key |
指定mysql表中能够用于分区的时间字段,字段必须是timestamp 或dateime类型 | 是 | 无 |
--precombine-key |
最终用于配置hudi的hoodie.datasource.write.precombine.field |
否 | 默认id |
--kafka-server |
指定Kafka 集群地址 | 是 | 无 |
--kafka-topic |
指定消费kafka的队列 | 是 | 无 |
--kafka-group |
指定消费kafka的group | 否 | 默认在存储表名前加'hudi'前缀,好比'hudi_crm__order' |
--duration-seconds |
因为本程序使用Spark streaming开发,这里指定Spark streaming微批的时长 | 否 | 默认10秒 |
一个使用的demo以下
/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit --class com.niceshot.hudi.CanalKafkaImport2Hudi \ --name hudi__goods \ --master yarn \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --num-executors 1 \ --queue hudi \ --conf spark.executor.memoryOverhead=2048 \ --conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \ --conf spark.core.connection.ack.wait.timeout=300 \ --conf spark.locality.wait=100 \ --conf spark.streaming.backpressure.enabled=true \ --conf spark.streaming.receiver.maxRate=500 \ --conf spark.streaming.kafka.maxRatePerPartition=200 \ --conf spark.ui.retainedJobs=10 \ --conf spark.ui.retainedStages=10 \ --conf spark.ui.retainedTasks=10 \ --conf spark.worker.ui.retainedExecutors=10 \ --conf spark.worker.ui.retainedDrivers=10 \ --conf spark.sql.ui.retainedExecutions=10 \ --conf spark.yarn.submit.waitAppCompletion=false \ --conf spark.yarn.maxAppAttempts=4 \ --conf spark.yarn.am.attemptFailuresValidityInterval=1h \ --conf spark.yarn.max.executor.failures=20 \ --conf spark.yarn.executor.failuresValidityInterval=1h \ --conf spark.task.maxFailures=8 \ /data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200
history_import_and_meta_sync
分支提供了将历史数据同步至hudi表,以及将hudi表数据结构同步至hive meta的操做
这里采用的思路是
HiveImport2HudiConfig提供了以下一些参数,用于配置程序执行行为
参数名 | 含义 | 是否必填 | 默认值 |
---|---|---|---|
--base-save-path |
hudi表存放在HDFS的基础路径,好比hdfs://192.168.16.181:8020/hudi_data/ | 是 | 无 |
--mapping-mysql-db-name |
指定处理的Mysql库名 | 是 | 无 |
--mapping-mysql-table-name |
指定处理的Mysql表名 | 是 | 无 |
--store-table-name |
指定Hudi的表名 | 否 | 默认会根据--mapping-mysql-db-name和--mapping-mysql-table-name自动生成。假设--mapping-mysql-db-name 为crm,--mapping-mysql-table-name为order。那么最终的hudi表名为crm__order |
--real-save-path |
指定hudi表最终存储的hdfs路径 | 否 | 默认根据--base-save-path和--store-table-name自动生成,生成格式为'--base-save-path'+'/'+'--store-table-name' ,推荐默认 |
--primary-key |
指定同步的hive历史表中能惟一标识记录的字段名 | 否 | 默认id |
--partition-key |
指定hive历史表中能够用于分区的时间字段,字段必须是timestamp 或dateime类型 | 是 | 无 |
--precombine-key |
最终用于配置hudi的hoodie.datasource.write.precombine.field |
否 | 默认id |
--sync-hive-db-name |
全量历史数据所在hive的库名 | 是 | 无 |
--sync-hive-table-name |
全量历史数据所在hive的表名 | 是 | 无 |
--hive-base-path |
hive的全部数据文件存放地址,须要参看具体的hive配置 | 否 | /user/hive/warehouse |
--hive-site-path |
hive-site.xml配置文件所在的地址 | 是 | 无 |
--tmp-data-path |
程序执行过程当中临时文件存放路径。通常默认路径是/tmp。有可能出现/tmp所在磁盘过小,而致使历史程序执行失败的状况。当出现该状况时,能够经过该参数自定义执行路径 | 否 | 默认操做系统临时目录 |
一个程序执行demo
nohup java -jar hudi-learn-1.0-SNAPSHOT.jar --sync-hive-db-name hudi_temp --sync-hive-table-name crm__wx_user_info --base-save-path hdfs://192.168.2.2:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name "order" --primary-key "id" --partition-key created_date --hive-site-path /etc/lib/hive/conf/hive-site.xml --tmp-data-path /data/tmp > order.log &
须要将hudi的数据结构和分区,以hive外表的形式同步至Hive meta,才能是Hive感知到hudi数据,并经过sql进行查询分析。Hudi自己在消费Binlog进行存储时,能够顺带将相关表元数据信息同步至hive。但考虑到每条写入Apache Hudi表的数据,都要读写Hive Meta ,对Hive的性能可能影响很大。因此我单独开发了HiveMetaSyncConfig工具,用于同步hudi表元数据至Hive。考虑到目前程序只支持按天分区,因此同步工具能够一天执行一次便可。参数配置以下
参数名 | 含义 | 是否必填 | 默认值 |
---|---|---|---|
--hive-db-name |
指定hudi表同步至哪一个hive数据库 | 是 | 无 |
--hive-table-name |
指定hudi表同步至哪一个hive表 | 是 | 无 |
--hive-jdbc-url |
指定hive meta的jdbc连接地址,例如jdbc:hive2://192.168.16.181:10000 | 是 | 无 |
--hive-user-name |
指定hive meta的连接用户名 | 否 | 默认hive |
--hive-pwd |
指定hive meta的连接密码 | 否 | 默认hive |
--hudi-table-path |
指定hudi表所在hdfs的文件路径 | 是 | 无 |
--hive-site-path |
指定hive的hive-site.xml路径 | 是 | 无 |
一个程序执行demo
java -jar hudi-learn-1.0-SNAPSHOT.jar --hive-db-name streaming --hive-table-name crm__order --hive-user-name hive --hive-pwd hive --hive-jdbc-url jdbc:hive2://192.168.16.181:10000 --hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order --hive-site-path /lib/hive/conf/hive-site.xml
有些hive集群的hive.input.format配置,默认是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,这会致使挂载Hudi数据的Hive外表读取到全部Hudi的Parquet数据,从而致使最终的读取结果重复。须要将hive的format改成org.apache.hadoop.hive.ql.io.HiveInputFormat
,为了不在整个集群层面上更改对其他离线Hive Sql形成没必要要的影响,建议只对当前hive session设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
因为binlog写入Hudi表的是基于Spark streaming实现的,这里给出了一些spark 和spark streaming层面的配置,它能使整个程序工做更稳定
配置 | 含义 |
---|---|
spark.streaming.backpressure.enabled=true | 启动背压,该配置能使Spark Streaming消费速率,基于上一次的消费状况,进行调整,避免程序崩溃 |
spark.ui.retainedJobs=10 spark.ui.retainedStages=10 spark.ui.retainedTasks=10 spark.worker.ui.retainedExecutors=10 spark.worker.ui.retainedDrivers=10 spark.sql.ui.retainedExecutions=10 |
默认状况下,spark 会在driver中存储一些spark 程序执行过程当中各stage和task的历史信息,当driver内存太小时,可能使driver崩溃,经过上述参数,调节这些历史数据存储的条数,从而减少对内层使用 |
spark.yarn.maxAppAttempts=4 | 配置当driver崩溃后,尝试重启的次数 |
spark.yarn.am.attemptFailuresValidityInterval=1h | 倘若driver执行一周才崩溃一次,那咱们更但愿每次都能重启,而上述配置在累计到重启4次后,driver就不再会被重启,该配置则用于重置maxAppAttempts的时间间隔 |
spark.yarn.max.executor.failures=20 | executor执行也可能失败,失败后集群会自动分配新的executor, 该配置用于配置容许executor失败的次数,超过次数后程序会报(reason: Max number of executor failures (400) reached),并退出 |
spark.yarn.executor.failuresValidityInterval=1h | 指定executor失败重分配次数重置的时间间隔 |
spark.task.maxFailures=8 | 容许任务执行失败的次数 |
欢迎关注个人我的公众号"西北偏北UP",记录代码人生,行业思考,科技评论