原文apache
几个月以前咱们在这里讨论过[](http://www.cakesolutions.net/teamblogs/introduction-into-distributed-real-time-stream-processing)目前对于这种日渐增长的分布式流计算的需求的缘由。固然,目前也有不少的各式各样的框架被用于处理这一些问题。如今咱们会在这篇文章中进行回顾,来讨论下各类框架之间的类似点以及区别在哪里,还有就是从个人角度分析的,推荐的适用的用户场景。编程
如你所想,分布式的流处理也就是一般意义上的持续处理、数据富集以及对于无界数据的分析过程的组合。它是一个相似于MapReduce这样的通用计算模型,可是咱们但愿它可以在毫秒级别或者秒级别完成响应。这些系统常常被有向非循环图(Directed ACyclic Graphs,DAGs)来表示。安全
DAG主要功能便是用图来表示链式的任务组合,而在流处理系统中,咱们便经常用DAG来描述一个流工做的拓扑。笔者本身是从Akka的Stream中的术语获得了启发。以下图所示,数据流通过一系列的处理器从源点流动到了终点,也就是用来描述这流工做。谈到Akka的Streams,我以为要着重强调下分布式这个概念,由于即便也有一些单机的解决方案能够建立而且运行DAG,可是咱们仍然着眼于那些能够运行在多机上的解决方案。网络
在不一样的系统之间进行选择的时候,咱们主要关注到如下几点。架构
Runtime and Programming model(运行与编程模型)负载均衡
一个平台提供的编程模型每每会决定不少它的特性,而且这个编程模型应该足够处理全部可能的用户案例。这是一个决定性的因素,我也会在下文中屡次讨论。框架
Functional Primitives(函数式单元)运维
一个合格的处理平台应该可以提供丰富的可以在独立信息级别进行处理的函数,像map、filter这样易于实现与扩展的一些函数。一样也应提供像aggregation这样的跨信息处理函数以及像join这样的跨流进行操做的函数,虽然这样的操做会难以扩展。dom
State Management(状态管理)分布式
大部分这些应用都有状态性的逻辑处理过程,所以,框架自己应该容许开发者去维护、访问以及更新这些状态信息。
Message Delivery Guarantees(消息投递的可达性保证)
通常来讲,对于消息投递而言,咱们有至多一次(at most once)、至少一次(at least once)以及刚好一次(exactly once)这三种方案。
at most once
At most once投递保证每一个消息会被投递0次或者1次,在这种机制下消息颇有可能会丢失。
at least once
At least once投递保证了每一个消息会被默认投递屡次,至少保证有一次被成功接收,信息可能有重复,可是不会丢失。
exactly once
exactly once意味着每一个消息对于接收者而言正好被接收一次,保证即不会丢失也不会重复。
Failures Handling
在一个流处理系统中,错误可能常常在不一样的层级发生,譬如网络分割、磁盘错误或者某个节点莫名其妙挂掉了。平台要可以从这些故障中顺利恢复,而且可以从最后一个正常的状态继续处理而不会损害结果。
除此以外,咱们也应该考虑到平台的生态系统、社区的完备程度,以及是否易于开发或者是否易于运维等等。
运行环境与编程模型多是某个系统的最重要的特性,由于它定义了整个系统的呈现特性、可能支持的操做以及将来的一些限制等等。所以,运行环境与编程模型就肯定了系统的能力与适用的用户案例。目前,主要有两种不一样的方法来构建流处理系统,其中一个叫Native Streaming,意味着全部输入的记录或者事件都会根据它们进入的顺序一个接着一个的处理。
另外一种方法叫作Micro-Batching。大量短的Batches会从输入的记录中建立出而后通过整个系统的处理,这些Batches会根据预设好的时间常量进行建立,一般是每隔几秒建立一批。
两种方法都有一些内在的优点与不足,首先来谈谈Native Streaming。好的一方面呢是Native Streaming的表现性会更好一点,由于它是直接处理输入的流自己的,并无被一些不天然的抽象方法所限制住。同时,由于全部的记录都是在输入以后立马被处理,这样对于请求方而言响应的延迟就会优于那种Micro-Batching系统。处理这些,有状态的操做符也会更容易被实现,咱们在下文中也会描述这个特色。不过Native Streaming系统每每吞吐量会比较低,而且由于它须要去持久化或者重放几乎每一条请求,它的容错的代价也会更高一些。而且负载均衡也是一个不可忽视的问题,举例而言,咱们根据键对数据进行了分割而且想作进一步地处理。若是某些键对应的分区由于某些缘由须要更多地资源去处理,那么这个分区每每就会变成整个系统的瓶颈。
而对于Micro-Batching而言,将流切分为小的Batches不可避免地会下降整个系统的变现性,也就是可读性。而一些相似于状态管理的或者joins、splits这些操做也会更加难以实现,由于系统必须去处理整个Batch。另外,每一个Batch自己也将架构属性与逻辑这两个原本不该该被糅合在一块儿的部分相链接了起来。而Micro-Batching的优点在于它的容错与负载均衡会更加易于实现,它只要简单地在某个节点上处理失败以后转发给另外一个节点便可。最后,值得一提的是,咱们能够在Native Streaming的基础上快速地构建Micro-Batching的系统。
而对于编程模型而言,又能够分为Compositional(组合式)与Declarative(声明式)。组合式会提供一系列的基础构件,相似于源读取与操做符等等,开发人员须要将这些基础构件组合在一块儿而后造成一个指望的拓扑结构。新的构件每每能够经过继承与实现某个接口来建立。另外一方面,声明式API中的操做符每每会被定义为高阶函数。声明式编程模型容许咱们利用抽象类型和全部其余的精选的材料来编写函数式的代码以及优化整个拓扑图。同时,声明式API也提供了一些开箱即用的高等级的相似于窗口管理、状态管理这样的操做符。下文中咱们也会提供一些代码示例。
目前已经有了各类各样的流处理框架,天然也没法在本文中所有攘括。因此我必须将讨论限定在某些范围内,本文中是选择了全部Apache旗下的流处理的框架进行讨论,而且这些框架都已经提供了Scala的语法接口。主要的话就是Storm以及它的一个改进Trident Storm,还有就是当下正火的Spark。最后还会讨论下来自LinkedIn的Samza以及比较有但愿的Apache Flink。笔者我的以为这是一个很是不错的选择,由于虽然这些框架都是出于流处理的范畴,可是他们的实现手段千差万别。
Apache Storm 最初由Nathan Marz以及他的BackType的团队在2010年建立。后来它被Twitter收购而且开源出来,而且在2014年变成了Apache的顶层项目。毫无疑问,Storm是大规模流处理中的先行者而且逐渐成为了行业标准。Storm是一个典型的Native Streaming系统而且提供了大量底层的操做接口。另外,Storm使用了Thrift来进行拓扑的定义,而且提供了大量其余语言的接口。
Trident 是一个基于Storm构建的上层的Micro-Batching系统,它简化了Storm的拓扑构建过程而且提供了相似于窗口、聚合以及状态管理等等没有被Storm原生支持的功能。另外,Storm是实现了至多一次的投递原则,而Trident实现了恰巧一次的投递原则。Trident 提供了 Java, Clojure 以及 Scala 接口。
众所周知,Spark是一个很是流行的提供了相似于SparkSQL、Mlib这样内建的批处理框架的库,而且它也提供了Spark Streaming这样优秀地流处理框架。Spark的运行环境提供了批处理功能,所以,Spark Streaming毫无疑问是实现了Micro-Batching机制。输入的数据流会被接收者分割建立为Micro-Batches,而后像其余Spark任务同样进行处理。Spark 提供了 Java, Python 以及 Scala 接口。
Samza最先是由LinkedIn提出的与Kafka协同工做的优秀地流解决方案,Samza已是LinkedIn内部关键的基础设施之一。Samza重负依赖于Kafaka的基于日志的机制,两者结合地很是好。Samza提供了Compositional接口,而且也支持Scala。
最后聊聊Flink. Flink可谓一个很是老的项目了,最先在2008年就启动了,不过目前正在吸引愈来愈多的关注。Flink也是一个Native Streaming的系统,而且提供了大量高级别的API。Flink也像Spark同样提供了批处理的功能,能够做为流处理的一个特殊案例来看。Flink强调万物皆流,这是一个绝对的更好地抽象,毕竟确实是这样。
下表就简单列举了上述几个框架之间的特性:
Wordcount就比如流处理领域的HelloWorld,它可以很好地描述不一样框架间的差别性。首先看看Storm是如何编写WordCount程序的:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new Split(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); ... Map<String, Integer> counts = new HashMap<String, Integer>(); public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.containsKey(word) ? counts.get(word) + 1 : 1; counts.put(word, count); collector.emit(new Values(word, count)); }
首先来看看它的拓扑定义,在第2行那边是定义了一个Spout,也就是一个输入源。而后定义了一个Bold,也就是一个处理的组件,用于将某个句子分割成词序列。而后还定义了另外一个Bolt用来负责真实的词计算。5,8到12行省略的过程用于定义集群中使用了多少个线程来供每个组件使用。如你所见,全部的定义都是比较底层的与手动的。接下来继续看看这个8-15行,也就是真正用于WordCount的部分代码。由于Storm没有内建的状态处理的支持,因此我必须自定义这样一个本地状态,和理想的相差甚远啊。下面咱们继续看看Trident。
正如我上文中说起的,Trident是一个基于Storm的Micro-Batching的扩展,它提供了状态管理等等功能。
public static StormTopology buildTopology(LocalDRPC drpc) { FixedBatchSpout spout = ... TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"),new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); ... }
从代码中就能够看出,在Trident中就可使用一些上层的譬如each
、groupBy
这样的操做符,而且能够在Trident中内建的进行状态管理了。接下来咱们再看看Spark提供的声明式的接口,要记住,与前几个例子不一样的是,基于Spark的代码已经至关简化了,下面基本上就是要用到的所有的代码了:
val conf = new SparkConf().setAppName("wordcount") val ssc = new StreamingContext(conf, Seconds(1)) val text = ... val counts = text.flatMap(line => line.split(" ")) .map(word => (word, 1)) .reduceByKey(_ + _) counts.print() ssc.start() ssc.awaitTermination()
每一个Spark的流任务都须要一个StreamingContext
用来指定整个流处理的入口。StreamingContext
定义了Batch的间隔,上面是设置到了1秒。在6-8行便是所有的词统计的计算过程,很是不同啊。下面再看看Apache Samza,另外一个表明性的组合式的API:
class WordCountTask extends StreamTask { override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { val text = envelope.getMessage.asInstanceOf[String] val counts = text.split(" ").foldLeft(Map.empty[String, Int]) { (count, word) => count + (word -> (count.getOrElse(word, 0) + 1)) } collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"), counts)) }
Topology定义在了Samza的属性配置文件里,为了明晰起见,这里没有列出来。下面再看看Fink,能够看出它的接口风格很是相似于Spark Streaming,不过咱们没有设置时间间隔:
val env = ExecutionEnvironment.getExecutionEnvironment val text = env.fromElements(...) val counts = text.flatMap ( _.split(" ") ) .map ( (_, 1) ) .groupBy(0) .sum(1) counts.print() env.execute("wordcount")
与批处理系统相比,流处理系统中的容错机制当然的会比批处理中的要难一点。在批处理系统中,若是碰到了什么错误,只要将计算中与该部分错误关联的从新启动就行了。不过在流计算的场景下,容错处理会更加困难,由于会不断地有数据进来,而且有些任务可能须要7*24地运行着。另外一个咱们碰到的挑战就是如何保证状态的一致性,在天天结束的时候咱们会开始事件重放,固然不可能全部的状态操做都会保证幂等性。下面咱们就看看其余的系统是怎么处理的:
Storm使用了所谓的逆流备份与记录确认的机制来保证消息会在某个错误以后被从新处理。记录确认这一个操做工做以下:一个操做器会在处理完成一个记录以后向它的上游发送一个确认消息。而一个拓扑的源会保存有全部其建立好的记录的备份。一旦受到了从Sinks发来的包含有全部记录的确认消息,就会把这些确认消息安全地删除掉。当发生错误时,若是尚未接收到所有的确认消息,就会从拓扑的源开始重放这些记录。这就确保了没有数据丢失,不过会致使重复的Records处理过程,这就属于At-Least投送原则。
Storm用一套很是巧妙的机制来保证了只用不多的字节就能保存而且追踪确认消息,可是并无太多关注于这套机制的性能,从而使得Storm有较低地吞吐量,而且在流控制上存在一些问题,譬如这种确认机制每每在存在背压的时候错误地认为发生了故障。
Spark Streaming以及它的Micro-Batching机制则使用了另外一套方案,道理很简单,Spark将Micro-Batches分配到多个节点运行,每一个Micro-Batch能够成功运行或者发生故障,当发生故障时,那个对应的Micro-Batch只要简单地从新计算便可,由于它是持久化而且无状态的,因此要保证Exactly-Once这种投递方式也是很简单的。
Samza的实现手段又不同了,它利用了一套可靠地、基于Offset的消息系统,在不少状况下指的就是Kafka。Samza会监控每一个任务的偏移量,而后在接收到消息的时候修正这些偏移量。Offset能够是存储在持久化介质中的一个检查点,而后在发生故障时能够进行恢复。不过问题在于你并不知道恢复到上一个CheckPoint以后到底哪一个消息是处理过的,有时候会致使某些消息屡次处理,这也是At-Least的投递原则。
Flink主要是基于分布式快照,每一个快照会保存流任务的状态。链路中运送着大量的CheckPoint Barrier(检查点障碍,就是分隔符、标识器之类的),当这些Barrier到达某个Operator的时候,Operator将自身的检查点与流相关联。与Storm相比,这种方式会更加高效,毕竟不用对每一个Record进行确认操做。不过要注意的是,Flink仍是Native Streaming,概念上和Spark仍是相去甚远的。Flink也是达成了Exactly-Once投递原则。
大部分重要的流处理应用都会保有状态,与无状态的操做符相比,这些应用中须要一个输入和一个状态变量,而后进行处理最终输出一个改变了的状态。咱们须要去管理、存储这些状态,要保证在发生故障的时候可以重现这些状态。状态的重造可能会比较困难,毕竟上面提到的很多框架都不能保证Exactly-Once,有些Record可能被重放屡次。
Storm是实践了At-Least投递原则,而怎么利用Trident来保证Exactly-Once呢?概念上仍是很简单的,只须要使用事务进行提交Records,不过很明显这种方式及其低效。因此呢,仍是能够构建一些小的Batches,而且进行一些优化。Trident是提供了一些抽象的接口来保证明现Exactly-Once,以下图所示,还有不少东西等着你去挖掘。
当想要在流处理系统中实现有状态的操做时,咱们每每想到的是一个长时间运行的Operator,而后输入一个状态以及一系列的Records。不过Spark Streaming是以另一种方式进行处理的,Spark Streaming将状态做为一个单独地Micro-Batching流进行处理,因此在对每一个小的Micro-Spark任务进行处理时会输入一个当前的状态和一个表明当前操做的函数,最后输出一个通过处理的Micro-Batch以及一个更新好的状态。
Samza的处理方式更加简单明了,就是把它们放到Kafka中,而后问题就解决了。Samza提供了真正意义上的有状态的Operators,这样每一个任务都能保有状态,而后全部状态的变化都会被提交到Kafka中。在有须要的状况下某个状态能够很方便地从Kafka的Topic中完成重造。为了提升效率,Samza容许使用插件化的键值本地存储来避免全部的消息所有提交到Kafka。这种思路以下图所示,不过Samza只是提升了At-Least这种机制,将来可能会提供Exactly-Once。
Flink提供了相似于Samza的有状态的Operator的概念,在Flink中,咱们可使用两种不一样的状态。第一种是本地的或者叫作任务状态,它是某个特定的Operator实例的当前状态,而且这种状态不会与其余进行交互。另外一种呢就是维护了整个分区的状态。
public static StormTopology buildTopology(LocalDRPC drpc) { FixedBatchSpout spout = ... TridentTopology topology = new TridentTopology(); TridentState wordCounts = topology.newStream("spout1", spout) .each(new Fields("sentence"),new Split(), new Fields("word")) .groupBy(new Fields("word")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); ... }
在第9行中,咱们能够经过调用一个持久化的聚合函数来建立一个状态。
// Initial RDD input to updateStateByKey val initialRDD = ssc.sparkContext.parallelize(List.empty[(String, Int)]) val lines = ... val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) Some(output) } val stateDstream = wordDstream.trackStateByKey( StateSpec.function(trackStateFunc).initialState(initialRDD))
在第2行中,咱们建立了一个RDD用来保存初始状态。而后在5,6行中进行一些转换,接下来能够看出,在8-14行中,咱们定义了具体的转换方程,即输入时一个单词、它的统计数量和它的当前状态。函数用来计算、更新状态以及返回结果,最后咱们将全部的Bits一块儿聚合。
class WordCountTask extends StreamTask with InitableTask { private var store: CountStore = _ def init(config: Config, context: TaskContext) { this.store = context.getStore("wordcount-store") .asInstanceOf[KeyValueStore[String, Integer]] } override def process(envelope: IncomingMessageEnvelope, collector: MessageCollector, coordinator: TaskCoordinator) { val words = envelope.getMessage.asInstanceOf[String].split(" ") words.foreach { key => val count: Integer = Option(store.get(key)).getOrElse(0) store.put(key, count + 1) collector.send(new OutgoingMessageEnvelope(new SystemStream("kafka", "wordcount"), (key, count))) } }
在上述代码中第3行定义了全局的状态,这里是使用了键值存储方式,而且在5~6行中定义了如何初始化。而后,在整个计算过程当中咱们都使用了该状态。
val env = ExecutionEnvironment.getExecutionEnvironment val text = env.fromElements(...) val words = text.flatMap ( _.split(" ") ) words.keyBy(x => x).mapWithState { (word, count: Option[Int]) => { val newCount = count.getOrElse(0) + 1 val output = (word, newCount) (output, Some(newCount)) } }
在第6行中使用了mapWithState
函数,第一个参数是即将须要处理的单次,第二个参数是一个全局的状态。
合理的性能比较也是本文的一个重要主题之一。不一样的系统的解决方案差别很大,所以也是很难设置一个无偏的测试。一般而言,在一个流处理系统中,咱们常说的性能就是指延迟与吞吐量。这取决于不少的变量,可是整体而言标准为若是单节点每秒能处理500K的Records就是个合格的,若是能达到100万次以上就已经不错了。每一个节点通常就是指24核附带上24或者48GB的内存。
对于延迟而言,若是是Micro-Batch的话每每但愿能在秒级别处理。若是是Native Streaming的话,但愿能有百倍的减小,调优以后的Storm能够很轻易达到几十毫秒。
另外一方面,消息的可达性保证、容错以及状态管理都是须要考虑进去的。譬如若是你开启了容错机制,那么会增长10%到15%的额外消耗。除此以外,以文章中两个WordCount为例,第一个是无状态的WordCount,第二个是有状态的WordCount,后者在Flink中可能会有25%额外的消耗,而在Spark中可能有50%的额外消耗。固然,咱们确定能够经过调优来减小这种损耗,而且不一样的系统都提供了不少的可调优的选项。
还有就是必定要记住,在分布式环境下进行大数据传输也是一件很是昂贵的消耗,所以咱们要利用好数据本地化以及整个应用的序列化的调优。
在为你的应用选择一个合适的框架的时候,框架自己的成熟度与社区的完备度也是一个不可忽略的部分。Storm是第一个正式提出的流处理框架,它已经成为了业界的标准而且被应用到了像Twitter、Yahoo、Spotify等等不少公司的生产环境下。Spark则是目前最流行的Scala的库之一,而且Spark正逐步被更多的人采纳,它已经成功应用在了像Netflix、Cisco、DataStax、Indel、IBM等等不少公司内。而Samza最先由LinkedIn提出,而且正在运行在几十个公司内。Flink则是一个正在开发中的项目,不过我相信它发展的会很是迅速。
在咱们进最后的框架推荐以前,咱们再看一下上面那张图:
这个问题的回答呢,也很俗套,具体状况具体分析。总的来讲,你首先呢要仔细评估下你应用的需求而且彻底理解各个框架之间的优劣比较。同时我建议是使用一个提供了上层接口的框架,这样会更加的开发友好,而且可以更快地投入生产环境。不过别忘了,绝大部分流应用都是有状态的,所以状态管理也是不可忽略地一个部分。同时,我也是推荐那些遵循Exactly-Once原则的框架,这样也会让开发和维护更加简单。不过不能教条主义,毕竟仍是有不少应用会须要At-Least-Once与At-Most-Once这些投递模式的。最后,必定要保证你的系统能够在故障状况下很快恢复,可使用Chaos Monkey或者其余相似的工具进行测试。在咱们以前的讨论中也发现这个快速恢复的能力相当重要。
对于小型与须要快速响应地项目,Storm依旧是一个很是好的选择,特别是在你很是关注延迟度的状况下。不过仍是要谨记容错机制和Trident的状态管理会严重影响性能。Twitter目前正在设计新的流计算系统Heron用来替代Storm,它能够在单个项目中有很好地表现。不过Twitter可不必定会开源它。
对于Spark Streaming而言,若是你的系统的基础架构中已经使用了Spark,那仍是很推荐你试试的。另外一方面,若是你想使用Lambda架构,那Spark也是个不错的选择。不过你必定要记住,Micro-Batching自己的限制和延迟对于你而言不是一个关键因素。
若是你想用Samza的话,那最好Kafka已是你的基础设施的一员了。虽然在Samza中Kafka只是个可插拔的组件,不过基本上全部人都会使用Kafka。正如上文所说,Samza提供了强大的本地存储功能,可以轻松管理数十G的状态数据。不过它的At-Least-Once的投递限制也是很大一个瓶颈。
Flink目前在概念上是一个很是优秀的流处理系统,它可以知足大部分的用户场景而且提供了不少先进的功能,譬如窗口管理或者时间控制。因此当你发现你须要的功能在Spark当中没法很好地实现的时候,你能够考虑下Flink。另外,Flink也提供了很好地通用的批处理的接口,只不过你须要很大的勇气来将你的项目结合到Flink中,而且别忘了多关注关注它的路线图。
我最后一个要提到的就是Dataflow和它的开源计划。Dataflow是Google云平台的一个组成部分,是目前在Google内部提供了统一的用于批处理与流计算的服务接口。譬如用于批处理的MapReduce,用于编程模型定义的FlumeJava以及用于流计算的MillWheel。Google最近打算开源这货的SDK了,Spark与Flink均可以成为它的一个运行驱动。
本文咱们过了一遍经常使用的流计算框架,它们的特性与优劣对比,但愿能对你有用吧。