简介:html
Storm是一个免费开源、分布式、高容错的实时计算系统。它与其余大数据解决方案的不一样之处在于它的处理方式。Hadoop 在本质上是一个批处理系统,数据被引入 Hadoop 文件系统 (HDFS) 并分发到各个节点进行处理。当处理完成时,结果数据返回到 HDFS 供始发者使用。Hadoop的高吞吐,海量数据处理的能力使得人们能够方便地处理海量数据。可是,Hadoop的缺点也和它的优势一样鲜明——延迟大,响应缓慢,运维复杂。Storm就是为了弥补Hadoop的实时性为目标而被创造出来。Storm 支持建立拓扑结构来转换没有终点的数据流。不一样于 Hadoop 做业,这些转换从不中止,它们会持续处理到达的数据。Storm常常用于在实时分析、在线机器学习、持续计算、分布式远程调用和ETL等领域。Storm的部署管理很是简单,并且,在同类的流式计算工具,Storm的性能也是很是出众的。java
Strom的优势:git
Storm的组成:github
在介绍Storm前咱们先来看下它与Hadoop的对比:数据库
Storm主要分为两种组件Nimbus和Supervisor。这两种组件都是快速失败的,没有状态。任务状态和心跳信息等都保存在Zookeeper上的,提交的代码资源都在本地机器的硬盘上。Storm中的一些概念:apache
下图描述了Nimbus、Supervisor、Worker、Task、Zookeeper这几个角色之间的关系:编程
在Storm中,一个实时应用的计算任务被打包做为Topology发布,这同Hadoop的MapReduce任务类似。可是有一点不一样的是:在Hadoop中,MapReduce任务最终会执行完成后结束;而在Storm中,Topology任务一旦提交后永远不会结束,除非你显示去中止任务。计算任务Topology是由不一样的Spouts和Bolts,经过数据流(Stream)链接起来的图。下面是一个Topology的结构示意图:api
Topology中每个计算组件(Spout和Bolt)都有一个并行执行度,在建立Topology时能够进行指定,Storm会在集群内分配对应并行度个数的线程来同时执行这一组件。既然对于一个Spout或Bolt,都会有多个task线程来运行,那么如何在两个组件(Spout和Bolt)之间发送tuple元组呢?Storm提供了若干种数据流分发(Stream Grouping)策略用来解决这一问题。在Topology定义时,须要为每一个Bolt指定接收什么样的Stream做为其输入(注:Spout并不须要接收Stream,只会发射Stream)。数组
下图是Topology的提交流程图:服务器
Storm 的一个最有趣的地方是它注重容错和管理。Storm 实现了有保障的消息处理,因此每一个元组都会经过该拓扑结构进行全面处理;若是发现一个元组还未处理,它会自动从喷嘴处重放。Storm 还实现了任务级的故障检测,在一个任务发生故障时,消息会自动从新分配以快速从新开始处理。Storm 包含比 Hadoop 更智能的处理管理,流程会由监管员来进行管理,以确保资源获得充分使用。
下图是Storm的数据交互图。能够看出两个模块Nimbus和Supervisor之间没有直接交互。状态都是保存在Zookeeper上。Worker之间经过ZeroMQ(新的消息机制使用netty代替ZeroMQ)传送数据。
Storm 使用 ZeroMQ 传送消息,这就消除了中间的排队过程,使得消息可以直接在任务自身之间流动。在消息的背后,是一种用于序列化和反序列化 Storm 的原语类型的自动化且高效的机制。
Storm的应用:
Storm被普遍应用于实时分析,在线机器学习,持续计算、分布式远程调用等领域。若是,业务场景中须要低延迟的响应,但愿在秒级或者毫秒级完成分析、并获得响应,并且但愿可以随着数据量的增大而拓展。那就能够考虑使用Storm。Storm的适用场景:
来看一些实际的应用:
参考连接:
一、淘宝搜索技术博客:storm简介 (20121009)
二、UC技术博客:Storm:最火的流式处理框架 (20130923)
认识:Topology的提交流程图 + Storm的数据交互图
发展:
从开源时候的0.5.0版本,到如今的0.8.0+,和即将到来的0.9.0+。前后添加了如下重大的新特性:
Transactional topologies和Trident都是针对实际应用中遇到的重复计数问题和应用性问题的解决方案。能够看出,实际的商用给予了Storm不少良好的反馈。
在GitHub上超过4000个项目负责人。Storm集成了许多库,支持包括Kestrel、Kafka、JMS、Cassandra、Memcached以及更多系统。随着支持的库愈来愈多,Storm更容易与现有的系统协做。
Storm的拥有一个活跃的社区和一群热心的贡献者。过去两年,Storm的发展是成功的。
当前实际应用:
总结:
使用Storm你须要加入消息队列作数据入口,考虑如何在流中保存状态,考虑怎样将大问题用分布式去解决。解决这些问题的成本可能比增长一个服务器的成本还高。可是,一旦下定决定使用了Storm并解决了那些恼人的细节,你就能享受到Storm给你带来的简单,可拓展等优点了。
本篇文章主要介绍storm的关键概念!(翻译摘取至徐明明博客)
——Storm官方文档Tutorial的解读
This page lists the main concepts of Storm and links to resources where you can find more information. The concepts discussed are:
导读:
The logic for a realtime application is packaged into a Storm topology. A Storm topology is analogous to a MapReduce job. One key difference is that a MapReduce job eventually finishes, whereas a topology runs forever (or until you kill it, of course). A topology is a graph of spouts and bolts that are connected with stream groupings. These concepts are described below.
一个实时计算应用程序的逻辑在storm里面被封装到topology对象里面, 我把它叫作计算拓补. Storm里面的topology至关于Hadoop里面的一个MapReduce Job, 它们的关键区别是:一个MapReduce Job最终老是会结束的, 然而一个storm的topoloy会一直运行 — 除非你显式的杀死它。 一个Topology是Spouts和Bolts组成的图状结构, 而连接Spouts和Bolts的则是Stream groupings。下面会有这些感念的描述。
Resources:
The stream is the core abstraction in Storm. A stream is an unbounded sequence of tuples that is processed and created in parallel in a distributed fashion. Streams are defined with a schema that names the fields in the stream’s tuples. By default, tuples can contain integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays. You can also define your own serializers so that custom types can be used natively within tuples.
消息流是storm里面的最关键的抽象。一个消息流是一个没有边界的tuple序列, 而这些tuples会被以一种分布式的方式并行地建立和处理。 对消息流的定义主要是对消息流里面的tuple的定义,( 咱们会给tuple里的每一个字段一个名字。 而且不一样tuple的对应字段的类型必须同样。 也就是说: 两个tuple的第一个字段的类型必须同样, 第二个字段的类型必须同样, 可是第一个字段和第二个字段能够有不一样的类型。) 在默认的状况下, tuple的字段类型能够是: integer, long, short, byte, string, double, float, boolean和byte array。 你还能够自定义类型 — 只要你实现对应的序列化器。
Every stream is given an id when declared. Since single-stream spouts and bolts are so common, OutputFieldsDeclarer has convenience methods for declaring a single stream without specifying an id. In this case, the stream is given the default id of “default”.
每一个消息流在定义的时候会被分配给一个id, 由于单向消息流是那么的广泛, OutputFieldsDeclarer定义了一些方法让你能够定义一个stream而不用指定这个id。在这种状况下这个stream会有个默认的id: 1.
Resources:
A spout is a source of streams in a topology. Generally spouts will read tuples from an external source and emit them into the topology (e.g. a Kestrel queue or the Twitter API). Spouts can either be reliable or unreliable. A reliable spout is capable of replaying a tuple if it failed to be processed by Storm, whereas an unreliable spout forgets about the tuple as soon as it is emitted.
消息源Spouts是storm里面一个topology里面的消息生产者。通常来讲消息源会从一个外部源读取数据而且向topology里面发出消息: tuple。 消息源Spouts能够是可靠的也能够是不可靠的。一个可靠的消息源能够从新发射一个tuple若是这个tuple没有被storm成功的处理, 可是一个不可靠的消息源Spouts一旦发出一个tuple就把它完全忘了 — 也就不可能再发了。
Spouts can emit more than one stream. To do so, declare multiple streams using thedeclareStream
method of OutputFieldsDeclarer and specify the stream to emit to when using the emit
method on SpoutOutputCollector.
消息源能够发射多条消息流stream。要达到这样的效果, 使用OutFieldsDeclarer.declareStream来定义多个stream, 而后使用SpoutOutputCollector来发射指定的sream。
The main method on spouts is nextTuple
. nextTuple
either emits a new tuple into the topology or simply returns if there are no new tuples to emit. It is imperative that nextTuple
does not block for any spout implementation, because Storm calls all the spout methods on the same thread.
Spout类里面最重要的方法是nextTuple要么发射一个新的tuple到topology里面或者简单的返回若是已经没有新的tuple了。要注意的是nextTuple方法不能block Spout的实现, 由于storm在同一个线程上面调用全部消息源Spout的方法。
The other main methods on spouts are ack
and fail
. These are called when Storm detects that a tuple emitted from the spout either successfully completed through the topology or failed to be completed. ack
and fail
are only called for reliable spouts. See the Javadoc for more information.
另外两个比较重要的Spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack, 不然调用fail。storm只对可靠的spout调用ack和fail。
Resources:
All processing in topologies is done in bolts. Bolts can do anything from filtering, functions, aggregations, joins, talking to databases, and more.
全部的消息处理逻辑被封装在bolts里面。 Bolts能够作不少事情: 过滤, 聚合, 查询数据库等等等等。
Bolts can do simple stream transformations. Doing complex stream transformations often requires multiple steps and thus multiple bolts. For example, transforming a stream of tweets into a stream of trending images requires at least two steps: a bolt to do a rolling count of retweets for each image, and one or more bolts to stream out the top X images (you can do this particular stream transformation in a more scalable way with three bolts than with two).
Bolts能够简单的作消息流的传递。复杂的消息流处理每每须要不少步骤, 从而也就须要通过不少Bolts。好比算出一堆图片里面被转发最多的图片就至少须要两步: 第一步算出每一个图片的转发数量。第二步找出转发最多的前10个图片。(若是要把这个过程作得更具备扩展性那么可能须要更多的步骤)。
Bolts can emit more than one stream. To do so, declare multiple streams using the declareStream
method of OutputFieldsDeclarer and specify the stream to emit to when using the emit
method on OutputCollector.
Bolts能够发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream, 使用OutputCollector.emit来选择要发射的stream。
When you declare a bolt’s input streams, you always subscribe to specific streams of another component. If you want to subscribe to all the streams of another component, you have to subscribe to each one individually. InputDeclarer has syntactic sugar for subscribing to streams declared on the default stream id. Saying declarer.shuffleGrouping("1")
subscribes to the default stream on component “1” and is equivalent to declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)
.
The main method in bolts is the execute
method which takes in as input a new tuple. Bolts emit new tuples using the OutputCollector object. Bolts must call the ack
method on the OutputCollector
for every tuple they process so that Storm knows when tuples are completed (and can eventually determine that its safe to ack the original spout tuples). For the common case of processing an input tuple, emitting 0 or more tuples based on that tuple, and then acking the input tuple, Storm provides an IBasicBolt interface which does the acking automatically.
Its perfectly fine to launch new threads in bolts that do processing asynchronously.OutputCollector is thread-safe and can be called at any time.
Bolts的主要方法是execute, 它以一个tuple做为输入,Bolts使用OutputCollector来发射tuple, Bolts必需要为它处理的每个tuple调用OutputCollector的ack方法,以通知storm这个tuple被处理完成了。– 从而咱们通知这个tuple的发射者Spouts。 通常的流程是: Bolts处理一个输入tuple, 发射0个或者多个tuple, 而后调用ack通知storm本身已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。
Resources:
Part of defining a topology is specifying for each bolt which streams it should receive as input. A stream grouping defines how that stream should be partitioned among the bolt’s tasks.
定义一个Topology的其中一步是定义每一个bolt接受什么样的流做为输入。stream grouping就是用来定义一个stream应该若是分配给Bolts上面的多个Tasks。
There are seven built-in stream groupings in Storm, and you can implement a custom stream grouping by implementing the CustomStreamGrouping interface:
storm里面有7种类型的stream grouping:
emit
method in OutputCollector (which returns the task ids that the tuple was sent to).Resources:
setBolt
is called on TopologyBuilder
and is used for declaring a bolt’s input streams and how those streams should be groupedStorm guarantees that every spout tuple will be fully processed by the topology. It does this by tracking the tree of tuples triggered by every spout tuple and determining when that tree of tuples has been successfully completed. Every topology has a “message timeout” associated with it. If Storm fails to detect that a spout tuple has been completed within that timeout, then it fails the tuple and replays it later.
storm保证每一个tuple会被topology完整的执行。storm会追踪由每一个spout tuple所产生的tuple树(一个bolt处理一个tuple以后可能会发射别的tuple从而能够造成树状结构), 而且跟踪这棵tuple树何时成功处理完。每一个topology都有一个消息超时的设置, 若是storm在这个超时的时间内检测不到某个tuple树到底有没有执行成功, 那么topology会把这个tuple标记为执行失败,而且过一会会从新发射这个tuple。
To take advantage of Storm’s reliability capabilities, you must tell Storm when new edges in a tuple tree are being created and tell Storm whenever you’ve finished processing an individual tuple. These are done using the OutputCollector object that bolts use to emit tuples. Anchoring is done in the emit
method, and you declare that you’re finished with a tuple using the ack
method.
为了利用storm的可靠性特性,在你发出一个新的tuple以及你完成处理一个tuple的时候你必需要通知storm。这一切是由OutputCollector来完成的。经过它的emit方法来通知一个新的tuple产生了, 经过它的ack方法通知一个tuple处理完成了。
This is all explained in much more detail in Guaranteeing message processing.
Each spout or bolt executes as many tasks across the cluster. Each task corresponds to one thread of execution, and stream groupings define how to send tuples from one set of tasks to another set of tasks. You set the parallelism for each spout or bolt in the setSpout
and setBolt
methods of TopologyBuilder.
每个Spout和Bolt会被看成不少task在整个集群里面执行。每个task对应到一个线程,而stream grouping则是定义怎么从一堆task发射tuple到另一堆task。你能够调用TopologyBuilder.setSpout()和TopBuilder.setBolt()来设置并行度 — 也就是有多少个task。
Topologies execute across one or more worker processes. Each worker process is a physical JVM and executes a subset of all the tasks for the topology. For example, if the combined parallelism of the topology is 300 and 50 workers are allocated, then each worker will execute 6 tasks (as threads within the worker). Storm tries to spread the tasks evenly across all the workers.
一个topology可能会在一个或者多个工做进程里面执行,每一个工做进程执行整个topology的一部分。好比对于并行度是300的topology来讲,若是咱们使用50个工做进程来执行,那么每一个工做进程会处理其中的6个tasks(其实就是每一个工做进程里面分配6个线程)。storm会尽可能均匀的工做分配给全部的工做进程。
Resources:
storm里面有一堆参数能够配置来调整nimbus, supervisor以及正在运行的topology的行为, 一些配置是系统级别的, 一些配置是topology级别的。全部有默认值的 配置的 默认配置 是配置在default.xml里面的。你能够经过定义个storm.xml在你的classpath里面来覆盖这些默认配置。而且你也能够在代码里面设置一些topology相关的配置信息 – 使用StormSubmitter。固然,这些配置的优先级是: default.xml < storm.xml < TOPOLOGY-SPECIFIC配置。