基于Flink1.11.2 多表join与维表join

1,业务场景:之前离线的业务场景,想通过实时来实现。离线业务是多个表join,因为接入的myslq binlog的数据,所以数据存在修改跟删除,这个是跟日志分析最大的区别,因为日志分析啥的数据只有新增,相对来说考虑的东西就少了很多。

 

2,首先我们要实现的是多表join,这个相对还是比较简单的,比如下面的SQL:

 

3,但是3个表join可是全量join的,也就是说表的state是一直存在的,总会存在OOM爆炸的时候,那怎么办呢。

在我们使用window窗口的时候,一个窗口结束之后会帮我们清除state,这里Flink支持通过API清除state,不过是全局的,细粒度的还没研究。

因为业务是只管当天的数据,数据到kafka的时候会有延迟,但是不可能延迟一天,所以我这里设置TTL为1天到3天。

当然,肯定要测试一下TTL,读取2个topic注册为表,然后join,30s之后发现join不上了。state过期了。这个大家手动去测试吧。

 

4,几个表join之后,再加上设置了 TTL,我们还要关联维表数据,在做之前我就想,维表会不会受TTL的影响,脑壳发昏了 。

经过鸡哥的提醒我才想起来了,维表读取的代码就是缓存,是guava。磕头谢罪。

 

5,维表的join,维表的join语法在官网一目了然:

 

我们需要给主流添加上processTime:

完整代码如下:

import java.util.concurrent.TimeUnit

import org.apache.flink.api.common.restartstrategy.RestartStrategies
import org.apache.flink.api.common.time.Time
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend
import org.apache.flink.runtime.state.StateBackend
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.environment.CheckpointConfig
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/*
  todo 在这里做join 操作 ttl 验证、
 */
object practice_join_mysqlJoin {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build
    val stenv = StreamTableEnvironment.create(env, bsSettings)
    val statement = stenv.createStatementSet()

    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.of(30L, TimeUnit.SECONDS)))
    env.enableCheckpointing(1 * 60 * 1000L)
    env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(10000L)
    env.getCheckpointConfig.setCheckpointTimeout(2 * 60000L)
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)
    env.setStateBackend(new RocksDBStateBackend("hdfs://dev-ct6-dc-master01:8020/flink/checkpoints/rocksDBStateBackend2", true))
    env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)
    //todo 设置状态过期时间
//    stenv.getConfig.setIdleStateRetentionTime(Time.days(1), Time.days(3))
    stenv.getConfig.setIdleStateRetentionTime(Time.seconds(30), Time.minutes(6))

    //todo 1,从mysql同步数据到kafka,此略

    //todo 2,我们直接从dwd层获取数据,创建为table

    val assure_orders =
      s"""
         |CREATE TABLE assure_orders (
         | order_id INT,
         | order_name STRING,
         | proctime as PROCTIME()
         |) WITH (
         | 'connector' = 'kafka',
         | 'topic' = 'doris_test.assure_orders',
         | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
         | 'properties.group.id' = 'test1',
         | 'format' = 'canal-json',
//         | 'format' = 'json',
         | 'scan.startup.mode' = 'earliest-offset'
         |)
       """.stripMargin
    stenv.executeSql(assure_orders)

    val order_company =
      s"""
         |CREATE TABLE order_company (
         | company_id INT,
         | company_name STRING
         |) WITH (
         | 'connector' = 'kafka',
         | 'topic' = 'doris_test.order_company',
         | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
         | 'properties.group.id' = 'test1',
         | 'format' = 'canal-json',
//         | 'format' = 'json',
         | 'scan.startup.mode' = 'earliest-offset'
         |)
       """.stripMargin
    stenv.executeSql(order_company)


    val order_address =
      s"""
         |CREATE TABLE order_address (
         | address_id INT,
         | address_name STRING
         |) WITH (
         | 'connector' = 'kafka',
         | 'topic' = 'doris_test.order_address',
         | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
         | 'properties.group.id' = 'test1',
         | 'format' = 'canal-json',
//         | 'format' = 'json',
         | 'scan.startup.mode' = 'earliest-offset'
         |)
       """.stripMargin
    stenv.executeSql(order_address)

    //todo ====================维表join====================
    val dim_base_province_create =
      """
        | CREATE TABLE `dim_base_province` (
        |      `id` INT,
        |      `name` STRING,
        |      `region_id` INT ,
        |      `area_code`STRING,
        |      PRIMARY KEY (id) NOT ENFORCED
        |    ) WITH (
        |     'connector' = 'jdbc',
        |    'url' = 'jdbc:mysql://192.168.5.24:3306/doris_test?useUnicode=true&characterEncoding=UTF-8',
        |    'table-name' = 'base_province_copy_copy', -- MySQL中的待插入数据的表
        |      'driver' = 'com.mysql.jdbc.Driver',
        |    'username' = 'root',
        |    'password' = '123456',
        |   'lookup.cache.max-rows'='1000',
        |   'lookup.cache.ttl' = '5s'
        |    )
      """.stripMargin

    stenv.executeSql(dim_base_province_create)

    //todo  ====================打印测试=====================
    val sink_print =
      s"""
         |create table sink_print (
         |a INT,
         |b STRING,
         |c STRING,
         |d STRING,
         |e STRING
         |) with ('connector' = 'print')
        """.stripMargin
//    stenv.executeSql(sink_print)

    //todo 输出======================
    val inset_kafka =
      s"""
         |CREATE TABLE sink_print (
         |a INT,
         |b STRING,
         |c STRING,
         |d STRING,
         |e STRING
         |) WITH (
         | 'connector' = 'kafka',
         | 'topic' = 'cccc',
         | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092',
         | 'properties.group.id' = 'test1',
         | 'format' = 'changelog-json',
//         | 'format' = 'json',
         | 'scan.startup.mode' = 'earliest-offset'
         |)
       """.stripMargin

    stenv.executeSql(inset_kafka)

    val insert_sql  =
      """
        |INSERT INTO sink_print
        | select d.order_id,d.company_name,d.address_name,d.order_name,dim_base_province.name from
        | (
        | SELECT a.*,b.*,c.*
        | FROM assure_orders a
        | JOIN
        | order_company b
        |  ON a.order_id = b.company_id
        | JOIN
        | order_address c
        | ON a.order_id = c.address_id
        |  ) d
        | LEFT JOIN
        | dim_base_province FOR SYSTEM_TIME AS OF d.proctime
        | ON d.order_id = dim_base_province.id
      """.stripMargin
  /* val join_sql =
     """
       |INSERT INTO sink_print
       |SELECT assure_orders.order_id,assure_orders.order_name,dim_base_province.name FROM assure_orders
       |LEFT JOIN dim_base_province FOR SYSTEM_TIME AS OF assure_orders.proctime
       |ON assure_orders.order_id = dim_base_province.id
     """.stripMargin*/
    stenv.executeSql(insert_sql)

  }
}

 

 

总结:

代码虽然看着简单,也算是简单的写了一些sql,后面也提交到测试集群运行了 :

后面的计划,我还是回归Streaming开发,SQL话开发虽然简单,但是对于调优跟底层的理解还是欠缺,回归streaming开发可能对我们的新人可能不太友好,但是对于深入理解,调优更有优势。