本案例根据某电网公司的真实业务需求,经过Blink SQL+UDAF实现实时流上的差值聚合计算,经过本案例,让读者熟悉UDAF编写,并理解UDAF中的方法调用关系和顺序。
感谢@军长在实现过程当中的指导。笔者水平有限,如有纰漏,请批评指出。
html
电网公司天天采集各个用户的电表数据(格式以下表),其中data_date为电表数据上报时间,cons_id为电表id,r1为电表度数,其余字段与计算逻辑无关,可忽略。为了后续演示方便,仅输入cons_id=100000002的数据。java
no(string) | data_date(string) | cons_id(string) | org_no(string) | r1(double) |
---|---|---|---|---|
101 | 20190716 | 100000002 | 35401 | 13.76 |
101 | 20190717 | 100000002 | 35401 | 14.12 |
101 | 20190718 | 100000002 | 35401 | 16.59 |
101 | 20190719 | 100000002 | 35401 | 18.89 |
表1:输入数据
电网公司但愿经过实时计算(Blink)对电表数据处理后,天天获得每一个电表最近两天(当天和前一天)的差值数据,结果相似以下表:sql
cons_id(string) | data_date(string) | subDegreeR1(double) |
---|---|---|
100000002 | 20190717 | 0.36 |
100000002 | 20190718 | 2.47 |
100000002 | 20190719 | 2.3 |
根据客户的需求,比较容易获得两种解决方案:一、经过over窗口(2 rows over window)开窗进行差值聚合;二、经过hop窗口(sliding=1天,size=2天)进行差值聚合。
over窗口和hop窗口均是Blink支持的标准窗口,使用起来很是简单。本需求的最大难点在于差值聚合,Blink支持SUM、MAX、MIN、AVG等内置的聚合函数,但没有知足业务需求的差值聚合函数,所以须要经过自定义聚合函数(UDAF)来实现。
api
实时计算自定义函数开发搭建环境请参考UDX概述(https://help.aliyun.com/docum...,在此再也不赘述。本案例使用Blink2.2.7版本,下面简要描述关键代码的编写。
完整代码(为了方便上传,使用了txt格式):SubtractionUdaf.txt
一、在com.alibaba.blink.sql.udx.SubtractionUdaf包中建立一个继承AggregateFunction类的SubtractionUdaf类。缓存
public class SubtractionUdaf extends AggregateFunction<Double, SubtractionUdaf.Accum>
其中Double是UDAF输出的类型,在本案例中为相邻两天的电表差值度数。SubtractionUdaf.Accum是内部自定义的accumulator数据结构。
二、定义accumulator数据结构,用户保存UDAF的状态。网络
public static class Accum { private long currentTime;//最新度数的上报时间 private double oldDegree;//前一次度数 private double newDegree;//当前最新度数 private long num; //accumulator中已经计算的record数量,主要用于merge private List<Tuple2<Double, Long>> listInput;//缓存全部的输入,主要用于retract }
三、实现createAccumulator方法,初始化UDAF的accumulator数据结构
//初始化udaf的accumulator public SubtractionUdaf.Accum createAccumulator() { SubtractionUdaf.Accum acc = new SubtractionUdaf.Accum(); acc.currentTime = 0; acc.oldDegree = 0.0; acc.newDegree = 0.0; acc.num = 0; acc.listInput = new ArrayList<Tuple2<Double, Long>>(); return acc; }
四、实现getValue方法,用于经过存放状态的accumulator计算UDAF的结果,本案例需求是计算新旧数据二者的差值。函数
public Double getValue(SubtractionUdaf.Accum accumulator) { return accumulator.newDegree - accumulator.oldDegree; }
五、实现accumulate方法,用于根据输入数据更新UDAF存放状态的accumulator。考虑到数据可能乱序以及可能的retract,数据数据包括了对应的度数iValue,还包括上报度数的时间(构造的事件时间ts)。性能
public void accumulate(SubtractionUdaf.Accum accumulator, double iValue, long ts) { System.out.println("method : accumulate" ); accumulator.listInput.add(Tuple2.of(Double.valueOf(iValue),Long.valueOf(ts))); Collections.sort(accumulator.listInput,this.comparator);//按照时间排序 accumulator.num ++; if(accumulator.listInput.size() == 1){ accumulator.newDegree = iValue; accumulator.oldDegree = 0.0; accumulator.currentTime = ts; }else {//处理可能存在的数据乱序问题 accumulator.newDegree = accumulator.listInput.get(0).f0; accumulator.currentTime = accumulator.listInput.get(0).f1; accumulator.oldDegree = accumulator.listInput.get(1).f0; } }
其中accumulator为UDAF的状态,iValue和ts为实际的输入数据。
注意须要处理可能存在的输入数据乱序问题。
六、实现retract方法,用于在某些优化场景下(如使用over窗口)对retract的数据进行处理。
public void retract(SubtractionUdaf.Accum accumulator, double iValue, long ts) throws Exception{ if(accumulator.listInput.contains(Tuple2.of(iValue, ts))){ if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 0){//retract的是最新值 accumulator.listInput.remove(0); accumulator.num--; if(accumulator.listInput.isEmpty()){ accumulator.currentTime = 0; accumulator.oldDegree = 0.0; accumulator.newDegree = 0.0; }else if(accumulator.listInput.size() == 1) { accumulator.currentTime = accumulator.listInput.get(0).f1; accumulator.newDegree = accumulator.listInput.get(0).f0; accumulator.oldDegree = 0.0; }else{ accumulator.currentTime = accumulator.listInput.get(0).f1; accumulator.newDegree = accumulator.listInput.get(0).f0; accumulator.oldDegree = accumulator.listInput.get(1).f0; } } else if(accumulator.listInput.indexOf(Tuple2.of(iValue, ts)) == 1){//retract的是次新值 accumulator.listInput.remove(1); accumulator.num--; if(accumulator.listInput.size() == 1){ accumulator.oldDegree = 0.0; }else { accumulator.oldDegree = accumulator.listInput.get(1).f0; } }else {//retract的是其余值 accumulator.listInput.remove(Tuple2.of(iValue, ts)); accumulator.num--; } }else { throw new Exception("Cannot retract a unexist record : iValue = "+ iValue + "timestamp = "+ ts); } }
须要考虑retract的是最新的数据仍是次新的数据,须要不一样的逻辑处理。
七、实现merge方法,用于某些优化场景(如使用hop窗口)。
public void merge(SubtractionUdaf.Accum accumulator, Iterable<SubtractionUdaf.Accum> its) { int i = 0; System.out.println("method : merge" ); System.out.println("accumulator : "+ accumulator.newDegree); System.out.println("accumulator : "+ accumulator.currentTime); for (SubtractionUdaf.Accum entry : its) { if(accumulator.currentTime < entry.currentTime){ if(entry.num > 1){ accumulator.currentTime = entry.currentTime; accumulator.oldDegree = entry.oldDegree; accumulator.newDegree = entry.newDegree; accumulator.num += entry.num; accumulator.listInput.addAll(entry.listInput); }else if(entry.num == 1){ accumulator.currentTime = entry.currentTime; accumulator.oldDegree = accumulator.newDegree; accumulator.newDegree = entry.newDegree; accumulator.num ++; accumulator.listInput.addAll(entry.listInput); } }else{ if(accumulator.num > 1){ accumulator.num += entry.num; accumulator.listInput.addAll(entry.listInput); }else if(accumulator.num == 1){ accumulator.oldDegree = entry.newDegree; accumulator.num += entry.num; accumulator.listInput.addAll(entry.listInput); }else if(accumulator.num == 0){ accumulator.currentTime = entry.currentTime; accumulator.oldDegree = entry.oldDegree; accumulator.newDegree = entry.newDegree; accumulator.num = entry.num; accumulator.listInput.addAll(entry.listInput); } } Collections.sort(accumulator.listInput,this.comparator); System.out.println("merge : "+i); System.out.println("newDegree : "+entry.newDegree); System.out.println("oldDegree = "+entry.oldDegree); System.out.println("currentTime : "+entry.currentTime); } }
须要考虑merge的是不是比当前新的数据,须要不一样的处理逻辑。
八、其余方面,考虑到须要对输入度数按照事件时间排序,在open方法中实例化了自定义的Comparator类,对accumulator数据结构中的inputList按事件时间的降序排序。
public void open(FunctionContext context) throws Exception { //定义record的前后顺序,用于listInput的排序,时间越新的record在list中越前面 this.comparator = new Comparator<Tuple2<Double, Long>>() { public int compare( Tuple2<Double, Long> o1, Tuple2<Double, Long> o2) { if (Long.valueOf(o1.f1) < Long.valueOf(o2.f1)) { return 1; } else if (Long.valueOf(o1.f1) > Long.valueOf(o2.f1)) { return -1; }else { return 0; } } }; }
请参考[使用IntelliJ IDEA开发自定义函数]()完成UDAF编译、打包,并参考UDX概述完成资源的上传和引用。
SQL代码以下,语法检查、上线、启动做业(选择当前启动位点)。并将表1数据上传至datahub。
CREATE FUNCTION OverWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf'; CREATE TABLE input_dh_e_mp_read_curve ( `no` VARCHAR, data_date VARCHAR, cons_id VARCHAR, org_no VARCHAR, r1 DOUBLE, ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss') ,WATERMARK wk FOR ts as withOffset(ts, 2000) ) WITH ( type = 'datahub', endPoint = 'http://dh-cn-shanghai.aliyun-inc.com', roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole', project = 'jszc_datahub', topic = 'input_dh_e_mp_read_curve' ); CREATE TABLE data_out( cons_id varchar ,data_date varchar ,subDegreeR1 DOUBLE )with( type = 'print' ); INSERT into data_out SELECT cons_id ,last_value(data_date) OVER ( PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date ,OverWindowSubtractionUdaf(r1,unix_timestamp(ts)) OVER ( PARTITION BY cons_id ORDER BY ts ROWS BETWEEN 1 preceding AND CURRENT ROW) as data_date FROM input_dh_e_mp_read_curve
因为使用了print connector,从对应的sink的taskmanager.out日志中能够查看到输出以下(已忽略其余debug日志):
task-1> (+)100000002,20190716,13.76 task-1> (+)100000002,20190717,0.35999999999999943 task-1> (+)100000002,20190718,2.4700000000000006
对比指望输出(表2),20190717和20190718两个窗口的数据均正确,代表业务逻辑正确,但此输出与指望输出有少量差别:
(1)20190716输出为13.76,这是由于第一个over窗口只有一条数据致使的,这种数据能够在业务层过滤掉;
(2)20190719的数据没有输出,这是由于咱们设置了watermark,测试环境下20190719以后没有数据进来触发20190719对应的窗口的结束。
SQL代码以下:语法检查、上线、启动做业(选择当前启动位点)。并将表1数据上传至datahub。
CREATE FUNCTION HopWindowSubtractionUdaf as 'com.alibaba.blink.sql.udx.SubtractionUdaf'; CREATE TABLE input_dh_e_mp_read_curve ( `no` VARCHAR, data_date VARCHAR, cons_id VARCHAR, org_no VARCHAR, r1 DOUBLE, ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss') ,WATERMARK wk FOR ts as withOffset(ts, 2000) ) WITH ( type = 'datahub', endPoint = 'http://dh-cn-shanghai.aliyun-inc.com', roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole', project = 'jszc_datahub', topic = 'input_dh_e_mp_read_curve' ); CREATE TABLE data_out( cons_id varchar ,data_date varchar ,subDegreeR1 DOUBLE )with( type = 'print' ); INSERT into data_out SELECT cons_id ,DATE_FORMAT(HOP_end(ts, INTERVAL '1' day,INTERVAL '2' day), 'yyyyMMdd') ,HopWindowSubtractionUdaf(r1,unix_timestamp(ts)) FROM input_dh_e_mp_read_curve group by hop(ts, INTERVAL '1' day,INTERVAL '2' day),cons_id;
因为使用了print connector,从对应的sink的taskmanager.out日志中能够查看到输出以下(已忽略其余debug日志):
task-1> (+)100000002,20190716,13.76 task-1> (+)100000002,20190717,0.35999999999999943 task-1> (+)100000002,20190718,2.4700000000000006
对比指望输出(表2),20190717和20190718两个窗口的数据均正确,代表业务逻辑正确,但此输出与指望输出有少量差别:
(1)20190716输出为13.76,这是由于第一个hop窗口只有一条数据致使的,这种数据能够在业务层过滤掉;
(2)20190719的数据没有输出,这是由于咱们设置了watermark,测试环境下20190719以后没有数据进来触发20190719对应的窗口的结束。
UDAF中主要有createAccumulator、getValue、accumulate、retract和merge方法,其调用关系和顺序并非彻底肯定,而是与Blink底层优化、Blink版本、开窗类型(如hop仍是over窗口)等相关。
比较肯定的是一次正常(没有failover)的做业,createAccumulator方法只在做业启动时调用一次,accumulate方法在每条数据输入时调用一次,在触发数据输出时会调用一次getValue(并不表明只调用一次)。
而retract方法和merge方法则跟具体的优化方式或开窗类型有关,本案例中over窗口调用retract方法而不调用merge方法,hop窗口调用merge方法而不调用retract方法。
你们能够增长日志,观察这几个方法的调用顺序,仍是蛮有意思的。
UDAF中必须实现createAccumulator、getValue、accumulate方法,可选择实现retract和merge方法。
通常状况下,可先实现createAccumulator、getValue、accumulate三个方法,而后编写SQL后进行语法检查,SQL编译器会提示是否须要retract或merge方法。
好比,若是没有实现retract方法,在使用over窗口时,语法检查会报相似以下错误:
org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'retract' which is public, not abstract and (in case of table functions) not static.
好比,若是没有实现merge方法,在使用over窗口时,语法检查会报相似以下错误:
org.apache.flink.table.api.ValidationException: Function class 'com.alibaba.blink.sql.udx.SubtractionUdaf' does not implement at least one method named 'merge' which is public, not abstract and (in case of table functions) not static.
(1)本案例没有考虑数据缺失的问题,好比由于某种缘由(网络问题、数据采集问题等)缺乏20190717的数据。这种状况下会是什么样的结果?你们能够自行测试下;(2)本案例使用了一个List,而后经过Collections.sort方法进行排序,这不是很优的方法,若是用优先级队列(priority queue)性能应该会更好;