运营商手机视频流量包业务日志ETL及统计分析

本身作过的项目在这里作一个记录,不然就感受不是本身的了.一是由于过去时间已经很长了,二是由于当时作得有点粗糙,最后还不了了之了.html

话很少说,先大体介绍一下项目背景.之前各大手机视频 App 通常都有运营商的流量包套餐.当用户产生这样的业务行为时,运营商便获取了一系列的用户行为日志.mysql

这条日志是一条获取视频用户手机号码的日志.日志的类型不少,当时作的主要工做是对这个类型的日志作一系列的抽取,清洗,过滤,转换及转存工做.最后,对实时的日志以10分钟为一个时间窗口作简要的统计分析.linux

 

要认识这样的项目,最重要的是要厘清整个数据的流向.web

首先,原始业务日志不管是实时的仍是历史的都保存在服务器日志目录内的 log 文件中,咱们使用 Apache Flume 这个工具将原始数据抽取出来.建立一个 Flume Agent ,配置 tail source ,memory channel 以及 kafka sink ,将原始数据先导入咱们的大数据集群环境中.把数据首先弄到大数据环境中,这是全部大数据数据处理的开端.通常来讲,kafka 集群是大数据环境的门面,全部的集群外部数据通常都是先导入到kafka中.sql

这个过程当中有一个问题,运营商的服务器在运营商的机房内网,跟咱们的 hdp 集群环境网络不是互通的.因此咱们用了一台机器作网络中转,这台机器分别跟 服务器 及咱们的 hdp 集群环境网络相通.apache

因此总共配置了3个Flume 的 Agent , 第一个为服务器采集端的 agent:  tail log文件 source , memory channel , avro sink 到中起色器编程

第二个为中起色器的agent : avro source ,memory channel,avro sink 到集群环境中的一台机器缓存

第三个为集群环境的某台机器的agent : avro source, memory channel ,kafka sink 到 kafka集群的brokers.服务器

这个流程调试了一段时间,比较纠结,整个流程运行了一段时间后会偶尔挂掉,而后整个数据的传输就断了.网络

第一个缘由是我本身的缘由,对 linux 系统还不是很熟练.我ssh到hdp集群上的机器上,run 第三个 flume agent 的时候没有意识到要让进程在后台可靠的运行.这样,当ssh断了的时候,这个agent的运行也会跟着中止.第三个agent挂掉了,前面的两个agent会因为memory channel满了也依次挂掉.因此为了不ssh的问题,须要在run agent命令的时候,要注意使用 nohup 及 & 标识 .这样 ssh 断掉以后,agent也能继续运行.

第二个缘由是沟通的问题.采集端 agent 和中转端 agent 是由业务方的开发人员管理的.我负责管理hdp集群中的这个agent.整个流程3个agent , 3个source 和 3个sink.中间某一段不通畅就会致使agent的挂掉,挂掉某个agent的话,整个传输流程就断了.当时出现问题的时候,屡次沟通都没有沟通清楚,反反复复作了不少无用功,其实缘由就摆在那里.主要是网络的问题,由于第一个agent是运营商的专用网,而且仍是北京异地的机房,它到中转服务器的sink网路不稳定,致使sink没有发出去.sink中止了的话,memory channel容量逐渐变满,致使source 无法正常工做,最终这个agent挂掉了.

最后的表现是每一个agent都会抛出一个错误,中止运行.因为沟通的问题,一直没有找到引起错误的源头agent.

还有一个潜在的性能问题.中转的 agent 没有作 load balance .因为采集端的数据量很是大,并且后期针对不一样的日志类型,开了好几个不一样的agent.数据一会儿所有集中到了中起色器的这个agent上,它的memory channel 是很容易跑满的.

不过这个项目的实际状况取决于网络情况,整个流程中,第一个agent的sink 到第二个agent的source是主要瓶颈(取决于网络情况),当这个瓶颈的问题解决了以后,第二个agent的性能就成了主要瓶颈.这个时候就须要构建flume sink的负载均衡了(详见官网:http://flume.apache.org/FlumeUserGuide.html#load-balancing-sink-processor).

 

如今日志数据已经成功弄到hdp集群环境中了,能够开始下一步了.不过,在正式开始下一步以前,还要提一点,那就是kafka. 上面最终是将日志数据导入到kafka的某个topic中,.对于kafka的topic,要作的是设定好topic的partiions 和 replication-factor. 这个项目用的数值分别是6和2 ,不过这是拍脑壳肯定,没有细致研究.主要缘由是因为 kafka 的性能实在是太优越了,kafka这里的数据流程基本上不可能成为瓶颈.(kafka很强大,之后也会愈来愈重要,值得细致研究)

 

当数据采集到 kafka 集群,也即hdp集群环境中以后,就可使用大数据的集群环境及相关技术生态对数据进行进一步的处理了. 因为处理数据的起点是kafka,因此所有采用的是流式处理的方式,也即 data pipline,从 kafka 中读取记录,进行逐条的处理.  这里,就要跟你们介绍一个很是很是优秀的开源数据处理及分发系统 ,Apache Nifi .由于后续的大部分工做是使用nifi完成的.

Apache Nifi 是由美国NSA 捐给Apache基金会,最先是NSA内部使用的一个数据处理工具.nifi可以保证数据的可靠性,nifi也可以保证数据的即时性,nifi逐条处理数据的速度很是快.最厉害的是,niifi提供了一个很是简单直观的基于web的用户使用界面.nifi官网上的一句话准确概况了nifi的特色: Apache NiFi supports powerful and scalable directed graphs of data routing, transformation, and system mediation logic . 这个system mediation logic 说得很是到位,可以使用nifi画出很是复杂的数据流动逻辑.没错, 是画出来的. 

下面这张图是nifi用户界面里面画出来的, 它不只仅是流程图, 实际上它就是整个处理过程.右键点击start,日志数据的etl就开始了,很是强大.

 

上面每个方块表明对记录的处理过程,每个箭头表明记录的流向.日志记录就是一条一条的在这个有向无环图里面流动,从起点开始,通过一个个的处理器通过处理,一直流到目的地. 看起来这个图看起来比较复杂,但实际上作的事情,无外乎就是对日志数据的抽取,过滤,路由,转换,更新,分发这几个事.

图画得这么多,主要是因为业务的须要.

其一,日志文件的格式不统一,即便是同类型的 log 文件.格式也会有细微的差别.这也是不可能统一的.由于日志文件的时间跨度大,由不一样的开发人员,按照不一样的业务人员需求和当时的约定和口径来开发的.有时候,一样的字段,会有不一样格式的值,甚至还要基于记录某个字段的值去作判断.客观上,日志文件的格式是不统一的,甚至有时候不规范.这个时候,就须要很是灵活的处理流程了.nifi的易用性及扩展性就体如今这里了,针对不一样的日志文件格式,只须要更改这个流程图就行.加几个处理器,连几个箭头.所有都是在 nifi 的这个界面上经过拖拽完成的.

其二,业务需求的变更性.对于用户行为的日志文件,对于同一份日志记录,同时会有各类不一样的业务需求.要作即席查询,要作实时聚合,要作批量查询.要备份表记录.还要触发特定业务处理.因此整个流程图确定会随着业务需求的增多愈来愈复杂.可以让流程图随着业务需求的变更而跟着变更自己就是一个很是厉害的事了,不管来了怎样的业务需求,我只须要连箭头,增长新的处理流程,也就是 data flow 就能够了.

 

接下来,详细看数据的流向.起点是从 kafka 集群中读取日志数据,批量读进来若干条日志记录以后,使用nifi的处理器逐条对日志记录进行处理.让记录在这个图中流动起来.

首先作验证,而后过滤格式错误记录,而后路由不一样的日志类型. nifi能作到这些的关键在于它的 flowfile 这个概念. 每一条数据记录进入到nifi中就叫flowfile. 每个flowfile 由两部分组成,一个是content, 文件内容. 一个是 attributes ,文件属性.  在 nifi 中, 咱们能够对文件属性进行增删改查等操做,甚至咱们可使用 nifi 提供的DSL,特定领域语言 对 attributes 进行编程. 这样的设定使得能够对数据记录进行任何想要的逻辑处理. 因此,通常是先把日志记录的内容转换到 flow file 的属性值当中去,而后进行后续的不一样处理.  以下图:

 

接着继续判断,对于经过网络获取手机号码成功的日志,将原始的日志记录保存到 hbase 中,以后供业务方作即席查询. 以下图:

 

 对于获取手机号码失败的日志,手动去查用户的地址和所属运营商信息.这里是强业务相关的,由于属于其余运营商,因此是获取不到号码的.这里的处理真正体现了nifi的强大和灵活.

由于对于失败的日志,其实是缺乏必备字段的.缺乏了字段, 这在日志文件批量处理中是多么坑爹的事情.然而使用 nifi 却能很轻松的解决这个问题.直接拿这条记录的用户ip 去调 一个内部的服务化查询接口,把字段查出来.并把值赋给flow file 相应的attribute. 把这样的日志记录变成正常的日志记录后,再汇入处处理的主流程当中,接着流动下去.

 

 

 

 接下的处理流程主要就是分发及转存了.对于同一条日志记录,一份数据要提取字段,将之转成 hive表结构对应的 csv 格式 ,保存到 hdfs 中, 也就是将全部处理好的数据落地到hdp集群环境中去.这是数据清洗后一大终点,也是结果.以后能够拿来直接作其余处理了,好比作批量查询,供其余工具使用(好比 kylin 等),也能够用来作模型训练.由于这里就算是干净的数据了.

 

 

另外一份数据(假定历史数据都已经处理完成,如今都是实时的数据)要提取业务须要的字段,导入到kafka的topic 中. 供业务须要的即时统计分析使用.

 

 

至此,整个 nifi 的数据处理流程都走完了. 归功了 nifi 的强大 ,数据从起点到终点,虽然处理流程多,但流向很是清晰.用 nifi 拖拽几下,一套特定业务日志的处理系统就完成了. 右键点击 start ,系统就跑起来,你能够在界面看到数据的流动,能够监控,能够暂停,能够调试某段,能够查看中间结果.这些都是能够在界面上完成的.  用很简单的使用方式,去作很复杂的事情,最牛逼的工具莫过于此了.固然, nifi 也有不少高级的用法. 甚至能够 搭一个 nifi 的集群 ,来处理更加海量的数据,这里就不细说了.

固然,nifi中一个很长的数据处理流程是须要花时间观察,调试及验证的.从一个处理器到另外一个处理器是否通畅.数据是否阻塞在了流程图中的某一段等问题.须要调试单个处理器的并发量及run schedule等信息.以及处理器之间的缓存队列容量和大小等信息.这些须要耐心的调试,就像一个流动的人群同样,慢慢疏导.

 

最后,附上实时数据统计的代码,这是这个项目写的惟一代码了,使用的是 spark 技术栈 ,spark streaming 和 spark sql . 关键是这个group by 函数.把聚合后的数据保存mysql 里面,供业务应用查询.这里后来把时间窗口改成了一小时,由于当时聚合后的数据量也还蛮大,2天存了35万条到mysql里面,这样的量放mysql 里面不太合适.

可能一开始的方案就不正确,后来也没继续跟进了.

 

以上,就是这个项目的一个完整记录,作得有点粗糙,最后也没有继续跟进.不少细节没有考虑到 ,好比因为网络的问题,在整个流程中,是否会有数据的丢失?当因为某种缘由致使流程中断,是否会有数据重复.最后的结果,怎样去校验数据的完整性.这些问题理论上是聚焦于各个组件,好比sqoop, kafka ,nifi的机制上的,但实际跑起来以后,会有什么样的问题.当时并无过细的研究.

因此,若是本文中,有什么不当的,须要补充以及错误的地方,欢迎指正,共同窗习,一块儿进步.欢迎一块儿探讨关于nifi的使用,最近在码的是一个基于nifi的数据交换平台。

相关文章
相关标签/搜索