使用Storm实现实时大数据分析(转)

原文连接:http://blog.csdn.net/hguisu/article/details/8454368html

 

简单和明了,Storm让大数据分析变得轻松加愉快。java

当今世界,公司的平常运营常常会生成TB级别的数据。数据来源囊括了互联网装置能够捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中建立的数据。考虑到数据的生成量,实时处理成为了许多机构须要面对的首要挑战。咱们常常用的一个很是有效的开源实时计算工具就是Storm —— Twitter开发,一般被比做“实时的Hadoop”。然而Storm远比Hadoop来的简单,由于用它处理大数据不会带来新老技术的交替。node

Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分别从事技术分析和研发工做。本文详述了Storm的使用方法,例子中的项目名称为“超速报警系统(Speeding Alert System)”。咱们想实现的功能是:实时分析过往车辆的数据,一旦车辆数据超过预设的临界值 —— 便触发一个trigger并把相关的数据存入数据库。mysql

 

1.  Storm是什么git

 

     全量数据处理使用的大可能是鼎鼎大名的hadoop或者hive,做为一个批处理系统,hadoop以其吞吐量大、自动容错等优势,在海量数据处理上获得了普遍的使用。github

      Hadoop下的Map/Reduce框架对于数据的处理流程是:web

 

      一、 将要处理的数据上传到Hadoop的文件系统HDFS中。redis

      二、 Map阶段sql

             a)   Master对Map的预处理:对于大量的数据进行切分,划分为M个16~64M的数据分片(可经过参数自定义分片大小)数据库

             b)   调用Mapper函数:Master为Worker分配Map任务,每一个分片都对应一个Worker进行处理。各个Worker读取并调用用户定义的Mapper函数    处理数据,并将结果存入HDFS,返回存储位置给Master。

一个Worker在Map阶段完成时,在HDFS中,生成一个排好序的Key-values组成的文件。并将位置信息汇报给Master。

      三、 Reduce阶段

             a)   Master对Reduce的预处理:Master为Worker分配Reduce任务,他会将全部Mapper产生的数据进行映射,将相同key的任务分配给某个Worker。

             b)   调用Reduce函数:各个Worker将分配到的数据集进行排序(使用工具类Merg),并调用用户自定义的Reduce函数,并将结果写入HDFS。

每一个Worker的Reduce任务完成后,都会在HDFS中生成一个输出文件。Hadoop并不将这些文件合并,由于这些文件每每会做为另外一个Map/reduce程序的输入。

         以上的流程,粗略归纳,就是从HDFS中获取数据,将其按照大小分片,进行分布式处理,最终输出结果。从流程来看,Hadoop框架进行数据处理有如下要求:

一、 数据已经存在在HDFS当中。

二、 数据间是少关联的。各个任务执行器在执行负责的数据时,无需考虑对其余数据的影响,数据之间应尽量是无联系、不会影响的。

使用Hadoop,适合大批量的数据处理,这是他所擅长的。因为基于Map/Reduce这种单级的数据处理模型进行,所以,若是数据间的关联系较大,须要进行数据的多级交互处理(某个阶段的处理数据依赖于上一个阶段),须要进行屡次map/reduce。又因为map/reduce每次执行都须要遍历整个数据集,对于数据的实时计算并不合适,因而有了storm。

      对比Hadoop的批处理,Storm是个实时的、分布式以及具有高容错的计算系统。同Hadoop同样Storm也能够处理大批量的数据,然而Storm在保证高可靠性的前提下还可让处理进行的更加实时;也就是说,全部的信息都会被处理。Storm一样还具有容错和分布计算这些特性,这就让Storm能够扩展到不一样的机器上进行大批量的数据处理。他一样还有如下的这些特性:

  • 易于扩展:对于扩展,伴随着业务的发展,咱们的数据量、计算量可能会愈来愈大,因此但愿这个系统是可扩展的。你只须要添加机器和改变对应的topology(拓扑)设置。Storm使用Hadoop Zookeeper进行集群协调,这样能够充分的保证大型集群的良好运行。
  • 每条信息的处理均可以获得保证。
  • Storm集群管理简易。
  • Storm的容错机能:一旦topology递交,Storm会一直运行它直到topology被废除或者被关闭。而在执行中出现错误时,也会由Storm从新分配任务。这是分布式系统中通用问题。一个节点挂了不能影响个人应用。
  • 低延迟。都说了是实时计算系统了,延迟是必定要低的。
  • 尽管一般使用Java,Storm中的topology能够用任何语言设计。

       在线实时流处理模型

       对于处理大批量数据的Map/reduce程序,在任务完成以后就中止了,但Storm是用于实时计算的,因此,相应的处理程序会一直执行(等待任务,有任务则执行)直至手动中止。

       对于Storm,他是实时处理模型,与hadoop的不一样是,他是针对在线业务而存在的计算平台,如统计某用户的交易量、生成为某个用户的推荐列表等实时性高的需求。他是一个“流处理”框架。何谓流处理?storm将数据以Stream的方式,并按照Topology的顺序,依次处理并最终生成结果。

固然为了更好的理解文章,你首先须要安装和设置Storm。须要经过如下几个简单的步骤:

  • 从Storm官方下载Storm安装文件
  • 将bin/directory解压到你的PATH上,并保证bin/storm脚本是可执行的。
      尽管 Storm 是使用 Clojure 语言开发的,您仍然能够在 Storm 中使用几乎任何语言编写应用程序。所需的只是一个链接到 Storm 的架构的适配器。已存在针对 Scala、JRuby、Perl 和 PHP 的适配器,可是还有支持流式传输到 Storm 拓扑结构中的结构化查询语言适配器。

2.  Storm的组件

 

       Storm集群和Hadoop集群表面上看很相似。可是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这二者之间是很是不同的。一个关键的区别是: 一个MapReduce job最终会结束, 而一个topology永远会运行(除非你手动kill掉)。

       Storm集群主要由一个主节点(Nimbus后台程序)和一群工做节点(worker node)Supervisor的节点组成,经过 Zookeeper进行协调。Nimbus相似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器, 而且监控状态。

      每个工做节点上面运行一个叫作Supervisor的节点。Supervisor会监听分配给它那台机器的工做,根据须要启动/关闭工做进程。每个工做进程执行一个topology的一个子集;一个运行的topology由运行在不少机器上的不少工做进程组成。

 

一、 Nimbus主节点:

     主节点一般运行一个后台程序 —— Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很相似于Hadoop中的Job Tracker。

二、Supervisor工做节点:

      工做节点一样会运行一个后台程序 —— Supervisor,用于收听工做指派并基于要求运行工做进程。每一个工做节点都是topology中一个子集的实现。而Nimbus和Supervisor之间的协调则经过Zookeeper系统或者集群。

三、Zookeeper

     Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“topology”。topology则是一组由Spouts(数据源)和Bolts(数据操做)经过Stream Groupings进行链接的图。下面对出现的术语进行更深入的解析。

四、Worker:

       运行具体处理组件逻辑的进程。

五、Task:

       worker中每个spout/bolt的线程称为一个task. 在storm0.8以后,task再也不与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。

六、Topology(拓扑):

       storm中运行的一个实时应用程序,由于各个组件间的消息流动造成逻辑上的一个拓扑结构。一个topology是spouts和bolts组成的图, 经过stream groupings将图中的spouts和bolts链接起来,以下图:

      

 

     一个topology会一直运行直到你手动kill掉,Storm自动从新分配执行失败的任务, 而且Storm能够保证你不会有数据丢失(若是开启了高可靠性的话)。若是一些机器意外停机它上面的全部任务会被转移到其余机器上。

运行一个topology很简单。首先,把你全部的代码以及所依赖的jar打进一个jar包。而后运行相似下面的这个命令:

      storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1, arg2。这个类的main函数定义这个topology而且把它提交给Nimbus。storm jar负责链接到Nimbus而且上传jar包。

Topology的定义是一个Thrift结构,而且Nimbus就是一个Thrift服务, 你能够提交由任何语言建立的topology。上面的方面是用JVM-based语言提交的最简单的方法。

 

七、Spout:

       消息源spout是Storm里面一个topology里面的消息生产者。简而言之,Spout历来源处读取数据并放入topology。Spout分红可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。

       消息源能够发射多条消息流stream。使用OutputFieldsDeclarer.declareStream来定义多个stream,而后使用SpoutOutputCollector来发射指定的stream。

      而Spout中最主要的方法就是nextTuple(),该方法会发射一个新的tuple到topology,若是没有新tuple发射则会简单的返回。

       要注意的是nextTuple方法不能阻塞,由于storm在同一个线程上面调用全部消息源spout的方法。

 

另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,不然调用fail。storm只对可靠的spout调用ack和fail。

八、Bolt:

     Topology中全部的处理都由Bolt完成。即全部的消息处理逻辑被封装在bolts里面。Bolt能够完成任何事,好比:链接的过滤、聚合、访问文件/数据库、等等。

        Bolt从Spout中接收数据并进行处理,若是遇到复杂流的处理也可能将tuple发送给另外一个Bolt进行处理。即须要通过不少blots。好比算出一堆图片里面被转发最多的图片就至少须要两步:第一步算出每一个图片的转发数量。第二步找出转发最多的前10个图片。(若是要把这个过程作得更具备扩展性那么可能须要更多的步骤)。

        Bolts能够发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。

      而Bolt中最重要的方法是execute(),以新的tuple做为参数接收。无论是Spout仍是Bolt,若是将tuple发射成多个流,这些流均可以经过declareStream()来声明。

     bolts使用OutputCollector来发射tuple,bolts必需要为它处理的每个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。 通常的流程是: bolts处理一个输入tuple,  发射0个或者多个tuple, 而后调用ack通知storm本身已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。

九、Tuple:

       一次消息传递的基本单元。原本应该是一个key-value的map,可是因为各个组件间传递的tuple的字段名称已经事先定义好,因此tuple中只要按序填入各个value就好了,因此就是一个value list.

十、Stream:

        源源不断传递的tuple就组成了stream。消息流stream是storm里的关键抽象。一个消息流是一个没有边界的tuple序列, 而这些tuple序列会以一种分布式的方式并行地建立和处理。经过对stream中tuple序列中每一个字段命名来定义stream。在默认的状况下,tuple的字段类型能够是:integer,long,short, byte,string,double,float,boolean和byte array。你也能够自定义类型(只要实现相应的序列化器)。

     每一个消息流在定义的时候会被分配给一个id,由于单向消息流使用的至关广泛, OutputFieldsDeclarer定义了一些方法让你能够定义一个stream而不用指定这个id。在这种状况下这个stream会分配个值为‘default’默认的id 。

      Storm提供的最基本的处理stream的原语是spout和bolt。你能够实现spout和bolt提供的接口来处理你的业务逻辑。

      

十一、Stream Groupings:

Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型:

1). 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每一个任务得到相等数量的tuple。

2). 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组老是分发到同一个任务,不一样“user-id”的元组可能分发到不一样的任务。

3). 所有分组(All grouping):tuple被复制到bolt的全部任务。这种类型须要谨慎使用。

4). 全局分组(Global grouping):所有流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。

5). 无分组(None grouping):你不须要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(若是可能)。

6). 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪一个元组处理者任务接收。

固然还能够实现CustomStreamGroupimg接口来定制本身须要的分组。

 

storm 和hadoop的对比来了解storm中的基本概念。

  Hadoop Storm
系统角色 JobTracker Nimbus
TaskTracker Supervisor
Child Worker
应用名称 Job Topology
组件接口 Mapper/Reducer Spout/Bolt

 

3.  Storm应用场景

       Storm 与其余大数据解决方案的不一样之处在于它的处理方式。Hadoop 在本质上是一个批处理系统。数据被引入 Hadoop 文件系统 (HDFS) 并分发到各个节点进行处理。当处理完成时,结果数据返回到 HDFS 供始发者使用。Storm 支持建立拓扑结构来转换没有终点的数据流。不一样于 Hadoop 做业,这些转换从不中止,它们会持续处理到达的数据。

Twitter列举了Storm的三大类应用:

1. 信息流处理{Stream processing} Storm可用来实时处理新数据和更新数据库,兼具容错性和可扩展性。即Storm能够用来处理源源不断流进来的消息,处理以后将结果写入到某个存储中去。

2. 连续计算{Continuous computation} Storm可进行连续查询并把结果即时反馈给客户端。好比把Twitter上的热门话题发送到浏览器中。

3. 分布式远程程序调用{Distributed RPC}        Storm可用来并行处理密集查询。Storm的拓扑结构是一个等待调用信息的分布函数,当它收到一条调用信息后,会对查询进行计算,并返回查询结果。举个例子Distributed RPC能够作并行搜索或者处理大集合的数据。

        经过配置drpc服务器,将storm的topology发布为drpc服务。客户端程序能够调用drpc服务将数据发送到storm集群中,并接收处理结果的反馈。这种方式须要drpc服务器进行转发,其中drpc服务器底层经过thrift实现。适合的业务场景主要是实时计算。而且扩展性良好,能够增长每一个节点的工做worker数量来动态扩展。

 

 

4.  项目实施,构建Topology

 

      当下状况咱们须要给Spout和Bolt设计一种可以处理大量数据(日志文件)的topology,当一个特定数据值超过预设的临界值时促发警报。使用Storm的topology,逐行读入日志文件而且监视输入数据。在Storm组件方面,Spout负责读入输入数据。它不只从现有的文件中读入数据,同时还监视着新文件。文件一旦被修改Spout会读入新的版本而且覆盖以前的tuple(能够被Bolt读入的格式),将tuple发射给Bolt进行临界分析,这样就能够发现全部可能超临界的记录。

下一节将对用例进行详细介绍。

临界分析

这一节,将主要聚焦于临界值的两种分析类型:瞬间临界(instant thershold)和时间序列临界(time series threshold)。

  • 瞬间临界值监测:一个字段的值在那个瞬间超过了预设的临界值,若是条件符合的话则触发一个trigger。举个例子当车辆超越80千米每小时,则触发trigger。
  • 时间序列临界监测:字段的值在一个给定的时间段内超过了预设的临界值,若是条件符合则触发一个触发器。好比:在5分钟类,时速超过80KM两次及以上的车辆。

Listing One显示了咱们将使用的一个类型日志,其中包含的车辆数据信息有:车牌号、车辆行驶的速度以及数据获取的位置。

AB 123 60 North city
BC 123 70 South city
CD 234 40 South city
DE 123 40 East  city
EF 123 90 South city
GH 123 50 West  city

这里将建立一个对应的XML文件,这将包含引入数据的模式。这个XML将用于日志文件的解析。XML的设计模式和对应的说明请见下表。

XML文件和日志文件都存放在Spout能够随时监测的目录下,用以关注文件的实时更新。而这个用例中的topology请见下图。

Figure 1:Storm中创建的topology,用以实现数据实时处理

如图所示:FilelistenerSpout接收输入日志并进行逐行的读入,接着将数据发射给ThresoldCalculatorBolt进行更深一步的临界值处理。一旦处理完成,被计算行的数据将发送给DBWriterBolt,而后由DBWriterBolt存入给数据库。下面将对这个过程的实现进行详细的解析。

Spout的实现

Spout以日志文件和XML描述文件做为接收对象。XML文件包含了与日志一致的设计模式。不妨设想一下一个示例日志文件,包含了车辆的车牌号、行驶速度、以及数据的捕获位置。(看下图)

Figure2:数据从日志文件到Spout的流程图

Listing Two显示了tuple对应的XML,其中指定了字段、将日志文件切割成字段的定界符以及字段的类型。XML文件以及数据都被保存到Spout指定的路径。

Listing Two:用以描述日志文件的XML文件。

 

  1. <TUPLEINFO>   
  2. <FIELDLIST>   
  3. <FIELD>   
  4. <COLUMNNAME>vehicle_number</COLUMNNAME>   
  5. <COLUMNTYPE>string</COLUMNTYPE>   
  6. </FIELD>   
  7.    
  8. <FIELD>  
  9. <COLUMNNAME>speed</COLUMNNAME>   
  10. <COLUMNTYPE>int</COLUMNTYPE>   
  11. </FIELD>   
  12.    
  13. <FIELD>   
  14. <COLUMNNAME>location</COLUMNNAME>   
  15. <COLUMNTYPE>string</COLUMNTYPE>   
  16. </FIELD>   
  17. </FIELDLIST>   
  18. <DELIMITER>,</DELIMITER>   
  19. </TUPLEINFO>     

 

经过构造函数及它的参数Directory、PathSpout和TupleInfo对象建立Spout对象。TupleInfo储存了日志文件的字段、定界符、字段的类型这些很必要的信息。这个对象经过XSTream序列化XML时创建。

Spout的实现步骤:

  • 对文件的改变进行分开的监听,并监视目录下有无新日志文件添加。
  • 在数据获得了字段的说明后,将其转换成tuple。
  • 声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。

Spout的具体编码在Listing Three中显示。

Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。

 

  1. public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )     
  2. {     
  3.            _collector = collector;     
  4.          try     
  5.          {     
  6.          fileReader  =  new BufferedReader(new FileReader(new File(file)));    
  7.          }    
  8.          catch (FileNotFoundException e)    
  9.          {    
  10.          System.exit(1);     
  11.          }    
  12. }                                                            
  13.    
  14. public void nextTuple()    
  15. {    
  16.          protected void ListenFile(File file)    
  17.          {    
  18.          Utils.sleep(2000);    
  19.          RandomAccessFile access = null;    
  20.          String line = null;     
  21.             try     
  22.             {    
  23.                 while ((line = access.readLine()) != null)    
  24.                 {    
  25.                     if (line !=null)    
  26.                     {     
  27.                          String[] fields=null;    
  28.                           if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\\"+tupleInfo.getDelimiter());     
  29.                           else     
  30.                           fields = line.split  (tupleInfo.getDelimiter());     
  31.                           if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));    
  32.                     }    
  33.                }    
  34.             }    
  35.             catch (IOException ex){ }    
  36.             }    
  37. }    
  38.    
  39. public void declareOutputFields(OutputFieldsDeclarer declarer)    
  40. {    
  41.       String[] fieldsArr = new String [tupleInfo.getFieldList().size()];    
  42.       for(int i=0; i<tupleInfo.getFieldList().size(); i++)    
  43.       {    
  44.               fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();    
  45.       }    
  46. declarer.declare(new Fields(fieldsArr));    
  47. }        

 

declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就能够用相似的方法将tuple译码。Spout持续对日志文件的数据的变动进行监听,一旦有添加Spout就会进行读入而且发送给Bolt进行处理。

Bolt的实现

Spout的输出结果将给予Bolt进行更深一步的处理。通过对用例的思考,咱们的topology中须要如Figure 3中的两个Bolt。

Figure 3:Spout到Bolt的数据流程。

ThresholdCalculatorBolt

Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:

临界值检查

  • 临界值栏数检查(拆分红字段的数目)
  • 临界值数据类型(拆分后字段的类型)
  • 临界值出现的频数
  • 临界值时间段检查

Listing Four中的类,定义用来保存这些值。

Listing Four:ThresholdInfo类

 

  1. public class ThresholdInfo implementsSerializable    
  2.    
  3. {      
  4.         private String action;     
  5.         private String rule;     
  6.         private Object thresholdValue;    
  7.         private int thresholdColNumber;     
  8.         private Integer timeWindow;     
  9.         private int frequencyOfOccurence;     
  10. }     

基于字段中提供的值,临界值检查将被Listing Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。

 

Listing Five:临界值检测代码段

 

  1. public void execute(Tuple tuple, BasicOutputCollector collector)     
  2. {    
  3.     if(tuple!=null)     
  4.     {    
  5.         List<Object> inputTupleList = (List<Object>) tuple.getValues();    
  6.         int thresholdColNum = thresholdInfo.getThresholdColNumber();     
  7.         Object thresholdValue = thresholdInfo.getThresholdValue();     
  8.         String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();     
  9.         Integer timeWindow = thresholdInfo.getTimeWindow();    
  10.          int frequency = thresholdInfo.getFrequencyOfOccurence();    
  11.          if(thresholdDataType.equalsIgnoreCase("string"))    
  12.          {    
  13.              String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();    
  14.              String frequencyChkOp = thresholdInfo.getAction();    
  15.              if(timeWindow!=null)    
  16.              {    
  17.                  long curTime = System.currentTimeMillis();    
  18.                  long diffInMinutes = (curTime-startTime)/(1000);    
  19.                  if(diffInMinutes>=timeWindow)    
  20.                  {    
  21.                      if(frequencyChkOp.equals("=="))    
  22.                      {    
  23.                           if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  24.                           {    
  25.                               count.incrementAndGet();    
  26.                               if(count.get() > frequency)    
  27.                                   splitAndEmit(inputTupleList,collector);    
  28.                           }    
  29.                      }    
  30.                      else if(frequencyChkOp.equals("!="))    
  31.                      {    
  32.                          if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  33.                          {    
  34.                               count.incrementAndGet();    
  35.                               if(count.get() > frequency)    
  36.                                   splitAndEmit(inputTupleList,collector);    
  37.                           }    
  38.                       }    
  39.                       else                         System.out.println("Operator not supported");     
  40.                   }    
  41.               }    
  42.               else   
  43.               {    
  44.                   if(frequencyChkOp.equals("=="))    
  45.                   {    
  46.                       if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  47.                       {    
  48.                           count.incrementAndGet();    
  49.                           if(count.get() > frequency)    
  50.                               splitAndEmit(inputTupleList,collector);    
  51.                           }    
  52.                   }    
  53.                   else if(frequencyChkOp.equals("!="))    
  54.                   {    
  55.                        if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))    
  56.                        {    
  57.                            count.incrementAndGet();    
  58.                            if(count.get() > frequency)    
  59.                                splitAndEmit(inputTupleList,collector);    
  60.                           }    
  61.                    }    
  62.                }    
  63.             }    
  64.             else if(thresholdDataType.equalsIgnoreCase("int") ||                     thresholdDataType.equalsIgnoreCase("double") ||                     thresholdDataType.equalsIgnoreCase("float") ||                     thresholdDataType.equalsIgnoreCase("long") ||                     thresholdDataType.equalsIgnoreCase("short"))    
  65.             {    
  66.                 String frequencyChkOp = thresholdInfo.getAction();    
  67.                 if(timeWindow!=null)    
  68.                 {    
  69.                      long valueToCheck =                          Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());    
  70.                      long curTime = System.currentTimeMillis();    
  71.                      long diffInMinutes = (curTime-startTime)/(1000);    
  72.                      System.out.println("Difference in minutes="+diffInMinutes);    
  73.                      if(diffInMinutes>=timeWindow)    
  74.                      {    
  75.                           if(frequencyChkOp.equals("<"))    
  76.                           {    
  77.                               if(valueToCheck < Double.parseDouble(thresholdValue.toString()))    
  78.                               {    
  79.                                    count.incrementAndGet();    
  80.                                    if(count.get() > frequency)    
  81.                                        splitAndEmit(inputTupleList,collector);    
  82.                               }    
  83.                           }    
  84.                           else if(frequencyChkOp.equals(">"))    
  85.                           {    
  86.                                if(valueToCheck > Double.parseDouble(thresholdValue.toString()))    
  87.                                 {    
  88.                                    count.incrementAndGet();    
  89.                                    if(count.get() > frequency)    
  90.                                        splitAndEmit(inputTupleList,collector);    
  91.                                }    
  92.                            }    
  93.                            else if(frequencyChkOp.equals("=="))    
  94.                            {    
  95.                               if(valueToCheck == Double.parseDouble(thresholdValue.toString()))    
  96.                               {    
  97.                                   count.incrementAndGet();    
  98.                                   if(count.get() > frequency)    
  99.                                       splitAndEmit(inputTupleList,collector);    
  100.                                }    
  101.                            }    
  102.                            else if(frequencyChkOp.equals("!="))    
  103.                            {    
  104.     . . .    
  105.                             }    
  106.                        }    
  107.              }    
  108.       else   
  109.           splitAndEmit(null,collector);    
  110.       }    
  111.       else   
  112.      {    
  113.            System.err.println("Emitting null in bolt");    
  114.            splitAndEmit(null,collector);    
  115.     }    
  116. }   



 

经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在咱们的用例中是DBWriterBolt。

DBWriterBolt

通过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt作了这个持久化的工做并把tuple存入了数据库。表的创建由prepare()函数完成,这也将是topology调用的第一个方法。方法的编码如Listing Six所示。

Listing Six:建表编码。

 

  1. public void prepare( Map StormConf, TopologyContext context )     
  2. {           
  3.     try     
  4.     {    
  5.         Class.forName(dbClass);    
  6.     }     
  7.     catch (ClassNotFoundException e)     
  8.     {    
  9.         System.out.println("Driver not found");    
  10.         e.printStackTrace();    
  11.     }    
  12.      
  13.     try     
  14.     {    
  15.        connection driverManager.getConnection(     
  16.            "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);    
  17.        connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();    
  18.      
  19.        StringBuilder createQuery = new StringBuilder(    
  20.            "CREATE TABLE IF NOT EXISTS "+tableName+"(");    
  21.        for(Field fields : tupleInfo.getFieldList())    
  22.        {    
  23.            if(fields.getColumnType().equalsIgnoreCase("String"))    
  24.                createQuery.append(fields.getColumnName()+" VARCHAR(500),");    
  25.            else   
  26.                createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");    
  27.        }    
  28.        createQuery.append("thresholdTimeStamp timestamp)");    
  29.        connection.prepareStatement(createQuery.toString()).execute();    
  30.      
  31.        // Insert Query    
  32.        StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");    
  33.        String tempCreateQuery = new String();    
  34.        for(Field fields : tupleInfo.getFieldList())    
  35.        {    
  36.             insertQuery.append(fields.getColumnName()+",");    
  37.        }    
  38.        insertQuery.append("thresholdTimeStamp").append(") values (");    
  39.        for(Field fields : tupleInfo.getFieldList())    
  40.        {    
  41.            insertQuery.append("?,");    
  42.        }    
  43.      
  44.        insertQuery.append("?)");    
  45.        prepStatement = connection.prepareStatement(insertQuery.toString());    
  46.     }    
  47.     catch (SQLException e)     
  48.     {           
  49.         e.printStackTrace();    
  50.     }           
  51. }    



 

数据分批次的插入数据库。插入的逻辑由Listting Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不一样类型输入的解析。

Listing Seven:数据插入的代码部分。

 

  1. public void execute(Tuple tuple, BasicOutputCollector collector)     
  2. {    
  3.     batchExecuted=false;    
  4.     if(tuple!=null)    
  5.     {    
  6.        List<Object> inputTupleList = (List<Object>) tuple.getValues();    
  7.        int dbIndex=0;    
  8.        for(int i=0;i<tupleInfo.getFieldList().size();i++)    
  9.        {    
  10.            Field field = tupleInfo.getFieldList().get(i);    
  11.            try {    
  12.                dbIndex = i+1;    
  13.                if(field.getColumnType().equalsIgnoreCase("String"))                 
  14.                    prepStatement.setString(dbIndex, inputTupleList.get(i).toString());    
  15.                else if(field.getColumnType().equalsIgnoreCase("int"))    
  16.                    prepStatement.setInt(dbIndex,    
  17.                        Integer.parseInt(inputTupleList.get(i).toString()));    
  18.                else if(field.getColumnType().equalsIgnoreCase("long"))    
  19.                    prepStatement.setLong(dbIndex,     
  20.                        Long.parseLong(inputTupleList.get(i).toString()));    
  21.                else if(field.getColumnType().equalsIgnoreCase("float"))    
  22.                    prepStatement.setFloat(dbIndex,     
  23.                        Float.parseFloat(inputTupleList.get(i).toString()));    
  24.                else if(field.getColumnType().equalsIgnoreCase("double"))    
  25.                    prepStatement.setDouble(dbIndex,     
  26.                        Double.parseDouble(inputTupleList.get(i).toString()));    
  27.                else if(field.getColumnType().equalsIgnoreCase("short"))    
  28.                    prepStatement.setShort(dbIndex,     
  29.                        Short.parseShort(inputTupleList.get(i).toString()));    
  30.                else if(field.getColumnType().equalsIgnoreCase("boolean"))    
  31.                    prepStatement.setBoolean(dbIndex,     
  32.                        Boolean.parseBoolean(inputTupleList.get(i).toString()));    
  33.                else if(field.getColumnType().equalsIgnoreCase("byte"))    
  34.                    prepStatement.setByte(dbIndex,     
  35.                        Byte.parseByte(inputTupleList.get(i).toString()));    
  36.                else if(field.getColumnType().equalsIgnoreCase("Date"))    
  37.                {    
  38.                   Date dateToAdd=null;    
  39.                   if (!(inputTupleList.get(i) instanceof Date))      
  40.                   {      
  41.                        DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");    
  42.                        try     
  43.                        {    
  44.                            dateToAdd = df.parse(inputTupleList.get(i).toString());    
  45.                        }    
  46.                        catch (ParseException e)     
  47.                        {    
  48.                            System.err.println("Data type not valid");    
  49.                        }    
  50.                    }      
  51.                    else   
  52.                    {    
  53.             dateToAdd = (Date)inputTupleList.get(i);    
  54.             java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());    
  55.             prepStatement.setDate(dbIndex, sqlDate);    
  56.             }       
  57.             }     
  58.         catch (SQLException e)     
  59.         {    
  60.              e.printStackTrace();    
  61.         }    
  62.     }    
  63.     Date now = new Date();              
  64.     try   
  65.     {    
  66.         prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));    
  67.         prepStatement.addBatch();    
  68.         counter.incrementAndGet();    
  69.         if (counter.get()== batchSize)     
  70.         executeBatch();    
  71.     }     
  72.     catch (SQLException e1)     
  73.     {    
  74.         e1.printStackTrace();    
  75.     }               
  76.    }    
  77.    else   
  78.    {    
  79.         long curTime = System.currentTimeMillis();    
  80.        long diffInSeconds = (curTime-startTime)/(60*1000);    
  81.        if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)    
  82.        {    
  83.             try {    
  84.                 executeBatch();    
  85.                 startTime = System.currentTimeMillis();    
  86.             }    
  87.             catch (SQLException e) {    
  88.                  e.printStackTrace();    
  89.             }    
  90.        }    
  91.    }    
  92. }    
  93.      
  94. public void executeBatch() throws SQLException    
  95. {    
  96.     batchExecuted=true;    
  97.     prepStatement.executeBatch();    
  98.     counter = new AtomicInteger(0);    
  99. }   



 

一旦Spout和Bolt准备就绪(等待被执行),topology生成器将会创建topology并准备执行。下面就来看一下执行步骤。

在本地集群上运行和测试topology

  • 经过TopologyBuilder创建topology。
  • 使用Storm Submitter,将topology递交给集群。以topology的名字、配置和topology的对象做为参数。
  • 提交topology。

Listing Eight:创建和执行topology。

 

  1. public class StormMain    
  2. {    
  3.      public static void main(String[] args) throws AlreadyAliveException,     
  4.                                                    InvalidTopologyException,     
  5.                                                    InterruptedException     
  6.      {    
  7.           ParallelFileSpout parallelFileSpout = new ParallelFileSpout();    
  8.           ThresholdBolt thresholdBolt = new ThresholdBolt();    
  9.           DBWriterBolt dbWriterBolt = new DBWriterBolt();    
  10.           TopologyBuilder builder = new TopologyBuilder();    
  11.           builder.setSpout("spout", parallelFileSpout, 1);    
  12.           builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");    
  13.           builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");    
  14.           if(this.argsMain!=null && this.argsMain.length > 0)     
  15.           {    
  16.               conf.setNumWorkers(1);    
  17.               StormSubmitter.submitTopology(     
  18.                    this.argsMain[0], conf, builder.createTopology());    
  19.           }    
  20.           else   
  21.           {        
  22.               Config conf = new Config();    
  23.               conf.setDebug(true);    
  24.               conf.setMaxTaskParallelism(3);    
  25.               LocalCluster cluster = new LocalCluster();    
  26.               cluster.submitTopology(    
  27.               "Threshold_Test", conf, builder.createTopology());    
  28.           }    
  29.      }    
  30. }   

 

topology被创建后将被提交到本地集群。一旦topology被提交,除非被取缔或者集群关闭,它将一直保持运行不须要作任何的修改。这也是Storm的另外一大特点之一。

这个简单的例子体现了当你掌握了topology、spout和bolt的概念,将能够轻松的使用Storm进行实时处理。若是你既想处理大数据又不想遍历Hadoop的话,不难发现使用Storm将是个很好的选择。

 

 

5.  storm常见问题解答

 

1、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm作计算?

你须要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算。怎么实现spout能够参考官方的kestrel spout实现:
https://github.com/nathanmarz/storm-kestrel

若是你的数据源不支持事务性消费,那么就没法获得storm提供的可靠处理的保证,也不必实现ISpout接口中的ack和fail方法。

2、Storm为了保证tuple的可靠处理,须要保存tuple信息,这会不会致使内存OOM?

Storm为了保证tuple的可靠处理,acker会保存该节点建立的tuple id的xor值,这称为ack value,那么每ack一次,就将tuple id和ack value作异或(xor)。当全部产生的tuple都被ack的时候, ack value必定为0。这是个很简单的策略,对于每个tuple也只要占用约20个字节的内存。对于100万tuple,也才20M左右。关于可靠处理看这个:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

3、Storm计算后的结果保存在哪里?能够保存在外部存储吗?

Storm不处理计算结果的保存,这是应用代码须要负责的事情,若是数据不大,你能够简单地保存在内存里,也能够每次都更新数据库,也能够采用NoSQL存储。storm并无像s4那样提供一个Persist API,根据时间或者容量来作存储输出。这部分事情彻底交给用户。

数据存储以后的展示,也是你须要本身处理的,storm UI只提供对topology的监控和统计。

4、Storm怎么处理重复的tuple?

由于Storm要保证tuple的可靠处理,当tuple处理失败或者超时的时候,spout会fail并从新发送该tuple,那么就会有tuple重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。一些可行的策略:
(1)不处理,这也算是种策略。由于实时计算一般并不要求很高的精确度,后续的批处理计算会更正实时计算的偏差。
(2)使用第三方集中存储来过滤,好比利用mysql,memcached或者redis根据逻辑主键来去重。
(3)使用bloom filter作过滤,简单高效。

5、Storm的动态增删节点

我在storm和s4里比较里谈到的动态增删节点,是指storm能够动态地添加和减小supervisor节点。对于减小节点来讲,被移除的supervisor上的worker会被nimbus从新负载均衡到其余supervisor节点上。在storm 0.6.1之前的版本,增长supervisor节点不会影响现有的topology,也就是现有的topology不会从新负载均衡到新的节点上,在扩展集群的时候很不方便,须要从新提交topology。所以我在storm的邮件列表里提了这个问题,storm的开发者nathanmarz建立了一个issue 54并在0.6.1提供了rebalance命令来让正在运行的topology从新负载均衡,具体见:
https://github.com/nathanmarz/storm/issues/54
和0.6.1的变动:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246

storm并不提供机制来动态调整worker和task数目。

6、Storm UI里spout统计的complete latency的具体含义是什么?为何emit的数目会是acked的两倍?
这个事实上是storm邮件列表里的一个问题。Storm做者marz的解答:

The complete latency is the time  from the spout emitting a tuple to that tuple being acked on the spout. So it tracks the time for the whole tuple tree to be processed.
If you dive into the spout component in the UI, you'll see that a lot of the emitted/transferred is on the __ack* stream.  This is the spout communicating with the ackers which take care of tracking the tuple trees. 


简单地说,complete latency表示了tuple从emit到被acked通过的时间,能够认为是tuple以及该tuple的后续子孙(造成一棵树)整个处理时间。其次spout的emit和transfered还统计了spout和acker之间内部的通讯信息,好比对于可靠处理的spout来讲,会在emit的时候同时发送一个_ack_init给acker,记录tuple id到task id的映射,以便ack的时候能找到正确的acker task。

 

 

 

 

6.  其余开源的大数据解决方案

 

自 Google 在 2004 年推出 MapReduce 范式以来,已诞生了多个使用原始 MapReduce 范式(或拥有该范式的质量)的解决方案。Google 对 MapReduce 的最初应用是创建万维网的索引。尽管此应用程序仍然很流行,但这个简单模型解决的问题也正在增多。

表 1 提供了一个可用开源大数据解决方案的列表,包括传统的批处理和流式处理应用程序。在将 Storm 引入开源以前将近一年的时间里,Yahoo! 的 S4 分布式流计算平台已向 Apache 开源。S4 于 2010 年 10 月发布,它提供了一个高性能计算 (HPC) 平台,向应用程序开发人员隐藏了并行处理的复杂性。S4 实现了一个可扩展的、分散化的集群架构,并归入了部分容错功能。


表 1. 开源大数据解决方案

解决方案 开发商 类型 描述
Storm Twitter 流式处理 Twitter 的新流式大数据分析解决方案
S4 Yahoo! 流式处理 来自 Yahoo! 的分布式流计算平台
Hadoop Apache 批处理 MapReduce 范式的第一个开源实现
Spark UC Berkeley AMPLab 批处理 支持内存中数据集和恢复能力的最新分析平台
Disco Nokia 批处理 Nokia 的分布式 MapReduce 框架
HPCC LexisNexis 批处理 HPC 大数据集群
相关文章
相关标签/搜索