实时海量日志分析系统的架构设计、实现以及思考

1 序

对ETL系统中数据转换和存储操做的相关日志进行记录以及实时分析有助于咱们更好的观察和监控ETL系统的相关指标(如单位时间某些操做的处理时间),发现系统中出现的缺陷和性能瓶颈。html

因为须要对日志进行实时分析,因此Storm是咱们想到的首个框架。Storm是一个分布式实时计算系统,它能够很好的处理流式数据。利用storm咱们几乎能够直接实现一个日志分析系统,可是将日志分析系统进行模块化设计能够收到更好的效果。模块化的设计至少有两方面的优势:java

  1. 模块化设计可使功能更加清晰。整个日志分析系统能够分为“数据采集-数据缓冲-数据处理-数据存储”四个步骤。Apache项目下的flumeng框架能够很好的从多源目标收集数据,因此咱们用它来从ETL系统中收集日志信息;因为采集数据与处理数据的速度可能会出现不一致,因此咱们须要一个消息中间件来做为缓冲,kafka是一个极好的选择;而后对流式数据的处理,咱们将选择大名鼎鼎的storm了,同时为了更好的对数据进行处理,咱们把drools与storm进行了整合,分离出了数据处理规则,这样更有利于管理规则;最后,咱们选择redis做为咱们处理数据的存储工具,redis是一个内存数据库,能够基于健值进行快速的存取。
  2. 模块化设计以后,storm和前两个步骤之间就得到了很好的解耦,storm集群若是出现问题,数据采集以及数据缓冲的操做还能够继续运行,数据不会丢失。

2 相关框架的介绍和安装

2.1 flumeng

2.1.1 原理介绍

Flume是一个高可用、高可靠、分布式的海量日志采集、聚合和传输系统。Flume支持在日志系统中定制日志发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各类数据接收方的能力。它拥有一个简单的、可扩展的流式数据流架构,以下图所示:mysql

flume

日志收集系统就是由一个或者多个agent(代理)组成,每一个agent由source、channel、sink三部分组成,source是数据的来源,channel是数据进行传输的通道,sink用于将数据传输到指定的地方。咱们能够把agent看作一段水管,source是水管的入口,sink是水管的出口,数据流就是水流。 Agent本质上是一个jvm进程,agent各个组件之间是经过event来进行触发和协调的。linux

2.1.2 flumeng的安装

  1. 从官方网站下载apache-flume-1.4.0-bin.tar.gz压缩包
  2. 解压缩,并在conf目录下面新建一个文件flume-conf.properties,内容以下:git

  3. 启动代理。flume-ng agent –n a1 –f flume-conf.propertiesgithub

2.2 kafka

2.2.1 原理介绍

Kafka是linkedin用于日志处理的分布式消息队列。Kafka的架构以下图所示:web

kafka架构

Kafka的存储策略有一下几点:redis

  1. kafka以topic来进行消息管理,每一个topic包括多个partition,每一个partition包括一个逻辑log,由多个segment组成。
  2. 每一个segment中存储多条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。
  3. 每一个partition在内存中对应一个index,记录每一个segment中的第一条消息的偏移。
  4. 发布者发到某个topic的消息会被均匀的分布到多个partition上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到必定的大小后将不会再往该segment写数据,broker会建立新的segment。

2.2.2 kafka集群的搭建

Kafka集群的搭建须要依赖zookeeper来进行负载均衡,因此咱们须要在安装kafka以前搭建zookeeper集群。算法

  1. zookeeper集群的搭建,本系统用到了两台机器。具体搭建过程见http://blog.csdn.net/itleochen/article/details/17453881
  2. 分别下载kafka_2.9.2-0.8.1的安装包到两台机器,并解压该安装包。
  3. 打开conf/server.properties文件,修改配置项broker.id、zookeeper.connect、partitions以及host.name为相应的值。
  4. 分别启动kafka即完成了集群的搭建。

2.3 storm

2.3.1 原理介绍

Storm是一个分布式的、高容错的实时计算系统。Storm对于实时计算的的意义至关于Hadoop对于批处理的意义。hadoop为咱们提供了Map和Reduce原语,使咱们对数据进行批处理变的很是的简单和优美。一样,Storm也对数据的实时计算提供了简单Spout和Bolt原语。sql

Strom集群里面有两种节点,控制节点和工做节点,控制节点上面运行一个nimbus(相似于hadoop中的JobTracker)后台程序,Nimbus负责在集群里面分布代码,分配工做给机器, 而且监控状态。每个工做节点上面运行一个叫作Supervisor(相似Hadoop中的TaskTracker)的节点。Supervisor会监听分配给它那台机器的工做,根据须要启动/关闭工做进程。每个工做进程执行一个Topology(相似hadoop中的Job)的一个子集;一个运行的Topology由运行在不少机器上的不少工做进程 Worker(相似Hadoop中的Child)组成。结构以下图所示:

storm架构

Stream是storm里面的关键抽象。一个stream是一个没有边界的tuple序列。storm提供一些原语来分布式地、可靠地把一个stream传输进一个新的stream。好比: 你能够把一个tweets流传输到热门话题的流。

storm提供的最基本的处理stream的原语是spout和bolt。你能够实现Spout和Bolt对应的接口以处理你的应用的逻辑。

Spout是流的源头。好比一个spout可能从Kestrel队列里面读取消息而且把这些消息发射成一个流。一般Spout会从外部数据源(队列、数据库等)读取数据,而后封装成Tuple形式,以后发送到Stream中。Spout是一个主动的角色,在接口内部有个nextTuple函数,Storm框架会不停的调用该函数。

Bolt能够接收任意多个输入stream。Bolt处理输入的Stream,并产生新的输出Stream。Bolt能够执行过滤、函数操做、Join、操做数据库等任何操做。Bolt是一个被动的角色,其接口中有一个execute(Tuple input)方法,在接收到消息以后会调用此函数,用户能够在此方法中执行本身的处理逻辑。

spout和bolt所组成一个网络会被打包成topology, topology是storm里面最高一级的抽象(相似 Job), 你能够把topology提交给storm的集群来运行。Topology的结构以下图所示:

topology

2.3.2 storm集群的搭建

Storm集群的搭建也要依赖于zookeeper,本系统中storm与kafka共用一样一个zookeeper集群。

  1. 下载安装包storm-0.9.0.1.tar.gz,并对该包进行解压。
  2. 配置nimbus。 修改storm的conf/storm.yaml文件以下:

    注意:在每一个配置项前面必须留有空格,不然会没法识别。storm.messaging.* 部分是Netty的配置。若是没有该部分。那么Storm默认仍是使用ZeroMQ。

  3. 配置supervisor 修改storm的conf/storm.yaml文件以下:

注意

  1. nimbus.host是nimbus的IP或hostname
  2. supervisor.slots.ports 是配置slot的ip地址。配了几个地址,就有几个slot,即几个worker。若是尝试提交的topology所声明的worker数超过当前可用的slot,该topology提交会失败。
  3. storm.messaging 部分是Netty的配置。

2.4 drools

Drools是一个基于Java的、开源的规则引擎,能够将复杂多变的规则从硬编码中解放出来,以规则脚本的形式存放在文件中,使得规则的变动不须要修正代码重启机器就能够当即在线上环境生效。 日志分析系统中,drools的做用是利用不一样的规则对日志信息进行处理,以得到咱们想要的数据。可是,Drools自己不是一个分布式框架,因此规则引擎对log的处理没法作到分布式。咱们的策略是将drools整合到storm的bolt中去,这就就解决了drools没法分布式的问题。这是由于bolt能够做为task分发给多个worker来处理,这样drools中的规则也天然被多个worker处理了。

2.5 redis

Redis是key-value存储系统,它支持较为丰富的数据结构,有String,list,set,hash以及zset。与memcached同样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操做写入追加的记录文件,而且在此基础上实现了master-slave(主从)同步。 Redis是内存数据库,因此有很是快速的存取效率。日志分析系统数据量并非特别大,可是对存取的速度要求较高,因此选择redis有很大的优点。

3 各个框架的整合

3.1 ETL系统整合flumeng

Flume如何收集ETL系统中的日志是我须要考虑的第一个问题。log4j2提供了专门的Appender-FlumeAppender用于将log信息发送到flume系统,并不须要咱们来实现。咱们在log4j2的配置文件中配置了ETL系统将log信息发送到的目的地,即avro服务器端。该服务器端咱们在flume的配置文件中进行了配置。配置信息以下所示:

3.2 flumeng与kafka的整合

咱们从ETL系统中得到了日志信息,将该信息不做任何处理传递到sink端,sink端发送数据到kafka。这个发送过程须要咱们编写代码来实现,咱们的实现代码为KafkaSink类。主要代码以下所示:

该类中,咱们读取了一些配置信息,这些配置信息咱们在flumeng的flume-conf.properties文件中进行了定义,定义内容以下:

将上面的KafkaSink类打包成flumeng-kafka.jar,并将该jar包以及kafka_2.9.2-0.8.1.jar、metrics-annotation-2.2.0.jar、metrics-core-2.2.0.jar、Scala-compiler.jar、scala-library.jar、zkclient-0.3.jar放到flume的lib目录下,启动flume,咱们就能够将ETL系统中产生的日志信息发送到kafka中的fks1这个topic中去了。

3.3 kafka与storm的整合

Storm中的spout如何主动消费kafka中的消息须要咱们编写代码来实现,httpsgithub.comwurstmeisterstorm-kafka-0.8-plus实现了一个kafka与storm整合的插件,下载该插件,将插件中的jar包以及metrics-core-2.2.0.jar、scala-compiler2.9.2.jar放到storm的lib目录下。利用插件中的StormSpout类,咱们就能够消费kafka中的消息了。主要代码以下所示:

3.4

storm中bolt与drools的整合 Drools能够将storm中处理数据的规则提取到一个drl文件中,该文件就成了惟一处理规则的文件。任什么时候候规则出现变化,咱们只须要修改该drl文件,而不会改变其它的代码。Bolt与drools的整合代码以下所示:

相关文章
相关标签/搜索