storm原理剖析

为何用Storm

storm是一个免费、开源的分布式实时计算框架。它让你更方便、可靠的处理实时发送的消息。若是你以前了解过hadoop,应该知道hadoop能很快速、方便的帮你完成批量数据处理,而storm能够认为是实时数据处理领域的hadoop。storm简单,虽然他是用jvm之上的clojure编写的,可是一样支持非jvm语言。html

若是你不知道是否该使用storm,你能够先看看你有没有过这些需求:java

  1. 实时数据分析
  2. 在线机器学习
  3. 实时计算
  4. 分布式rpc框架

若是你有其中某项需求,那么恭喜你,storm能够帮到你。storm性能好、可伸缩性强、容错能力好,而且能保证消息的可靠性。这些特色足以你拥有使用storm的理由。git

介绍

要了解storm,首先须要了解这些概念:github

  1. Topologies
  2. Streams
  3. Spouts
  4. Bolts
  5. Stream groupings
  6. Reliability
  7. Tasks
  8. Workers

Topologies

Topology

storm中的实时处理的应用会被打包成topology,这个topology由一系列stream(数据流)、spout(数据流生产者)、bolt(数据处理逻辑)组成。相似hadoop中作mapreduce的job,有个区别就是mapreduce job会结束,而topology只要你不手动kill掉,它永远也不会结束。apache

Streams

Stream是数据流,有无穷无尽的tuple组成,而tuple则包含了用户发送的具体数据,好比整数、小数、字符串等,也能够包含自定义的数据类型,前提是你要为它实现序列化。api

Spouts

Spout是数据流Stream的生产者。一般spout会从外部数据源(kafka等)读取数据tuple,并将它emit(发送)到topology中。网络

Spout中最主要的方法是nextTuple。nextTuple一般会生成一个新的tuple,而后emit到topology。因为storm会在一个线程中调用全部spout的nextTuple方法,因此千万不要让这个方法阻塞掉。尽可能保持spout只处理数据的发送,不要让它处理业务逻辑。数据结构

Bolts

Bolt处理topology中全部的运算、业务逻辑,若是逻辑复杂,一般使用多个bolt也能很好的解决。bolt会订阅spout或者其余bolt发送的tuple,而整个应用可能会有多个spout和bolt,他们组成一块儿就会造成一个图装结构,也就是topology。并发

bolt中最主要的方法是execute,它会从订阅的spout或者bolt获取tuple,从tuple从取出数据,作响应的逻辑处理,而后生成新的tuple给emit出去。若是这个bolt是topology中最后一个bolt节点,就没有必要继续emit,而是本身来处理数据的归属。负载均衡

Stream groupings

Stream groupings就是数据流分组,它定义了tuple该如何分发给bolt中不一样的task。好比,一个topology中有ASpout和订阅了Apout的Abolt,为了保持并发量,给Abolt设置了4个task。数据流分组会决定Aspout发送出来的tuple,会怎样分配到4个task中

目前storm定义了八种不一样的分组方式:

  1. Shuffle grouping:随机分组。随机分配给不一样的task,保证最后每一个task接受到的tuple数量均等
  2. Fields grouping:按字段分组。好比tuple中存在名为user-id的字段,那么全部该字段所在的tuple都会被分配到同一个task上。
  3. Partial Key grouping:部分key分组。同字段分组,惟一的区别是,它会在不一样的task之间作负载均衡,保证tuple均匀分配。
  4. All grouping:全复制分组。将tuple复制后发给全部订阅的bolt,这种会致使网络传输量较大,当心使用。
  5. Global grouping:全局分组。将tuple发送给id最小的task。
  6. None grouping:不分组。目前实现上等同随机分组。
  7. Direct grouping:指向型分组。经过emitDirect(id,tuple)发给指定id的task。
  8. Local or shuffle grouping:本地或者随机分组。若是同一个work内有目标bolt的task,会在这几个task中作随机分发。其余状况下,采用随机分组方式。这种分组实现的目的是减小网络传输,尽可能选择本地的task作随机分发,若是没有再选择远程task。

Reliability

storm提供了可靠的和不可靠的实时处理方式,须要本身经过api指定。经过追踪tuple树中的消息传递,spout能够保证一旦消息丢失或者传送超时,就会重发。具体能够参见后面描述。

Tasks

每一个spout和bolt均可以被分解成多个task,运行在不一样的线程中,经过并发执行保持高效。对应的api为TopologyBuilder的setSpout和setBolt方法。

Workers

topology能够运行在多个worker进程中,每一个worker进程都是一个独立的jvm,每一个进程里面运行着不少task

storm如何序列化

Storm采用Kryo做为序列化框架。默认状况下,Storm支持基本数据类型, strings, byte arrays, ArrayList, HashMap, HashSet,以及Clojure的集合类型。若是你但愿在tuple中存储自定义数据类型,保证它能在topology中传递,你就须要注册自定义数据类型。

官网提供了两种注册方式。

  1. 配置文件
  2. Config对象的registerSerialization方法

任选一种方式,将自定义数据结构注册进去,就能使用FieldsSerializer来序列化(kryo提供),不然就要本身提供序列化方式。

若是tuple中存储的数据没有注册过,就会采用默认java序列化方案,若是它没法没java序列化方案处理,storm会抛出异常。为了性能考虑,若是存在自定义数据,最好使用storm提供的方案注册,采用kryo等优秀的序列化方案。不然,java序列化的性能开销很是大。

storm并发机制

要理解storm的并发机制,首先得理解下面几个概念:

  1. Workers:每一个woker是topology中独立的jvm进程
  2. Executors:executors是woker中运行的线程,执行具体task
  3. Tasks:每一个task至关于spout或者bolt实例

并发topology

如图所示,该topology设置了2个wokers,10个executors,12个task。这些资源会被平均分配。

其中,executors数量不能多于task数量,这样就保证了每一个executor至少会分配到一个task。默认状况executors数量等于task数量。

若是用户但愿改变task并行能力,能够经过改变executors数量来实现。之因此没有仅仅使用task来表明线程,而是引入executor,主要是考虑到在一个运行的topology中,task数量没法改变,由于一旦改变,可能致使Fields grouping这种分组方式出现bug。

举个例子,用户但愿消息A分配到某个task,而且之后都由这个task接收,那采用Fields grouping能够将获取消息id,取n = hash(id) % task数量,n就是A要去的task编号。若是运行期间能够修改task数量,那么n可能会发生变化,带来的影响就是A会跑去另一个task。很明显,这是不容许的。

随着executor的引入,用户能够根据本身的需求,在topology运行时调整task的并行能力,更加自由灵活。(不过jstorm取消了executor这个语义,转而采用task来表明任务和线程,主要考虑到storm这种模型的实现复杂性与收益微小性,每每大部分人采用默认配置)

storm可靠性保证

storm提供了三个级别的消息处理保障机制:

  1. 尽可能保障消息发送
  2. 保障消息至少发送一次
  3. 保障消息发送而且仅发一次

尽可能保障消息发送

这是最简单的模式,就是发送消息,丢了就不作处理。

保障消息至少发送一次

storm提供了一种api保证每一个tuple都会被完整的处理。要保证storm的消息可靠性,就得保证spout和bolt两个角色的可靠性。

  • spout可靠性:在nextTuple中经过SpoutOutputCollector来emit消息的时候加上消息id,如
_collector.emit(new Values("field1", "field2", 3) , msgId);

加上msgId就将emit出去的tuple打上了标识,一旦tuple在timeout(默认为30s)时间范围内被彻底处理,系统就会调用ack(msgId),表示表示已经被完整的处理过了,不然就调用fail(msgId)作重发处理。(ack和fail方法都得本身实现,一般ack回复数据来源消息已经被处理,fail作重发操做

  • bolt可靠性:一般bolt都会读入tuple,接着取出数据,最后emit新的tuple。bolt要保证可靠性,首先 须要在emit时锚定读入的tuple和新生成的tuple。

    _collector.emit(tuple, new Values(word));

    接着根据消息处理成功或者失败的状况分别作ack或者fail调用。

    //_collector是SpoutOutputCollector的对象
    	if(success){
    		_collector.ack(tuple);
    	}else{
    		_collector.ack(tuple);
    	}

那么,storm是怎样保障消息的可靠性的呢?

要了解实现原理,首先得有tuple树的背景知识,经过下面一幅图来看看tuple树的处理流程。

tuple树

假设图中A是Aspout发送的tuple,BC是Bbolt发送的tuple,DE是Cbolt发送的tuple,Bbolt订阅Aspout,Cbolt订阅Bbolt。

  1. 当Aspout emit A时,会将A加入tuple树,当Bbolt接收到A,锚定新生成的tuple B和C时,会将B和C加入tuple树,最后ack(A),因而A在tuple树中标记为已处理。目前的状态就是上图左边所示。
  2. 当Cbolt接收到C之后,通过处理,会锚定新生成的D和E,接着ack(C),因而C也被标记为已处理。目前的状态就是上图右边所示。

storm的topology中会运行名为acker的任务,acker会监控这个tuple树,当发现tuple树中全部的tuple都被完整的处理过了,而且没有新的tuple生成,就会调用spout的ack方法,表示消息被成功处理。不然就调用fail方法。

当topology越大,tuple树也就越大,完整的在内存生成这个tuple树并跟踪它是不现实的。storm经过一个很巧妙的方式,来实现对tuple树的监控。 acker存储了一个64位的数字,名为ack val。当新加入一个tuple时,会生成一个64位随机数字做为id,让tuple被emit时,会将更新ack val为ack val xor id。当ack(tuple)时,也会按一样的方法更新ack val。根据xor的原理,若是acker发现最后ack val的值变为0,则说明全部生成的tuple都被ack,也就是tuple树中全部的tuple都已经被处理了。经过这种方式,storm保证了acker能高效的识别消息是否被完整的处理。

保障消息发送而且仅发一次

由storm的高级api Trident来保证消息不会丢失,而且不会多发。具体信息本章不会描述,会在后面继续补充。

storm高可用性(HA)

  • 若是woker挂了,supervisor会从新建立
  • 若是机器节点挂了,nimbus会把该节点上的task转移到其余节点
  • 若是nimbus或者supervisor挂了,重启就好了。nimbus和supervisor被设计成无状态,状态都被存到zookeeper里面了
  • 为防止nimbus挂掉,worker节点也挂掉,致使任务没法被nimbus转移到其余机器。nimbus也被设计成HA的,利用主从结构保证主节点挂了以后从节点同样能服务

学习资料

相关文章
相关标签/搜索