Flink sql之hive catalog踩雷记

先上代码:

val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(5)
streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val tableEnvSettings = EnvironmentSettings.newInstance()
  .useBlinkPlanner()
  .inStreamingMode()
  .build()
val tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings)

    val hive_catalog = new HiveCatalog(
      "flink", // catalog name
      "default", // default database
      "G:\\Flink SQL开发文件", // Hive config (hive-site.xml) directory
      "1.1.0" // Hive version
    )
//todo kafka 数据源创建表
val createSourceTableSql =
  """CREATE TABLE flink_test_01 (
    |    user_id STRING,
    |    item_id STRING,
    |    category_id STRING,
    |    behavior STRING,
    |    ts TIMESTAMP(3),
    |    proctime as proctime(),
    |     WATERMARK FOR ts as ts - INTERVAL '5' SECOND
    | )
    | WITH
    |(
    |  'connector.type' = 'kafka',
    |  'connector.version' = '0.11',
    |  'connector.topic' = 'ng_log_par_extracted',
    |  'connector.startup-mode' = 'earliest-offset',
    | 'connector.properties.zookeeper.connect' = 'dev-ct6-dc-worker01:2181,dev-ct6-dc-worker02:2181,dev-ct6-dc-worker03:2181',
    | 'connector.properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
    |  'connector.properties.group.id' = 'test_group',
    |  'format.type' = 'json',
    |  'format.derive-schema' = 'true',
    |  'update-mode' = 'append'
    |)""".stripMargin

print(createSourceTableSql)
tableEnv.sqlUpdate(createSourceTableSql)

 

 

我们通过API 获取了kafka的数据转出了table,这个时候我们查询这个表,正常我们是可以查询字段‘proctime ’,可是这里会报错

 

这个时候去掉hivecata的话,这个字段本身是可以查询的。

后面发现这是官网的一个BUG :

 

这个是一个已知bug,https://issues.apache.org/jira/browse/FLINK-17189,已经fix了

 

看来任重而道远。。