flink sql使用kafka做为source和sink

你们都知道sql有着简单,直接,容易上手等优点,因此如今大有用sql去掉api的趋势。那么咱们少说废话,下面先上个sql的列子node

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.enableCheckpointing(10000)
    env.setParallelism(1)
    //注入数据源
    var tableEnv: StreamTableEnvironment  = TableEnvironment.getTableEnvironment(env)
    tableEnv.registerExternalCatalog("kafka", new UDMExternalCatalog())
    tableEnv.sqlUpdate(
      s"""INSERT INTO `kafka.kafka-k8s.pb_sink_test`
         |select
         |fstDeptSet,
         |filedName1,
         |filedName2,
         |userId,
         |brandNames
         |from kafka.`kafka-k8s`.`pb_internal_test`
         | """.stripMargin)
    env.execute("Flink SQL Skeleton")

上面是一个查询,插入语句,在flink中会被转为一个任务进行提交sql

下面咱们大概讲一下flink内部kafka的实例化过程api

有图可知,主要分为4大步骤,先经过calcite分析sql,转为相应的relnode,在根据用户配置的schema和Java spi,过滤出须要的kafka produce和kafka consumer版本。scala

kafka consumer对应于select部分code

kafka produce对应于insert部分blog

相关文章
相关标签/搜索