Strom的结构
Storm与传统关系型数据库
传统关系型数据库是先存后计算,而storm则是先算后存,甚至不存
传统关系型数据库很难部署实时计算,只能部署定时任务统计分析窗口数据
关系型数据库重视事务,并发控制,相对来讲Storm比较简陋
Storm与Hadoop,Spark等是流行的大数据方案
与Storm关系密切的语言:核心代码用clojure书写,实用程序用python开发,使用java开发拓扑
topology
Storm集群中有两种节点,一种是控制节点(Nimbus节点),另外一种是工做节点(Supervisor节点)。全部Topology任务的 提交必须在Storm客户端节点上进行(须要配置 storm.yaml文件),由Nimbus节点分配给其余Supervisor节点进行处理。 Nimbus节点首先将提交的Topology进行分片,分红一个个的Task,并将Task和Supervisor相关的信息提交到 zookeeper集群上,Supervisor会去zookeeper集群上认领本身的Task,通知本身的Worker进程进行Task的处理。
和一样是计算框架的MapReduce相比,MapReduce集群上运行的是Job,而Storm集群上运行的是Topology。可是Job在运行结束以后会自行结束,Topology却只能被手动的kill掉,不然会一直运行下去
Storm不处理计算结果的保存,这是应用代码须要负责的事情,若是数据不大,你能够简单地保存在内存里,也能够每次都更新数据库,也能够采用NoSQL存储。这部分事情彻底交给用户。
数据存储以后的展示,也是你须要本身处理的,storm UI 只提供对topology的监控和统计。
整体的Topology处理流程图为:
zookeeper集群
storm使用zookeeper来协调整个集群, 可是要注意的是storm并不用zookeeper来传递消息。因此zookeeper上的负载是很是低的,单个节点的zookeeper在大多数状况下 都已经足够了, 可是若是你要部署大一点的storm集群, 那么你须要的zookeeper也要大一点。关于如何部署zookeeper,能够看http://zookeeper.apache.org/doc /r3.3.3/zookeeperAdmin.html
部署zookeeper有些须要注意的地方:
一、对zookeeper作好监控很是重要, zookeeper是fail-fast的系统,只要出现什么错误就会退出, 因此实际场景中要监控,更多细节看http://zookeeper.apache.org/doc/r3.3.3 /zookeeperAdmin.html#sc_supervision
二、实际场景中要配置一个cron job来压缩zookeeper的数据和业务日志。zookeeper本身是不会去压缩这些的,因此你若是不设置一个cron job, 那么你很快就会发现磁盘不够用了,更多细节能够查看http://zookeeper.apache.org/doc/r3.3.3 /zookeeperAdmin.html#sc_maintenance
Component
Storm中,Spout和Bolt都是Component。因此,Storm定义了一个名叫IComponent的总接口
全家普以下:绿色部分是咱们最经常使用、比较简单的部分。红色部分是与事务相关的
Spout
Spout是Stream的消息产生源, Spout组件的实现能够经过继承BaseRichSpout类或者其余Spout类来完成,也能够经过实现IRichSpout接口来实现
public interface ISpout extends Serializable {
void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
void close();
void nextTuple();
void ack(Object msgId);
void fail(Object msgId);
}
open()方法 -- 初始化方法
close() -- 在该spout将要关闭时调用。可是不保证其必定被调用,由于在集群中supervisor节点,可使用kill -9来杀死worker进程。只有当Storm是在本地模式下运行,若是是发送中止命令,能够保证close的执行
ack(Object msgId) -- 成功处理tuple时回调的方法,一般状况下,此方法的实现是将消息队列中的消息移除,防止消息重放
fail(Object msgId) -- 处理tuple失败时回调的方法,一般状况下,此方法的实现是将消息放回消息队列中而后在稍后时间里重放
nextTuple() -- 这是Spout类中最重要的一个方法。发射一个Tuple到Topology都是经过这个方法来实现的。调用此方法时,storm向spout发出请求,让spout发出元组(tuple)到输出器(ouput collector)。这种方法应该是非阻塞的,因此spout若是没有元组发出,这个方法应该返回。nextTuple、ack 和fail 都在spout任务的同一个线程中被循环调用。 当没有元组的发射时,应该让nextTuple睡眠一个很短的时间(如一毫秒),以避免浪费太多的CPU。
继承了BaseRichSpout后,不用实现close、 activate、 deactivate、 ack、 fail 和 getComponentConfiguration 方法,只关心最基本核心的部分。
一般状况下(Shell和事务型的除外),实现一个Spout,能够直接实现接口IRichSpout,若是不想写多余的代码,能够直接继承BaseRichSpout
Bolt
Bolt类接收由Spout或者其余上游Bolt类发来的Tuple,对其进行处理。Bolt组件的实现能够经过继承BasicRichBolt类或者IRichBolt接口等来完成
prepare方法 -- 此方法和Spout中的open方法相似,在集群中一个worker中的task初始化时调用。 它提供了bolt执行的环境
declareOutputFields方法 -- 用于声明当前Bolt发送的Tuple中包含的字段(field),和Spout中相似
cleanup方法 -- 同ISpout的close方法,在关闭前调用。一样不保证其必定执行。
execute方法 -- 这是Bolt中最关键的一个方法,对于Tuple的处理均可以放到此方法中进行。具体的发送是经过emit方法来完成的。execute接受一个tuple进行处理,并用prepare方法传入的OutputCollector的ack方法(表示成功)或fail(表示失败)来反馈处理结果。
Storm提供了IBasicBolt接口,其目的就是实现该接口的Bolt不用在代码中提供反馈结果了,Storm内部会自动反馈成功。若是你确实要反馈失败,能够抛出FailedException
一般状况下,实现一个Bolt,能够实现IRichBolt接口或继承BaseRichBolt,若是不想本身处理结果反馈,能够实现 IBasicBolt接口或继承BaseBasicBolt,它实际上至关于自动实现了collector.emit.ack(inputTuple)
Topology运行流程
(1)Storm提交后,会把代码首先存放到Nimbus节点的inbox目录下,以后,会把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的stormdist目录中,在此目录中同时还有序列化以后的Topology代码文件
(2) 在设定Topology所关联的Spouts和Bolts时,能够同时设置当前Spout和Bolt的executor数目和task数目,默认状况下,一个Topology的task的总和是和executor的总和一致的。以后,系统根据worker的数目,尽可能平均的分配这些task的执行。worker在哪一个supervisor节点上运行是由storm自己决定的
(3)任务分配好以后,Nimbus节点会将任务的信息提交到zookeeper集群,同时在zookeeper集群中会有workerbeats节点,这里存储了当前Topology的全部worker进程的心跳信息
(4)Supervisor 节点会不断的轮询zookeeper集群,在zookeeper的assignments节点中保存了全部Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor经过轮询此节点的内容,来领取本身的任务,启动worker进程运行
(5)一个Topology运行以后,就会不断的经过Spouts来发送Stream流,经过Bolts来不断的处理接收到的Stream流,Stream流是无界的。
最后一步会不间断的执行,除非手动结束Topology。
Topology运行方式
在开始建立项目以前,了解Storm的操做模式(operation modes)是很重要的。 Storm有两种运行方式
本地运行的提交方式,例:
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());
Thread.sleep(2000);
cluster.shutdown();
分布式提交方式,例:
StormSubmitter.submitTopology(TOPOLOGY_NAME, conf, builder.createTopology());
须要注意的是,在Storm代码编写完成以后,须要打包成jar包放到Nimbus中运行,打包的时候,不须要把依赖的jar都打迚去,不然若是把依赖的storm.jar包打进去的话,运行时会出现重复的配置文件错误致使Topology没法运行。由于Topology运行以前,会加载本地的 storm.yaml 配置文件。
运行的命令以下: storm jar StormTopology.jar mainclass [args]
storm守护进程的命令
Nimbus: storm nimbus 启动nimbus守护进程
Supervisor: storm supervisor 启动supervisor守护迚程
UI:storm ui 这将启动stormUI的守护进程,为监测storm集群提供一个基于web的用户界面。
DRPC: storm drpc 启动DRPC的守护进程
storm管理命令
JAR:storm jar topology_jar topology_class [arguments...]
jar命令是用于提交一个集群拓扑.它运行指定参数的topology_class中的main()方法,上传topology_jar到nimbus,由nimbus发布到集群中。一旦提交,storm将激活拓扑并开始处理topology_class 中的main()方法,main()方法负责调用StormSubmitter.submitTopology()方法,并提供一个惟一的拓扑(集群)的名。若是一个拥有该名称的拓扑已经存在于集群中,jar命令将会失败。常见的作法是在使用命令行参数来指定拓扑名称,以便拓扑在提交的时候被命名。
KILL:storm kill topology_name [-w wait_time]
杀死一个拓扑,可使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内容许拓扑完成当前的数据流。执行kill命令时能够经过-w [等待秒数]指定拓扑停用之后的等待时间。也能够在Storm UI 界面上实现一样的功能
Deactivate:storm deactivate topology_name
停用拓扑时,全部已分发的元组都会获得处理,spouts的nextTuple方法将不会被调用。也能够在Storm UI 界面上实现一样的功能
Activate:storm activate topology_name
启动一个停用的拓扑。也能够在Storm UI 界面上实现一样的功能
Rebalance:storm rebalance topology_name [-w wait_time] [-n worker_count] [-e component_name=executer_count]...
rebalance使你从新分配集群任务。这是个很强大的命令。好比,你向一个运行中的集群增长了节点。rebalance命令将会停用拓扑,而后在相应超时时间以后重分配worker,并重启拓扑
例:storm rebalance wordcount-topology -w 15 -n 5 -e sentence-spout=4 -e split-bolt=8
还有其余管理命令,如:Remoteconfvalue、REPL、Classpath等
新建storm项目注意事项
为了开发storm项目,你的classpath里面须要有storm的jar包。最推荐的方式是使用Maven,不使用maven的话你能够手动把storm发行版里面的全部的jar包添加到classpath storm-starter项目使用Leiningen做为build和依赖管理工具,你能够下载这个脚本(https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein)来安装Leiningen, 把它加入到你的PATH, 使它可执行。要拉取storm的全部依赖包,简单地在项目的根目录执行 lein deps 就能够了