Storm技术加强
注:学习本课程,请先学习Storm基础
课程目标:
经过本模块的学习,可以掌握Storm底层的通讯机制、消息容错机制、storm目录树及任务提交流程。
课程大纲:
一、 Storm程序的并发机制
二、 Storm框架通讯机制(worker内部通讯与外部通讯)
三、 Storm组件本地目录树
四、 Storm zookeeper目录树
五、 Storm 任务提交的过程
一、Storm程序的并发机制
1.一、概念
Workers (JVMs): 在一个物理节点上能够运行一个或多个独立的JVM 进程。一个Topology能够包含一个或多个worker(并行的跑在不一样的物理机上), 因此worker process就是执行一个topology的子集, 而且worker只能对应于一个topology
Executors (threads): 在一个worker JVM进程中运行着多个Java线程。一个executor线程能够执行一个或多个tasks。但通常默认每一个executor只执行一个task。一个worker能够包含一个或多个executor, 每一个component (spout或bolt)至少对应于一个executor, 因此能够说executor执行一个compenent的子集, 同时一个executor只能对应于一个component。
Tasks(bolt/spout instances):Task就是具体的处理逻辑对象,每个Spout和Bolt会被看成不少task在整个集群里面执行。每个task对应到一个线程,而stream grouping则是定义怎么从一堆task发射tuple到另一堆task。你能够调用TopologyBuilder.setSpout和TopBuilder.setBolt来设置并行度 — 也就是有多少个task。
1.二、配置并行度
对于并发度的配置, 在storm里面能够在多个地方进行配置, 优先级为:
defaults.yaml < storm.yaml < topology-specific configuration
< internal component-specific configuration < external component-specific configuration
worker processes的数目, 能够经过配置文件和代码中配置, worker就是执行进程, 因此考虑并发的效果, 数目至少应该大亍machines的数目
executor的数目, component的并发线程数,只能在代码中配置(经过setBolt和setSpout的参数), 例如, setBolt(“green-bolt”, new GreenBolt(), 2)
tasks的数目, 能够不配置, 默认和executor1:1, 也能够经过setNumTasks()配置
Topology的worker数经过config设置,即执行该topology的worker(java)进程数。它能够经过 storm rebalance 命令任意调整。
Config conf = newConfig();
conf.setNumWorkers(2); //用2个worker
topologyBuilder.setSpout(“blue-spout”, newBlueSpout(), 2); //设置2个并发度
topologyBuilder.setBolt(“green-bolt”, newGreenBolt(), 2).setNumTasks(4).shuffleGrouping(“blue-spout”); //设置2个并发度,4个任务
topologyBuilder.setBolt(“yellow-bolt”, newYellowBolt(), 6).shuffleGrouping(“green-bolt”); //设置6个并发度
StormSubmitter.submitTopology(“mytopology”, conf, topologyBuilder.createTopology());java
3个组件的并发度加起来是10,就是说拓扑一共有10个executor,一共有2个worker,每一个worker产生10 / 2 = 5条线程。
绿色的bolt配置成2个executor和4个task。为此每一个executor为这个bolt运行2个task。node
动态的改变并行度
Storm支持在不 restart topology 的状况下, 动态的改变(增减) worker processes 的数目和 executors 的数目, 称为rebalancing. 经过Storm web UI,或者经过storm rebalance命令实现:
storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10web
二、Storm通讯机制
Worker间的通讯常常须要经过网络跨节点进行,Storm使用ZeroMQ或Netty(0.9之后默认使用)做为进程间通讯的消息框架。
Worker进程内部通讯:不一样worker的thread通讯使用LMAX Disruptor来完成。
不一样topologey之间的通讯,Storm不负责,须要本身想办法实现,例如使用kafka等;
2.一、Worker进程间通讯
worker进程间消息传递机制,消息的接收和处理的大概流程见下图编程
对于worker进程来讲,为了管理流入和传出的消息,每一个worker进程有一个独立的接收线程(对配置的TCP端口supervisor.slots.ports进行监听);
对应Worker接收线程,每一个worker存在一个独立的发送线程,它负责从worker的transfer-queue中读取消息,并经过网络发送给其余worker
每一个executor有本身的incoming-queue和outgoing-queue。
Worker接收线程将收到的消息经过task编号传递给对应的executor(一个或多个)的incoming-queues;
每一个executor有单独的线程分别来处理spout/bolt的业务逻辑,业务逻辑输出的中间数据会存放在outgoing-queue中,当executor的outgoing-queue中的tuple达到必定的阀值,executor的发送线程将批量获取outgoing-queue中的tuple,并发送到transfer-queue中。
每一个worker进程控制一个或多个executor线程,用户可在代码中进行配置。其实就是咱们在代码中设置的并发度个数。
2.二、Worker进程间通讯分析api
一、 Worker接受线程经过网络接受数据,并根据Tuple中包含的taskId,匹配到对应的executor;而后根据executor找到对应的incoming-queue,将数据存发送到incoming-queue队列中。
二、 业务逻辑执行现成消费incoming-queue的数据,经过调用Bolt的execute(xxxx)方法,将Tuple做为参数传输给用户自定义的方法
三、 业务逻辑执行完毕以后,将计算的中间数据发送给outgoing-queue队列,当outgoing-queue中的tuple达到必定的阀值,executor的发送线程将批量获取outgoing-queue中的tuple,并发送到Worker的transfer-queue中
四、 Worker发送线程消费transfer-queue中数据,计算Tuple的目的地,链接不一样的node+port将数据经过网络传输的方式传送给另外一个的Worker。
五、 另外一个worker执行以上步骤1的操做。
2.三、Worker进程间技术(Netty、ZeroMQ)
2.3.一、Netty
Netty是一个NIO client-server(客户端服务器)框架,使用Netty能够快速开发网络应用,例如服务器和客户端协议。Netty提供了一种新的方式来使开发网络应用程序,这种新的方式使得它很容易使用和有很强的扩展性。Netty的内部实现时很复杂的,可是Netty提供了简单易用的api从网络处理代码中解耦业务逻辑。Netty是彻底基于NIO实现的,因此整个Netty都是异步的。
书籍:Netty权威指南
2.3.二、ZeroMQ
ZeroMQ是一种基于消息队列的多线程网络库,其对套接字类型、链接处理、帧、甚至路由的底层细节进行抽象,提供跨越多种传输协议的套接字。ZeroMQ是网络通讯中新的一层,介于应用层和传输层之间(按照TCP/IP划分),其是一个可伸缩层,可并行运行,分散在分布式系统间。
ZeroMQ定位为:一个简单好用的传输层,像框架同样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,以后进入Linux内核”。
2.四、Worker 内部通讯技术(Disruptor)
2.4.一、 Disruptor的来历
一个公司的业务与技术的关系,通常能够分为三个阶段。第一个阶段就是跟着业务跑。第二个阶段是经历了几年的时间,才达到的驱动业务阶段。第三个阶段,技术引领业务的发展乃至企业的发展。因此咱们在学习Disruptor这个技术时,不得不提LMAX这个机构,由于Disruptor这门技术就是由LMAX公司开发并开源的。
LMAX是在英国注册并受到FSA监管(监管号码为509778)的外汇黄金交易所。LMAX也是欧洲第一家也是惟一一家采用多边交易设施Multilateral Trading Facility(MTF)拥有交易所牌照和经纪商牌照的欧洲顶级金融公司
LAMX拥有最迅捷的交易平台,顶级技术支持。LMAX交易所使用“(MTF)分裂器Disruptor”技术,能够在极短期内(通常在3百万秒之一内)处理订单,在一个线程里每秒处理6百万订单。全部订单均为撮合成交形式,无一例外。多边交易设施(MTF)曾经用来设计伦敦证券交易 所(london Stock Exchange)、德国证券及衍生工具交易所(Deutsche Borse)和欧洲证券交易所(Euronext)。
2011年LMAX凭借该技术得到了金融行业技术评选大赛的最佳交易系统奖和甲骨文“公爵杯”创新编程框架奖。
2.4.二、Disruptor是什么
一、 简单理解:Disruptor是一个Queue。Disruptor是实现了“队列”的功能,并且是一个有界队列。而队列的应用场景天然就是“生产者-消费者”模型。
二、 在JDK中Queue有不少实现类,包括不限于ArrayBlockingQueue、LinkBlockingQueue,这两个底层的数据结构分别是数组和链表。数组查询快,链表增删快,可以适应大多数应用场景。
三、 可是ArrayBlockingQueue、LinkBlockingQueue都是线程安全的。涉及到线程安全,就会有synchronized、lock等关键字,这就意味着CPU会打架。
四、 Disruptor一种线程之间信息无锁的交换方式(使用CAS(Compare And Swap/Set)操做)。
2.4.二、Disruptor主要特色
一、 没有竞争=没有锁=很是快。
二、 全部访问者都记录本身的序号的实现方式,容许多个生产者与多个消费者共享相同的数据结构。
三、 在每一个对象中都能跟踪序列号(ring buffer,claim Strategy,生产者和消费者),加上神奇的cache line padding,就意味着没有为伪共享和非预期的竞争。
2.4.二、 Disruptor 核心技术点
Disruptor能够当作一个事件监听或消息机制,在队列中一边生产者放入消息,另一边消费者并行取出处理.
底层是单个数据结构:一个ring buffer。
每一个生产者和消费者都有一个次序计算器,以显示当前缓冲工做方式。
每一个生产者消费者可以操做本身的次序计数器的可以读取对方的计数器,生产者可以读取消费者的计算器确保其在没有锁的状况下是可写的。数组
核心组件
Ring Buffer 环形的缓冲区,负责对经过 Disruptor 进行交换的数据(事件)进行存储和更新。
Sequence 经过顺序递增的序号来编号管理经过其进行交换的数据(事件),对数据(事件)的处理过程老是沿着序号逐个递增处理。
RingBuffer底层是个数组,次序计算器是一个64bit long 整数型,平滑增加。缓存
一、 接受数据并写入到脚标31的位置,以后会沿着序号一直写入,可是不会绕过消费者所在的脚标。
二、 Joumaler和replicator同时读到24的位置,他们能够批量读取数据到30
三、消费逻辑线程读到了14的位置,可是无法继续读下去,由于他的sequence暂停在15的位置上,须要等到他的sequence给他序号。若是sequence能正常工做,就能读取到30的数据。安全
三、Storm组件本地目录树服务器
四、Storm zookeeper目录树网络
五、Storm 任务提交的过程
TopologyMetricsRunnable.TaskStartEvent[oldAssignment=,newAssignment=Assignment[masterCodeDir=C:\Users\MAOXIA~1\AppData\Local\Temp\e73862a8-f7e7-41f3-883d-af494618bc9f\nimbus\stormdist\double11-1-1458909887,nodeHost={61ce10a7-1e78-4c47-9fb3-c21f43a331ba=192.168.1.106},taskStartTimeSecs={1=1458909910, 2=1458909910, 3=1458909910, 4=1458909910, 5=1458909910, 6=1458909910, 7=1458909910, 8=1458909910},workers=[ResourceWorkerSlot[hostname=192.168.1.106,memSize=0,cpu=0,tasks=[1, 2, 3, 4, 5, 6, 7, 8],jvm=,nodeId=61ce10a7-1e78-4c47-9fb3-c21f43a331ba,port=6900]],timeStamp=1458909910633,type=Assign],task2Component=,clusterName=,topologyId=double11-1-1458909887,timestamp=0]
六、Storm 消息容错机制
6.一、整体介绍
在storm中,可靠的信息处理机制是从spout开始的。
一个提供了可靠的处理机制的spout须要记录他发射出去的tuple,当下游bolt处理tuple或者子tuple失败时spout可以从新发射。
Storm经过调用Spout的nextTuple()发送一个tuple。为实现可靠的消息处理,首先要给每一个发出的tuple带上惟一的ID,而且将ID做为参数传递给SoputOutputCollector的emit()方法:collector.emit(new Values(“value1”,”value2”), msgId); messageid就是用来标示惟一的tupke的,而rootid是随机生成的
给每一个tuple指定ID告诉Storm系统,不管处理成功仍是失败,spout都要接收tuple树上全部节点返回的通知。若是处理成功,spout的ack()方法将会对编号是msgId的消息应答确认;若是处理失败或者超时,会调用fail()方法。
6.二、基本实现
Storm 系统中有一组叫作”acker”的特殊的任务,它们负责跟踪DAG(有向无环图)中的每一个消息。
acker任务保存了spout id到一对值的映射。第一个值就是spout的任务id,经过这个id,acker就知道消息处理完成时该通知哪一个spout任务。第二个值是一个64bit的数字,咱们称之为”ack val”, 它是树中全部消息的随机id的异或计算结果。
ack val表示了整棵树的的状态,不管这棵树多大,只须要这个固定大小的数字就能够跟踪整棵树。当消息被建立和被应答的时候都会有相同的消息id发送过来作异或。 每当acker发现一棵树的ack val值为0的时候,它就知道这棵树已经被彻底处理了
总结:
Storm为了保证每条数据成功被处理,实现至少一次语义,经过Storm的ACK机制能够对spout产生的每个tuple进行跟踪;
tuple处理成功是指这个Tuple以及这个Tuple产生的全部子Tuple都被成功处理, 由每个处理bolt经过OutputCollector的方法ack(tuple)来告知storm当前bolt处理成功,最终调用spout的ack方法;
处理失败是指这个Tuple或这个Tuple产生的全部Tuple中的任意一个tuple处理失败或者超时(超时时间由Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS指定), 处理失败bolt调用OutputCollector的方法fail(tuple),来告知storm当前bolt处理失败,最终调用spout的fail方法从新发送失败的tuple,失败时storm不会自动重发失败的tuple,须要咱们在spout中从新获取发送失败数据,手动从新发送一次。
Ack原理
Storm中有个特殊的task名叫acker,他们负责跟踪spout发出的每个Tuple的Tuple树(由于一个tuple经过spout发出了,通过每个bolt处理后,会生成一个新的tuple发送出去)。当acker(框架自启动的task)发现一个Tuple树已经处理完成了,它会发送一个消息给产生这个Tuple的那个task。对任意大的一个Tuple树,storm只须要恒定的20字节就能够进行跟踪。acker对于每一个spout-tuple保存一个ack-val的校验值,它的初始值是0,而后每发射一个Tuple或Ack一个Tuple时,这个Tuple的id就要跟这个校验值异或一下,
而且把获得的值更新为ack-val的新值。那么假设每一个发射出去的Tuple都被ack了,那么最后ack-val的值就必定是0。Acker就根据ack-val是否为0来判断是否彻底处理,若是为0则认为已彻底处理。
例以下图是一个简单的Topology:
开启Act机制 1.spout发射tuple的时候指定messageId 2.spout对发射的tuple进行缓存,不然spout没法获取发送失败的数据进行重发, (这里到底系统里有没有缓存没有成功处理的tuple,好比接口conf.setMaxSpoutPending()是否只缓存了条数仍是原始数据还要去查证一下) 3.spout要重写BaseRichSpout的fail和ack方法,spout根据messageId对于成功处理的tuple从缓存队列中删除,对于失败的tuple选择重发或作其它处理; 4.若是使用BasicBolt,BasicOutputCollector在emit新的tuple时自动与源tuple锚定,execute方法结束时源tuple会被自动ack或fail; 使用RichBolt在emit数据的时需显示指定该数据的源tuple,以保持tracker链路,即collector.emit(oldTuple, newTuple); 而且须要在execute执行成功后调用OutputCollector.ack(tuple), 当失败处理时,执行OutputCollector.fail(tuple); 5.设置acker数大于0,conf.setNumAckers(>0); 关闭Ack机制 1.在Tuple层面去掉可靠性。在发射Tuple的时候不指定MessageID来达到不不跟踪这个Tuple的目的 2.若是对于一个Tuple树里面的某一部分到底成不成功不是很关心,那么能够在Bolt发射这些Tuple的时候不锚定它们。 这样这些Tuple就不在Tuple树里面,也就不会被跟踪了。 3.把Config.TOPOLOGY_ACKERS设置成0。在这种状况下,Storm会在Spout发射一个Tuple以后立刻调用Spout的ack方法, 也就是说这个Tuple树不会被跟踪。 例子程序: public class RandomSentenceSpout extends BaseRichSpout { private SpoutOutputCollector _collector; private Random _rand; private ConcurrentHashMap