原文地址:http://mp.weixin.qq.com/s/-RZB0gCj0gCRUq09EMx1fAjava
沈辉煌 魅族数据架构师 node
2010年加入魅族,负责大数据、云服务相关设计与研发;web
专一于分布式服务、分布式存储、海量数据下rdb与nosql融合等技术。正则表达式
主要技术点:推荐算法、文本处理、ranking算法算法
本篇文章内容来自第八期魅族开放日魅族数据架构师沈辉煌的现场分享,由IT大咖说提供现场速录,由msup整理编辑。sql
导读:魅族大数据的流平台系统拥有自设计的采集SDK,自设计支持多种数据源采集的Agent组件,还结合了Flume、Spark、Metaq、Storm、Kafka、Hadoop等技术组件,本文就魅族流平台对大量数据的采集、实时计算、系统分析方法,全球多机房数据采集等问题进行介绍。数据库
流平台是魅族大数据平台的重要部分,包括数据采集、数据处理、数据存储、数据计算等模块,流平台为大数据提供了强大的支撑能力。后端
文章还介绍了魅族大数据流平台的架构、设计方式、经常使用组件、核心技术框架等方面的内容,还原魅族大数据平台的搭建过程及遇到的问题。跨域
1、魅族大数据平台架构缓存
如图所示即是魅族的大数据平台架构。
左边是多样性的数据源接入;
右上是离线数据的采集;
下面是流平台(也是今天分享的主角);
中间是集群的部署;
右边是ETL的数据挖掘、算法库和一些数据模型;
左上角是数据开发平台,好比webIDE可使得开发人员更便捷地作一些数据查询和管理;
最右边的是一个数据产品门户,包括咱们的用户画像、统计系统等,这里面包含大数据的不少组件,好比数据采集、数据处理、数据存储、数据挖掘等,最后产生大数据的雏形。
2、流平台介绍
流平台是大数据平台一个比较重要的部分,主要包括四个部分:数据采集、数据处理、数据存储、计算能力。
数据采集
“谁拥有了整个世界的数据,他就是最大的赢家”,这句话虽然有点夸张,可是却表达了数据采集的重要性。一个大数据平台数据的多样性、数据量的级别很大程度上决定了大数据的能力和丰富程度。
数据处理
这里讲的数据处理并非像末端那么专业的数据清洗,更多的是为后续入库作一些简单处理,以及实时计算。
数据存储
计算能力,包括离线计算和实时计算
流平台为大数据提供很是强大的支撑,数据统计分析、数据挖掘、神经网络的图形计算等均可以依靠计算能力进行。
实时计算是指在必定单位的时间延迟范围内,基于增量的数据推算出结果,再结合历史数据获得指望的分析结果。这个时间是根据业务需求而定。
一、流平台架构
上图是咱们的流平台架构图
左边是数据源,像NoSQL、RDB、文件类型;
最右边是集群,下面还有其余的一些Hadoop(存储);
中间的框是核心,也就是流平台;
最上面的是AS-Manager(咱们的流管理平台),承载了很是多的管理功能;
下面是Zookeeper,这是一个很是流行的集成管理中心,魅族的一些架构都会用到它,流平台也不例外,Zookeeper能够说贯穿了咱们整个流平台的架构;
最下面是AS-Protocol,咱们本身设计的流平台的数据对象协议,打通了整个流平台的数据链路;
中间四个框是核心的四个模块:采集模块、数据中转模块、缓存模块、实时计算模块,也叫合并层。
二、具体架构介绍
这是咱们的具体架构图。
业务规模:从这边采集数据到通过流平台最后通过实时计算或入库,它的数据量量级在千亿级别。
三、组件
数据源渠道
前面提到采集数据源渠道的多样性决定了大数据平台的相应能力和综合程度。咱们这边首先会有一个文件类的业务数据,包括业务日志、业务数据、数据库文件,这些都会通过采集服务采集。
下面这一块包括一些网站的js访问、手机各APP埋点、特色的应用日志文件(它会经过手机端的一些埋点上访到咱们的埋点服务)。
数据采集
数据采集分为两个部分:采集服务、独立部署的埋点服务。图中只显示了一个埋点服务,里面还会有不少的第三方业务,第三方业务经过这个红色的插件接入咱们的采集。
数据中转
经过采集模块把数据流转到中转模块,中转模块采用的是目前比较流行的flume组件,红色sink是咱们本身开发的。
Cache
sink把前面的数据转给缓存层,缓存层里有metaq和Kafka。
Streaming
实时计算模块上线了Spark和Storm,较早上线的是Spark,目前两个都在用的缘由是它会适应不一样的业务场景。
Store
最后面是咱们提供给落地的store层,像HIVE、Hbase等等。
流管理平台
最下面是流管理平台,图中有四条线连着四个核心模块,对这四个模块进行很是重要且很是丰富的逻辑管理,包括数据管理、对各节点的监控、治理、实时命令的下发等。
3、流平台设计
一、概念解读
Message,就是一条消息,是最小的数据单位。业务方给的一条数据就是一个message;咱们去采集文件的话,一行数据就是一个message。
AS-Protocol,是咱们本身设计的流平台数据的对象,它会对一批量的message进行打包,而后再加上一些必要的变量作一个封装。
Evnet,会提供一个相似的标准接口,这个地方其实更多的是为了打通采集的流平台。它最重要的一个变量是Topic,就是说我拿到了个人AS-Protocol就能够根据对应的Topic发到相应的登陆去缓存提取,由于咱们的AS-Protocol除了起始端和结束端之外,中间层是不用解析协议的。
Type,数据格式目前是Json和Hive格式,能够根据业务去扩展。
Compress,Hive格式在空间上也是很是有优点的,很是适合于网络传输压缩。当压缩数据源质量没有达到必定量的程度的时候会越压越大,因此咱们要判断是否须要压缩。咱们压缩采用的是一个全系统
Data_timestamp,数据的时间是最上面的message,每个message会携带一个数据时间.这个比较好理解,就是入库以后会用作数据统计和分析的。
Send_timestamp,发送时间会携带在咱们的AS-Protocol里,它声明了每个数据包发送的时间。
Unique Key,每个数据包都有一个惟一的标识,这个也是很是重要的,它会跟着AS-Protocol和Event走通整个平台的数据链路,在作数据定位、问题定位的时候很是有用,能够明确查到每一个数据包在哪一个链路经历了什么事情。
Topic。这个不需多言。
Data_Group,数据分组是咱们很是核心的一个设计思想,原则上咱们是一个业务对应一个数据分组。
Protobuf序列化,咱们会对Event数据作一个PT序列化,而后再往上面传,这是为了节省数据流量。
二、协议设计
如图所示为Event、As-Protocol和Message的关系。
最上层是Event,里面有一个Unique Key和Topic包括了咱们的As-Protocol,而后是数据格式、发动时间是否压缩、用什么方式压缩,还携带一些额外的变量。最后面是一个Body,Body其实就是一个message的宿主,以字节流的方式存储。这个就是咱们一个数据对象的协议设计。
接下来看数据在整个架构里是如何流转和传输的。
首先是数据源渠道,最左边的是message,任何业务方的数据过来都是一条message,通过数据采集把一批message打包封装成Event,再发给数据中转模块,也叫flume。把Event拆出来,有一个topic,最后把As-protocol放到相应位置缓存,消费对应的Topic,拿到对应的As-Protocol,并把这个数据包解析出来,获得一条一条的message,这时就能够进行处理、入库或实时计算。
须要特别注意的是message和Event。每一个Message的业务量级是不同的,有几十B、几百B、几千B的差异,打包成As-Protocol的时候要试试批量的数目有多少,原则上压缩后的数据有个建议值,这个建议值视业务而定,DataGroup打包的数量是能够配的。
三、数据分组设计
如图所示是咱们的DataGroup设计。首先看最上面,一个Topic能够定义N个DataGroup。往下是Topic和streaming Job一比一的关系,就是说一个实时的Group只须要对应一个Topic,若是两个业务不相关就对应的两个Topic,用两个Job去处理,最后获得想要的关系。
从架构图能够看到DataGroup的扭转关系。最初数据采集每个节点会声明它是属于哪个DataGroup,上传数据会处于这个DataGroup,通过数据中转发给咱们的分布式缓存也对应了Topic下面不一样的分组数据。最后Streaming交给我Topic,我能够帅选出在最上面的关系,去配置DataGroup,能够很是灵活地组合。这就是DataGroup的设计思想。
4、采集组件Agent
一、概述
如图所示,这是彻底由咱们本身设计和实现的一款组件。右边是采集组件,分为两部分:一个是基于java环境的独立工做程序;另外一个是jar插件。插件叫Agen-Stub.jar;独立层是Agent-File.zip,Agent-File有一个paresr支持不一样的文件类型,目前支持的file和Binlog,可扩展。根据须要能够增长parser,也是接入Agent-stub,拥有Agent-stub的一些特性。
如上图右侧的示意图,Agent-stub接入多个Business,前面提到的一个埋点服务就是一个Business,它把数据交给Agent-stub,Agent-stub会日后发展,与file和mysQL相对应的是file parser,出来是Agent-stub,流程是同样的。
二、Agent-Stub.jar
接下来看Agent-Stub是如何设计的。
多线程、异步。这个毫无疑问,作插件化确定是这样考虑的,不能阻塞上层业务。
内存小队列+磁盘压缩队列。这是咱们改进最大的一个地方,早期版本中咱们采用的是内存大队列,若是只有内存大队列缺点很是明显:
程序正常启动的时候大队列里的数据怎么办?要等他发完吗?仍是不发完?当大队列塞满的时候,还有对上层业务的侵入性怎么办?程序遇到问题时怎么办?大队列多是50万、100万甚至更多。
采用了内存小队列+磁盘压缩队列后能够解决正常程序的启停,保证数据没有问题,还能够解决空间的占用清空性的问题,以此同时,磁盘压缩队列还能够在程序出错的时候加速发送。
解释一下磁盘压缩队列, 此次咱们设计协议的思想很简单:压缩以后获得一个字节速度,存在磁盘的文件里,这个文件按照小时存储,这时对于二次发送带来的损耗并不大,不须要从新阻断数据也不须要解析和压缩,只须要读出来发出去。后面还有一个提高就是磁盘发送队列跟内存发送队列是单独分开的,这样更能提高二次数据的发送性能。
无损启停。正常的启动和中止,数据是不会中止不会丢失的。
Agent的版本号自动上报平台。这个很是重要,咱们早期的版本是没有的,能够想象一下当你的Agent节点是几千上万,若是没有一个平台直观地管理,那将是一个怎样恐怖的局面。如今咱们每个Agent启动的时候都会建立一个node path,把版本号放到path里,在管理平台解析这个path,而后作分类,咱们的版本就是这样上报的。
自动识别接入源,智能归类。这个其实和上面那点是同样的,在早期版本中咱们作一个Agent的标识,其实就是一个IP+一个POD,就是说你有几千个IP+POD量表须要人工管理,工做量很是大且乏味。咱们优化了一个自动识别,把DataGroup放到Agent的node path里,管理平台能够作到自动识别。
Agent的全面实时监控。包括内存队列数、磁盘队列数、运行状态、出错状态、qps等,均可以Agent上报,而且在管理平台直观地看到哪个节点是什么样子的。其作法也依赖于zookeeper的实现和承载,这里其实就是对zk node的应用,咱们有一个定时线程收集当前Agent必要的数据,而后传到node的data上去,管理平台会获取这些date,最后作一个平台化的展现。
支持实时命令。包括括限流,恢复限流、中止、调整心跳值等,大大提升了运维能力。其实现原理也是依赖于Agent,这里咱们建立一个Data Group,经过管理平台操做以后把数据放到Data Group里,而后会有一个监听者去监听获取数据的变化并做出相应的逻辑。
兼容Docker。目前魅族在用Doker,Doker对咱们这边的Agent来说是一个挑战,它的启动和中止是很是态化的,就是你可能认为相同的Docker容器不会重启第二次。
三、Agent-File.zip
接入Agent-Stub。Agent-file首先是接入Agent-stub,拥有Agent-stub的一些特性。
兼容Docker。由于启动和中止的常态,假设咱们刚刚一个业务接入了Agent-stub,那中止的时候它会通知我,Agent-stub会把小队列里的数据抓到磁盘压缩队列里去。可是这里须要注意的是:磁盘压缩队列不能放到Docker本身的文件系统里,否则它停了以后数据就没有人可以获得了。
当Agent-stub停的时候,会有一个标识说磁盘要作队列,咱们的数据有没有发完,磁盘压缩队列里有一个评级的标识文件,这时要用到Agent-file,Agent-file有一个单独的扫描线程一个个地去扫描Docker目录,扫到这个文件的时候判断其数据有没有发完,若是没发完就只能当作一个发送者。
支持重发历史数据。作大数据的可能都知道这些名词,好比昨天的数据已经采集完了,但因为某些缘由有可能数据有遗漏,须要再跑一次后端的补贴逻辑,或者上马训练,这时就要作数据重发。咱们在管理平台上就会有一个支持这种特定文件或特定时间段的选择,Agent接收到这个命令的时候会把相应的数据发上去,固然前提是数据不要被清了。
管理平台自助升级。这个能够理解成软件升级,Agent能够说是很是常见的组件,可是咱们从新设计时把自动升级考虑在内,这也是咱们为何设计本身作而不是用开源的组件。这样作带来的好处是很是大的,咱们几千个Agent在平台里只须要一键就能够完成自动升级。
文件名正则表达式匹配。文件名的扫描是用自动表达式。
源目录定时扫描 and Jnotify。重点介绍文件扫描机制。早期的版本是基于Agent-fire和KO-F二者结合作的数据采集:Agent-file是加码里对文件变动的事件鉴定,包括重命名、删除、建立都有一个事件产生;KO-F是拿到文件下的最佳数据。假设源目录里有一千个文件,KO-F现场就是一千个,Agent-file对应的文件变革赋予的追加、重命名等均可能会产生一系列事件,逻辑复杂。
因此咱们设计了源目录定时扫描的机制,首先有一个目标,就是咱们的文件队列,包括为未读文件、已读文件作区别,区别以后扫描,固然还会有像文件摘要等的存在这里不细讲,扫描以后更新未读文件、已读文件列表。
之因此加Jnotify是由于咱们发现只用定制扫描不能解决全部业务场景的问题,jootify在这里起到补充定制扫描的做用,解决文件风险和文件产程的问题。
单文件读取。早期版本中这一点依赖于文件列表,当文件很是多时程序变得很是不稳定,由于可能要开几百个或几千个线程。后来咱们改为了单文件的读取,上文提到的扫描机制会产生一个文件队列,而后从文件队列里读取,这样一个个文件、一段段图,程序就很是稳定了。
文件方式存储offset,无损启停。早期采用切入式PTE作存储,衔接很是重,后来咱们改为文件方式存储,设计很是简单就只有两个文件:一个是目录下面全部文件的offset;一个是正在读的文件的offset。这里涉及到无损启停和策略的问题,咱们定了一个5次算法:就是每读了5次就会刷盘一次,但只刷在读文件,别的文件不会变化,因此能够想象获得,当这个程序被替换走的时候,最多也就是重复5条数据,大会致使数据丢失。
四、Agent示意图
如图是Agent示意图。上面是Agent-file和数据对象。Agent启动的时候要把里面的offset文件取来,就会产生未读文件和已读文件列表,扫描文件目录,而后更新文件队列,还有一个fileJNotify是相对应的文件队列。而后有一个比较重要的fileReader,我会先从文件队列里拿到再去读实际文件,读完刷盘以后这一块就成功了,我会根据个人刷盘去刷新offset。
上图左边有一个业务加了一个Agent-stub,最后变成flume,这里有一个QueueReceiver(队列接收者),filereader和业务方的DataSender会把message发过来,QueueReceiver接受的数据就是一条条的message,而后发送到内存小队列里,当这边的小队列满了怎么办呢?中间有一个额外的固定大小的性能提高的地方用于message归类,当这个fIieReader往这个内存小队列发的时候发现塞不进去了,就会在规定大小的队列里发,当一个固定大小的队列满了以后就会打包压缩,以字节处理的方式存到磁盘压缩队列。
再来讲说咱们为何会提出二次数据的发送,其实就是多了一个countsender即压缩队列的发送者,直接的数据来源是磁盘压缩队列,与上面的并生没有任何冲突。Countsender的数据对帐功能是咱们整个平台的核心功能之一,基于这个统计的数据确保了其完整性,少一条数据咱们都知道,在采集层有一个countsender,以另一个渠道发出去,和真正的数据源渠道不同,会更加的轻量化更加可靠,且数值很是小。
最后是前文提到的监控和命令的实现,一边是Agentnode,一边是数据管理。
五、Agent的坑
丢数据。如前文提到内存大队列带来的问题。
版本管理的问题。
tailf -f的问题。
网络缘由致使zk删节点问题。网络不稳定的时候,ZK会有一个节点的心跳检测,不稳定的时候监测会觉得节点已经不存在了而把节点删掉,这会致使管理平台的节点监控、文件下发所有都失效。解决办法就是在message加一层控制检查线程,发现节点不在了再建立一遍。
乱码的问题。可能会跟一些远程访问的软件相关,原则上咱们假设第二次启动的时候没有配置咱们的编码,默认与系统一致,但当远程软件启动的时候可能会发生不同的地方,因此不要依赖于默认值,必定要在启动程序里设置但愿的编码。
日志问题,在插件化的时候确定要考虑到业务方的日志,咱们把业务方的日志刷死了,当网络出现问题的时候每发送一条就失败一条,那是否是都要打印出来?咱们的考虑是第一条不打印,后面可能十条打印一次,一百条打印一次,一千条打印一次,这个量取决于业务。补充一点,咱们有一个统计线程,能够根据统计线程观察Agent的正常与否。
5、流管理平台
如图所示,咱们的流管理平台界面比较简单,但功能很是丰富,包括:
接入业务的管理、发布、上线;
对Agent节点进行实时监测、管理、命令;
对Flume进行监测、管理;
对实时计算的job的管理;
对全链路的数据流量对账,这是咱们自检的功能;
智能监控报警,咱们有一个很是人性化的报警阀值的建议。取一个平均值,好比一周或一天,设定一个阀值,好比一天的流量访问次数多是一千次,咱们设计的报警是2000次,当连续一周都是2000次的时候就得改进。
6、数据中转
一、背景
业务发展可能从1到100再到1000,或者当公司互联网发展到必定程度的时候业务可能遍及世界各地,魅族的云服务数据分为海外服务和国内服务,咱们把业务拆分开来,大数据采集确定也要跟着走,这就面临着数据中转的问题。
如图所示是咱们两个案例的示意图。黑色的是内网的线,橙色的是跨界性的线,有公网的、云端的、专线的,各类各样的网络状况。
上面的是Agent集群,B-IDC也有一个Agent集群,直接访问咱们登陆的集群。
这里第一个问题是咱们的链接很是多,访问Agent节点的时候有几千个Agent节点就得访问几千个节点,这是不太友好的事情。另外一个问题是当咱们作升级迁移的时候,Agent要作修改和配置,必须得重启,当整个B-IDC迁移到A-IDC,咱们加了一个Flyme集群。一样是一个Agent集群,下面有一个Flume集群,这样的好处:一是里面的链接很是少,线上的Flume一个ID就三台;二是这边承载了全部的Agent,除了Agent还有其余的采集都在A-IDC里中转,当这个片区要作升级的时候上面的业务是透明的,灵活性很是高。
二、Flume介绍
Flume里有三个核心的部分:Source、Channel、Sink,Source是数据结构源;Channel至关于内存大队列,Sink是输出到不一样的目标。官方提供了不少组件:Avro、HTTP、Thrift、Memory、File、Spillable Memory、Avro、Thrift、Hdfs、Hive。
三、Flume实践
无Group,采用Zookeeper作集群
Agent采用LB作负载均衡,动态感知。结合Zookeeper能够感知到Agent列表,这时会采用负载均衡的作法找到当前的那个Flume,到后端的Flume直接变化的时候能够感知到从而下线。
硬盘缓存、无损启停。采用memory可能会带来些很差的问题,若是内存队列改为文件就没有这个问题。由于内存速度快,存储强制刷新的时候就没有数据了,因此咱们作了优化:仍是采用memory,在Flume停的时候把数据采集下来,下一次启动的时候把数据发出去,这时就能够作到无损启停,可是有一点千万要注意:磁盘实际上是固化在机器里面,当这台机器停下再也不启动的时候,别忘了把数据移走发出去。
中止顺序优化。在作优化的时候遇到源码的修改,其实就是Flume中止顺序的优化。原生里好像先中止Channel,而后提升sink,这就会致使想要作这个功能的时候作不到。咱们应该先把这个数据改掉再去中止sink最后中止Channel,这样就保证Channel里的数据能够所有固化到硬盘里。
多种转发方式。咱们如今是全球的RBC,支持公网、内网、跨域性专线,咱们提供一个很是好的功能:http sink,它也是一个安全的支持ssl的转换方式。
自定义Sink,多线程发送(channel的get只能单线程)。
四、中止顺序
如图是中止顺序的修改。这是一个sourceRunner、sink、channel。
五、Memory的capacity
选择内存以后,这个内存大小到底多少比较合适?如图所示,左边Flume是从500-1000,channel容量是5万、10万,还有Agent的个数、线程,咱们发如今10万的时候它的fullGC是很是频繁的,因此咱们最后定的大小是5万。固然不一样的机器根据不一样的测试获得本身的值,这个值不是恒定的。
包大小从10K到30K到50K有什么不同呢?很明显TPS从1万多降到了2000多,由于包越大网卡就越慢了,这里看到其实已经到了200兆(双网卡),把网卡跑满了。咱们作流平台设计的时候,不但愿链路被跑满,因此咱们给了个建议值,大小在5-10K。固然,线上咱们采用的万兆网卡。
7、实时计算
一、实时计算集群
在SparkZK里直接写HA,能够减小没必要要的MR提升IO,减小IO消耗。
Kafka+Strom (ZK)
二、Spark实践
直接写HDFS底层文件
自动建立不存在的Hive分区
相应Metaq的日志切割,这一点上如今的Kafka是没有问题的,当时的日志切割会致使网络链接超时,咱们查看源代码发现确实会堵塞,咱们的解决方法是把切割调成多色或分区调多。
不要定时的killJob。早期的Spark版本由于大批量的killJob致使一些不稳定的状况,某些job实际上是没有被彻底覆盖,假死在那里的。