如何快速作一个山寨的实时“大数据”处理

 

前言

为啥写这篇文章?由于我如今作的这套实时计算系统在公司里很难玩下去了。去年年初来到ctrip,主要就是作两个实时应用,一个是实时报警,功能是作出来了,但应用效果很差;一个是XXX(敏感应用,不敢写出来,以XXX代替),也是实现了功能需求,但想继续按本身的思路往下走是不可能了,我捉急的表达能力很难让上头去理解实时计算和传统的request-response方式的应用不一样点在哪里,为啥要这么干,也很难明白上头喜欢的系统是什么样的,真是挠破头。个人方式看来是作不下去了,但总以为仍是有点价值,因此写下来,看看有没有也想山寨的同窗,能够做为参考,下面一段是扯淡,想看实际内容的请跳到系统结构。

至于为何起这个标题:html

  • 如何:
    会介绍我目前的系统是怎么作到如今这个程度的;以及原本想尝试的一些开发方向。
  • 快速:
    • 这套系统基本上是我去年一我的搭起来的,最初彻底不知道如何去作实时数据处理,因此架构翻了好几次(感谢ctrip给的机会),所以到最近的一个整个版本的话,核心代码也就小几千行(这个要感谢开源社区),找一个熟练的java工程师来作的话,最多在几个星期内就能搭起来,应该说建设的成本不会太大。
    • 由于起手就有两个项目,因此一开始就瞄着通用的实时计算框架/平台,过去一年还零散开发了两三个额外的应用,这几个应用基本上都是共享一套代码,差异只存在于配置。所以当底层设施建好后,能够很快的开发一个应用,咱们曾经有个应用大概花了半天,若是能作的完善的话,新应用的开发和部署能够作到分钟级,由于现实场景中大部分应用的数据获取和计算本质上差异很小,很是类似。
  • 山寨:
    介绍的系统和那些大公司的系统确定不在一个层次上,高手们能够绕道了。本文适合于像我同样的三流程序员,若是不想花费太大的代价,而关注点在于如何应用现有的开源产品(咱们是storm+esper)来作能够cover一部分状况的实时数据处理,能够参考下本文。
  • 实时
    上面说了开发和部署的目标是分钟级。而整个系统的响应速度和新规则的上线速度是在秒级,但实在是无法作到毫秒级,毕竟是山寨的。用简单的方式作到实时计算,牺牲了必定的可靠性,若是真有需求的话,须要另外的手段去弥补。
    另外最近被人用实时操做系统的定义challenge了对实时系统的理解。我仍是想表达一个观点,互联网里的实时系统应该还具有一个特征,便是关注latest的数据,实时系统不能彻底等于响应快,还对应于数据新。若是是历史时刻的某一数据特征,那是offline的系统去考虑的,混到实时系统中只会对系统设计拖后腿

  • 大是相对的,跟巨头们的数据量无法比,只是ctrip这样的流量仍是没有压力的。老实点,放在引号里。
  • 数据
    业内大数据讲的太多,太空,连“数据”是什么都没有明确的定义。我本身心中的数据要有以下的特征(我的观点,因此在引号里):
    • 可描述。各个公司应该都差很少,基本数据来源于各类各样的log,但最好仍是能用统一且简单的方式去描述这种数据,而不是光秃秃的没有任何schema的text,否则在其上就比较难作文章(若是只是简单使用的话),后面几点特征也很难保证。固然这种格式会比较简单,以防止带来太多限制
    • 可计算。提供一些通用的操做能够对数据进行变换和处理,由数据产生数据,并能支持递归的迭代。这样才能在raw data上获得一些更有意义的数据,从而去作更有价值的事情。
    • 多条记录Over单条记录。在计算中,对于某一个实体(例如一个ip或是一个uid)而言,其单条记录每每用处不大,更应该关注其一组记录的特征值。因此须要大量的聚合、统计操做。目前咱们作的这套系统重点就在这,本文大部份内容会与此相关。
    • 群体记录Over单个实体的记录。在计算中,单个实体也没有太大的价值,更重要的是看整个群体的特征,用群体的特征再去比对个体。找到人民大众的特征,再去找里面的遵纪守法者(正常值,可用在推荐)和离经叛道者(异常点,多用于报警)。我一直想作到这个,也有大概的思路,但无法继续尝试了,后文会简要介绍一些。
    • 可度量和监控。要对数据有统一的度量和监控,从而对数据有个掌控,也方便找出异常点
    • 数据Over规则。估计大多数公司的应用和ctrip的同样,业务方只会提出一系列规则和几个原始的数据源,指望一个万能的规则引擎去把全部数据采进来并按照规则的逻辑去实现。大部分业务方会是自然以规则为中心的,根据具体的case,人肉找出必定的规律,而后再看过程当中用到了什么数据源,从而给出需求给pd。然而在大数据量的条件下,这是不成立的,一是在大量数据下作复杂的逻辑计算,从开发和运行的效率都很难保证高效;二是大数据量下,人肉很难去发现规则了,并且老的规则也很难维护,由于规则这东西难以度量和演化,也难作自动化生成。因此必须转向数据为中心,抽出不一样的特征维度,而后才可能用高大上的机器学习等方法去自动化生成规则或模型,这个才是王道;如今很成熟的hadoop,storm,也都是以数据为中心,尽可能简化计算模式。
    • 数据特征Over业务特征。数据处理系统只应该跟数据相关,考虑的是数据类型(double,string等少许类型)和数据特征(单记录大小、吞吐量、流速等),这样才能让系统的适用性最大化。虽然主流的说法是设计要按照业务来,但我的仍是比较顽固,认为其本质是业务里的数据特征,业务自己太繁琐,对底层系统也没多大意义,应该把业务底下的数据特征提取抽象出来。目前我所作的系统只适合于较短期内的数据的大量的、实时的计算,不考虑大时间跨度的数据,也暂时忍受短暂的失效。如今的几个应用基本拥有这个特征,因此能够号称开发应用只要配置就行了,虽然你们就是不相信。
  • 不是数据大
    如今不少同窗认为跟H*的系统挂上钩,或是用个nosql,就吹是大数据了。其本质仍是之前基于数据库的业务驱动的应用换了个dao而已,顶多称为数据大。实际上“数据”和“大”都值得商榷。虽然是山寨,也要跟他们坚定划清界限。
  • 处理
    火热的大数据一应俱全,这里只涉及到实时环境下的数据处理,不怎么涉及到存储、展示等其余方面

后续系统设计都是以这些为出发点的前端

背景知识

storm

storm 是目前比较火热的实时处理系统,虽然不能和H系的比,但资料也仍是很多,我这就默认你们已经知道storm的概况了,具体的资料就不举了!java

国内而言,阿里系对storm的应用比较多,网上有不少的文章;在ctrip,也有另一个team在用storm作前端的用户行为分析,感受是蛮成功的,应该算公司里拿得出手的项目之一了。还有不少公司也是用storm在作一些实时的业务开发。mysql

storm自己只是提供了一个实时处理的框架,帮咱们解决了大部分的可靠性,数据流向组织,并发性等基本问题,可是实际的数据处理仍是要根据业务需求去开发代码适配。所以它只是解决了实时计算的组织和管理,而对计算自己是没有支持的,直接用是达不到我想要的“不写代码只配置”的效果,因此我把重心放在esper上,storm只做为外部的容器,帮我作数据的简单处理和sharding,节点的自动分配和重启,数据源的组织和数据结果的分配等等外围的功能,计算就交给esper了。git

esper

esper 绝壁是个好东西,功能强大,但门槛过高。资料的话官网上看下例子,也有一篇很详尽的文档,这是html版,有兴趣的同窗能够上官网下pdf版。
简单的介绍的话,esper是一个用于针对数据流进行流式处理的lib库(java和.net都有),他跟传统的数据库不一样之处在于:数据库是数据先写进来,再定义一个sql,而后去拉取数据并计算一次获得结果;esper是定义一个计算规则,并做为一个计算节点,而后再把数据不停的推给他,由计算节点不停的作增量计算,并出来一系列的结果。
Alt text
上图是传统的数据库(也有用nosql的)的方式,目前不少项目应该都是采起这种设计,好比我要作一个5分钟的数据统计,就须要不停的跑sql去拉数据作统计和计算(固然能够提早作一些预聚合,数据库也应该有触发器之类的功能,不过只能做为优化,也很难去掌控)。这种pull的方式容易理解,也有已经成熟到烂街的数据库技术支持,pull的时候靠分库分表来sharding,只是要本身写点聚合的代码,对单个查询来讲很快,由于只要一次数据库查询,一跳就完了,数据的查询和存储都由数据库来保证高效。但对于数据量大,而后又要实时更新的场景来讲,这种低效的方式是走不通的,图上就能够看到由于数据和计算节点的分离,势必形成冗余的数据被屡次拉取(或者要写复杂的代码去优化),并且实时度靠轮询来达到。这种设计要么只能适合于数据量和计算量比较小的场景,要么只能适合于人傻钱多的场景。但它好理解。程序员

也有说拿redis作counter的,但这种方式仍是解决不了数据和计算分离的问题;在者,对于功能稍复杂的计算就力不从心了,并且redis的逻辑抽象程度远不如sql,开发工做比sql都要大不少。具体的不展开了
Alt text
此图中下方每一个方框表明一个用esper实现的实时计算引擎,配置好运算规则后,咱们主动去把数据喂过去,引擎不停的作增量计算,每次有新结果就经过回调通知咱们。这个图是公司内部写ppt时临时画的,不大恰当。由于esper是个lib,单实例的可靠性和性能无法scale,因此咱们是架在storm上,由storm去自动部署和分配多个esper进程,并在前端作sharding来达到高扩展性。github

select avg(price) from StockTickEvent.win:time(30 sec) 

select a.id, count(*) from pattern [
        every a=Status -> (timer:interval(10 sec) and not Status(id=a.id)
] group by id

esper声称本身是SEP(stream event processing)和cep(complex event processing)。上面从官网抄了两个比较有表明性的例子来分别说明。redis

  • 第一个很直观,是从StockTickEvent中计算最近30秒的平均price,当你配置好规则和StockTickEvent事件的schema,并把数据组装成具体的StockTickEvent事件源源不断push给引擎后,引擎会根据新数据的到来和时间的流逝(意味着老数据的expire)不断作计算,每当值发生变化了就会经过回调来通知调用方。从这个例子能够看出,esper采用了相似sql的写法,其称之为epl,基本包含了sql的大部分用法,还算比较亲切,只是用的时候思惟要转换一下,这是一个持续的计算过程,而不是sql那种一锤子买卖
  • 第二个例子稍微复杂些,展现了esper描述事件之间关系的能力,这个例子是说当某个Status事件发生10秒以内,没有相同id的另外一个Status事件发生,即通知调用方。

这两个例子还只是揭露了esper能力和复杂度冰山一角。对于流式的数据处理和事件间pattern的描述,它提供了不少的底层支持和选项。基本咱们的需求都能获得知足,一种功能还能用好多种写法来实现,越用就越是以为它强。
然而,强大的东西不容易掌控。我是在前公司paypal时据说这个开源软件的,当时老板的另一个team花了很大的力气和资源在上面,但愿作实时大数据,我离开不久据说这个team就散了,大概由于没出好的成果,员工图谱上,我和cto之间那条线上的老板们都陆续黯然离开了;而我本身,是在来到ctrip才接触,由于大概知道前同事的不顺利,因此是带着敬畏心在作,当心翼翼,尽可能让它更易用,结果虽然功能是出来了,奖也拿了,仍是被骂,结局也离屎差很少了。因此想用它的要小心啊,不祥之物啊:)
不过失败不能白失败,经验教训要总结下的,但愿能成为它人的成功之母:算法

  • 要在esper上封装一层给使用。esper过于复杂,一应俱全,是个大杂烩,学习曲线开始时比较陡峭;并且从上面两张图比较,它的思惟是跟传统的数据库正好相反,因此要想说服用户能理解并直接去用就很困难。前同事推销给paypal的analyst,结果人家不感冒,我在ctrip时也向用户推销过,但愿用户能直接配epl规则来完成功能,结局是一年多下来仍是只有我一我的能写一点。因此我花了很大力气去尝试在这上层去封装,但愿能封装出更简单,更好用的一层。如今我大概有四个应用,已经近千条epl,每一个都比上面的例子要长好多,若是直接在epl上操做是不可想象的事情。
  • 既然要作封装,不是作全部功能的封装。刚开始用esper,会以为很好玩,好比我一个报警的cooldown功能就有好多种写法。后来逐渐简单化,尝试只用sep那套sql like的语法(不包含pattern那种扩展),一是已经知足目前遇到所需功能;二是容易驾驭,在上面作封装;三是正如我以前所说,想作数据为中心而不是规则为中心,因此只用了它简单而有效的聚合功能,摈弃了复杂和强大的CEP功能。
  • 必定要作好监控。前公司用的商业版,带一个称为“dashboard”的功能,据说没什么用;我本身用的开源版,没有这方面的支持,因此很受困扰。由于当那么大的数据量部署上去而后源源不断的跑时,你只能看到你的输入(源数据)和输出(esper最后经过回调传出来的数据),里面一大坨计算过程是黑盒的,彻底搞不清情况,调试什么的基本不可能。仍是须要想办法能让其输出一些中间信息,作到必定的监控,方便使用。最近用户提了点监控和报警的需求,已经想到了简单的solution,争取能在挂掉前实现上。
  • 界面是很重要的。以前都是一我的瞎玩,重点放在后端,一方面是前端基本没经验,另外一方面是后端还在摸石头,因此忽视了前段展现。结局是我觉得实现了功能就完了,因此只作到了restful api一层。但没有界面就向人展现理念,不讨喜,虽然本身以为仍是有独到之处的,结果仍是留下了很烂的印象。这是一个很好的教训。
  • 说服别人采用异步批处理的思惟。不管是storm仍是esper,都不是request-response的同步调用方式。storm的调用方通常经过queue去push数据,计算节点是本身不断运行的,由源源不断到来的数据trigger,多个物理上可能分离的节点处理后,产生最终结果,最后一个节点可能执行些落地的操做,但不会往回传,虽然有drpc,但也只是模拟了一个rpc的表壳,内部仍是经过不一样的queue去链接各个计算节点的,并且是分布式系统:queue+分布式节点+分布式数据来源的归并同步,单个数据的处理很难作到毫秒级,但它本质是一种批处理的方式,是跑量的,能够作到海量数据的秒级处理;esper也是,它的操纵对象是数据流,内部是事件驱动,因此也是一种异步批处理的方式,若是用同步调的话很难达到如此高效。其实从硬件到软件(网络),再到现有的大数据处理系统,基本都是异步批处理的思路。因此当最外层使用的时候,最好是去适应这种新环境下的新特征,而不是说老的数据库能作到100ms,你这咋不行,太差了。这个思惟方式不同,不转换很难理解。

dashboard

我同事在opentsdb(后端是hbase)的思路上开发了一个叫dashboard的系统,能够对海量metrics进行实时的存储和查询。,同事对整个先后端都进行了重写和改进,细节上有蛮多独到之处,我全部程序的监控,包括storm的监控和想要作的计算平台的监控,都是基于此。这应该是ctrip里很是好的一个系统了。sql

系统结构

下面分别从逻辑和物理上描述整个系统的结构。

逻辑结构

目前的系统设计,对一个应用,包含了四大部分
Alt text

  1. data source system. Data sources负责管理系统的输入数据,全部输入源经过配置的形式给出,系统尽量自动化的提取数据,并转化为内部的数据流
  2. variable engine. variable部分负责对原始数据流进行实时计算,以封装的esper引擎对原始数据流进行拆分/合并/过滤/聚合操做,加工后获得新的,更有价值的数据流。这一部分focus在计算,具体详见后文。
  3. rule engine. 针对variable部分处理后的数据流,咱们须要过滤出符合用户需求的部分,须要进行阈值比较,数据cooldown等工做,最后产生可直接供用户使用的数据(称为Alert)。
  4. dispatch engine
    对每个rule的Alert,可能有不一样的action(邮件、DB、调用特定url、mq),这一部分管理如何输出数据。

因此整套系统是一个典型的输入(part 1) --> 处理(part 2&3) --> 输出(part 4)的结构,每一个应用只需给出四大部分的配置,就能够获得一个实时事件处理应用。
这里须要补充说明的是:

  1. data source、variable、rule本质上都是数据流,每一个元素的config信息会描述它是什么(shema,有哪些字段,字段是什么类型,仅限于string,double,long,object等简单类型),它从何来(它的source是什么,datasource来自于外部,variable来至于datasource或自己,rule来自于variable),它如何获得(对source的计算方式)
  2. data source这块应该仅局限于外部数据的集成和简单处理。这点以前没想清楚,也把一些计算功能混进去了,结果挖了个坑。
  3. rule单列开来,是由于给外部使用时须要一些额外的配置信息。咱们的系统中基本往外的数据都是以alert/alarm为形式的,为了方便辨识,增长了一些名称、标示符等属性,方便数据的外部集成。
  4. variable目前在公司内部称为counter,但我的仍是倾向于叫variable,主要是由于:
    • 大多数analyst的的规则和模型都是创建在变量上。我设计的初衷是为了能摆脱肉眼对原始数据进行判断的规则分析模式,但愿能从多个数据源中抽象出供analyst去作挖掘分析的可精确描述、独立性较强的变量。固然这种变量在实时系统中是一个随时间变化的数据流(永远是latest值,不停变化)。虽然失败了,但理由还在。
    • counter这个名字只反映了聚合这一种计算类型,当时妥协改了这个名字,后面后悔了,由于要描述这个系统更难了
    • 对于整个计算过程的配置来讲,都是OPERATION(input1, input2)--> output1的最简单的范式,input和output都是系统里的数据流,用variable的叫法更贴切。

Data Source System

对于data source来讲,框架部分开发出不一样类型数据源的数据抽取驱动,能够对如下数据类型数据源进行数据拉取:

  1. db(mysql/sqlserver)
  2. hbase
  3. dashboard api拉取
  4. dashboard数据直连
  5. mq
  6. url拉取
  7. other

对于每一种数据源,大体只须要定义元数据信息,就能够完成外部的数据拉取并到系统内部数据(通常称为event,包括name、key、timestamp、value四大固有属性和其余属性)格式的转换:

  1. 链接信息(dashboard url,db connection & sql, mq connection & queue name)
  2. event转换信息。对每一种通用的数据源,能够配置一些参数来自动完成源数据-->event的转换。

目前大部分数据源都是根据应用写死,但长期但愿抽象出特征来,能够经过配置自动完成,只剩下少数特别的经过开发完成。
这一份的结果是咱们能够从每一个datasource源源不断的获得数据,所以就是一条条数据流,通过alignment后(时间对齐),要求全部事件都以相同的时钟下汇聚成一个逻辑上的总的数据流,这是系统最大的limitation)

Variable Engine

对于进入系统的数据流(stream),咱们能够对此进行一些操做(包括但不限于split/join/aggreation/filtering),实时造成新的数据流,获得一系列variable(能够对应为BI中的维度,风控模型中的variable),以下图所示

Alt text

进入这部分系统时有两个数据流DS1, DS2;通过处理后,获得6个Variable(每一个Variable主要包括name,key,timestamp,value几个固有属性和其余用户定义的属性),每一个variable其实也是随时间变化的数据流,经过加工后的数据会更有利于做进一步的决策;最后在某一个时间点,对全部variable进行切片,能够获得一系列的latest值,这样就能够作为决策规则(模型)的输入依据,这一部分由rule management部分完成。

实际上,从数学的角度看,这部分工做但愿以variable(数据流/维度,counter只是其中一种)这种数据流做为基本单元,完成一个带少许操做符的代数系统,从而整个计算过程能够由这样一些基本的操做符去搭建一个DAG,而不是从头至尾所有由程序员编码实现功能。

操做部分基本由esper完成,经过封装,将esper实现相关的部分封装掉,只提供逻辑上的运算符给用户/admin,减轻使用负担。目前只提供少许基本的操做类型:
a. (已实现)单线聚合/过滤。提供基于单个数据流(variable或data source)的按时间聚合、按条件过滤。经过此操做能够实现split/aggregate/filter等逻辑操做
b. (已实现)双线merge。对两条数据流(variable)进行合并操做,实现简单的join(根据key和timestamp严格匹配)。
c. (未实现)多线merge。将多条数据流(variable)根据key去全部的latest值,进行合并计算。
以上是系统中如今和将要实现的操做符,目前看效果不是很好,好比操做符a带的功能太多,但愿一个操做符就能解决多个问题,对用户并不贴切,应该拆分为aggregate,filter(split用多个filter来实现)。

对于每一个应用来讲,直接拿底层框架的操做符进行配置能够基本知足需求。在资源充足的状况下,每一个应用能够在框架的variable体系开发一套更高一级抽象层的操做符,来方便应用的使用。参见实时报警的实现

同时,variable部分会提供dump操做,按期的将variable值dump到hbase中,供后续查询。这个功能主要是为了过后分析和离线查询,目前尚未在生产启用。最近打算先简单写一路进dashboard,以利用其实时查询和展示的能力。至因而否之后要扩展,要看最后项目的发展了

Rule Engine

Rule这块会直接对上一部的variable进行操做,经过用户提供的阈值来得出有价值的信息(暂且称为alert),而且根据后续用户配置的action操做分发到外部处理的地方。
Alt text

目前,规则管理准备实现如下三种:

  1. (已实现)单variable固定阈值。阈值能够写死在规则里,这种方式简单,但不够灵活,适合那种比较稳定的规则
  2. (已实现)单variable浮动阈值。阈值经过api由用户管理,对每一个variable[key],以分钟为精度,进行阈值报警。这种方式使用比较麻烦,单能够提供必定的灵活性。
  3. (未实现)多variable组合报警。多个variable经过给定的公式来报警。这个操做能够做为后续更复杂的规则引擎的基础。目前是实现了两个variable的组合报警,方便用户使用。对于BI提供的规则和模型来讲,使用多variable的组合是自然的手段,目前还没达到这个阶段,因此也没加上。

这里须要注明的事,rule和variable有部分重合,rule的一些功能后续也能扩展到variable里实现,二者的区别在于:

  • variable是一种较为稳定的逻辑对象,对它有很好的管理:它能够做为计算单元在整个variable引擎中做为计算输入;经过variable dump功能能够有过后分析和查询的好处。所以,当须要对结果数据进行更细致的分析后后续处理时,能够创建一个variable
  • 当只须要对结果数据进行分发到后续的handler时,能够用rule,逻辑上更易懂一些。

Dispatch Engine

Dispatch引擎会将rule engine产生的alerts进行分发,供用户进行进一步访问。这一快将与data source一块造成相对应的关系,目标是经过配置将alerts推送到制定的地方:

  • db
  • mq
  • url gateway
  • hbase

App Management

App 管理做为后续计划(nice to have),应该是没可能实现了,本打算实现如下功能。

  1. 根据data source system, variable engine, rule engine, dispatch engine,再加上一些额外的配置,自动生成storm topology
  2. 提供app(其实就是topology)的界面化部署、启动、中止
  3. 整合各个模块的监控,并开发console模块
    写的简单,仍是有蛮多细节要考虑的,有精力的能够去尝试

这一部分主要设计了逻辑上数据流的定义,以及其整个生命周期,下一段讲一下咱们简单的物理结构是如何去分别实现各个功能的。

物理结构

以前已经提到过,咱们是storm+esper的形式,esper负责内部绝大部分的计算,storm负责外围的组织。整个系统实现起来很是简单,如图
Alt text

外部数据过来有两种方式:

  1. 外部数据主动push。然而因为storm更适合用主动向外拉的方式,因此咱们中间用了一个mq 中转,目前是一个activemq,打算往分布式queue迁移。
  2. 外部数据被pull。最初专门写了一个puller的程序,去拉取各类数据,再导入到storm中。后来发现画蛇添足了,storm自己就提供了高并发和故障恢复,因而逐渐把数据采集转移到storm上,组织好的话,代码仍然能够保持简洁性,并轻松得到高并发的特性。如今基本上只要不向外提供接口服务的程序都已经往storm集群上迁了

storm的多个spout/bolt节点获取外部数据后,成为统一的格式event(包含name,key,timestamp和其它特定属性),并sharding(须要key)到不一样esper 节点,做进一步计算。

Alt text

每个esper节点,都已是运行在单个机器的单个进程上了,因此在这里将不一样来源的数据流对齐(须要timestamp),并做一些优化处理(好比去除冗余的对象)。最后这些event就分配到esper引擎了。
variable计算引擎一方面将这些外部数据转化为简单变量(一种我本身约定格式的esper数据流),这些简单变量在一些操做符下能够生成其它变量,有最简单的操做符

  1. filtering. insert into outputVar select * from inputVar where $condition
  2. aggregation.insert into outputVar select count/sum/avg(*) as value,... from inputVar:time:win(5 min)
  3. split. 跟filter相似,用两个或多个filter来实现便可insert into outputVar1 select * from inputVar where $condition;insert into outputVar2 select * from inputVar where not $condition
  4. join(根据实际须要有多种join方式,这里列出一种)。insert into outputVar select inputVar1.*, inputVar2.* from inputVar1.latest(key), inputVar2.latest(key) where inputVar1.key = inputVar2.key,这个伪epl表示将两个原始数据流,以其key做为 单位,将最新值拼接起来,这里还可做一些运算。举个例子,有一个变量源数据来源于pc访问,另外一个变量数据来源于移动app(可能用户再家里同时用手机和pc访问),按ip/uid去作join,就能够获得这个ip的完整视图

这些基本的操做符都很是简单,不会太多,开发起来也比较容易。只是要定义一下配置项,以及运算到底层esper语句的映射。咱们目前新加一个基本variable操做的话,后端只用新增一个文件便可,但前端比较难作,尚未想到很好的解决方法。
固然,对于一些特殊的应用,简单的操做符可能抽象度较低,难以直接使用,能够在简单操做之上进一步封装,详细的参考case study里面实时报警的就能够了

变量出来以后,就能够经过rule来过滤出咱们感兴趣的内容了:

  1. 单变量规则:insert into rule1 select * from inputVar where value > 5
  2. 多变量规则:insert into rule1 select ... from inputVar1.latest(key), inputVar2.latest(key) where inputVar1.key = inputVar2.key and (inputVar1.value 5 or inputVar2.value <10)

最后产生的结果,经过各类方式送到系统下游的各个handler。因为storm这种灵活的代码运行框架,这一点很容易作到,不详细叙述了。

case study

实时报警

ctrip内部用dashboard系统对各个应用的内部状态进行监控,包括物理的/逻辑的,利用hbase的特性,得到了实时聚合和展现的能力。咱们的实时报警项目,就但愿能自动化的去拉取数据,找出异常点。
公司内部也有其余报警系统,会对订单这些做一些同环比,阈值报警,系统跑的很好颇有效。咱们这边的报警系统更多的在于系统监控方面,会有一些更大的挑战,一是数量更大,我可能要对每一个hostip,每一个url,甚至是他们的组合做为单位去检查并报警;二是简单的阈值规则对上层业务来讲有价值,对底层来讲没太大指导做用,超出定义的阈值是比较常见的状况,结局就是大量的报警邮件发出来了,但没人关心,本身反倒成了公司内部最大的垃圾邮件制造商,因此须要各类更加复杂类型的报警:

  1. 连续上升超过50%的报警。原始数据-->简单变量-->(单变量聚合,取相邻两个值的比例合成一个新变量)-->比例变量-->(阈值为大于1.5即报警)-->done
  2. 同环比报警。咱们的数据源拉取配了时间参数,能够拉取当前时间的数据,也能拉取一段时间前这个时段的数据,因而两条原始数据流-->两个简单变量-->(双变量join后,新数据/老数据)-->同环比变量-->阈值报警-->done
  3. 多个metrics值组合报警,好比一个metrics描述了>10s延时的统计,另外一个metrics描述了全部延时的统计,我想知道占比的状况。两个metrics对应的原始数据流-->简单变量-->(双变量join,分子/分母)-->比例变量-->阈值报警-->done
  4. 多阈值报警。单阈值的话,规则过于刚性,容易误报。因此添加了阈值管理功能,经过开放api接口,用户能够以时间、key为单位定义不一样的阈值,这样能够达到分时段报警(多个time的阈值),差别报警(不一样key对应于不一样阈值),基线报警(天天的每一分钟预先算好阈值并写入)。这得益于esper能够轻易的调用java的方法,只作简单的开发就能够扩充esper的功能。
  5. 。。。

这里能够看出,尽管有各类复杂的规则类型,但基本的操做是相同的,因此只要在基本操做上封装一层便可。下面是个人一条规则配置,因为没有太多资源专门管理这些五花八门的规则,我将全部配置项揉到了一块儿,看着复杂些,但管理成本稍微低些(由于各个部分的配置相互间能够排列组合,分开来成本过高)。

{
    "namespace": "ns",
    "group": "group,
    "name": "rulename", // 这三项只是名字标识,方便rule管理
    "config": {
      "dashboardUrl": "http://xxxxx",  // dashboard url,返回json数据
      "timeAdjustment": 300,   // 表示取多久以前的数据,同环比就在这个配置项有不一样
      "dataType": "Single", // 是否拉取到的值直接报警,仍是要作同环比,或者是多值计算
      "period": 0,  // 如下四项配置同环比的参数
      "ops": "-",
      "oldCondition": "",
      "newCondition": "",
      "secondUrl": "",  // 如下四项配置配置双metrics计算 
      "secondTimeAdjustment": 0,
      "dualOps": "-",
      "firstCondition": "",
      "secondCondition": "",
      "valueType": "", // 如下三项配置是对以前的原始值或计算值直接检测,仍是要看变化率(差或比值)
      "formerPointCondition": "",
      "latterPointCondition": "",
      "triggerType": "fixed",  // 如下四项配置阈值类型,要么直接给阈值,要么给个标识符,用户本身去写复杂的阈值
      "lower": 0,
      "upper": 30000,
      "thresholdName": "",
      "conditionWindow": 0, // 如下两项控制规则屡次命中才报警,减小误报
      "conditionCount": 0,
      "cooldown": 600, // 报警cooldown,防止短时间内重复报警
      "to": "wenlu@ctrip.com", // 报警邮件配置
      "cc": "",
      "bcc": "",
      "mailInterval": 60,
      "cats": "",
      "catsEnv": "",
      "catsLevel": "info",
      "catsMessage": "",
      "catsDevid": "",
      "catsName": "",
      "catsPriority": "info",
      "type": "MetricsAlertingRule", // 表示规则类型,不一样值会触发不一样操做
      "desc": ""
    },
    "type": "MetricsAlertingRule",
    "desc": "",
    "status": "on",
    "app_subid": "auto@hotel_product_common_utility_logging_responsetime_30s" // 自动生成底层数据时用给的标识符
  }
View Code

 

这算是一条顶层规则,系统会自动生成一系列底层的data source/variable/rule/action,对顶层的CRUD操做也会自动映射为底层的操做。
这样开发一条新规则只用考虑如何去运用底层的计算平台,能够在更高的抽象层次上去开发,而不是从头至尾从新开发一遍。
整体而言,实时报警的功能是实现了,但运用的很差,并且是给底层人民用的,可视度不高。

XXX应用

XXX应用是敏感项目,会讲的含糊些。
这个项目数据量比较大,基本上对ctrip的每次访问都要促发一系列的运算和规则检验。过滤后,每秒k级数据,目前有几十个变量和规则,意味着每秒上w次的聚合操做和检测。生产上用了三台机器跑storm集群,实时上借助于storm+esper的高效,单台测试机(16g内存+6核cpu),已经能基本扛得住如此量的运行(不过最近随着变量的增长和流量的上升已经比较勉强,须要考虑可能的优化了)。
公司另一个项目,用的是我以前提的数据库的方式。他们的数据跨度比较大,须要很长时间段的数据,而不局限于当前,总数据量是xxx项目当前时间段数据的几倍,但计算触发频次较低,每秒几十次,若是不算db的话,用了10台服务器,听说cpu使用率20%不到。虽然我很不喜欢拿这两个系统去比较,由于解决的问题和适用的场景不同,就好像拿乔丹和贝利去比较,但由此得出XXX项目效率比较低这种结论是很难让人心服口服的。esper这种基于本地内存的效率远不是基于db或者是redis这种异地内存的系统能够比拟的,他的滑动窗口的计算效率也不是简单算法就能超过的。若是说可靠性和成熟度却是可让人服帖的。

这个项目一方面展现了当前架构能扛得住中等流量的冲击,另外一方面本意是尝试让用户能自主的动态的建立变量:
Alt text
上图的拓扑是根据用户的需求演化而来,其实具备必定的普适性,经过必定的过滤找到具体的感兴趣的数据,进行聚合获得统计特征,在此之上才能建出灵活的规则来。
图中最下方有个join变量,目前还没用到,是为了多数据源(多设备或者多数据中心的数据)的数据整合。有种说法是直接从数据源上进行合并,但这样会增长数据源的复杂度,破坏总体的结构,还不能彻底覆盖;另外一方面,join过滤后的数据远比join原始数据高效的多得多
这一套以前一半用代码,一半靠实时计算系统的规则,最近才抽象出来,原本打算彻底迁移到实时计算系统中,把整个图的掌控交给用户,由用户去控制整个DAG的结构构造和节点配置,这样灵活性比较高,经过统一的监控功能可让用户掌控每一环节的具体信息。这让我想起去年有家国内公司来推销大数据的,他们学术背景是可视计算。如今想来若是这套能实现的话,是否是也有点可视计算的味道了。
不过现实是这条路已经断掉了,一是咱们team,尤为是个人前端能力还比较差,另外一方面老板以为这个太复杂,易用性比较差,不能让人家去管理树(内部介绍还停留在树一层),因而改为了与业务适配,拓扑定死的结构,只让用户配底层的聚合变量,内部称为counter。如今只但愿能知足用户需求,能多多的建出变量来,这样才有往下一步走的可能。

一些实现细节和演进方向

如下基本是未实现,只是思考过的部分

  1. 要增强监控。以前对variable的监控力度不够,整个系统彻底变成黑盒。其实能够经过诸如简单的epl如insert into varInfo select count(*),.... from inputVar.win:time_length(1 min)就能够得到每一个变量的运行时统计,直接导入ctrip的dashboard系统,就能够得到监控和展现能力,这样还能导入到实时报警应用里面去,得到实时报警能力(用户提的需求,因此说多让用户参与讨论是颇有用的,不要把用户当傻瓜)。对XXX项目而言,这些统计信息甚至能做为业务指标使用,例如若是我须要公司某一业务线的访问量,只要配个变量+监控就ok了。没什么难度,但应该颇有用
  2. 提升sharding的灵活性。目前只在前端作一次sharding,都是事先决定的(根据event里的key字段),能够考虑改进这块,根据后续变量的sharding来自动选择、复制和分发数据。
  3. 多迭代。目前esper节点只有一层,若是有更复杂的功能,能够考虑用mq做中转,或是创建多层esper节点,从而实现多道处理。
  4. 多集群配合。几个集群配合起来工做,好比在多数据中心环境下,每一个数据中心部署一套算各自的集群,只要把处理过的数据再统一的聚合一遍,就能够得到统一的结果,简单又不失效率;另外第一点也提到了XXX项目的监控数据能够由实时报警去检测异常,但二者核心代码是一致的,实时报警自己的统计也能用来监控本身,这种带点递归的特性仍是比较好玩的。
  5. 2-4都描述了复杂化整个计算流程,其实甚至能够经过对整个DAG图进行分析去自动分配到各个节点,自动去作同步的操做。不过这些都过于复杂了,如今还只能想一想
  6. 这套实时系统有可靠性的弱点。由于基于内存的计算,若是某一节点不幸挂掉了,内存里的数据就丢了。这个目前没有太好的办法,要么作主从备份(一样的计算分配到不一样的机器,只取其中一个的计算结果),要么用storm的可靠性方式,挂了后恢复到某一个时间段开始重放,不过这就要看业务是否能容忍至少一次的一致性。要作到通用的彻底的可靠度很是具备挑战性,仍是得根据业务对数据可靠性的不一样需求才能作出合适的设计。
  7. 因为可靠性的因素,目前这套系统比较推荐用于短时数据的分析,这样即便挂了,storm重启后也不会有很大的影响,不少业务都能容忍。对于时间跨度比较长的数据分析和聚合,按个人想法,须要摒弃掉esper,但数据库那种方式仍是太慢,必定要作好流式计算(或者说是online的计算)。我目前的思路是只采用storm,将每一个步骤分摊到各个节点:
    • 对于过滤和join比较简单,直接来就能够了
    • 对于基于滑动窗口的聚合,稍微麻烦些,但像count和sum这类都是能够流式计算的,对于每一个进了变量窗口的事件,咱们只要添加一个进入事件和窗口事件后的退出事件便可,计算节点只要顺序处理这些事件便可。
    • count distinct的比较麻烦些。能够用一些hashmap的技术去优化,不过我更倾向于采用分段做基数估计的方法去取近似值。
      这样子的话项目规模会大很多,效率确定比单机版的esper差,但可控性加强,方即可靠性和一致性方面的工做。还只是想法,没有详细思考过
  8. 对于历史性的数据,若是跨度很是大,能够仍旧采用老的db的方式,但能够有优化,能够隔一段时间用hadoop跑一下,作些预处理工做,尽可能减小在线的计算量。事实上,咱们的实时系统自己能够也改形成后台job的形式,计算写基于老数据的变量,若是和实时版的变量相结合,会很大扩展适用范围。
  9. 续上一点,最理想的情况就是把实时变量提供接口暴露出来(下一步有时间会尝试,这下要测下整个系统的延时度了),与其余系统产生的历史数据相集成,能有个通用的rule引擎去统一运用这些数据。这样的话,每一个系统各司其职,有的系统跑得慢,但稳定,能够利用较多的数据;有的系统虽然可靠性很难保证,但跑得快,能够提供掌控当前情况的能力。

更大的视角

其实为啥要这么折腾,除了人懒想写点一本万利的代码外,主要是从前公司那里产生了完整的生态系统才是最重要的这一个想法。因此力图往这个方向靠。
Alt text
此图是我在paypal待了一年后最大的感觉:别看技术烂,只要生态系统建成了,整个系统能有机的跑起来,就能源源不断的带来收益。
整个系统是一个循环往复的过程:

  1. ops在目前的技术条件下是最重要的,任何的规则系统,总得有人去给你作标注,作出大量的样本数据来,这是作任何分析系统的基础。paypal ops的director就自豪的认为google没作过paypal就是由于没“人”的参与;而在ctrip,目前感受几个应用最缺的就是这块,没有样本数据很难去作任何自动化,规则只能是出过后每次人工去猜想,后续新规则的制定和老规则的维护都很难跟上。即便是公司技术能力突增,会用机器学习了,也很难发力,你只能用用无监督学习,但找到的特征估计十有八九不是想要的。这一点是致命伤,公司也很难向这个方向投入资源,一是还没发展到这步,二是得考虑划不划算
  2. ops产生的样本数据经由analyst分析,概括出一些变量,就能够训练处一部分规则和模型,只要有足够的数据,就能够获得还行的成果。再拿paypal举例,基本上看家本领就靠teradata数据仓库,当时听一个pd的team leader说他们最复杂的方法就是logstic regression了,由于这样出来的东西好理解(听说如今也开始高大上,采用神经网络了,帮他们打个广告)。别看不是很复杂,但确实颇有效,支撑公司了好久(固然他们的准确率要求不高,由于背后有千把个ops撑腰呢)。切换到ctrip,没有ops做支撑,因此个人用户们定规则都当心翼翼的,每次都只有些raw log去看,而后去定规则,这些规则都没法度量和跟踪维护。这就是我要作这套变量系统的初衷,其实原本我只要把人家的规则实现就行了,但这永远是打补丁的方式,不是可持续发展的道路。本来是但愿能有套灵活建变量的系统,这样能够很轻松的让用户去见上百个变量,这样对每一个ip/uid,我都能有成百上千个更有意义的数据(或者叫维度),理论上说,造成了一个risk profile。若是有样本的话,能够与BI对接,自动化都能产生规则出来;即便没有样本,经过监控统计,也能比raw log更好的揭示系统情况。
  3. ops产生的规则和模型最终由pd的在线系统去run,从而抓出感兴趣的数据来。这一块纯粹是执行了。不过对我本身实现的系统而言,还指望达到能灵活的自动化的创建变量,而不是每一个变量都要去考编码来实现
  4. 整个系统周而复始的运行,不断调整,不断演进。

这这个一套系统才是完整的有机体,本文所述的内容都只是描述了其中最不重要的在线那一块,而创建整个体系和完善离线系统才是更重要的,这个是真正作一个完善有用的实时系统须要解决的。我本身还没作到,就很少说了

总结

想说的都说了,没想到这么长。最后只想说句作实时计算真不容易,越作越以为能力和经验上的不足益发明显了,已经不是简单搭个开源软件就能搞定的。但愿后面能看到别人的作法,有机会能跟风。

什么,你竟然看到结束了,请你喝酸梅汤。

相关文章
相关标签/搜索