flink 有状态udf 引发血案一

版权声明:本文为博主原创文章,未经博主赞成不得转载。 https://blog.csdn.net/rlnLo2pNEfx9c/article/details/83422587

640

场景css

近期在作一个画像的任务,sql实现的,当中有一个udf,会作很是多事情,包含将从redis读出历史值加权,并将中间结果和加权后的结果更新到redis。html

你们都知道,flink 是可以支持事件处理的。也就是可以没有时间的概念,那么在聚合,join等操做的时候,flink内部会维护一个状态,假如此时你也用redis维护了历史状态,也便是类似 result = currentState(flink)+lastState(redis)。且此时要针对计算的结果用where进行筛选.redis

SQL例如如下sql

 
   

CREATE VIEW view_count AS
select
 `time`,
 gid,
 cid,
 count(feed_id) * 1 as strength
FROM
 view_cid
GROUP BY
 gid,
 cid,`time`;

CREATE VIEW view_strength AS select
 `time`,
 gid,
 cid ,
 Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result`
FROM
 view_count
;

insert into
 hx_app_server_sink_common
SELECT
 gid,
 cid,
 `result`
FROM
 view_strength
where `result` <> '0.0'
GROUP BY
 gid,
 cid,
 `result`;缓存

业务分析app

第一个sql视图完毕的是首先分组,而后统计某一个字段并乘以权重;函数

第二个sql视图。udf :Get_Strength_Weaken完毕当前值和历史值叠加工做,历史值存储在redis。同一时候将结果返回并更新redis,返回值做为result字段。post

第三个sql在输出的时候,result字段做为了where的条件和group by里的字段。优化

这时候生成的flink概图例如如下:spa

640

观察中间的结构图可以发现。Get_Strength_Weaken被调用两次:

1. where条件。这个的生成是由于第三条sql

 
   

where `result` <> '0.0'

产生的运行计划,是否是看起来很是懵逼。。

2. select里面另外一次调用Get_Strength_Weaken。这个很是明显。

固然。可以打印一下flink udf里eval函数的调用细节日志,很是easy发现反复调用的问题。浪院长这个也是经过分析日志。对照输出结果来得出的论。

综合上面分析和udf调用日志,结论就是udf被调用了两次。

对于这个flink的udf被屡次调用引发的结果偏大。整整调试了一下午。

由于上面分析可以得出结论,flink将where条件下推了,where 条件推断会先运行,而select里后运行,那么可以调整SQL。例如如下:

 
   

CREATE VIEW view_count AS
select
`time`,
gid,
cid,
count(feed_id) * 1 as strength
FROM
view_cid
GROUP BY
gid,
cid,`time`;

CREATE VIEW view_strength AS select
`time`,
gid,
cid ,
getResult(gid,cid) as `result`
FROM
view_count
where Get_Strength_Weaken(gid, cid, cast(strength as double), `time`, 0.95)  as `result` <> '0.0'
;

insert into
hx_app_server_sink_common
SELECT
gid,
cid,
`result`
FROM
view_strength
GROUP BY
gid,
cid,
`result`;

那么实际上。select里的udf主要目的是取出来计算结果。那么这个时候可以写个简单的udf--getResult,仅仅让他从redis获取 where条件里更新到redis里的结果,由于该udf是无状态的即便屡次调用。也无所谓。

因此。总结一下,对于flink 来讲,由于基于事件的处理,聚合、join等操做会有状态缓存,那么此时再用到含有外部存储状态的udf,必定要谨慎,结合运行计划,来合理放置udf的位置,避免出错。

固然。调试阶段最好是有具体的日志。便于分析和定位问题。

flink 状态删除

事实上。flink聚合等内部状态有配置可以使其本身主动删除的,具体配置使用例如如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12));

// define query
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// emit result Table via a TableSink
result.writeToSink(sink, qConfig);

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);

[完]

推荐阅读:

推荐两个不错的flink项目

Spark SQL从入门到精通

重要 : 优化flink的四种方式

flink超越Spark的Checkpoint机制

640

相关文章
相关标签/搜索