揭秘|每秒千万级的实时数据处理是怎么实现的?

做者:闲鱼技术-靖杨redis

背景

闲鱼目前实际生产部署环境愈来愈复杂,横向依赖各类服务盘宗错节,纵向依赖的运行环境也愈来愈复杂。当服务出现问题的时候,可否及时在海量的数据中定位到问题根因,成为考验闲鱼服务能力的一个严峻挑战。数据库

线上出现问题时经常须要十多分钟,甚至更长时间才能找到问题缘由,所以一个可以快速进行自动诊断的系统需求就应用而生,而快速诊断的基础是一个高性能的实时数据处理系统。这个实时数据处理系统须要具有以下的能力:一、数据实时采集、实时分析、复杂计算、分析结果持久化。二、能够处理多种多样的数据。包含应用日志、主机性能监控指标、调用链路图。三、高可靠性。系统不出问题且数据不能丢。四、高性能,底延时。数据处理的延时不超过3秒,支持每秒千万级的数据处理。 本文不涉及问题自动诊断的具体分析模型,只讨论总体实时数据处理链路的设计。json

输入输出定义

为了便于理解系统的运转,咱们定义该系统总体输入和输出以下:数组

输入:

服务请求日志(包含traceid、时间戳、客户端ip、服务端ip、耗时、返回码、服务名、方法名)缓存

环境监控数据(指标名称、ip、时间戳、指标值)。好比cpu、 jvm gc次数、jvm gc耗时、数据库指标。安全

输出:

一段时间内的某个服务出现错误的根因,每一个服务的错误分析结果用一张有向无环图表达。(根节点便是被分析的错误节点,叶子节点便是错误根因节点。叶子节点多是一个外部依赖的服务错误也多是jvm异常等等)。性能优化

架构设计

在实际的系统运行过程当中,随着时间的推移,日志数据以及监控数据是源源不断的在产生的。每条产生的数据都有一个本身的时间戳。而实时传输这些带有时间戳的数据就像水在不一样的管道中流动同样。数据结构

若是把源源不断的实时数据比做流水,那数据处理过程和自来水生产的过程也是相似的:架构

天然地,咱们也将实时数据的处理过程分解成采集、传输、预处理、计算、存储几个阶段。并发

总体的系统架构设计以下:

采集

采用阿里自研的sls日志服务产品(包含logtail+loghub组件),logtail是采集客户端,之因此选择logtail是由于其优秀的性能、高可靠性以及其灵活插件扩展机制,闲鱼能够定制本身的采集插件实现各类各样数据的实时采集。

传输

loghub能够理解为一个数据发布订阅组件,和kafka的功能相似,做为一个数据传输通道其更稳定、更安全,详细对比文章参考:yq.aliyun.com/articles/35…

预处理

实时数据预处理部分采用blink流计算处理组件(开源版本叫作flink,blink是阿里在flink基础上的内部加强版本)。目前经常使用的实时流计算开源产品有Jstorm、SparkStream、Flink。Jstorm因为没有中间计算状态的,其计算过程当中须要的中间结果必然依赖于外部存储,这样会致使频繁的io影响其性能;SparkStream本质上是用微小的批处理来模拟实时计算,实际上仍是有必定延时;Flink因为其出色的状态管理机制保证其计算的性能以及实时性,同时提供了完备SQL表达,使得流计算更容易。

计算与持久化

数据通过预处理后最终生成调用链路聚合日志和主机监控数据,其中主机监控数据会独立存储在tsdb时序数据库中,供后续统计分析。tsdb因为其针对时间指标数据的特别存储结构设计,很是适合作时序数据的存储与查询。调用链路日志聚合数据,提供给cep/graph service作诊断模型分析。cep/graph service是闲鱼自研的一个应用,实现模型分析、复杂的数据处理以及外部服务进行交互,同时借助rdb实现图数据的实时聚合。 最后cep/graph service分析的结果做为一个图数据,实时转储在lindorm中提供在线查询。lindorm能够看做是加强版的hbase,在系统中充当持久化存储的角色。

详细设计与性能优化

采集

日志和指标数据采集使用logtail,整个数据采集过程如图:

其提供了很是灵活的插件机制,共有四种类型的插件:

  • inputs: 输入插件,获取数据。
  • processors: 处理插件,对获得的数据进行处理。
  • aggregators: 聚合插件,对数据进行聚合。
  • flushers: 输出插件,将数据输出到指定 sink。

因为指标数据(好比cpu、内存、jvm指标)的获取须要调用本地机器上的服务接口获取,所以应尽可能减小请求次数,在logtail中,一个input占用一个goroutine。闲鱼经过定制input插件和processors插件,将多个指标数据(好比cpu、内存、jvm指标)在一个input插件中经过一次服务请求获取(指标获取接口由基础监控团队提供),并将其格式化成一个json数组对象,在processors插件中再拆分红多条数据,以减小系统的io次数同时提高性能。

传输

数据传输使用LogHub,logtail写入数据后直接由blink消费其中的数据,只需设置合理的分区数量便可。分区数要大于等于blink读取任务的并发数,避免blink中的任务空转。

预处理

预处理主要采用blink实现,主要的设计和优化点:

编写高效的计算流程

blink是一个有状态的流计算框架,很是适合作实时聚合、join等操做。在咱们的应用中只须要关注出现错误的的请求上相关服务链路的调用状况,所以整个日志处理流分红两个流:一、服务的请求入口日志做为一个单独的流来处理,筛选出请求出错的数据。二、其余中间链路的调用日志做为另外一个独立的流来处理,经过和上面的流join on traceid实现出错服务依赖的请求数据塞选。

如上图所示经过双流join后,输出的就是全部发生请求错误相关链路的完整数据。

设置合理的state生命周期

blink在作join的时候本质上是经过state缓存中间数据状态,而后作数据的匹配。而若是state的生命周期太长会致使数据膨胀影响性能,若是state的生命周期过短就会没法正常关联出部分延迟到来的数据,因此须要合理的配置state生存周期,对于该应用容许最大数据延迟为1分钟。

使用niagara做为statebackend,以及设定state数据生命周期,单位毫秒
state.backend.type=niagara
state.backend.niagara.ttl.ms=60000
复制代码

开启MicroBatch/MiniBatch

MicroBatch 和 MiniBatch 都是微批处理,只是微批的触发机制上略有不一样。原理上都是缓存必定的数据后再触发处理,以减小对 state 的访问从而显著提高吞吐,以及减小输出数据量。

link.miniBatch.join.enabled=true
使用 microbatch 时须要保留如下两个 minibatch 配置
blink.miniBatch.allowLatencyMs=5000
防止OOM,每一个批次最多缓存多少条数据
blink.miniBatch.size=20000 
复制代码

动态负载使用Dynamic-Rebalance替代Rebalance

blink任务在运行是最忌讳的就是存在计算热点,为保证数据均匀使用Dynamic Rebalance,它能够根据当前各subpartition中堆积的buffer的数量,选择负载较轻的subpartition进行写入,从而实现动态的负载均衡。相比于静态的rebalance策略,在下游各任务计算能力不均衡时,可使各任务相对负载更加均衡,从而提升整个做业的性能。

开启动态负载task.dynamic.rebalance.enabled=true
复制代码

自定义输出插件

数据关联后须要将统一请求链路上的数据做为一个数据包通知下游图分析节点,传统的方式的是经过消息服务来投递数据。可是经过消息服务有两个缺点:一、其吞吐量和rdb这种内存数据库相比比仍是较大差距(大概差一个数量级)。二、在接受端还须要根据traceid作数据关联。 咱们经过自定义插件的方式将数据经过异步的方式写入RDB,同时设定数据过时时间。在RDB中以<traceid,相关链路请求数据json style="box-sizing: border-box;">数据结构存储。写入的同时只将traceid作为消息内容经过metaQ通知下游计算服务,极大的减小了metaQ的数据传输压力。</traceid,相关链路请求数据json>

图聚合计算

cep/graph计算服务节点在接收到metaQ的通知后,综合根据请求的链路数据以及依赖的环境监控数据,会实时生成诊断结果。诊断结果简化为以下形式:

说明本次请求是因为下游jvm的线程池满致使的,可是一次调用并不能说明该服务不可用的根本缘由,须要分析总体的错误状况,那就须要对图数据作实时聚合。 聚合设计以下(为了说明基本思路,作了简化处理): 一、首先利用redis的zrank能力为根据服务名或ip信息为每一个节点分配一个全局惟一排序序号。 二、为图中的每一个节点生成对应图节点编码,编码格式:-对于头节点:头节点序号|归整时间戳|节点编码-对于普通节点:|归整时间戳|节点编码 三、因为每一个节点在一个时间周期内都有惟一的key,所以能够将节点编码做为key利用redis为每一个节点作计数。同时消除了并发读写的问题。 四、利用redis中的set集合能够很方便的叠加图的边。 五、记录根节点,便可经过遍历还原聚合后的图结构。 聚合后的结果大体以下:

这样最终生成了服务不可用的总体缘由,而且经过叶子节点的计数能够实现根因的排序。

收益

系统上线后,整个实时处理数据链路的延迟不超过三秒。闲鱼服务端问题的定位时间从十多分钟甚至更长时间降低到五秒内。大大的提高了问题定位的效率。

展望

目前的系统能够支持闲鱼每秒千万的数据处理能力。后续自动定位问题的服务可能会推广到阿里内部更多的业务场景,随之而来的是数据量的成倍增长,所以对于效率和成本提出了更好的要求。

将来咱们可能作的改进:

  • 可以自动的减小或者压缩处理的数据。
  • 复杂的模型分析计算也能够在blink中完成,减小io,提高性能。
  • 支持多租户的数据隔离。
相关文章
相关标签/搜索