摘要:随着数据体积的愈来愈大,实时处理成为了许多机构须要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为咱们演示了使用Storm进行实时大数据分析。CSDN在此编译、整理。html
简单和明了,Storm让大数据分析变得轻松加愉快。java
当今世界,公司的平常运营常常会生成TB级别的数据。数据来源囊括了互联网装置能够捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中建立的数据。考虑到数据的生成量,实时处理成为了许多机构须要面对的首要挑战。咱们常常用的一个很是有效的开源实时计算工具就是Storm —— Twitter开发,一般被比做“实时的Hadoop”。然而Storm远比Hadoop来的简单,由于用它处理大数据不会带来新老技术的交替。node
Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分别从事技术分析和研发工做。本文详述了Storm的使用方法,例子中的项目名称为“超速报警系统(Speeding Alert System)”。咱们想实现的功能是:实时分析过往车辆的数据,一旦车辆数据超过预设的临界值 —— 便触发一个trigger并把相关的数据存入数据库。mysql
1. Storm是什么git
全量数据处理使用的大可能是鼎鼎大名的hadoop或者hive,做为一个批处理系统,hadoop以其吞吐量大、自动容错等优势,在海量数据处理上获得了普遍的使用。github
Hadoop下的Map/Reduce框架对于数据的处理流程是:redis
一、 将要处理的数据上传到Hadoop的文件系统HDFS中。sql
二、 Map阶段数据库
a) Master对Map的预处理:对于大量的数据进行切分,划分为M个16~64M的数据分片(可经过参数自定义分片大小)设计模式
b) 调用Mapper函数:Master为Worker分配Map任务,每一个分片都对应一个Worker进行处理。各个Worker读取并调用用户定义的Mapper函数 处理数据,并将结果存入HDFS,返回存储位置给Master。
一个Worker在Map阶段完成时,在HDFS中,生成一个排好序的Key-values组成的文件。并将位置信息汇报给Master。
三、 Reduce阶段
a) Master对Reduce的预处理:Master为Worker分配Reduce任务,他会将全部Mapper产生的数据进行映射,将相同key的任务分配给某个Worker。
b) 调用Reduce函数:各个Worker将分配到的数据集进行排序(使用工具类Merg),并调用用户自定义的Reduce函数,并将结果写入HDFS。
每一个Worker的Reduce任务完成后,都会在HDFS中生成一个输出文件。Hadoop并不将这些文件合并,由于这些文件每每会做为另外一个Map/reduce程序的输入。
以上的流程,粗略归纳,就是从HDFS中获取数据,将其按照大小分片,进行分布式处理,最终输出结果。从流程来看,Hadoop框架进行数据处理有如下要求:
一、 数据已经存在在HDFS当中。
二、 数据间是少关联的。各个任务执行器在执行负责的数据时,无需考虑对其余数据的影响,数据之间应尽量是无联系、不会影响的。
使用Hadoop,适合大批量的数据处理,这是他所擅长的。因为基于Map/Reduce这种单级的数据处理模型进行,所以,若是数据间的关联系较大,须要进行数据的多级交互处理(某个阶段的处理数据依赖于上一个阶段),须要进行屡次map/reduce。又因为map/reduce每次执行都须要遍历整个数据集,对于数据的实时计算并不合适,因而有了storm。
对比Hadoop的批处理,Storm是个实时的、分布式以及具有高容错的计算系统。同Hadoop同样Storm也能够处理大批量的数据,然而Storm在保证高可靠性的前提下还可让处理进行的更加实时;也就是说,全部的信息都会被处理。Storm一样还具有容错和分布计算这些特性,这就让Storm能够扩展到不一样的机器上进行大批量的数据处理。他一样还有如下的这些特性:
在线实时流处理模型
对于处理大批量数据的Map/reduce程序,在任务完成以后就中止了,但Storm是用于实时计算的,因此,相应的处理程序会一直执行(等待任务,有任务则执行)直至手动中止。
对于Storm,他是实时处理模型,与hadoop的不一样是,他是针对在线业务而存在的计算平台,如统计某用户的交易量、生成为某个用户的推荐列表等实时性高的需求。他是一个“流处理”框架。何谓流处理?storm将数据以Stream的方式,并按照Topology的顺序,依次处理并最终生成结果。
固然为了更好的理解文章,你首先须要安装和设置Storm。须要经过如下几个简单的步骤:
Storm集群和Hadoop集群表面上看很相似。可是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这二者之间是很是不同的。一个关键的区别是: 一个MapReduce job最终会结束, 而一个topology永远会运行(除非你手动kill掉)。
Storm集群主要由一个主节点(Nimbus后台程序)和一群工做节点(worker node)Supervisor的节点组成,经过 Zookeeper进行协调。Nimbus相似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器, 而且监控状态。
每个工做节点上面运行一个叫作Supervisor的节点。Supervisor会监听分配给它那台机器的工做,根据须要启动/关闭工做进程。每个工做进程执行一个topology的一个子集;一个运行的topology由运行在不少机器上的不少工做进程组成。
一、 Nimbus主节点:
主节点一般运行一个后台程序 —— Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很相似于Hadoop中的Job Tracker。
二、Supervisor工做节点:
工做节点一样会运行一个后台程序 —— Supervisor,用于收听工做指派并基于要求运行工做进程。每一个工做节点都是topology中一个子集的实现。而Nimbus和Supervisor之间的协调则经过Zookeeper系统或者集群。
三、Zookeeper
Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“topology”。topology则是一组由Spouts(数据源)和Bolts(数据操做)经过Stream Groupings进行链接的图。下面对出现的术语进行更深入的解析。
四、Worker:
运行具体处理组件逻辑的进程。
五、Task:
worker中每个spout/bolt的线程称为一个task. 在storm0.8以后,task再也不与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。
六、Topology(拓扑):
storm中运行的一个实时应用程序,由于各个组件间的消息流动造成逻辑上的一个拓扑结构。一个topology是spouts和bolts组成的图, 经过stream groupings将图中的spouts和bolts链接起来,以下图:
一个topology会一直运行直到你手动kill掉,Storm自动从新分配执行失败的任务, 而且Storm能够保证你不会有数据丢失(若是开启了高可靠性的话)。若是一些机器意外停机它上面的全部任务会被转移到其余机器上。
运行一个topology很简单。首先,把你全部的代码以及所依赖的jar打进一个jar包。而后运行相似下面的这个命令:
storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2
这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个topology而且把它提交给Nimbus。storm jar负责链接到Nimbus而且上传jar包。
Topology的定义是一个Thrift结构,而且Nimbus就是一个Thrift服务, 你能够提交由任何语言建立的topology。上面的方面是用JVM-based语言提交的最简单的方法。
七、Spout:
消息源spout是Storm里面一个topology里面的消息生产者。简而言之,Spout历来源处读取数据并放入topology。Spout分红可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。
消息源能够发射多条消息流stream。使用OutputFieldsDeclarer.declareStream来定义多个stream,而后使用SpoutOutputCollector来发射指定的stream。
而Spout中最主要的方法就是nextTuple(),该方法会发射一个新的tuple到topology,若是没有新tuple发射则会简单的返回。
要注意的是nextTuple方法不能阻塞,由于storm在同一个线程上面调用全部消息源spout的方法。
另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,不然调用fail。storm只对可靠的spout调用ack和fail。
八、Bolt:
Topology中全部的处理都由Bolt完成。即全部的消息处理逻辑被封装在bolts里面。Bolt能够完成任何事,好比:链接的过滤、聚合、访问文件/数据库、等等。
Bolt从Spout中接收数据并进行处理,若是遇到复杂流的处理也可能将tuple发送给另外一个Bolt进行处理。即须要通过不少blots。好比算出一堆图片里面被转发最多的图片就至少须要两步:第一步算出每一个图片的转发数量。第二步找出转发最多的前10个图片。(若是要把这个过程作得更具备扩展性那么可能须要更多的步骤)。
Bolts能够发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。
而Bolt中最重要的方法是execute(),以新的tuple做为参数接收。无论是Spout仍是Bolt,若是将tuple发射成多个流,这些流均可以经过declareStream()来声明。
bolts使用OutputCollector来发射tuple,bolts必需要为它处理的每个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。 通常的流程是: bolts处理一个输入tuple, 发射0个或者多个tuple, 而后调用ack通知storm本身已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。
九、Tuple:
一次消息传递的基本单元。原本应该是一个key-value的map,可是因为各个组件间传递的tuple的字段名称已经事先定义好,因此tuple中只要按序填入各个value就好了,因此就是一个value list.
十、Stream:
源源不断传递的tuple就组成了stream。消息流stream是storm里的关键抽象。一个消息流是一个没有边界的tuple序列, 而这些tuple序列会以一种分布式的方式并行地建立和处理。经过对stream中tuple序列中每一个字段命名来定义stream。在默认的状况下,tuple的字段类型能够是:integer,long,short, byte,string,double,float,boolean和byte array。你也能够自定义类型(只要实现相应的序列化器)。
每一个消息流在定义的时候会被分配给一个id,由于单向消息流使用的至关广泛, OutputFieldsDeclarer定义了一些方法让你能够定义一个stream而不用指定这个id。在这种状况下这个stream会分配个值为‘default’默认的id 。
Storm提供的最基本的处理stream的原语是spout和bolt。你能够实现spout和bolt提供的接口来处理你的业务逻辑。
十一、Stream Groupings:
Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型:
1). 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每一个任务得到相等数量的tuple。
2). 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组老是分发到同一个任务,不一样“user-id”的元组可能分发到不一样的任务。
3). 所有分组(All grouping):tuple被复制到bolt的全部任务。这种类型须要谨慎使用。
4). 全局分组(Global grouping):所有流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。
5). 无分组(None grouping):你不须要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(若是可能)。
6). 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪一个元组处理者任务接收。
固然还能够实现CustomStreamGroupimg接口来定制本身须要的分组。
storm 和hadoop的对比来了解storm中的基本概念。
Hadoop | Storm | |
系统角色 | JobTracker | Nimbus |
TaskTracker | Supervisor | |
Child | Worker | |
应用名称 | Job | Topology |
组件接口 | Mapper/Reducer | Spout/Bolt |
3. Storm应用场景
Storm 与其余大数据解决方案的不一样之处在于它的处理方式。Hadoop 在本质上是一个批处理系统。数据被引入 Hadoop 文件系统 (HDFS) 并分发到各个节点进行处理。当处理完成时,结果数据返回到 HDFS 供始发者使用。Storm 支持建立拓扑结构来转换没有终点的数据流。不一样于 Hadoop 做业,这些转换从不中止,它们会持续处理到达的数据。
Twitter列举了Storm的三大类应用:
1. 信息流处理{Stream processing}
Storm可用来实时处理新数据和更新数据库,兼具容错性和可扩展性。即Storm能够用来处理源源不断流进来的消息,处理以后将结果写入到某个存储中去。
2. 连续计算{Continuous computation}
Storm可进行连续查询并把结果即时反馈给客户端。好比把Twitter上的热门话题发送到浏览器中。
3. 分布式远程程序调用{Distributed RPC}
Storm可用来并行处理密集查询。Storm的拓扑结构是一个等待调用信息的分布函数,当它收到一条调用信息后,会对查询进行计算,并返回查询结果。举个例子Distributed RPC能够作并行搜索或者处理大集合的数据。
经过配置drpc服务器,将storm的topology发布为drpc服务。客户端程序能够调用drpc服务将数据发送到storm集群中,并接收处理结果的反馈。这种方式须要drpc服务器进行转发,其中drpc服务器底层经过thrift实现。适合的业务场景主要是实时计算。而且扩展性良好,能够增长每一个节点的工做worker数量来动态扩展。
4. 项目实施,构建Topology
当下状况咱们须要给Spout和Bolt设计一种可以处理大量数据(日志文件)的topology,当一个特定数据值超过预设的临界值时促发警报。使用Storm的topology,逐行读入日志文件而且监视输入数据。在Storm组件方面,Spout负责读入输入数据。它不只从现有的文件中读入数据,同时还监视着新文件。文件一旦被修改Spout会读入新的版本而且覆盖以前的tuple(能够被Bolt读入的格式),将tuple发射给Bolt进行临界分析,这样就能够发现全部可能超临界的记录。
下一节将对用例进行详细介绍。
临界分析
这一节,将主要聚焦于临界值的两种分析类型:瞬间临界(instant thershold)和时间序列临界(time series threshold)。
Listing One显示了咱们将使用的一个类型日志,其中包含的车辆数据信息有:车牌号、车辆行驶的速度以及数据获取的位置。
AB 123 | 60 | North city |
BC 123 | 70 | South city |
CD 234 | 40 | South city |
DE 123 | 40 | East city |
EF 123 | 90 | South city |
GH 123 | 50 | West city |
这里将建立一个对应的XML文件,这将包含引入数据的模式。这个XML将用于日志文件的解析。XML的设计模式和对应的说明请见下表。
XML文件和日志文件都存放在Spout能够随时监测的目录下,用以关注文件的实时更新。而这个用例中的topology请见下图。
Figure 1:Storm中创建的topology,用以实现数据实时处理
如图所示:FilelistenerSpout接收输入日志并进行逐行的读入,接着将数据发射给ThresoldCalculatorBolt进行更深一步的临界值处理。一旦处理完成,被计算行的数据将发送给DBWriterBolt,而后由DBWriterBolt存入给数据库。下面将对这个过程的实现进行详细的解析。
Spout的实现
Spout以日志文件和XML描述文件做为接收对象。XML文件包含了与日志一致的设计模式。不妨设想一下一个示例日志文件,包含了车辆的车牌号、行驶速度、以及数据的捕获位置。(看下图)
Figure2:数据从日志文件到Spout的流程图
Listing Two显示了tuple对应的XML,其中指定了字段、将日志文件切割成字段的定界符以及字段的类型。XML文件以及数据都被保存到Spout指定的路径。
Listing Two:用以描述日志文件的XML文件。
经过构造函数及它的参数Directory、PathSpout和TupleInfo对象建立Spout对象。TupleInfo储存了日志文件的字段、定界符、字段的类型这些很必要的信息。这个对象经过XSTream序列化XML时创建。
Spout的实现步骤:
Spout的具体编码在Listing Three中显示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。
declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就能够用相似的方法将tuple译码。Spout持续对日志文件的数据的变动进行监听,一旦有添加Spout就会进行读入而且发送给Bolt进行处理。
Bolt的实现
Spout的输出结果将给予Bolt进行更深一步的处理。通过对用例的思考,咱们的topology中须要如Figure 3中的两个Bolt。
Figure 3:Spout到Bolt的数据流程。
ThresholdCalculatorBolt
Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:
临界值检查
Listing Four中的类,定义用来保存这些值。
Listing Four:ThresholdInfo类
Listing Five:临界值检测代码段
经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在咱们的用例中是DBWriterBolt。
DBWriterBolt
通过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt作了这个持久化的工做并把tuple存入了数据库。表的创建由prepare()函数完成,这也将是topology调用的第一个方法。方法的编码如Listing Six所示。
Listing Six:建表编码。
数据分批次的插入数据库。插入的逻辑由Listting Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不一样类型输入的解析。
Listing Seven:数据插入的代码部分。
一旦Spout和Bolt准备就绪(等待被执行),topology生成器将会创建topology并准备执行。下面就来看一下执行步骤。
在本地集群上运行和测试topology
Listing Eight:创建和执行topology。
topology被创建后将被提交到本地集群。一旦topology被提交,除非被取缔或者集群关闭,它将一直保持运行不须要作任何的修改。这也是Storm的另外一大特点之一。
这个简单的例子体现了当你掌握了topology、spout和bolt的概念,将能够轻松的使用Storm进行实时处理。若是你既想处理大数据又不想遍历Hadoop的话,不难发现使用Storm将是个很好的选择。
5. storm常见问题解答
1、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm作计算?
你须要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算。怎么实现spout能够参考官方的kestrel spout实现:
https://github.com/nathanmarz/storm-kestrel
若是你的数据源不支持事务性消费,那么就没法获得storm提供的可靠处理的保证,也不必实现ISpout接口中的ack和fail方法。
2、Storm为了保证tuple的可靠处理,须要保存tuple信息,这会不会致使内存OOM?
Storm为了保证tuple的可靠处理,acker会保存该节点建立的tuple id的xor值,这称为ack value,那么每ack一次,就将tuple id和ack value作异或(xor)。当全部产生的tuple都被ack的时候, ack value必定为0。这是个很简单的策略,对于每个tuple也只要占用约20个字节的内存。对于100万tuple,也才20M左右。关于可靠处理看这个:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing
3、Storm计算后的结果保存在哪里?能够保存在外部存储吗?
Storm不处理计算结果的保存,这是应用代码须要负责的事情,若是数据不大,你能够简单地保存在内存里,也能够每次都更新数据库,也能够采用NoSQL存储。storm并无像s4那样提供一个Persist API,根据时间或者容量来作存储输出。这部分事情彻底交给用户。
数据存储以后的展示,也是你须要本身处理的,storm UI只提供对topology的监控和统计。
4、Storm怎么处理重复的tuple?
由于Storm要保证tuple的可靠处理,当tuple处理失败或者超时的时候,spout会fail并从新发送该tuple,那么就会有tuple重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。一些可行的策略:
(1)不处理,这也算是种策略。由于实时计算一般并不要求很高的精确度,后续的批处理计算会更正实时计算的偏差。
(2)使用第三方集中存储来过滤,好比利用mysql,memcached或者redis根据逻辑主键来去重。
(3)使用bloom filter作过滤,简单高效。
5、Storm的动态增删节点
我在storm和s4里比较里谈到的动态增删节点,是指storm能够动态地添加和减小supervisor节点。对于减小节点来讲,被移除的supervisor上的worker会被nimbus从新负载均衡到其余supervisor节点上。在storm 0.6.1之前的版本,增长supervisor节点不会影响现有的topology,也就是现有的topology不会从新负载均衡到新的节点上,在扩展集群的时候很不方便,须要从新提交topology。所以我在storm的邮件列表里提了这个问题,storm的开发者nathanmarz建立了一个issue 54并在0.6.1提供了rebalance命令来让正在运行的topology从新负载均衡,具体见:
https://github.com/nathanmarz/storm/issues/54
和0.6.1的变动:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246
storm并不提供机制来动态调整worker和task数目。
6、Storm UI里spout统计的complete latency的具体含义是什么?为何emit的数目会是acked的两倍?
这个事实上是storm邮件列表里的一个问题。Storm做者marz的解答: