看着图 1 你们可能会感到熟悉,又或者会以为部分有些陌生,这是一张聚集了目前大数据生态下大多数成熟组件的架构图。众所周知,大数据生态很复杂,对于我的来讲,要所有学会可能要花费好几年时间。而对于企业来讲,要最大程度发挥其价值,构建一个成熟稳定、功能多样的大数据平台,期间花费的时间以及人力成本着实难以估量,更况且还须要考虑持续维护的问题。这就是七牛的Pandora大数据平台灵感的来源,咱们构建一个大数据平台,做为产品提供给用户,快速帮助用户挖掘数据价值。android
七牛是以云存储起家的公司,平台上有着大量的数据、业务日志以及运维监控数据,基于对这些数据的管理以及分析的需求,Pandora诞生了。咱们搭建了一个可靠的大数据平台,将大数据生态中的各个组件配套成一个体系发挥做用,用来解决实际业务中碰到的繁琐、复杂、多样化的问题。这个大数据平台的工做从数据的采集便已开始,经过一条数据总线,再根据业务需求分流到不一样的下游产品,从存储到数据可视化,从实时的数据变换到离线的算法分析,咱们构建一个全栈的大数据分析产品。ios
与此同时,咱们在大数据平台之上构建了业务工做流的概念,让用户只需关心构建符合他们业务模型的工做流,而无需具有大数据技术背景。不只大大下降了用户挖掘大数据价值的成本,更为重要的是去除了大数据技术门槛,使得各行各业的专家能够更好的施展本身对业务的深度理解。git
在工做流上,用户不只能够清晰的监控本身的数据流动,轻松构建各种实时、离线的数据变化与自定义计算,还能够按需弹性扩容、快速调度云端资源,下降了运维的成本。与此同时,咱们集成了社区中大量的优秀开源软件,对其进行优化及定制,一方面以便发挥其更强大的功能,另外一方面,也让熟悉使用这类开源软件的用户能够作到快速迁移,无缝切换使用。github
那么,Pandora究竟是一个怎样的平台呢?工做流又是怎样的呢?让咱们首先来直观的看一下工做流的使用形态,如图 2 所示。
算法
最左边的数据源是工做流的起点,数据源能够是一个,也能够是多个。在实时计算的工做流中,咱们只能有一个数据源,这个数据源就是数据收集汇聚的中心,也能够理解为数据总线,全部不一样终端的数据打向数据源,再经过数据源根据业务需求分发到不一样下游;在离线工做流中,咱们能够有多个数据源,不一样的数据源表明的是存储在不一样地方的离线数据,能够是七牛云存储中的不一样文件,又或是HDFS等不一样类型的数据仓库。数据库
无论是实时仍是离线,从数据源开始,你就能够根据须要作不一样类型的处理。后端
最基本的处理是对数据进行一些定制化的计算,好比你可能须要对天天海量的数据进行一个定时分析汇聚的功能,计算每分钟有多少条数据、每小时有多少条数据,从而缩减数据规模节约存储成本,或者从中生成一份数据日报、周报等等;又好比在这个信息爆炸的时代,你从网上抓取了海量的数据,须要对数据进行一些清洗、过滤、删选,以此分析社会热点或其余有价值的信息;又好比你想对数据作一个延伸或扩展,最多见的就是对一个IP获取它的运营商、所在区域等信息。那么你就能够建立一个计算任务,最简单的编写一些SQL语句就能够作数据变换;进阶一些的使用方式就是编写一些UDF(用户自定义的函数),作一些较为复杂的变化;更高阶的就是直接编写一类插件,直接使用大量Java的类库对数据进行操做。固然,在离线计算中,除了单个数据源的计算任务之外,你还能够对两个数据源,亦或是两个计算任务的结果进行聚合,而后再进行计算等等。计算任务能够知足你对于整个工做流的完整数据处理需求。api
在进行过一个基本的计算之后,可能最多见的一个需求就是对这些计算后的数据进行检索,直白的说就能够查询你的数据。那么你能够建立一个导出到日志检索,在这里你就能够搜索你的计算结果。固然,你的数据在数据源中也彻底能够不通过任何计算任务,直接导向日志检索。又或者你但愿对数据进行更完善的实时监控和数据可视化处理,那么就能够导出到时序数据库,针对带有时间戳的数据作高性能数据写入和查询优化,知足你针对实时海量数据的即席查询需求。七牛云存储
另外一方面,你工做流计算后的结果,能够直接再次导出到七牛云存储进行永久保存,或者与后续的数据结合进行分析。你也能够理解为经过大数据服务,七牛的云存储变成了一个数据仓库为客户提供服务。以前已经存储在七牛云上的数据(如CDN日志等),彻底能够直接使用咱们的大数据平台进行计算,无需任何接入过程。缓存
为了方便用户充分利用本身的数据,咱们甚至提供了导出到 HTTP 服务,用户能够构建本身的 HTTP 服务器来接受通过Pandora大数据平台计算后的数据。
图 3 是 Pandora的产品架构图,基本的功能在第 2 节中均已介绍,在此再也不赘述,在讲解系统架构以前,让咱们对照产品架构进行一些基本的系统组件技术名称的对照说明,以便下文描述简洁便于理解。数据经过咱们提供的数据上报工具logkit、各种SDK或者用户直接调用开放API接入,数据进入后不管是数据源自己仍是通过计算任务后的临时数据存储节点,咱们都一称做消息队列,技术上称之为Pipeline,像不一样下游提供导出服务的组件咱们称之为Export,在Pipeline中承担各种计算任务处理的组件咱们称之为Transform,下游的时序数据库服务咱们称之为TSDB,下游的日志检索服务咱们称之为LogDB。
有了这些基本概念后,让咱们对照图 4 Panora系统架构图,开启咱们的Pandora架构演进之旅。
最左侧的组件是数据收集的部分,数据来源于客户各类各样的系统。相信大部分用户在接入大数据平台时,都会面临数据收集这一难题,一方面但愿数据不重不漏所有收集上来,一方面又要数据收集尽量高效且不太消耗机器的各种资源,同时还要知足场景各异的不一样状况下的数据收集需求。熟悉这块的朋友可能也早已了解,社区已经有不少不一样类型的开源数据收集工具,知名的包括flume、logstash、fluentd、telegraf等等,他们各有利弊,功能上大都知足需求,可是在性能上和一些非通用需求的场景上不尽如人意。为了更好的知足用户不一样类型的需求,咱们自主研发了一个能够收集各类各样数据源数据的工具logkit,图 5 是logkit的功能架构示意图。logkit使用go语言编写,以插件的形式定制不一样的数据收集解析需求,处理高效、性能损耗低,同时也已经开源,咱们很是欢迎你们一块儿参与到logkit的使用和代码开发定制中来,为logkit 提提PR,固然,也很是乐意接受您关于logkit的任何意见或建议,只需在github提issues便可。
有了这样一款数据收集工具,几乎 90% 的数据收集场景咱们已经解决了,可是还会有诸如ios、android客户端数据直接上报、页面请求点击数据直接上报等需求,为此咱们提供了各种语言的SDK方便用户使用,以弥补logkit功能上没法知足的需求。
数据收集上来后,就正式进入咱们的Pandora大数据平台了。全部上报的数据不管最终是否计算或存储,都会统一暂存进入咱们的大数据总线Pipeline。相信通过上面的介绍,不少读者早已发现,Pandora帮助用户根据不一样场景选择最适合的大数据分析方式。而这套模式的核心,毋庸置疑,就是处理数据在不一样大数据产品间的流转。
Pipeline就是这样一条数据总线,在数据总线的基础上咱们打通一条条管,根据所需的场景导出到后端相应的存储服务上。同时据此来进行资源分配和任务管理。这样一来,就能够避免用户技术选型及技术架构与使用姿式和业务场景不匹配的状况,同时也能够利用云计算的优点,按需分配、快速扩容。
如图 6 所示是咱们的初版架构,实现上咱们经过定制开源版本的confluent,并把它做为咱们这个架构系统的核心。数据要流入系统,咱们首先构建了一个 Points Gate(API 服务器),Points Gate 解析校验用户的数据格式并调用confluent中kafka-Rest提供的rest API 将数据写入到kafka,利用schema-registry完成数据格式的校验以及数据解析,经过kafka得到数据管道的能力。
在进行元数据建立时,咱们的调度器在元数据服务器上建立一个用户元数据存储在MongoDB当中。对于MongoDB的元数据访问,咱们构建了一个二级缓存系统(即图中qconf),数据在进入或者导出时都会经过二级缓存访问元数据,这样数据就能够快速获得校验,扛住海量的数据吞吐。Kafka自己包含了Zookeeper组件,咱们也借此来保证总体系统组件的服务发现以及数据一致性这两个问题。
然而,随着应用的增长,数据量愈来愈大,这样,单个定制版的 Confluent 并不能知足这些数据量增加的业务压力,以及用户不断增长的场景需求。kafka topic(partition)不断增加致使总体响应变慢,没法快速切换灾备等待问题日益凸显。在这个基础上,咱们对本来的系统架构进行了调整。
3.2.2 Pipeline的升级
如图 7 所示,咱们对Pipeline的第一次总体升级包含了大量的组件基础架构的调整。首先咱们构建了Confluent的多集群系统,将单个Confluent集群规模控制在100台机器之内,分区数量控制在1万之内,根据需求对集群进行路由。
可见经过多集群系统,咱们能够合理控制每一个confluent集群的规模,同时根据咱们的调度器按照须要切换用户到不一样的集群上,实现快速切换、扩容、容灾、隔离等调度需求。
其次咱们对Points Gate、Transform、Export中无状态组件进行了容器化处理,利用咱们七牛内部的容器云平台,实现了无状态服务的快速部署、扩容以及灰度发布等需求。
此次架构的调整效果显著,咱们成功抗住了天天上百TB,千亿级数据点的数据增量。
不止于此,为了更高的性能提高以及节约成本,咱们在上述升级以后进行了第二次的架构升级。此次升级主要体如今对Confluent的进一步定制(或者说替换),咱们再也不使用kafka-rest,同时对打点的数据格式进一步优化,又一次节约了近一半的机器成本。
在解决了数据总线问题之后,问题的重中之重天然是如何处理数据导出的问题。众所周知,数据导出其实就是从一个上游系统拉取数据,而后将数据再发送到下游系统的过程。但这里面涉及的难点和调整可能大多数都是鲜为人知的了。在介绍咱们的导出服务架构以前,很是有必要描述一下针对海量数据导出,到底有哪些挑战?
3.3.1 数据导出的挑战
首先面对的第一大挑战天然是高吞吐量的问题,海量数据不断涌入带来的必然问题就是网卡和CPU分配的问题,一旦流量分配不均,就会出现大量因网卡、CPU负载太高带来的延迟,严重影响服务可用性。
显然,保证低延迟就是第二个挑战,一旦各个链路有一个环节配合不均衡,就会带来大量延迟,如何保证导出的上下游始终保持较高的吞吐量,从而保证较低的延迟,是一个很是大的调整。
为了保证低延迟,就要更好地适配多种下游,使其始终保证高吞吐,了解下游不一样服务的特性,并针对这些特性动态的调整资源,亦是一个很大的挑战。
除此以外还有分布式系统的常见问题,须要保证服务高可用,以及水平扩展的能力。保证任务单元标准化,任务粒度能够切分扩展;保证调度任务节点故障不影响其余节点的正常导出等等。
最为重要的是自动化运维,当导出的任务涵盖数十上百台机器后,人力已经没法精细化处理每台机器的任务分配,资源必须能够自动调度、调整以及构建统一的任务监控。
3.3.2 导出服务功能介绍及架构演进
让咱们来看一下导出服务的功能架构图,如图 8 所示。咱们的导出服务主要涉及三个层级,一个是元数据管理,在这一层保证任务的分配以及监控展现;第二层则是任务管理层,除了基本的任务切分、并发管理以及通讯协议以外,还包含了压力预估模块,根据以前的数据量预估下一阶段的数据流量,从而调整资源分配;再下一层则是数据处理层,在这一层完成诸如数据预取、数据校验、压缩以及推送到下游等任务。
在最初的版本中,咱们会在 zookeeper 上面建立一个任务(task) ,Export 经过分布式锁对task进行争抢,被抢到的任务则开始直接导出,如图 9 所示。
在这样一个初步架构中,咱们基本完成了水平扩展以及高可用的需求,同时作了诸如数据预取,延迟报警、数据监控等多种功能和优化。可是流量上来之后,很容易出现某个机器争取的任务流量变大,致使大量数据打到同一台机器上,致使网卡和CPU负载太高,延迟急剧升高。本质上就是流量分布不均匀,致使导出性能低下,机器资源的平均利用率也低。
此时,咱们对该方案进行第一次架构升级,如图 10 所示。咱们将原来topic级别的任务按照parition进行分布式消费。为了使得每一个partition粒度的任务大致是均等的,咱们将partition承载的数据量按照标准化处理,并根据历史流量进行预测,预测结果超过当前咱们定制的标准符合的对应容量即触发扩容,这样的标准化有效简化了调度的难度。
同时咱们将原来纯粹的export改成master/worker结构,Master对收集到的任务进行主动权衡分配,根据任务的历史流量进行流量预测、对任务的partition数量、每一个export worker的机器资源剩余状况,进行综合调度。对于一些特殊任务作机器黑白名单绑定等功能。
在作了上述工做之后,咱们机器的总体利用率有了很大的提高,可是因为下游系统的不一样,写入吞吐量始终良莠不齐,没法始终保持在一个较高的水平。为了解决该问题,咱们再次对架构进行小范围升级,如图 11 所示,咱们在导出的export worker端增长了一套对下游系统的适配加速模块。其核心思路就是按照下游的吞吐能力进行自动调节请求体大小以及并发度。这个主要是为了解决上下游传输数据速度不匹配,以及下游吞吐量不稳定的问题。
相似于Flume的思想,咱们构建了一个内存队列,以事务的形式从队列中获取数据(或者失败回滚),根据下游的状况调整单次数据请求的大小和并发度,以及调整出错等待时间等。这样一来,整个导出的吞吐量就能够颇有效的进行控制,去除了毛刺,极大的提升了机器资源的使用率以及导出效率。
解决了数据的导出问题,基本上绝大部分数据流转的问题也都解决了。下面咱们开始关注Pandora下游的一系列服务。
TSDB是七牛彻底自主研发的分布式时序数据库服务。TSDB针对时序数据定制存储引擎,根据时序数据带有时间戳的特性,咱们针对时间戳作特殊的索引,实现数据高速汇入和实时查询获取的能力;同时构建了简单且高性能的HTTP写点和查询接口,为查询聚合数据量身定制了类SQL语言,彻底兼容开源社区InfluxDB的API,支持无缝对接到Grafana,对数据进行高比例压缩,实现低成本存储。除此以外,TSDB拥有开源社区版本的InfluxDB所没有的分布式、多集群、高可用,水平扩容、以及分库分表能力。
如图 12 所示,TSDB是咱们基于tsm构建的分布式时序数据库,拥有每秒60万条记录的写入性能以及实时查询聚合分析的能力。在分布式方面,除了基本的多集群、多租户隔离的概念之外,咱们还针对性的作了两个强大的扩容功能,一个是根据时序进行纵向集群切割,解决海量数据写入时磁盘的快速扩容问题;另外一个则是根据用户的标签进行数据库表横向切割,相似传统数据的分库分表能力。在进行了这两大扩展能力变换后,数据的写入和查询依旧高效,甚至查询速度在分库分表后性能有所提高。
为了实现这样的扩容功能,咱们基于此构建了一个分布式计算引擎,解析用户的SQL并变成一个个执行计划,将执行计划下推至不一样的TSM计算引擎实例中进行计算,最后再进行数据reduce计算反馈给用户。
除了数据写入性能高之外,还支持数据即时查询,数据写入成功便可查询,数据零延迟;同时支持InfluxDB的持续聚合功能,相似于定时任务同样将持续写入的数据不断进行聚合计算;当单个用户数据量过大时,拥有横向拓展能力,集群扩展后写入性能不打折,查询效率更高。针对时序数据的特性,咱们将数据进行冷热分离, 对数据按照时间分片,使最近的数据查询性能更高。
在了解完咱们的时序数据库之后,让咱们来看一下下游的另外一大服务,日志检索服务,又称LogDB。日志搜索实际上是几乎全部技术开发人员都会须要的服务,传统解决方案(ELK,Elasticsearch、Logstash、Kibana) 针对小数据量不会出现任何问题。可是当数据量过于庞大时,这些方案也就不那么适用了。
咱们LogDB的底层能够经过插件的形式接入不一样类型的搜索引擎,包括Solr、Elasticsearch(简称ES)等,目前承载海量数据搜索任务的底层引擎主要使用的是ES。与单纯的使用ES不一样,LogDB自己是一套分布式系统,管理的单元能够是一个ES节点,也能够是一个ES集群,因此咱们构建了大量的ES集群,不一样的集群用以适配不一样的用户以及不一样的搜索需求。
大致上咱们将搜索的需求分为两类,一类是ELK类需求,针对如程序运行日志、业务访问日志等收集索引,这类需求的广泛特色是数据量大,时效性高,带有时间戳,无需存储太长时间,无需更新;另外一类需求相似于搜索引擎,数据存在更新须要,且强依赖于不一样类型的分词器,数据冷热不明显,不带有明显的时间属性,咱们称之为通用检索需求。这两类需求,LogDB都是彻底支持的,可是针对这两类需求,咱们作的优化不一样。
在咱们讨论具体的优化以前,让咱们先来看一下LogDB的架构图, 如图 13 所示。首先是数据的写入,LogDB是Pandora平台下游服务,上游主要是以前提到的Pipeline以及Export。Export导出的数据经过apisever将数据导入到不一样的ES集群当中,根据不一样用户的需求给他们提供不一样的集群服务,集群之间也能够相互进行切换。
那么如何确认数据到底数据哪一个集群呢?为了使得海量的数据快速确认,咱们对元数据进行了多级缓存,包括MongoDB的实际存储、memcached的二级缓存,以及本地的缓存,极大提升了数据校验的速度。除此以外,LogDB自己也是Pandora的用户,使用了TSDB对自身数据进行监控,使用七牛云存储进行数据快照与备份。同时将计费数据导出到云存储,利用咱们的XSpark机器进行离线计算。
架构中剩下的部分都是咱们针对不一样索引作的优化。简而言之,咱们首先构建了一个高性能队列,可让外部的数据持续高吞吐写入;同时咱们还会根据历史流量进行动态索引平衡、不一样集群的索引跨集群平衡、索引定时清理、VIP集群隔离等功能;而且会对 ES 的搜索进行分步搜索控制,缓存历史搜索,优化用户搜索的效率和体验等等.
最后有读者看到这里,也许会忍不住想问,若是只是纯粹的想使用一个高度灵活的Spark集群,不但愿通过Pandora各种复杂的计算、导出,甚至数据都没存储在七牛,可不能够享受七牛的Spark大数据服务呢?是的,彻底能够,这就是咱们的XSpark!
XSpark不只与Pandora总体彻底打通,能够把七牛云存储当成一个数据仓库使用,又彻底能够独立使用。即用户除了在Pipeline里面作离线计算以外,你还能够选择直接一键生成一个基于容器云的我的专属Spark集群,直接读取你本身的数据源,只要Spark支持的数据格式,XSpark都支持。若是你的数据已经存储在七牛云存储上,XSpark能够直接高效读取并计算,XSpark是Pandora提供给大数据高端用户的一个高度灵活的离线计算产品。
显然,容器云所具备的优点XSpark全都具有,你彻底能够根据须要动态伸缩的XSpark资源数量与规格,按需启停等等。
图 14 是 XSpark 的架构图。咱们将Spark的master和worker分为不一样的容器,首先启动Spark的master容器,获取master地址,而后再根据用户的配置,启动相应数量的worker容器,worker容器自动向master注册。同时容器云的支撑使得咱们的XSpark能够在运行过程当中进行缩容扩容。
同时XSpark也开放了完整的Spark监控以及管理控制页面,彻底兼容开源社区的Zepplin使用方式。