来源于flink社区sql
咨询一个flink问题。flinsql,能写入数据到hive表。可是hive表中的数据,都是基于 ".part,,,,inprogress,,,,"
相似的文件。
flink1.12.0 是基于cdh6.2.0编译的,hive版本是2.1.一、hadoop-3.0.0. 问题截图以下:
建立hive表:ide
SET table.sql-dialect=hive; CREATE TABLE hive_table ( user_id STRING, order_amount DOUBLE ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES ( 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00', 'sink.partition-commit.trigger'='partition-time', 'sink.partition-commit.delay'='1 min', 'sink.partition-commit.policy.kind'='metastore,success-file' );
插入数据:oop
INSERT INTO TABLE hive_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
分区的提交须要开启checkpoint ,若是是jar包发布,直接在代码里写就能够。
若是用的sql-client提交sql ,能够在配置文件: sql-client-defaults.yaml 中加入以下配置:code
configuration: execution.checkpointing.interval: 1000
配置完如上后,hive表成功写入数据,可是为啥flink-sql及hive却读取不到hive表数据呢,hadoop
SELECT * FROM hive_table WHERE dt='2021-06-21' and hr='18';
第一步:资源
CREATE CATALOG myhive WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/home/admin/hive/conf' );
第二部部署
USE CATALOG myhive;
第三步kafka
select * from hive_table;
猜想可能的问题,咱们本地部署设置的slot都是1,你多是在跑着写入任务,没有资源跑读取任务?
你能够设置把写入任务停了,或者设置方言问 :
SET table.sql-dialect=hive;
而后再查询试试。。。。。it
很是感谢,slot我设置成了4,按照你的方法我排查了下个人问题,应该是我在个人集群配置文件sql-client-defaults.yaml文件中设置的原始值不对:
我多加了个“hive-version: 2.1.1”,后来把这一行注释掉,能够了;并且按照你的方式注册临时catalog也能够了。
这个问题致使我一直卡在这io
sql-client-defaults.yaml文件中设置的错误原始值以下:
catalogs: # [] # empty list # A typical catalog definition looks like: - name: myhive type: hive # hive-conf-dir: /opt/hive_conf/ hive-conf-dir: /etc/hive/conf # default-database: ... hive-version: 2.1.1 default-database: myhive
sql-client-defaults.yaml文件中设置的修改后值以下:
catalogs: # [] # empty list # A typical catalog definition looks like: - name: myhive type: hive # hive-conf-dir: /opt/hive_conf/ hive-conf-dir: /etc/hive/conf # default-database: ... # hive-version: 2.1.1 default-database: myhive