storm是一个免费、开源的分布式实时计算框架。它让你更方便、可靠的处理实时发送的消息。若是你以前了解过hadoop,应该知道hadoop能很快速、方便的帮你完成批量数据处理,而storm能够认为是实时数据处理领域的hadoop。storm简单,虽然他是用jvm之上的clojure编写的,可是一样支持非jvm语言。html
若是你不知道是否该使用storm,你能够先看看你有没有过这些需求:java
若是你有其中某项需求,那么恭喜你,storm能够帮到你。storm性能好、可伸缩性强、容错能力好,而且能保证消息的可靠性。这些特色足以你拥有使用storm的理由。git
要了解storm,首先须要了解这些概念:github
storm中的实时处理的应用会被打包成topology,这个topology由一系列stream(数据流)、spout(数据流生产者)、bolt(数据处理逻辑)组成。相似hadoop中作mapreduce的job,有个区别就是mapreduce job会结束,而topology只要你不手动kill掉,它永远也不会结束。apache
Stream是数据流,有无穷无尽的tuple组成,而tuple则包含了用户发送的具体数据,好比整数、小数、字符串等,也能够包含自定义的数据类型,前提是你要为它实现序列化。api
Spout是数据流Stream的生产者。一般spout会从外部数据源(kafka等)读取数据tuple,并将它emit(发送)到topology中。网络
Spout中最主要的方法是nextTuple。nextTuple一般会生成一个新的tuple,而后emit到topology。因为storm会在一个线程中调用全部spout的nextTuple方法,因此千万不要让这个方法阻塞掉。尽可能保持spout只处理数据的发送,不要让它处理业务逻辑。数据结构
Bolt处理topology中全部的运算、业务逻辑,若是逻辑复杂,一般使用多个bolt也能很好的解决。bolt会订阅spout或者其余bolt发送的tuple,而整个应用可能会有多个spout和bolt,他们组成一块儿就会造成一个图装结构,也就是topology。并发
bolt中最主要的方法是execute,它会从订阅的spout或者bolt获取tuple,从tuple从取出数据,作响应的逻辑处理,而后生成新的tuple给emit出去。若是这个bolt是topology中最后一个bolt节点,就没有必要继续emit,而是本身来处理数据的归属。负载均衡
Stream groupings就是数据流分组,它定义了tuple该如何分发给bolt中不一样的task。好比,一个topology中有ASpout和订阅了Apout的Abolt,为了保持并发量,给Abolt设置了4个task。数据流分组会决定Aspout发送出来的tuple,会怎样分配到4个task中
目前storm定义了八种不一样的分组方式:
storm提供了可靠的和不可靠的实时处理方式,须要本身经过api指定。经过追踪tuple树中的消息传递,spout能够保证一旦消息丢失或者传送超时,就会重发。具体能够参见后面描述。
每一个spout和bolt均可以被分解成多个task,运行在不一样的线程中,经过并发执行保持高效。对应的api为TopologyBuilder的setSpout和setBolt方法。
topology能够运行在多个worker进程中,每一个worker进程都是一个独立的jvm,每一个进程里面运行着不少task
Storm采用Kryo做为序列化框架。默认状况下,Storm支持基本数据类型, strings, byte arrays, ArrayList, HashMap, HashSet,以及Clojure的集合类型。若是你但愿在tuple中存储自定义数据类型,保证它能在topology中传递,你就须要注册自定义数据类型。
官网提供了两种注册方式。
任选一种方式,将自定义数据结构注册进去,就能使用FieldsSerializer来序列化(kryo提供),不然就要本身提供序列化方式。
若是tuple中存储的数据没有注册过,就会采用默认java序列化方案,若是它没法没java序列化方案处理,storm会抛出异常。为了性能考虑,若是存在自定义数据,最好使用storm提供的方案注册,采用kryo等优秀的序列化方案。不然,java序列化的性能开销很是大。
要理解storm的并发机制,首先得理解下面几个概念:
如图所示,该topology设置了2个wokers,10个executors,12个task。这些资源会被平均分配。
其中,executors数量不能多于task数量,这样就保证了每一个executor至少会分配到一个task。默认状况executors数量等于task数量。
若是用户但愿改变task并行能力,能够经过改变executors数量来实现。之因此没有仅仅使用task来表明线程,而是引入executor,主要是考虑到在一个运行的topology中,task数量没法改变,由于一旦改变,可能致使Fields grouping这种分组方式出现bug。
举个例子,用户但愿消息A分配到某个task,而且之后都由这个task接收,那采用Fields grouping能够将获取消息id,取n = hash(id) % task数量
,n就是A要去的task编号。若是运行期间能够修改task数量,那么n可能会发生变化,带来的影响就是A会跑去另一个task。很明显,这是不容许的。
随着executor的引入,用户能够根据本身的需求,在topology运行时调整task的并行能力,更加自由灵活。(不过jstorm取消了executor这个语义,转而采用task来表明任务和线程,主要考虑到storm这种模型的实现复杂性与收益微小性,每每大部分人采用默认配置)
storm提供了三个级别的消息处理保障机制:
这是最简单的模式,就是发送消息,丢了就不作处理。
storm提供了一种api保证每一个tuple都会被完整的处理。要保证storm的消息可靠性,就得保证spout和bolt两个角色的可靠性。
_collector.emit(new Values("field1", "field2", 3) , msgId);
加上msgId就将emit出去的tuple打上了标识,一旦tuple在timeout(默认为30s)时间范围内被彻底处理,系统就会调用ack(msgId),表示表示已经被完整的处理过了,不然就调用fail(msgId)作重发处理。(ack和fail方法都得本身实现,一般ack回复数据来源消息已经被处理,fail作重发操做)
bolt可靠性:一般bolt都会读入tuple,接着取出数据,最后emit新的tuple。bolt要保证可靠性,首先 须要在emit时锚定读入的tuple和新生成的tuple。
_collector.emit(tuple, new Values(word));
接着根据消息处理成功或者失败的状况分别作ack或者fail调用。
//_collector是SpoutOutputCollector的对象 if(success){ _collector.ack(tuple); }else{ _collector.ack(tuple); }
那么,storm是怎样保障消息的可靠性的呢?
要了解实现原理,首先得有tuple树的背景知识,经过下面一幅图来看看tuple树的处理流程。
假设图中A是Aspout发送的tuple,BC是Bbolt发送的tuple,DE是Cbolt发送的tuple,Bbolt订阅Aspout,Cbolt订阅Bbolt。
storm的topology中会运行名为acker的任务,acker会监控这个tuple树,当发现tuple树中全部的tuple都被完整的处理过了,而且没有新的tuple生成,就会调用spout的ack方法,表示消息被成功处理。不然就调用fail方法。
当topology越大,tuple树也就越大,完整的在内存生成这个tuple树并跟踪它是不现实的。storm经过一个很巧妙的方式,来实现对tuple树的监控。 acker存储了一个64位的数字,名为ack val。当新加入一个tuple时,会生成一个64位随机数字做为id,让tuple被emit时,会将更新ack val为ack val xor id。当ack(tuple)时,也会按一样的方法更新ack val。根据xor的原理,若是acker发现最后ack val的值变为0,则说明全部生成的tuple都被ack,也就是tuple树中全部的tuple都已经被处理了。经过这种方式,storm保证了acker能高效的识别消息是否被完整的处理。
由storm的高级api Trident来保证消息不会丢失,而且不会多发。具体信息本章不会描述,会在后面继续补充。