1,业务场景:之前离线的业务场景,想通过实时来实现。离线业务是多个表join,因为接入的myslq binlog的数据,所以数据存在修改跟删除,这个是跟日志分析最大的区别,因为日志分析啥的数据只有新增,相对来说考虑的东西就少了很多。
2,首先我们要实现的是多表join,这个相对还是比较简单的,比如下面的SQL:
3,但是3个表join可是全量join的,也就是说表的state是一直存在的,总会存在OOM爆炸的时候,那怎么办呢。
在我们使用window窗口的时候,一个窗口结束之后会帮我们清除state,这里Flink支持通过API清除state,不过是全局的,细粒度的还没研究。
因为业务是只管当天的数据,数据到kafka的时候会有延迟,但是不可能延迟一天,所以我这里设置TTL为1天到3天。
当然,肯定要测试一下TTL,读取2个topic注册为表,然后join,30s之后发现join不上了。state过期了。这个大家手动去测试吧。
4,几个表join之后,再加上设置了 TTL,我们还要关联维表数据,在做之前我就想,维表会不会受TTL的影响,脑壳发昏了 。
经过鸡哥的提醒我才想起来了,维表读取的代码就是缓存,是guava。磕头谢罪。
5,维表的join,维表的join语法在官网一目了然:
我们需要给主流添加上processTime:
完整代码如下:
import java.util.concurrent.TimeUnit import org.apache.flink.api.common.restartstrategy.RestartStrategies import org.apache.flink.api.common.time.Time import org.apache.flink.contrib.streaming.state.RocksDBStateBackend import org.apache.flink.runtime.state.StateBackend import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic} import org.apache.flink.streaming.api.environment.CheckpointConfig import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment /* todo 在这里做join 操作 ttl 验证、 */ object practice_join_mysqlJoin { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val bsSettings = EnvironmentSettings.newInstance.useBlinkPlanner.inStreamingMode.build val stenv = StreamTableEnvironment.create(env, bsSettings) val statement = stenv.createStatementSet() env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.of(30L, TimeUnit.SECONDS))) env.enableCheckpointing(1 * 60 * 1000L) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setMinPauseBetweenCheckpoints(10000L) env.getCheckpointConfig.setCheckpointTimeout(2 * 60000L) env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) env.setStateBackend(new RocksDBStateBackend("hdfs://dev-ct6-dc-master01:8020/flink/checkpoints/rocksDBStateBackend2", true)) env.getCheckpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //todo 设置状态过期时间 // stenv.getConfig.setIdleStateRetentionTime(Time.days(1), Time.days(3)) stenv.getConfig.setIdleStateRetentionTime(Time.seconds(30), Time.minutes(6)) //todo 1,从mysql同步数据到kafka,此略 //todo 2,我们直接从dwd层获取数据,创建为table val assure_orders = s""" |CREATE TABLE assure_orders ( | order_id INT, | order_name STRING, | proctime as PROCTIME() |) WITH ( | 'connector' = 'kafka', | 'topic' = 'doris_test.assure_orders', | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092', | 'properties.group.id' = 'test1', | 'format' = 'canal-json', // | 'format' = 'json', | 'scan.startup.mode' = 'earliest-offset' |) """.stripMargin stenv.executeSql(assure_orders) val order_company = s""" |CREATE TABLE order_company ( | company_id INT, | company_name STRING |) WITH ( | 'connector' = 'kafka', | 'topic' = 'doris_test.order_company', | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092', | 'properties.group.id' = 'test1', | 'format' = 'canal-json', // | 'format' = 'json', | 'scan.startup.mode' = 'earliest-offset' |) """.stripMargin stenv.executeSql(order_company) val order_address = s""" |CREATE TABLE order_address ( | address_id INT, | address_name STRING |) WITH ( | 'connector' = 'kafka', | 'topic' = 'doris_test.order_address', | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092', | 'properties.group.id' = 'test1', | 'format' = 'canal-json', // | 'format' = 'json', | 'scan.startup.mode' = 'earliest-offset' |) """.stripMargin stenv.executeSql(order_address) //todo ====================维表join==================== val dim_base_province_create = """ | CREATE TABLE `dim_base_province` ( | `id` INT, | `name` STRING, | `region_id` INT , | `area_code`STRING, | PRIMARY KEY (id) NOT ENFORCED | ) WITH ( | 'connector' = 'jdbc', | 'url' = 'jdbc:mysql://192.168.5.24:3306/doris_test?useUnicode=true&characterEncoding=UTF-8', | 'table-name' = 'base_province_copy_copy', -- MySQL中的待插入数据的表 | 'driver' = 'com.mysql.jdbc.Driver', | 'username' = 'root', | 'password' = '123456', | 'lookup.cache.max-rows'='1000', | 'lookup.cache.ttl' = '5s' | ) """.stripMargin stenv.executeSql(dim_base_province_create) //todo ====================打印测试===================== val sink_print = s""" |create table sink_print ( |a INT, |b STRING, |c STRING, |d STRING, |e STRING |) with ('connector' = 'print') """.stripMargin // stenv.executeSql(sink_print) //todo 输出====================== val inset_kafka = s""" |CREATE TABLE sink_print ( |a INT, |b STRING, |c STRING, |d STRING, |e STRING |) WITH ( | 'connector' = 'kafka', | 'topic' = 'cccc', | 'properties.bootstrap.servers' = 'dev-ct6-dc-worker01:9092,dev-ct6-dc-worker02:9092,dev-ct6-dc-worker03:9092', | 'properties.group.id' = 'test1', | 'format' = 'changelog-json', // | 'format' = 'json', | 'scan.startup.mode' = 'earliest-offset' |) """.stripMargin stenv.executeSql(inset_kafka) val insert_sql = """ |INSERT INTO sink_print | select d.order_id,d.company_name,d.address_name,d.order_name,dim_base_province.name from | ( | SELECT a.*,b.*,c.* | FROM assure_orders a | JOIN | order_company b | ON a.order_id = b.company_id | JOIN | order_address c | ON a.order_id = c.address_id | ) d | LEFT JOIN | dim_base_province FOR SYSTEM_TIME AS OF d.proctime | ON d.order_id = dim_base_province.id """.stripMargin /* val join_sql = """ |INSERT INTO sink_print |SELECT assure_orders.order_id,assure_orders.order_name,dim_base_province.name FROM assure_orders |LEFT JOIN dim_base_province FOR SYSTEM_TIME AS OF assure_orders.proctime |ON assure_orders.order_id = dim_base_province.id """.stripMargin*/ stenv.executeSql(insert_sql) } }
总结:
代码虽然看着简单,也算是简单的写了一些sql,后面也提交到测试集群运行了 :
后面的计划,我还是回归Streaming开发,SQL话开发虽然简单,但是对于调优跟底层的理解还是欠缺,回归streaming开发可能对我们的新人可能不太友好,但是对于深入理解,调优更有优势。