Flink使用HiveCatalog能够经过批或者流的方式来处理Hive中的表。这就意味着Flink既能够做为Hive的一个批处理引擎,也能够经过流处理的方式来读写Hive中的表,从而为实时数仓的应用和流批一体的落地实践奠基了坚实的基础。本文将以Flink1.12为例,介绍Flink集成Hive的另一个很是重要的方面——Hive维表JOIN(Temporal Table Join)与Flink读写Hive表的方式。如下是全文,但愿本文对你有所帮助。java
Flink支持以批处理(Batch)和流处理(Streaming)的方式写入Hive表。当以批处理的方式写入Hive表时,只有当写入做业结束时,才能够看到写入的数据。批处理的方式写入支持append模式和overwrite模式。正则表达式
Flink SQL> use catalog myhive; -- 使用catalog Flink SQL> INSERT INTO users SELECT 2,'tom'; Flink SQL> set execution.type=batch; -- 使用批处理模式 Flink SQL> INSERT OVERWRITE users SELECT 2,'tom';
-- 向静态分区表写入数据 Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25; -- 向动态分区表写入数据 Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
流式写入Hive表,不支持Insert overwrite 方式,不然报以下错误:sql
[ERROR] Could not execute SQL statement. Reason: java.lang.IllegalStateException: Streaming mode not support overwrite.
下面的示例是将kafka的数据流式写入Hive的分区表json
-- 使用流处理模式 Flink SQL> set execution.type=streaming; -- 使用Hive方言 Flink SQL> SET table.sql-dialect=hive; -- 建立一张Hive分区表 CREATE TABLE user_behavior_hive_tbl ( `user_id` BIGINT, -- 用户id `item_id` BIGINT, -- 商品id `cat_id` BIGINT, -- 品类id `action` STRING, -- 用户行为 `province` INT, -- 用户所在的省份 `ts` BIGINT -- 用户行为发生的时间戳 ) PARTITIONED BY (dt STRING,hr STRING,mi STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:$mi:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='0S', 'sink.partition-commit.policy.kind'='metastore,success-file' ); -- 使用默认SQL方言 Flink SQL> SET table.sql-dialect=default; -- 建立一张kafka数据源表 CREATE TABLE user_behavior ( `user_id` BIGINT, -- 用户id `item_id` BIGINT, -- 商品id `cat_id` BIGINT, -- 品类id `action` STRING, -- 用户行为 `province` INT, -- 用户所在的省份 `ts` BIGINT, -- 用户行为发生的时间戳 `proctime` AS PROCTIME(), -- 经过计算列产生一个处理时间列 `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间 WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定义watermark ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'user_behaviors', -- kafka主题 'scan.startup.mode' = 'earliest-offset', -- 偏移量 'properties.group.id' = 'group1', -- 消费者组 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 'format' = 'json', -- 数据源格式为json 'json.fail-on-missing-field' = 'true', 'json.ignore-parse-errors' = 'false' );
关于Hive表的一些属性解释:bootstrap
partition.time-extractor.timestamp-pattern缓存
$year-$month-$day $hour:00:00
,若是是按天时进行分区,则该属性值为:$dt $hour:00:00
;sink.partition-commit.triggerbash
解释:分区触发器类型,可选 process-time 或partition-time。app
sink.partition-commit.delay性能
sink.partition-commit.policy.kind大数据
解释:提交分区的策略,用于通知下游的应用该分区已经完成了写入,也就是说该分区的数据能够被访问读取。可选的值以下:
_SUCCESS
文件能够同时配置上面的两个值,好比metastore,success-file
执行流式写入Hive表
-- streaming sql,将数据写入Hive表 INSERT INTO user_behavior_hive_tbl SELECT user_id, item_id, cat_id, action, province, ts, FROM_UNIXTIME(ts, 'yyyy-MM-dd'), FROM_UNIXTIME(ts, 'HH'), FROM_UNIXTIME(ts, 'mm') FROM user_behavior; -- batch sql,查询Hive表的分区数据 SELECT * FROM user_behavior_hive_tbl WHERE dt='2021-01-04' AND hr='16' AND mi = '46';
同时查看Hive表的分区数据:
尖叫提示:1.Flink读取Hive表默认使用的是batch模式,若是要使用流式读取Hive表,须要而外指定一些参数,见下文。
2.只有在完成 Checkpoint 以后,文件才会从 In-progress 状态变成 Finish 状态,同时生成
_SUCCESS
文件,因此,Flink流式写入Hive表须要开启并配置 Checkpoint。对于Flink SQL Client而言,须要在flink-conf.yaml中开启CheckPoint,配置内容为:state.backend: filesystem
execution.checkpointing.externalized-checkpoint-retention:RETAIN_ON_CANCELLATION
execution.checkpointing.interval: 60s
execution.checkpointing.mode: EXACTLY_ONCE
state.savepoints.dir: hdfs://kms-1:8020/flink-savepoints
Flink支持以批处理(Batch)和流处理(Streaming)的方式读取Hive中的表。批处理的方式与Hive的自己查询相似,即只在提交查询的时刻查询一次Hive表。流处理的方式将会持续地监控Hive表,而且会增量地提取新的数据。默认状况下,Flink是以批处理的方式读取Hive表。
关于流式读取Hive表,Flink既支持分区表又支持非分区表。对于分区表而言,Flink将会监控新产生的分区数据,并以增量的方式读取这些数据。对于非分区表,Flink会监控Hive表存储路径文件夹里面的新文件,并以增量的方式读取新的数据。
Flink读取Hive表能够配置一下参数:
streaming-source.enable
streaming-source.partition.include
streaming-source.monitor-interval
streaming-source.partition-order
streaming-source.consume-start-offset
partition.time-extractor.timestamp-pattern
配置时间戳提取的正则表达式。在 SQL Client 中须要显示地开启 SQL Hint 功能
Flink SQL> set table.dynamic-table-options.enabled= true;
使用SQLHint流式查询Hive表
SELECT * FROM user_behavior_hive_tbl /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2021-01-03') */;
Flink 1.12 支持了 Hive 最新的分区做为时态表的功能,能够经过 SQL 的方式直接关联 Hive 分区表的最新分区,而且会自动监听最新的 Hive 分区,当监控到新的分区后,会自动地作维表数据的全量替换。
Flink支持的是processing-time的temporal join,也就是说老是与最新版本的时态表进行JOIN。另外,Flink既支持非分区表的temporal join,又支持分区表的temporal join。对于分区表而言,Flink会监听Hive表的最新分区数据。值得注意的是,Flink尚不支持 event-time temporal join。
对于一张随着时间变化的Hive分区表,Flink能够读取该表的数据做为一个无界流。若是Hive分区表的每一个分区都包含全量的数据,那么每一个分区将作为一个时态表的版本数据,即将最新的分区数据做为一个全量维表数据。值得注意的是,该功能特色仅支持Flink的STREAMING模式。
使用 Hive 最新分区做为 Tempmoral table 以前,须要设置必要的两个参数:
'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest'
除此以外还有一些其余的参数,关于参数的解释见上面的分析。咱们在使用Hive维表的时候,既能够在建立Hive表时指定具体的参数,也可使用SQL Hint的方式动态指定参数。一个Hive维表的建立模板以下:
-- 使用Hive的sql方言 SET table.sql-dialect=hive; CREATE TABLE dimension_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP(3), update_user STRING, ... ) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES ( -- 方式1:按照分区名排序来识别最新分区(推荐使用该种方式) 'streaming-source.enable' = 'true', -- 开启Streaming source 'streaming-source.partition.include' = 'latest',-- 选择最新分区 'streaming-source.monitor-interval' = '12 h',-- 每12小时加载一次最新分区数据 'streaming-source.partition-order' = 'partition-name', -- 按照分区名排序 -- 方式2:分区文件的建立时间排序来识别最新分区 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.partition-order' = 'create-time',-- 分区文件的建立时间排序 'streaming-source.monitor-interval' = '12 h' -- 方式3:按照分区时间排序来识别最新分区 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '12 h', 'streaming-source.partition-order' = 'partition-time', -- 按照分区时间排序 'partition.time-extractor.kind' = 'default', 'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00' );
有了上面的Hive维表,咱们就可使用该维表与Kafka的实时流数据进行JOIN,获得相应的宽表数据。
-- 使用default sql方言 SET table.sql-dialect=default; -- kafka实时流数据表 CREATE TABLE orders_table ( order_id STRING, order_amount DOUBLE, product_id STRING, log_ts TIMESTAMP(3), proctime as PROCTIME() ) WITH (...); -- 将流表与hive最新分区数据关联 SELECT * FROM orders_table AS orders JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim ON orders.product_id = dim.product_id;
除了在定义Hive维表时指定相关的参数,咱们还能够经过SQL Hint的方式动态指定相关的参数,具体方式以下:
SELECT * FROM orders_table AS orders JOIN dimension_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '1 h', 'streaming-source.partition-order' = 'partition-name') */ FOR SYSTEM_TIME AS OF orders.proctime AS dim -- 时态表(维表) ON orders.product_id = dim.product_id;
对于Hive的非分区表,当使用temporal join时,整个Hive表会被缓存到Slot内存中,而后根据流中的数据对应的key与其进行匹配。使用最新的Hive表进行temporal join不须要进行额外的配置,咱们只须要配置一个Hive表缓存的TTL时间,该时间的做用是:当缓存过时时,就会从新扫描Hive表并加载最新的数据。
lookup.join.cache.ttl
尖叫提示:当使用此种方式时,Hive表必须是有界的lookup表,即非Streaming Source的时态表,换句话说,该表的属性streaming-source.enable = false。
若是要使用Streaming Source的时态表,记得配置streaming-source.monitor-interval的值,即数据更新的时间间隔。
-- Hive维表数据使用批处理的方式按天装载 SET table.sql-dialect=hive; CREATE TABLE dimension_table ( product_id STRING, product_name STRING, unit_price DECIMAL(10, 4), pv_count BIGINT, like_count BIGINT, comment_count BIGINT, update_time TIMESTAMP(3), update_user STRING, ... ) TBLPROPERTIES ( 'streaming-source.enable' = 'false', -- 关闭streaming source 'streaming-source.partition.include' = 'all', -- 读取全部数据 'lookup.join.cache.ttl' = '12 h' ); -- kafka事实表 SET table.sql-dialect=default; CREATE TABLE orders_table ( order_id STRING, order_amount DOUBLE, product_id STRING, log_ts TIMESTAMP(3), proctime as PROCTIME() ) WITH (...); -- Hive维表join,Flink会加载该维表的全部数据到内存中 SELECT * FROM orders_table AS orders JOIN dimension_table FOR SYSTEM_TIME AS OF orders.proctime AS dim ON orders.product_id = dim.product_id;
尖叫提示:1.每个子任务都须要缓存一份维表的全量数据,必定要确保TM的task Slot 大小可以容纳维表的数据量;
2.推荐将streaming-source.monitor-interval和lookup.join.cache.ttl的值设为一个较大的数,由于频繁的更新和加载数据会影响性能。
3.当缓存的维表数据须要从新刷新时,目前的作法是将整个表进行加载,所以不可以将新数据与旧数据区分开来。
假设维表的数据是经过批处理的方式(好比天天)装载至Hive中,而Kafka中的事实流数据须要与该维表进行JOIN,从而构建一个宽表数据,这个时候就可使用Hive的维表JOIN。
SET table.sql-dialect=default; CREATE TABLE fact_user_behavior ( `user_id` BIGINT, -- 用户id `item_id` BIGINT, -- 商品id `action` STRING, -- 用户行为 `province` INT, -- 用户所在的省份 `ts` BIGINT, -- 用户行为发生的时间戳 `proctime` AS PROCTIME(), -- 经过计算列产生一个处理时间列 `eventTime` AS TO_TIMESTAMP(FROM_UNIXTIME(ts, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间 WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND -- 定义watermark ) WITH ( 'connector' = 'kafka', -- 使用 kafka connector 'topic' = 'user_behaviors', -- kafka主题 'scan.startup.mode' = 'earliest-offset', -- 偏移量 'properties.group.id' = 'group1', -- 消费者组 'properties.bootstrap.servers' = 'kms-2:9092,kms-3:9092,kms-4:9092', 'format' = 'json', -- 数据源格式为json 'json.fail-on-missing-field' = 'true', 'json.ignore-parse-errors' = 'false' );
SET table.sql-dialect=hive; CREATE TABLE dim_item ( item_id BIGINT, item_name STRING, unit_price DECIMAL(10, 4) ) PARTITIONED BY (dt STRING) TBLPROPERTIES ( 'streaming-source.enable' = 'true', 'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '12 h', 'streaming-source.partition-order' = 'partition-name' );
SELECT fact.item_id, dim.item_name, count(*) AS buy_cnt FROM fact_user_behavior AS fact LEFT JOIN dim_item FOR SYSTEM_TIME AS OF fact.proctime AS dim ON fact.item_id = dim.item_id WHERE fact.action = 'buy' GROUP BY fact.item_id,dim.item_name;
使用SQL Hint方式,关联非分区的Hive维表:
set table.dynamic-table-options.enabled= true; SELECT fact.item_id, dim.item_name, count(*) AS buy_cnt FROM fact_user_behavior AS fact LEFT JOIN dim_item1 /*+ OPTIONS('streaming-source.enable'='false', 'streaming-source.partition.include' = 'all', 'lookup.join.cache.ttl' = '12 h') */ FOR SYSTEM_TIME AS OF fact.proctime AS dim ON fact.item_id = dim.item_id WHERE fact.action = 'buy' GROUP BY fact.item_id,dim.item_name;
本文以最新版本的Flink1.12为例,介绍了Flink读写Hive的不一样方式,并对每种方式给出了相应的使用示例。在实际应用中,一般有将实时数据流与 Hive 维表 join 来构造宽表的需求,Flink提供了Hive维表JOIN,能够简化用户使用的复杂度。本文在最后详细说明了Flink进行Hive维表JOIN的基本步骤以及使用示例,但愿对你有所帮助。
公众号『大数据技术与数仓』,回复『资料』领取大数据资料包