简介:Flink+Hologres亿级用户实时UV精确去重最佳实践
UV、PV计算,由于业务需求不一样,一般会分为两种场景:html
针对离线计算场景,Hologres基于RoaringBitmap,提供超高基数的UV计算,只需进行一次最细粒度的预聚合计算,也只生成一份最细粒度的预聚合结果表,就能达到亚秒级查询。具体详情能够参见往期文章>>Hologres如何支持超高基数UV计算(基于RoaringBitmap实现)java
对于实时计算场景,能够使用Flink+Hologres方式,并基于RoaringBitmap,实时对用户标签去重。这样的方式,能够较细粒度的实时获得用户UV、PV数据,同时便于根据需求调整最小统计窗口(如最近5分钟的UV),实现相似实时监控的效果,更好的在大屏等BI展现。相较于以天、周、月等为单位的去重,更适合在活动日期进行更细粒度的统计,而且经过简单的聚合,也能够获得较大时间单位的统计结果。git
1)建立表uid\_mapping为uid映射表,用于映射uid到32位int类型。github
BEGIN; CREATE TABLE public.uid_mapping ( uid text NOT NULL, uid_int32 serial, PRIMARY KEY (uid) ); --将uid设为clustering_key和distribution_key便于快速查找其对应的int32值 CALL set_table_property('public.uid_mapping', 'clustering_key', 'uid'); CALL set_table_property('public.uid_mapping', 'distribution_key', 'uid'); CALL set_table_property('public.uid_mapping', 'orientation', 'row'); COMMIT;
2)建立表dws\_app为基础聚合表,用于存放在基础维度上聚合后的结果。sql
CREATE EXTENSION IF NOT EXISTS roaringbitmap;
--新建shard数为16的Table Group, --由于测试数据量百万级,其中后端计算资源为100core,设置shard数为16 BEGIN; CREATE TABLE tg16 (a int); --Table Group哨兵表 call set_table_property('tg16', 'shard_count', '16'); COMMIT;
BEGIN; create table dws_app( country text, prov text, city text, ymd text NOT NULL, --日期字段 timetz TIMESTAMPTZ, --统计时间戳,能够实现以Flink窗口周期为单位的统计 uid32_bitmap roaringbitmap, -- 使用roaringbitmap记录uv primary key(country, prov, city, ymd, timetz)--查询维度和时间做为主键,防止重复插入数据 ); CALL set_table_property('public.dws_app', 'orientation', 'column'); --日期字段设为clustering_key和event_time_column,便于过滤 CALL set_table_property('public.dws_app', 'clustering_key', 'ymd'); CALL set_table_property('public.dws_app', 'event_time_column', 'ymd'); --等价于将表放在shard数为16的table group call set_table_property('public.dws_app', 'colocate_with', 'tg16'); --group by字段设为distribution_key CALL set_table_property('public.dws_app', 'distribution_key', 'country,prov,city'); COMMIT;
完整示例源码请见alibabacloud-hologres-connectors examples后端
1)Flink 流式读取数据源(DataStream),并转化为源表(Table)数组
//此处使用csv文件做为数据源,也能够是kafka等 DataStreamSource odsStream = env.createInput(csvInput, typeInfo); // 与维表join须要添加proctime字段,详见https://help.aliyun.com/document_detail/62506.html Table odsTable = tableEnv.fromDataStream( odsStream, $("uid"), $("country"), $("prov"), $("city"), $("ymd"), $("proctime").proctime()); // 注册到catalog环境 tableEnv.createTemporaryView("odsTable", odsTable);
2)将源表与Hologres维表(uid\_mapping)进行关联app
其中维表使用insertIfNotExists参数,即查询不到数据时自行插入,uid\_int32字段即可以利用Hologres的serial类型自增建立。ide
// 建立Hologres维表,其中nsertIfNotExists表示查询不到则自行插入 String createUidMappingTable = String.format( "create table uid_mapping_dim(" + " uid string," + " uid_int32 INT" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," //Hologres DB名 + " 'tablename' = '%s',"//Hologres 表名 + " 'username' = '%s'," //当前帐号access id + " 'password' = '%s'," //当前帐号access key + " 'endpoint' = '%s'," //Hologres endpoint + " 'insertifnotexists'='true'" + ")", database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable); // 源表与维表join String odsJoinDim = "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32" + " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim" + " ON ods.uid = dim.uid"; Table joinRes = tableEnv.sqlQuery(odsJoinDim);
3)将关联结果转化为DataStream,经过Flink时间窗口处理,结合RoaringBitmap进行聚合函数
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource = source // 筛选须要统计的维度(country, prov, city, ymd) .keyBy(0, 1, 2, 3) // 滚动时间窗口;此处因为使用读取csv模拟输入流,采用ProcessingTime,实际使用中可以使用EventTime .window(TumblingProcessingTimeWindows.of(Time.minutes(5))) // 触发器,能够在窗口未结束时获取聚合结果 .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))) .aggregate( // 聚合函数,根据key By筛选的维度,进行聚合 new AggregateFunction< Tuple5<String, String, String, String, Integer>, RoaringBitmap, RoaringBitmap>() { @Override public RoaringBitmap createAccumulator() { return new RoaringBitmap(); } @Override public RoaringBitmap add( Tuple5<String, String, String, String, Integer> in, RoaringBitmap acc) { // 将32位的uid添加到RoaringBitmap进行去重 acc.add(in.f4); return acc; } @Override public RoaringBitmap getResult(RoaringBitmap acc) { return acc; } @Override public RoaringBitmap merge( RoaringBitmap acc1, RoaringBitmap acc2) { return RoaringBitmap.or(acc1, acc2); } }, //窗口函数,输出聚合结果 new WindowFunction< RoaringBitmap, Tuple6<String, String, String, String, Timestamp, byte[]>, Tuple, TimeWindow>() { @Override public void apply( Tuple keys, TimeWindow timeWindow, Iterable<RoaringBitmap> iterable, Collector< Tuple6<String, String, String, String, Timestamp, byte[]>> out) throws Exception { RoaringBitmap result = iterable.iterator().next(); // 优化RoaringBitmap result.runOptimize(); // 将RoaringBitmap转化为字节数组以存入Holo中 byte[] byteArray = new byte[result.serializedSizeInBytes()]; result.serialize(ByteBuffer.wrap(byteArray)); // 其中 Tuple6.f4(Timestamp) 字段表示以窗口长度为周期进行统计,以秒为单位 out.collect( new Tuple6<>( keys.getField(0), keys.getField(1), keys.getField(2), keys.getField(3), new Timestamp( timeWindow.getEnd() / 1000 * 1000), byteArray)); } });
4)写入结果表
须要注意的是,Hologres中RoaringBitmap类型在Flink中对应Byte数组类型
// 计算结果转换为表 Table resTable = tableEnv.fromDataStream( processedSource, $("country"), $("prov"), $("city"), $("ymd"), $("timest"), $("uid32_bitmap")); // 建立Hologres结果表, 其中Hologres的RoaringBitmap类型经过Byte数组存入 String createHologresTable = String.format( "create table sink(" + " country string," + " prov string," + " city string," + " ymd string," + " timetz timestamp," + " uid32_bitmap BYTES" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," + " 'tablename' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + " 'endpoint' = '%s'," + " 'connectionSize' = '%s'," + " 'mutatetype' = 'insertOrReplace'" + ")", database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable); // 写入计算结果到dws表 tableEnv.executeSql("insert into sink select * from " + resTable);
查询时,从基础聚合表(dws\_app)中按照查询维度作聚合计算,查询bitmap基数,得出group by条件下的用户数
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好 set hg_experimental_enable_force_three_stage_agg=off SELECT country ,prov ,city ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE ymd = '20210329' GROUP BY country ,prov ,city ;
--运行下面RB_AGG运算查询,可执行参数先关闭三阶段聚合开关(默认关闭),性能更好 set hg_experimental_enable_force_three_stage_agg=off SELECT country ,prov ,RB_CARDINALITY(RB_OR_AGG(uid32_bitmap)) AS uv FROM dws_app WHERE time > '2021-04-19 18:00:00+08' and time < '2021-04-19 19:00:00+08' GROUP BY country ,prov ;
本文内容由阿里云实名注册用户自发贡献,版权归原做者全部,阿里云开发者社区不拥有其著做权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。若是您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将马上删除涉嫌侵权内容。