对ETL系统中数据转换和存储操做的相关日志进行记录以及实时分析有助于咱们更好的观察和监控ETL系统的相关指标(如单位时间某些操做的处理时间),发现系统中出现的缺陷和性能瓶颈。html
因为须要对日志进行实时分析,因此Storm是咱们想到的首个框架。Storm是一个分布式实时计算系统,它能够很好的处理流式数据。利用storm咱们几乎能够直接实现一个日志分析系统,可是将日志分析系统进行模块化设计能够收到更好的效果。模块化的设计至少有两方面的优势:java
Flume是一个高可用、高可靠、分布式的海量日志采集、聚合和传输系统。Flume支持在日志系统中定制日志发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各类数据接收方的能力。它拥有一个简单的、可扩展的流式数据流架构,以下图所示:mysql
日志收集系统就是由一个或者多个agent(代理)组成,每一个agent由source、channel、sink三部分组成,source是数据的来源,channel是数据进行传输的通道,sink用于将数据传输到指定的地方。咱们能够把agent看作一段水管,source是水管的入口,sink是水管的出口,数据流就是水流。 Agent本质上是一个jvm进程,agent各个组件之间是经过event来进行触发和协调的。linux
解压缩,并在conf目录下面新建一个文件flume-conf.properties,内容以下:git
启动代理。flume-ng agent –n a1 –f flume-conf.propertiesgithub
Kafka是linkedin用于日志处理的分布式消息队列。Kafka的架构以下图所示:web
Kafka的存储策略有一下几点:redis
Kafka集群的搭建须要依赖zookeeper来进行负载均衡,因此咱们须要在安装kafka以前搭建zookeeper集群。算法
Storm是一个分布式的、高容错的实时计算系统。Storm对于实时计算的的意义至关于Hadoop对于批处理的意义。hadoop为咱们提供了Map和Reduce原语,使咱们对数据进行批处理变的很是的简单和优美。一样,Storm也对数据的实时计算提供了简单Spout和Bolt原语。sql
Strom集群里面有两种节点,控制节点和工做节点,控制节点上面运行一个nimbus(相似于hadoop中的JobTracker)后台程序,Nimbus负责在集群里面分布代码,分配工做给机器, 而且监控状态。每个工做节点上面运行一个叫作Supervisor(相似Hadoop中的TaskTracker)的节点。Supervisor会监听分配给它那台机器的工做,根据须要启动/关闭工做进程。每个工做进程执行一个Topology(相似hadoop中的Job)的一个子集;一个运行的Topology由运行在不少机器上的不少工做进程 Worker(相似Hadoop中的Child)组成。结构以下图所示:
Stream是storm里面的关键抽象。一个stream是一个没有边界的tuple序列。storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。好比: 你能够把一个tweets流传输到热门话题的流。
storm提供的最基本的处理stream的原语是spout和bolt。你能够实现Spout和Bolt对应的接口以处理你的应用的逻辑。
Spout是流的源头。好比一个spout可能从Kestrel队列里面读取消息而且把这些消息发射成一个流。一般Spout会从外部数据源(队列、数据库等)读取数据,而后封装成Tuple形式,以后发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数。
Bolt能够接收任意多个输入stream。Bolt处理输入的Stream,并产生新的输出Stream。Bolt能够执行过滤、函数操做、Join、操做数据库等任何操做。Bolt是一个被动的角色,其接口中有一个execute(Tuple input)方法,在接收到消息以后会调用此函数,用户能够在此方法中执行本身的处理逻辑。
spout和bolt所组成一个网络会被打包成topology, topology是storm里面最高一级的抽象(相似 Job), 你能够把topology提交给storm的集群来运行。Topology的结构以下图所示:
Storm集群的搭建也要依赖于zookeeper,本系统中storm与kafka共用一样一个zookeeper集群。
配置nimbus。 修改storm的conf/storm.yaml文件以下:
注意:在每一个配置项前面必须留有空格,不然会没法识别。storm.messaging.* 部分是Netty的配置。若是没有该部分。那么Storm默认仍是使用ZeroMQ。
配置supervisor 修改storm的conf/storm.yaml文件以下:
注意
Drools是一个基于Java的、开源的规则引擎,能够将复杂多变的规则从硬编码中解放出来,以规则脚本的形式存放在文件中,使得规则的变动不须要修正代码重启机器就能够当即在线上环境生效。 日志分析系统中,drools的做用是利用不一样的规则对日志信息进行处理,以得到咱们想要的数据。可是,Drools自己不是一个分布式框架,因此规则引擎对log的处理没法作到分布式。咱们的策略是将drools整合到storm的bolt中去,这就就解决了drools没法分布式的问题。这是由于bolt能够做为task分发给多个worker来处理,这样drools中的规则也天然被多个worker处理了。
Redis是key-value存储系统,它支持较为丰富的数据结构,有String,list,set,hash以及zset。与memcached同样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操做写入追加的记录文件,而且在此基础上实现了master-slave(主从)同步。 Redis是内存数据库,因此有很是快速的存取效率。日志分析系统数据量并非特别大,可是对存取的速度要求较高,因此选择redis有很大的优点。
Flume如何收集ETL系统中的日志是我须要考虑的第一个问题。log4j2提供了专门的Appender-FlumeAppender用于将log信息发送到flume系统,并不须要咱们来实现。咱们在log4j2的配置文件中配置了ETL系统将log信息发送到的目的地,即avro服务器端。该服务器端咱们在flume的配置文件中进行了配置。配置信息以下所示:
咱们从ETL系统中得到了日志信息,将该信息不做任何处理传递到sink端,sink端发送数据到kafka。这个发送过程须要咱们编写代码来实现,咱们的实现代码为KafkaSink类。主要代码以下所示:
该类中,咱们读取了一些配置信息,这些配置信息咱们在flumeng的flume-conf.properties文件中进行了定义,定义内容以下:
将上面的KafkaSink类打包成flumeng-kafka.jar,并将该jar包以及kafka_2.9.2-0.8.1.jar、metrics-annotation-2.2.0.jar、metrics-core-2.2.0.jar、Scala-compiler.jar、scala-library.jar、zkclient-0.3.jar放到flume的lib目录下,启动flume,咱们就能够将ETL系统中产生的日志信息发送到kafka中的fks1这个topic中去了。
Storm中的spout如何主动消费kafka中的消息须要咱们编写代码来实现,httpsgithub.comwurstmeisterstorm-kafka-0.8-plus实现了一个kafka与storm整合的插件,下载该插件,将插件中的jar包以及metrics-core-2.2.0.jar、scala-compiler2.9.2.jar放到storm的lib目录下。利用插件中的StormSpout类,咱们就能够消费kafka中的消息了。主要代码以下所示:
storm中bolt与drools的整合 Drools能够将storm中处理数据的规则提取到一个drl文件中,该文件就成了惟一处理规则的文件。任什么时候候规则出现变化,咱们只须要修改该drl文件,而不会改变其它的代码。Bolt与drools的整合代码以下所示: