流式实时分布式计算的设计

https://blog.csdn.net/anzhsoft/article/details/38168025java

1. 流式计算的背景和特色算法


如今不少公司天天都会产生数以TB级的大数据,如何对这些数据进行挖掘,分析成了很重要的课题。好比:数据库

电子商务:须要处理而且挖掘用户行为产生的数据,产生推荐,从而带来更多的流量和收益。最理想的推荐就是根据兴趣推荐给用户原本不须要的东西!而天天处理海量的用户数据,须要一个低延时高可靠的实时流式分布式计算系统。
新闻聚合:新闻时效性很是重要,若是在一个重大事情发生后可以实时的推荐给用户,那么确定能增大用户粘性,带来可观的流量。
社交网站:你们天天都会去社交网站是为了看看如今发生了什么,周围人在作什么。流式计算能够把用户关注的热点聚合,实时反馈给用户,从而达到一个圈子的聚合效果。
交通监管部门:每一个城市的交通监管部门天天都要产生海量的视频数据,这些视频数据也是以流的形式源源不断的输系统中。实时流式计算系统须要以最快的速度来处理这些数据。
数据挖掘和机器学习:它们其实是互联网公司内部使用的系统,主要为线上服务提供数据支撑。它们能够说是互联网公司的最核心的平台之一。系统的效率是挖掘的关键,理想条件下就是天天产生的海量数据都能获得有效处理,对于原来的数据进行全量更新。
大型集群的监控:自动化运维很重要,集群监控的实时预警机制也很是重要,而流式系统对于日志的实时处理,每每是监控系统的关键。
等等。
流式实时分布式计算系统就是要解决上述问题的。这些系统的共同特征是什么?编程

很是方便的运行用户编写的计算逻辑:就如Hadoop定义了Map和Reduce的原语同样,这些系统也须要让用户关注与数据处理的具体逻辑上,他们不该该也不须要去了解这些usder defined codes是如何在分布式系统上运转起来的。由于他们仅仅关注与数据处理的逻辑,所以能够极大的提升效率。并且应该尽可能不要限制编程语言,毕竟不一样的公司甚至同一公司的不一样部门使用的语言多是千差万别的。支持多语言无疑能够抢占更多的用户。
Scale-out的设计:分布式系统天生就是scale-out的。
无数据丢失:系统须要保证无数据丢失,这也是系统高可用性的保证。系统为了无数据丢失,须要在数据处理失败的时候选择另外的执行路径进行replay(系统不是简单的从新提交运算,而是从新执行调度,不然按照来源的call stack有可能使得系统永远都在相同的地方出一样的错误)。
容错透明:用户不会也不须要关心容错。系统会自动处理容错,调度而且管理资源,而这些行为对于运行于其上的应用来讲都是透明的。
数据持久化:为了保证高可用性和无数据丢失,数据持久化是没法躲避的问题。的确,数据持久化可能在低延时的系统中比较影响性能,可是这没法避免。固然了,若是考虑到出错状况比较少,在出错的时候咱们可以忍受数据能够从头replay,那么中间的运算能够不进行持久化。注意,这只有在持久化的成本要比计算的replay高的状况下有效。通常来讲,计算的结果须要replica,固然了,可使用将数据replica到其余的节点的内存中去(这又会占用集群的网络带宽)。
超时设置:超时之因此在在这里被提出来,由于超时时间的大小设置须要重视,若是过短能够会误杀正常运行的计算,若是太长则不能快速的检测错误。还有就是对于错误的快速发现能够这类系统的一个设计要点,毕竟,超时了才发现错误不少时候在时效性上是不可接受的。网络


2. 原语设计架构


        Hadoop定义了Map和Reduce,使得应用者只须要实现MR就能够实现数据处理。而流式系统的特色,容许它们能够进行更加具体一些的原语设计。流式的数据的特色就是数据时源源不断进入系统的,而这些数据的处理通常都须要几个阶段。拿普通的日志处理来讲,咱们可能仅仅关注Error的日志,那么系统的第一个计算逻辑就是进行filer。接下来可能须要对这个日志进行分段,分段后可能交给不一样的规则处理器进行处理。所以,数据处理通常是分阶段的,能够说是一个有向无环图,或者说是一个拓扑。实际上,Spark抽象出的运算逻辑就是由RDD(Resilient Distributed Dataset)构成DAG(Directed Acyclic Graph),而Storm则有Spout和Blot构成Topology(拓扑)。并发

 

2.1 Spark的设计
       Spark Streaming是将流式计算分解成一系列短小的批处理做业。这里的批处理引擎是Spark,也就是把Spark Streaming的输入数据按照batch size(如1秒)分红一段一段的数据,每一段数据都转换成Spark中的RDD,而后将Spark Streaming中对DStream的Transformation操做变为针对Spark中对RDD的Transformation操做,将RDD通过操做变成中间结果保存在内存中。整个流式计算根据业务的需求能够对中间的结果进行叠加,或者存储到外部设备。下图显示了Spark Streaming的整个流程。框架

 

 

WordCount的例子:运维

// Create the context and set up a network input stream to receive from a host:port
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
val lines = ssc.socketTextStream(args(1), args(2).toInt)

// Split the lines into words, count them, and print some of the counts on the master
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()

// Start the computation
ssc.start()
这个例子使用Scala写的,一个简单优雅的函数式编程语言,同时也是基于JVM的后Java类语言。机器学习


2.2 Storm的设计
        Storm将计算逻辑成为Topology,其中Spout是Topology的数据源,这个数据源多是文件系统的某个日志,也多是MessageQueue的某个消息队列,也有多是数据库的某个表等等;Bolt负责数据的护理。Bolt有可能由另外两个Bolt的join而来。

       而Storm最核心的抽象Streaming就是链接Spout,Bolt以及Bolt与Bolt之间的数据流。而数据流的组成单位就是Tuple(元组),这个Tuple可能由多个Fields构成,每一个Field的含义都在Bolt的定义的时候制定。也就是说,对于一个Bolt来讲,Tuple的格式是定义好的。

 

 

2.3 原语设计的要点
流式系统的原语设计,要关注一下几点:

如何定义计算拓扑:要方便算法开发者开发算法与策略。最好的实现是定义一个算法与框架的交互方式,定义好算法的输入结构和算法的输出结构。而后拓扑可以组合不一样的算法来为用户提供一个统一的服务。计算平台最大的意义在于算法开发者不须要了解程序的运行,并发的处理,高可用性的实现,只须要提供算法与计算逻辑便可以快速可靠的处理海量的数据。
拓扑的加载与启动:对于每一个节点来讲,启动时须要加载拓扑,节点须要其余的信息,好比上游的数据来源与下游的数据输出。固然了下游的数据输出的拓扑信息能够存储到Tuple中,对于数据须要放到那里去拓扑自己是无状态的。这就取决于具体的设计了。
拓扑的在线更新:对于每一个算法逻辑来讲,更新是不可避免的,如何在不中止服务的状况下进行更新是必要的。因为实现了架构与算法的剥离,所以算法能够以一个单独的个体进行更新。能够操做以下:Master将算法实体保存到一个Worker可见的地方,好比HDFS或者是NFS或者ZK,而后经过心跳发送命令到拓扑,拓扑会暂时中止处理数据而加载新的算法实体,加载以后从新开始处理数据。数据通常都会放到buffer中,这个buffer多是一个queue。可是从外界看来,拓扑其实是一直处于服务状态的。
数据如何流动:流式系统最重要的抽象就是Streaming了。那么Steaming如何流动?实际上涉及到消息的传递和分发,数据如何从一个节点传递到另一个节点,这是拓扑定义的,具体实现能够参照第三小节。
计算的终点及结果处理:流式计算的特色就是计算一直在进行,流是源源不断的流入到系统中的。可是对于每一个数据单位来讲它的处理结果是肯定的,这个结果通常是须要返回调用者或者须要持久化的。好比处理一个时间段的交通违章,那么输入的数据是一段时间的视频监控,输出这是违章的信息,好比车牌,还有违章时刻的抓拍的图片。这个数据要么返回调用者,由调用者负责数据的处理,包括持久化等。或者是拓扑最后的节点将这些信息进行持久化。系统须要对这些常见的case进行指导性的说明,须要在Programmer Guide的sample中给出使用例子。


3. 消息传递和分发
       对于实现的逻辑来讲,它们都是有向无环图的一个节点,那么如何设计它们之间的消息传递呢?或者说数据如何流动的?由于对于分布式系统来讲,咱们不能假定整个运算都是在同一个节点上(事实上,对于闭源软件来讲,这是能够的,好比就是知足一个特定运算下的计算,计算平台也不须要作的那么通用,那么对于一个运算逻辑让他在一个节点完成也是能够了,毕竟节省了调度和网络传输的开销)。或者说,对于一个通用的计算平台来讲,咱们不能假定任何事情。

      消息传递和分发是取决于系统的具体实现的。经过对比Storm和Spark,你就明白我为何这么说了。

 

3.1 Spark的消息传递
对于Spark来讲,数据流是在经过将用户定义的一系列的RDD转化成DAG图,而后DAG Scheduler把这个DAG转化成一个TaskSet,而这个TaskSet就能够向集群申请计算资源,集群把这个TaskSet部署到Worker中去运算了。固然了,对于开发者来讲,他的任务是定义一些RDD,在RDD上作相应的转化动做,最后系统会将这一系列的RDD投放到Spark的集群中去运行。

 

3.2 Storm的消息传递      
对于Storm来讲,他的消息分发机制是在定义Topology的时候就显式定义好的。也就是说,应用程序的开发者须要清楚的定义各个Bolts之间的关系,下游的Bolt是以什么样的方式获取上游的Bolt发出的Tuple。Storm有六种消息分发模式:

Shuffle Grouping: 随机分组,Storm会尽可能把数据平均分发到下游Bolt中。
Fields Grouping:按字段分组, 好比按userid来分组, 具备一样userid的tuple会被分到相同的Bolt。这个对于相似于WordCount这种应用很是有帮助。
All Grouping: 广播, 对于每个Tuple, 全部的Bolts都会收到。这种分发模式要慎用,会形成资源的极大浪费。
Global Grouping: 全局分组, 这个Tuple被分配到storm中的一个bolt的其中一个task。这个对于实现事务性的Topology很是有用。
Non Grouping: 不分组, 这个分组的意思是说stream不关心到底谁会收到它的tuple。目前这种分组和Shuffle grouping是同样的效果, 有一点不一样的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
Direct Grouping: 直接分组,  这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪一个task处理这个消息。
3.3 消息传递要点
消息队列如今是模块之间通讯的很是通用的解决方案了。消息队列使得进程间的通讯能够跨越物理机,这对于分布式系统尤其重要,毕竟咱们不能假定进程到底是部署在同一台物理机上仍是部署到不一样的物理机上。RabbitMQ是应用比较普遍的MQ,关于RabbitMQ能够看个人一个专栏:RabbitMQ

提到MQ,不得不提的是ZeroMQ。ZeroMQ封装了Socket,引用官方的说法: “ZMQ (如下 ZeroMQ 简称 ZMQ)是一个简单好用的传输层,像框架同样的一个 socket library,他使得 Socket 编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ 的明确目标是“成为标准网络协议栈的一部分,以后进入 Linux 内核”。如今还未看到它们的成功。可是,它无疑是极具前景的、而且是人们更加须要的“传统”BSD 套接字之上的一层封装。ZMQ 让编写高性能网络应用程序极为简单和有趣。”

所以, ZeroMQ不是传统意义上的MQ。它比较适用于节点之间和节点与Master之间的通讯。Storm在0.8以前的Worker之间的通讯就是经过ZeroMQ。可是为何0.9就是用Netty替代了ZeroMQ呢?说替代不大合适,只是0.9的默认的Worker之间的通讯是使用了Netty,ZeroMQ仍是支持的。Storm官方认为ZeroMQ有如下缺点:

不容易部署,尤为是在云环境下:觉得ZMQ是以C写的,所以它仍是紧依赖于操做系统环境的。
没法限制其内存。经过JVM能够很容易的限制java所占用的内存。可是ZMQ对于Storm来讲是个黑盒似得存在。
Storm没法从ZMQ获取信息。好比Storm没法知道当前buffer中有多少数据为发送。
固然了还有所谓的性能问题,具体能够访问Netty做者的blog。结论就是Netty的性能比ZMQ(在默认配置下)好两倍。不知道所谓的ZMQ的默认配置是什么。反正我对这个结果挺惊讶。固然了,Netty使用Java实现的确方便了在Worker之间的通讯加上受权和认证机制。这个使用ZMQ的确是不太好作。

 

4. 高可用性
HA是分布式系统的必要属性。若是没有HA,其实系统是不可用的。那么若是实现HA?对于Storm来讲,它认为Master节点Nimbus是无状态的,无状态意味着能够快速恢复,所以Nimbus并无实现HA(不知道之后的Nimbus是否会实现HA,实际上使用ZooKeeper实现节点的HA是开源领域的通用作法)。为何说Nimbus是无状态的呢?由于集群全部的元数据都保存到了ZooKeeper(ZK)中。Nimbus定时从ZK获取信息,而且经过向ZK写信息来控制Worker。Worker也是经过从ZK中获取信息,经过这种方式,Worker执行从Nimbus传递过来的命令。

Storm的这种使用ZK的方式仍是很值得借鉴的。

Spark是如何实现HA的?个人另一篇文章分析过Spark的Master是怎么实现HA的:Spark技术内幕:Master基于ZooKeeper的High Availability(HA)源码实现 。

也是经过ZK的leader 选举实现的。Spark使用了百行代码的级别实现了Master的HA,因而可知ZK的功力。

 

除了这些Master的HA,还有每一个Worker的HA。或者说Worker的HA说法不太准确,所以对于集群里的工做节点来讲,它能够很是容易失败的。这里的HA能够说是如何让Worker失败后快速重启,从新提供服务。实现方式也能够由不少种。一个简单的方法就是使用一个容器(Container)启动Worker而且监控Worker的状态,若是Worker异常退出,那么就从新启动它。这个方法很简单也颇有效。

若是是节点宕机呢?上述方法确定是不能用的。这种状况下Master会检测到Worker的心跳超时,那么就会从资源池中把这个节点删除。回到正题,宕机后的节点重启涉及到了运维方面的知识。对于一个集群来讲,硬件宕机这种状况应该须要统一的管理,也就是集群也能够由一个Master,维持每一个节点的心跳来肯定硬件的状态。若是节点宕机,那么集群首先是重启它。若是启动失败可能会经过电话或者短信或者邮件通知运维人员。所以运维人员为了保证集群的高可用性付出了不少的努力,尤为是大型互联网公司的运维人员,很是值得点赞。固然了这个已经不是Storm或者Spark所能涵盖的了。

 

5. 存储模型与数据不丢失
其实,数据不丢失有时候和处理速度是矛盾的。为了数据不丢失就要进行数据持久化,数据持久化意味着要写硬盘,在固态硬盘尚未成为标配的今天,硬盘的IO速度永远是系统的痛点。固然了能够在另外节点的内存上进行备份,可是这涉及到了集群的两个稀缺资源:内存和网络。若是由于备份而占用了大量的网络带宽的话,那必将影响系统的性能,吞吐量。

固然了,可使用日志的方式。可是日志的话对于错误恢复的时间又是不太能接受的。流式计算系统的特色就是要快,若是错误恢复时间太长,那么可能不如直接replay来的快,并且系统设计还更为简单。

其实若是不是为了追求100%的数据丢失,可使用checkpoint的机制,容许一个时间窗口内的数据丢失。

回到系统设计自己,实际上流式计算系统主要是为了离线和近线的机器学习和数据挖掘,所以确定要保证数据的处理速度:至少系统能够处理一天的新增数据,不然数据堆积愈来愈大。所以即便有的数据处理丢失了数据,可让源头从新发送数据。

 

还有另一个话题,就是系统的元数据信心如何保存,由于系统的路由信息等须要是全局可见的,须要保存相似的这些数据以供集群查询。固然了Master节点保持了和全部节点的心跳,它彻底能够保存这些数据,而且在心跳中能够返回这些数据。实际上HDFS的NameNode就是这么作的。HDFS的NN这种设计很是合理,为何这么说?HDFS的元数据包含了很是多的数据:

目录文件树结构和文件与数据块的对应关系:会持久化到物理存储中,文件名叫作fsimage。
DN与数据块的对应关系,即数据块存储在哪些DN中:在DN启动时会上报到NN它所维护的数据块。这个是动态创建的,不会持久化。所以,集群的启动可能须要比较长的时间。

那么对于流式计算系统这种算得上轻量级的元数据来讲,Master处理这些元数据实际上要简单的多,固然了,Master须要实现服务的HA和数据的HA。这些不是一个轻松的事情。实际上,能够采用ZooKeeper来保存系统的元数据。ZooKeeper使用一个目录树的结构来保存集群的元数据。节点能够监控感兴趣的数据,若是数据有变化,那么节点会收到通知,而后就保证了系统级别的数据一致性。这点对于系统比较重要,由于节点都是不稳定的,所以系统的其余服务可能都会由于节点失效而发生变化,这些都须要通知相关的节点更新器服务列表,保证了部分节点的失效并不会影响系统的总体的服务,从而也就实现了故障对于用户的透明性。--------------------- 做者:anzhsoft 来源:CSDN 原文:https://blog.csdn.net/anzhsoft/article/details/38168025 版权声明:本文为博主原创文章,转载请附上博文连接!

相关文章
相关标签/搜索