伴随着信息科技突飞猛进的发展,信息呈现出爆发式的膨胀,人们获取信息的途径也更加多样、更加便捷,同时对于信息的时效性要求也愈来愈高。举个搜索场景中的例子,当一个卖家发布了一条宝贝信息时,他但愿的固然是这个宝贝立刻就能够被卖家搜索出来、点击、购买啦,相反,若是这个宝贝要等到次日或者更久才能够被搜出来,估计这个大哥就要骂娘了。再举一个推荐的例子,若是用户昨天在淘宝上买了一双袜子,今天想买一副泳镜去游泳,可是却发现系统在竭尽全力地给他推荐袜子、鞋子,根本对他今天寻找泳镜的行为视而不见,估计这哥们内心就会想推荐你妹呀。其实稍微了解点背景知识的码农们都知道,这是由于后台系统作的是天天一次的全量处理,并且大可能是在夜深人静之时作的,那么你今天白天作的事情固然要明天才能反映出来啦。 java
全量数据处理使用的大可能是鼎鼎大名的hadoop或者hive,做为一个批处理系统,hadoop以其吞吐量大、自动容错等优势,在海量数据处理上获得了普遍的使用。可是,hadoop不擅长实时计算,由于它自然就是为批处理而生的,这也是业界一致的共识。不然最近这两年也不会有s4,storm,puma这些实时计算系统如雨后春笋般冒出来啦。先抛开s4,storm,puma这些系统不谈,咱们首先来看一下,若是让咱们本身设计一个实时计算系统,咱们要解决哪些问题。 数据库
好,若是仅仅须要解决这5个问题,可能会有无数种方案,并且各有千秋,随便举一种方案,使用消息队列+分布在各个机器上的工做进程就ok啦。咱们再继续往下看。 编程
不知道你们对这些问题是否都有了本身的答案,下面让咱们带着这些问题,一块儿来看一看storm的解决方案吧。 架构
若是只用一句话来描述storm的话,可能会是这样:分布式实时计算系统。按照storm做者的说法,storm对于实时计算的意义相似于hadoop对于批处理的意义。咱们都知道,根据google mapreduce来实现的hadoop为咱们提供了map, reduce原语,使咱们的批处理程序变得很是地简单和优美。一样,storm也为实时计算提供了一些简单优美的原语。咱们会在第三节中详细介绍。 app
咱们来看一下storm的适用场景。 框架
说了半天,好像都是很玄乎的东西,下面咱们开始具体讲解storm的基本概念和它内部的一些实现原理吧。 分布式
首先咱们经过一个 storm 和hadoop的对比来了解storm中的基本概念。 ide
Hadoop | Storm | |
系统角色 | JobTracker | Nimbus |
TaskTracker | Supervisor | |
Child | Worker | |
应用名称 | Job | Topology |
组件接口 | Mapper/Reducer | Spout/Bolt |
表3-1 函数
接下来咱们再来具体看一下这些概念。 oop
10. stream grouping:即消息的partition方法。Storm中提供若干种实用的grouping方式,包括shuffle, fields hash, all, global, none, direct和localOrShuffle等
相比于s4, puma等其余实时计算系统,storm最大的亮点在于其记录级容错和可以保证消息精确处理的事务功能。下面就重点来看一下这两个亮点的实现原理。
首先来看一下什么叫作记录级容错?storm容许用户在spout中发射一个新的源tuple时为其指定一个message id, 这个message id能够是任意的object对象。多个源tuple能够共用一个message id,表示这多个源 tuple对用户来讲是同一个消息单元。storm中记录级容错的意思是说,storm会告知用户每个消息单元是否在指定时间内被彻底处理了。那什么叫作彻底处理呢,就是该message id绑定的源tuple及由该源tuple后续生成的tuple通过了topology中每个应该到达的bolt的处理。举个例子。在图4-1中,在spout由message 1绑定的tuple1和tuple2通过了bolt1和bolt2的处理生成两个新的tuple,并最终都流向了bolt3。当这个过程完成处理完时,称message 1被彻底处理了。
图4-1
在storm的topology中有一个系统级组件,叫作acker。这个acker的任务就是追踪从spout中流出来的每个message id绑定的若干tuple的处理路径,若是在用户设置的最大超时时间内这些tuple没有被彻底处理,那么acker就会告知spout该消息处理失败了,相反则会告知spout该消息处理成功了。在刚才的描述中,咱们提到了”记录tuple的处理路径”,若是曾经尝试过这么作的同窗能够仔细地思考一下这件事的复杂程度。可是storm中倒是使用了一种很是巧妙的方法作到了。在说明这个方法以前,咱们来复习一个数学定理。
A xor A = 0.
A xor B…xor B xor A = 0,其中每个操做数出现且仅出现两次。
storm中使用的巧妙方法就是基于这个定理。具体过程是这样的:在spout中系统会为用户指定的message id生成一个对应的64位整数,做为一个root id。root id会传递给acker及后续的bolt做为该消息单元的惟一标识。同时不管是spout仍是bolt每次新生成一个tuple的时候,都会赋予该tuple一个64位的整数的id。Spout发射完某个message id对应的源tuple以后,会告知acker本身发射的root id及生成的那些源tuple的id。而bolt呢,每次接受到一个输入tuple处理完以后,也会告知acker本身处理的输入tuple的id及新生成的那些tuple的id。Acker只须要对这些id作一个简单的异或运算,就能判断出该root id对应的消息单元是否处理完成了。下面经过一个图示来讲明这个过程。
图4-1 spout中绑定message 1生成了两个源tuple,id分别是0010和1011.
图4-2 bolt1处理tuple 0010时生成了一个新的tuple,id为0110.
图4-3 bolt2处理tuple 1011时生成了一个新的tuple,id为0111.
图4-4 bolt3中接收到tuple 0110和tuple 0111,没有生成新的tuple.
可能有些细心的同窗会发现,容错过程存在一个可能出错的地方,那就是,若是生成的tuple id并非彻底各异的,acker可能会在消息单元彻底处理完成以前就错误的计算为0。这个错误在理论上的确是存在的,可是在实际中其几率是极低极低的,彻底能够忽略。
事务拓扑(transactional topology)是storm0.7引入的特性,在最近发布的0.8版本中已经被封装为Trident,提供了更加便利和直观的接口。由于篇幅所限,在此对事务拓扑作一个简单的介绍。
事务拓扑的目的是为了知足对消息处理有着极其严格要求的场景,例如实时计算某个用户的成交笔数,要求结果彻底精确,不能多也不能少。Storm的事务拓扑是彻底基于它底层的spout/bolt/acker原语实现的,经过一层巧妙的封装得出一个优雅的实现。我的以为这也是storm最大的魅力之一。
事务拓扑简单来讲就是将消息分为一个个的批(batch),同一批内的消息以及批与批之间的消息能够并行处理,另外一方面,用户能够设置某些bolt为committer,storm能够保证committer的finishBatch()操做是按严格不降序的顺序执行的。用户能够利用这个特性经过简单的编程技巧实现消息处理的精确。
因为storm的内核是clojure编写的(不过大部分的拓展工做都是java编写的),为咱们理解它的实现带来了必定的困难,好在大部分状况下storm都比较稳定,固然咱们也在尽力熟悉clojure的世界。咱们在使用storm时一般都是选择java语言开发应用程序。
在淘宝,storm被普遍用来进行实时日志处理,出如今实时统计、实时风控、实时推荐等场景中。通常来讲,咱们从类kafka的metaQ或者基于hbase的timetunnel中读取实时日志消息,通过一系列处理,最终将处理结果写入到一个分布式存储中,提供给应用程序访问。咱们天天的实时消息量从几百万到几十亿不等,数据总量达到TB级。对于咱们来讲,storm每每会配合分布式存储服务一块儿使用。在咱们正在进行的个性化搜索实时分析项目中,就使用了timetunnel + hbase + storm + ups的架构,天天处理几十亿的用户日志信息,从用户行为发生到完成分析延迟在秒级。
Storm0.7系列的版本已经在各大公司获得了普遍使用,最近发布的0.8版本中引入了State,使得其从一个纯计算框架演变成了一个包含存储和计算的实时计算新利器,还有刚才提到的Trident,提供更加友好的接口,同时可定制scheduler的特性也为其针对不一样的应用场景作优化提供了更便利的手段,也有人已经在基于storm的实时ql(query language)上迈出了脚本。在服务化方面,storm一直在朝着融入mesos框架的方向努力。同时,storm也在实现细节上不断地优化,使用不少优秀的开源产品,包括kryo, Disruptor, curator等等。能够想象,当storm发展到1.0版本时,必定是一款无比杰出的产品,让咱们拭目以待,固然,最好仍是参与到其中去吧,同窗们。