Akka 和 Storm 都是实现低延时, 高吞吐量计算的重要工具. 不过它们并不是彻底的竞品,
若是说 Akka 是 linux 内核的话, storm 更像是相似 Ubuntu 的发行版.然而 Storm
并不是 Akka 的发行版, 或许说 Akka 比做 BSD, Storm 比做 Ubuntu 更合适.html
Akka 包括了一套 API 和执行引擎.
Storm 除了 API 和执行引擎以外,还包括了监控数据,WEB界面,集群管理,消息传递保障机制.
此文讨论 Akka 和 Storm 重合的部分,也就是 API 和 执行引擎的异同.java
咱们看下 Storm 两个主要的 APIpython
public interface ISpout extends Serializable { /** * Called when a task for this component is initialized within a worker on the cluster. * It provides the spout with the environment in which the spout executes. * * <p>This includes the:</p> * * @param conf The Storm configuration for this spout. This is the configuration provided to the topology merged in with cluster configuration on this machine. * @param context This object can be used to get information about this task's place within the topology, including the task id and component id of this task, input and output information, etc. * @param collector The collector is used to emit tuples from this spout. Tuples can be emitted at any time, including the open and close methods. The collector is thread-safe and should be saved as an instance variable of this spout object. */ void open(Map conf, TopologyContext context, SpoutOutputCollector collector); /** * Called when an ISpout is going to be shutdown. There is no guarentee that close * will be called, because the supervisor kill -9's worker processes on the cluster. * * <p>The one context where close is guaranteed to be called is a topology is * killed when running Storm in local mode.</p> */ void close(); /** * Called when a spout has been activated out of a deactivated mode. * nextTuple will be called on this spout soon. A spout can become activated * after having been deactivated when the topology is manipulated using the * `storm` client. */ void activate(); /** * Called when a spout has been deactivated. nextTuple will not be called while * a spout is deactivated. The spout may or may not be reactivated in the future. */ void deactivate(); /** * When this method is called, Storm is requesting that the Spout emit tuples to the * output collector. This method should be non-blocking, so if the Spout has no tuples * to emit, this method should return. nextTuple, ack, and fail are all called in a tight * loop in a single thread in the spout task. When there are no tuples to emit, it is courteous * to have nextTuple sleep for a short amount of time (like a single millisecond) * so as not to waste too much CPU. */ void nextTuple(); /** * Storm has determined that the tuple emitted by this spout with the msgId identifier * has been fully processed. Typically, an implementation of this method will take that * message off the queue and prevent it from being replayed. */ void ack(Object msgId); /** * The tuple emitted by this spout with the msgId identifier has failed to be * fully processed. Typically, an implementation of this method will put that * message back on the queue to be replayed at a later time. */ void fail(Object msgId); }
以及react
public interface IBasicBolt extends IComponent { void prepare(Map stormConf, TopologyContext context); /** * Process the input tuple and optionally emit new tuples based on the input tuple. * * All acking is managed for you. Throw a FailedException if you want to fail the tuple. */ void execute(Tuple input, BasicOutputCollector collector); void cleanup(); }
和 akka 中 actor 的 apilinux
trait Actor { import Actor._ // to make type Receive known in subclasses without import type Receive = Actor.Receive /** * Stores the context for this actor, including self, and sender. * It is implicit to support operations such as `forward`. * * WARNING: Only valid within the Actor itself, so do not close over it and * publish it to other threads! * * [[akka.actor.ActorContext]] is the Scala API. `getContext` returns a * [[akka.actor.UntypedActorContext]], which is the Java API of the actor * context. */ implicit val context: ActorContext = { val contextStack = ActorCell.contextStack.get if ((contextStack.isEmpty) || (contextStack.head eq null)) throw ActorInitializationException( s"You cannot create an instance of [${getClass.getName}] explicitly using the constructor (new). " + "You have to use one of the 'actorOf' factory methods to create a new actor. See the documentation.") val c = contextStack.head ActorCell.contextStack.set(null :: contextStack) c } /** * The 'self' field holds the ActorRef for this actor. * <p/> * Can be used to send messages to itself: * <pre> * self ! message * </pre> */ implicit final val self = context.self //MUST BE A VAL, TRUST ME /** * The reference sender Actor of the last received message. * Is defined if the message was sent from another Actor, * else `deadLetters` in [[akka.actor.ActorSystem]]. * * WARNING: Only valid within the Actor itself, so do not close over it and * publish it to other threads! */ final def sender(): ActorRef = context.sender() /** * This defines the initial actor behavior, it must return a partial function * with the actor logic. */ //#receive def receive: Actor.Receive //#receive /** * INTERNAL API. * * Can be overridden to intercept calls to this actor's current behavior. * * @param receive current behavior. * @param msg current message. */ protected[akka] def aroundReceive(receive: Actor.Receive, msg: Any): Unit = receive.applyOrElse(msg, unhandled) /** * Can be overridden to intercept calls to `preStart`. Calls `preStart` by default. */ protected[akka] def aroundPreStart(): Unit = preStart() /** * Can be overridden to intercept calls to `postStop`. Calls `postStop` by default. */ protected[akka] def aroundPostStop(): Unit = postStop() /** * Can be overridden to intercept calls to `preRestart`. Calls `preRestart` by default. */ protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = preRestart(reason, message) /** * Can be overridden to intercept calls to `postRestart`. Calls `postRestart` by default. */ protected[akka] def aroundPostRestart(reason: Throwable): Unit = postRestart(reason) /** * User overridable definition the strategy to use for supervising * child actors. */ def supervisorStrategy: SupervisorStrategy = SupervisorStrategy.defaultStrategy /** * User overridable callback. * <p/> * Is called when an Actor is started. * Actors are automatically started asynchronously when created. * Empty default implementation. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def preStart(): Unit = () //#lifecycle-hooks /** * User overridable callback. * <p/> * Is called asynchronously after 'actor.stop()' is invoked. * Empty default implementation. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def postStop(): Unit = () //#lifecycle-hooks /** * User overridable callback: '''By default it disposes of all children and then calls `postStop()`.''' * @param reason the Throwable that caused the restart to happen * @param message optionally the current message the actor processed when failing, if applicable * <p/> * Is called on a crashed Actor right BEFORE it is restarted to allow clean * up of resources before Actor is terminated. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def preRestart(reason: Throwable, message: Option[Any]): Unit = { context.children foreach { child ⇒ context.unwatch(child) context.stop(child) } postStop() } //#lifecycle-hooks /** * User overridable callback: By default it calls `preStart()`. * @param reason the Throwable that caused the restart to happen * <p/> * Is called right AFTER restart on the newly created Actor to allow reinitialization after an Actor crash. */ @throws(classOf[Exception]) // when changing this you MUST also change UntypedActorDocTest //#lifecycle-hooks def postRestart(reason: Throwable): Unit = { preStart() } //#lifecycle-hooks /** * User overridable callback. * <p/> * Is called when a message isn't handled by the current behavior of the actor * by default it fails with either a [[akka.actor.DeathPactException]] (in * case of an unhandled [[akka.actor.Terminated]] message) or publishes an [[akka.actor.UnhandledMessage]] * to the actor's system's [[akka.event.EventStream]] */ def unhandled(message: Any): Unit = { message match { case Terminated(dead) ⇒ throw new DeathPactException(dead) case _ ⇒ context.system.eventStream.publish(UnhandledMessage(message, sender(), self)) } } }
能够说 Storm 主要的 API 和 Actor 很是相像, 不过从时间线上看 Storm 和 Akka
都是从差很少的时间开始开发的,所以颇有可能 Storm 是做者受了 Erlang 的 Actor 实现启发而写的.
从目前的情况看来, 颇有可能做者想用 Clojure 语言写一个"朴素"的 Actor 实现, 然而这个"朴素"实现已经知足了 Storm 的设计目标, 因此做者也没有继续把 Storm 变成一个 Actor 在 clojure 上的完整实现.git
那么,仅仅是从 API 上看的话 Spout/Bolt 和 Actor 的差别有哪些呢?github
Storm 在 API 上比 Actor 多了 ack 和 fail 两个接口. 有这两个接口主要是由于 Storm 比 Akka 的应用场景更加细分(基本上只是用于统计), 因此已经作好了容错机制,能让在这个细分领域的用户达到开箱可用.算法
另外,在 Storm 的 Tuple 类中存储着一些 context 信息,也是出于目标使用场景的需求封装的.apache
context: Spout 的 open 方法里也有 context, 然而 context 在 actor 中是随时能够调用的,代表 Actor 比 Spout 更加鼓励用户使用 context, context 中的数据也会动态更新.编程
self: Actor对自身的引用,能够理解为 Actor 模型更加支持下游收到数据的组件往上游回发数据的行为,甚至本身对本身发数据也能够.在 Storm 中,咱们默认数据发送是单向的,下游接收的组件不会对上游有反馈(除了系统定义的ack,和fail)
postRestart: 区分 Actor 的第一次启动和重启, 仍是蛮有用的,Storm 没有应该是最初懒得写或者没想到,后来又不想改核心 API.
unhandled: 对没有预期到会发送给自身的消息作处理,默认是传到一个系统 stream,由于 Actor 自己是开放的,外部应用只要知道这个 Actor 的地址就能发消息给它.Storm 自己只接收你为它设计好的消息,因此没有这个需求.
Actor 和 Task 的比较, 线程调度模型的不一样, 以及代码热部署,Storm 的 ack 机制对异步代码的限制等.
Component 是 Spout 和 Bolt 的总称,是 Storm 中执行用户代码的基本组件. 共同点是都根据消息作出响应,也可以存储内容,一次只有一个线程进入,除非你手动另外开启线程.主要的区别在于 Actor 是很是轻量的组件,你能够在一个程序里建立几万个 Actor, 或者每十行代码都在一个 Actor 里, 这都没有问题. 然而换成 Storm 的Component, 状况都不同了,你最好只用若干个 Component 来描述顶层抽象.
API 很类似,为何 Actor 能够随便开新的, Component 就要尽可能少开呢? 秘密都在 Akka 的调度器(Dispatchers)里. Akka 程序的全部异步代码,包括 Actor,Future,Runnable 甚至ParIterable,能够说除了你要用主线程启动ActorSystem外,其余全部线程均可以交给Dispatcher管理.Dispatcher 能够自定义,默认的状况下采用了 "fork-join-executor",相对于通常的线程池,fork-join-executor 特别适合 Actor模型,能够提供至关优异的性能.
相比较的, Storm 的线程调度模型就要"朴素"不少,就是每一个 Component 一个线程,或者若干个Component轮流共用一个线程,这也就是为何Component不能开太多的缘由.
实时计算方面,热部署的需求主要是诸如修改排序算法之类的,替换某个算法模块,其余东西不变.
由于 Storm 是能够经过 Thrift 支持任何语言编程的,因此你若是是用python之类的脚本语言写的算法,想要换掉算法而不重启,那只要把每台机器上相应位置的py文件替换掉就行了.不过这样就会让程序限定在用此类语言实现.
Akka 方面, 由于 Actor 模型对进程内和进程间的通讯接口都是统一的, 能够负责算法的一类 Actor 做为单独的进程启动,代码更新了就重启这个进程. 虽然系统中有一个进程重启了,可是整个系统仍是能够一刻不停地运转.
Storm 的消息保障机制是具备首创性的, 利用位亦或可以用很是小的内存,高性能地掌握数据处理过程当中的成功或失败状况. 默认的状况下,在用户的代码中只须要指定一个MessageId, Ack 机制就能愉快地跑起来了. 因此一般用户不用关心这块内容, 可是默认接口的问题就是, 一旦使用了异步程序, ack 机制就会失效,包括 schedule 和 submit runnable 等行为,都不会被 Ack 机制关心,也就是说异步逻辑执行失败了,acker也不知道. 如何能让 Storm 的 Ack 机制与异步代码和谐相处,仍是一个待探讨的问题.
我认为 Storm 的 API 是优秀的, 可靠性也是在若干年的实践中获得证明的, 然而其核心运起色制过于朴素又给人一种烈士暮年的感受. Storm 最初的使用者 Twitter 也在不久前公布了他们兼容 Storm 接口的新的解决方案 Heron, 不过并无开源. 若是有开源方案可以基于 Akka "从新实现" 一个 Storm,那将是很是使人期待的事情. 我目前发现 gearpump是其中一个.