网易云信做为一个 PaaS 服务,须要对线上业务进行实时监控,实时感知服务的“心跳”、“脉搏”、“血压”等健康情况。经过采集服务拿到 SDK、服务器等端的心跳埋点日志,是一个很是庞大且杂乱无序的数据集,而如何才能有效利用这些数据?服务监控平台要作的事情就是对海量数据进行实时分析,聚合出表征服务的“心跳”、“脉搏”、“血压”的核心指标,并将其直观的展现给相关同窗。这其中核心的能力即是 :实时分析和实时聚合。程序员
在以前的《网易云信服务监控平台实践》一文中,咱们围绕数据采集、数据处理、监控告警、数据应用 4 个环节,介绍了网易云信服务监控平台的总体框架。本文是对网易云信在聚合指标计算逻辑上的进一步详述。算法
基于明细数据集进行实时聚合,生产一个聚合指标,业界经常使用的实现方式是 Spark Streaming、Flink SQL / Stream API。不管是何种方式,咱们都须要经过写代码来指定数据来源、数据清洗逻辑、聚合维度、聚合窗口大小、聚合算子等。如此繁杂的逻辑和代码,不管是开发、测试,仍是后续任务的维护,都须要投入大量的人力/物力成本。而咱们程序员要作的即是化繁为简、实现大巧不工。数据库
本文将阐述网易云信是如何基于 Flink 的 Stream API,实现一套通用的聚合指标计算框架。segmentfault
如上图所示,是咱们基于 Flink 自研的聚合指标完整加工链路,其中涉及到的模块包括:服务器
下文将详细介绍这几个模块的设计和实现思路。微信
为了便于聚合指标的生产和维护,咱们将指标计算过程当中涉及到的关键参数进行了抽象提炼,提供了可视化配置页面,以下图所示。下文会结合具体场景介绍各个参数的用途。架构
在聚合任务运行过程当中,咱们会按期加载配置。若是检测到有新增的 Topic,咱们会建立 kafka-consumer 线程,接收上游实时数据流。同理,对于已经失效的配置,咱们会关闭消费线程,并清理相关的 reporter。并发
对于数据源相同的聚合指标,咱们共用一个 kafka-consumer,拉取到记录并解析后,对每一个聚合指标分别调用 collect() 进行数据分发。若是指标的数据筛选规则(配置项⑤)非空,在数据分发前须要进行数据过滤,不知足条件的数据直接丢弃。app
基于 Flink 的 Stream API 实现聚合计算的核心代码以下所示:框架
SingleOutputStreamOperator<MetricContext> aggResult = src .assignTimestampsAndWatermarks(new MetricWatermark()) .keyBy(new MetricKeyBy()) .window(new MetricTimeWindow()) .aggregate(new MetricAggFuction());
对于大数据量的聚合计算,数据倾斜是不得不考虑的问题,数据倾斜意味着规则中配置的分组字段(配置项⑥)指定的聚合 key 存在热点。咱们的计算框架在设计之初就考虑了如何解决数据倾斜问题,就是将聚合过程拆分红2阶段:
具体实现:判断并发度参数 parallelism(配置项⑦) 是否大于1,若是 parallelism 大于1,生成一个 [0, parallelism) 之间的随机数做为 randomKey,在第1阶段聚合 keyBy() 中,将依据分组字段(配置项⑥)获取的 key 与 randomKey 拼接,生成最终的聚合 key,从而实现了数据随机打散。
做为一个平台型的产品,咱们提供了以下常见的聚合算子。因为采用了二次聚合逻辑,各个算子在第1阶段和第2阶段采用了相应的计算策略。
算子 | 第1阶段聚合 | 第2阶段聚合 |
---|---|---|
min/max/sum/count | 直接对输入数据进行预聚合计算,输出预聚合结果 | 对第1阶段预聚合结果进行二次聚合计算,输出最终结果 |
first/last | 对输入数据的 timestamp 进行比较,记录最小/最大的 timestamp 以及对应的 value 值,输出 <timestamp,value> 数据对 | 对 <timestamp,value> 数据对进行二次计算,输出最终的 first/last |
avg | 计算该分组的和值和记录数,输出 <sum,cnt> 数据对 | 对 <sum,cnt> 数据对分别求和,而后输出:总 sum / 总 cntcount |
median/tp90/tp95 | 统计输入数据的分布,输出 NumericHistogram | 对输入的 NumericHistogram 作 merge 操做,最终输出中位数/tp90/tp95 |
count-distinct | 输出记录桶信息和位图的 RoaringArray | 对 RoaringArray 进行 merge 操做,最终输出精确的去重计数结果 |
count-distinct(近似) | 输出基数计数对象 HyperLoglog | 对 HyperLoglog 进行 merge 操做,最终输出近似的去重计数结果 |
对于计算结果受所有数据影响的算子,如 count-distinct(去重计数),常规思路是利用 set 的去重特性,将全部统计数据放在一个 Set 中,最终在聚合函数的 getResult 中输出 Set 的 size。若是统计数据量很是大,这个 Set 对象就会很是大,对这个 Set 的 I/O 操做所消耗的时间将不能接受。
对于类 MapReduce 的大数据计算框架,性能的瓶颈每每出如今 shuffle 阶段大对象的 I/O 上,由于数据须要序列化 / 传输 / 反序列化,Flink 也不例外。相似的算子还有 median 和 tp95。
为此,须要对这些算子作专门的优化,优化的思路就是尽可能减小计算过程当中使用的数据对象的大小,其中:
后处理模块,是对第2阶段聚合计算输出数据进行再加工,主要有2个功能:
这里所说的异常数据,分为两类:迟到的数据和提早到的数据。
迟到数据:
聚合计算获得的指标,默认输出到 Kafka 和时序数据库 InfluxDB。
为了实时监控各个数据源和聚合指标的运行状况,咱们经过 InfluxDB+Grafana 组合,实现了聚合计算全链路监控:如各环节的输入/输出 QPS、计算耗时、消费堆积、迟到数据量等。
目前,经过该通用聚合框架,承载了网易云信 100+ 个不一样维度的指标计算,带来的收益也是比较可观的:
圣少友,网易云信数据平台资深开发工程师,从事数据平台相关工做,负责服务监控平台、数据应用平台、质量服务平台的设计开发工做。
更多技术干货,欢迎关注【网易智企技术+】微信公众号