用了一段时间Storm后的笔记。发现能够记的东西很少,证实Storm挺简单的,你只要遵循一些简单的接口与原则,就能写出大规模实时消息处理的程序。git
不断更新中,请尽可能访问博客原文。github
为何用Storm 没接触前把Storm想象得很强大,接触后以为它就那样无关紧要,再后来又以为没有了所有本身作也麻烦。算法
集群管理:支持应用的部署,工做节点的管理(任务分配、HA、Scalable等),Metrics的收集。数据库
数据流的传输与路由:支持多种数据在各处理节点间自由流动(是同类方案里DAG拓扑最灵活的),基于Netty的高效传输机制,支持轮询、广播、按值分组的路由。apache
数据高可靠性的保证:支持数据流动了多个节点后,在某个节点的处理失败,能够引起数据从源头开始重传。数组
按Storm的官方说法,你也能够本身搭建许多消息队列和worker组成的网络来实现实时处理,可是:网络
乏味:你大部份开发时间花费在部署worker,部署中间队列,配置消息发送到哪里。你所关心的实时处理逻辑只占了你的代码不多的比例 。架构
脆弱:你要本身负责保持每一个worker和队列正常工做。并发
伸缩时痛苦:当worker或队列的消息吞吐量过高时,你须要从新分区,从新配置其它worker,让它们发送消息到新位置。框架
缺点 核心代码是用Clojure写成,翻看代码很是不便。其实,它如今不少新的外部模块都用Java来写了,另外阿里同窗翻写了一个JStorm。
其余流处理方案 Spark-Streaming:老是有人问为何不用Spark Stream,首先它是Micro-Batch风格的准实时方案,间隔通常设到500ms。另外,它的消息流拓扑好像没Storm那样能够随便乱入,有时候必须弄个DB或MQ来作中间传输。
Samza: Linkedin的产品,与Storm比,传输基于Apache Kafka,集群管理基于YARN,它只作处理的那块,但多了基于RocksDB的状态管理。但据《大数据日知录》说,受限于Kafka和YARN,它的拓扑也不够灵活。
自定义Spout Storm对可靠消息传输的支持程度,很大程度上依赖于Spout的实现。
并不默认就支持高可靠性的,collector emit的时候要传输msgId,要本身处理ack(msgId)和fail(msgId)函数。而不少spout其实没有这样作,只有Kafka Spout作的比较正规。
默认的,若是三十秒,消息流经的全部下游节点没有都ack完毕,或者有一个节点报fail,则触发fail(msgId)函数。
由于ack/fail的参数只有msgId,这就要Spout想在ack/fail时对上有消息源如Kafka/JMS进行ack/fail,又或者fail时想重发消息,若是须要完整的消息体而不仅是msgId才能完成时,要本身把msgId对应的消息存起来(会撑爆内存么,好在Kafka不须要)。
另外,由于每一个Spout 是单线程的,会循环的调用nextTuple()的同时,调用ack()或fail()处理结果。因此nextTuple()若是没消息时不要长期阻塞,避免把ack()也阻塞了。同时,Storm本身有个机制在nextTuple老是没消息时Sleep一下避免空循环耗CPU,但参考storm-starter里的spout,仍是直接内部等个50ms好了。在JStorm里,就改成了两条分开的线程。
另外,spout有时是每次被调用nextTuple()时主动去pull消息的,有时是被动接收push消息后存放在LinkedBlockingQueue里,netxtTuple()时从Queue里取消息的。Spout忽然crash的话,存在Queue里的消息也会丢失,除非上游消息源有ack机制来保障。
Spout还有个Max Pending的配置,若是有太多消息没有ack,它就不会再调nextTuple()。但若是上游消息源是主动Push的,消息仍是会源源不断的来,累积在queue里。
RichBolt vs BasicBolt 直接用BasicBolt,会在execute()后自动ack/fail Tuple,而RichBolt则须要自行调用ack/fail。
那何时使用RichBolt? Bolt不是在每次execute()时马上产生新消息,须要异步的发送新消息(好比聚合一段时间的数据再发送)时,又或者想异步的ack/fail原消息时就须要。
BasicBolt的prepare()里并无collector参数,只在每次execute()时传入collector。而RichBolt恰好相反,你能够在初始化时就把collector保存起来,用它在任意时候发送消息。
另外,若是用RichBolt的collector,还要考虑在发送消息时是否带上传入的Tuple,若是不带,则下游的处理节点出错也不会回溯到Spout重发。用BasicBolt则已默认带上。
Ack机制 做者是一拍脑壳想到了用20个字节来追踪每条Spout出来的消息被处理的状况,原理是XOR的时候,N XOR N=0,同一个值以任意次序XOR两次会归0,如A XOR B XOR B XOR C XOR A XOR C =0, 在发出Tuple时,就用随机产生的Tuple Id XOR一下。等接收的Bolt ack时,再XOR一下,就会归0。因此当消息以任意的顺序会流经不少节点,产生不少新Tuple,若是都被成功处理,即全部Tuple id都被以任意顺序执行了两次XOR,则这20个字节最后应该从新归0,就可判断所有ack完毕。
另外,重发是从最上游的Spout开始,若是某个bolt的操做是非幂等的,还要想一想怎么本身去实现去重。
异常处理 若是但愿上游的Spout重发消息,则在BasicBolt中抛出FailedException 或在RichBolt中直接fail掉Tuple。 其余状况下,execute()方法不该该抛出任何异常,或者你是故意抛出异常使得Topology停转。
状态管理 状态数据分两种,一种是本地历史数据,不如使用路由规则,使相同用户的数据老是路由到同一个Bolt。 一种是全局的数据。Storm彻底无论数据的持久化(Trident那块没用到不算), 《Storm 并不是彻底适合全部实时应用》 就是吐槽Storm的状态数据管理的。
不像Linkedin的Samza,Bolt若是须要历史数据,通常本身在内存里管理数据(Crash掉或节点的变化致使路由变化就没了哈),或者在本地起一个Redis/Memcached(不能与Bolt一块儿管理,路由变化的数据迁移,性能也会削弱)
对于全局数据,一样须要Cassandra之类高可扩展的NOSQL来帮忙,但此时延时会更厉害,性能瓶颈也极可能压到了Cassandra上。
定时任务 定时聚合数据之类的需求,除了本身在bolt里开定时器,还能够用以下设置,全部Bolt都定时收到一条Tick消息:
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 300);
以下函数用于判断是Tick仍是正常业务消息:
protected static boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); }
拓扑的定义 除了使用Java代码,还可使用Yaml来动态定义拓扑,见 https://github.com/ptgoetz/flux
并发度的定义及基于命令行的动态扩容见官方文档,另对于worker进程数的建议是Use one worker per topology per machine。
序列化 Tuple除了传基本类型与数组,AraayList,HashMap外,也能够传一下Java对象的。Storm使用Kyro做为序列化框架,据测比Hessian什么的都要快和小。但必定注册这些额外的Java对象的类型,不然就会使用Java默认的序列化。
参看官方文档,有两种方式注册类型,一个是storm.yaml文件,一个是Config类的registerSerialization方法。如无特殊需求,直接注册须要序列化的类就能够了,不须要本身实现一个Serializer,Kryo默认的按fields序列化的Serializer已足够。
Spout和Bolt的构造函数只会在submit Topology时调一次,而后序列化起来,直接发给工做节点,工做节点里实例化时不会被调用里,因此复杂的成员变量记得都定义成transient,在open(),prepare()里初始化及链接数据库等资源。
另外,须要实现close()函数清理资源,但该函数不承诺在worker进程被杀时保证被调用。
fields grouping的算法 按名称提取fields的值,叠加其hash值,再按当前的可选Task数量取模。因此,动态扩展Task数量,或某Task失效被重建的话,均可能让原来的分配彻底乱掉。
与其余开源技术的集成 好比External目录里的一堆,storm-contrib 里也有一堆,目前支持Jdbc,Redis,HBase,HDFS,Hive,甚至还有Esper,目标都是经过配置(好比SQL及Input/Output fields),而非代码,或尽可能少的代码,实现交互。有时也能够不必定要直接用它们,当成Example Code来看就行了。
另外,与传统的Java应用思路相比,Bolt/Spout与资源链接时,比较难实现共享链接池的概念,链接池通常都是每一个Bolt/Spout实例自用的,要正确处理其链接数量。
HA的实现 若是Worker进程失效,Supervisor进程会检查 Worker的心跳信息,从新建立。 Supervisor进程,须要用Daemon程序如monit来启动,失效时自动从新启动。 若是整个机器节点失效,Nimbus会在其余节点上从新建立。
Nimbus进程,须要用Daemon程序如monit来启动,失效时自动从新启动。 若是Nimbus进程所在的机器直接倒了,须要在其余机器上从新启动,Storm目前没有内建支持,须要本身写脚本实现。
由于Supervisor和Nimbus在进程内都不保存状态,状态都保存在本地文件和ZooKeeper,所以进程能够随便杀。 即便Nimbus进程不在了,也只是不能部署新任务,有节点失效时不能从新分配而已,不影响已有的线程。 一样,若是Supervisor进程失效,不影响已存在的Worker进程。
Zookeeper自己已是按至少三台部署的HA架构了。
运维管理 Storm UI也是用Clojure写的,比较难改,好在它提供了Restful API,能够与其余系统集成,或基于API重写一个UI。
Metrics的采样率是1/20(topology.stats.sample.rate=0.05),即Storm随机从20个事件里取出一个事件来进行统计,命中的话,counter 直接+20。
在旧版本的Storm使用旧版的ZooKeeper要启动数据清理的脚本,在新版上只要修改ZooKeeper的配置文件zoo.cfg, 默认是24小时清理一次 autopurge.purgeInterval=24。
日志的配置在logback/cluster.xml文件里,Storm的日志,自然的须要Logstash + ElasticSearch的集中式日志方案。
storm.local.dir 要本身建,并且不支持~/ 表明用户根目录。
storm.yaml的默认值在 https://github.com/apache/storm/blob/master/conf/defaults.yaml
Tunning
内部传输机制的各类配置,见文档
屏蔽ack机制,当可靠传输并非最重要时。能够把Acker数量设为0,可让Spout不要发出msgId,或者bolt发送消息时不传以前的Tuple。