Storm(三)Storm的原理机制

一.Storm的数据分发策略

1. Shuffle Grouping 

随机分组,随机派发stream里面的tuple,保证每一个bolt task接收到的tuple数目大体相同。 轮询,平均分配 node

2. Fields Grouping

按字段分组,好比,按"user-id"这个字段来分组,那么具备一样"user-id"的 tuple 会被分到相同的Bolt里的一个task, 而不一样的"user-id"则可能会被分配到不一样的task。 数据库

3. All Grouping

广播发送,对于每个tuple,全部的bolts都会收到 服务器

4. Global Grouping

全局分组,把tuple分配给task id最低的task 。网络

5. None Grouping

不分组,这个分组的意思是说stream不关心到底怎样分组。目前这种分组和Shuffle grouping是同样的效果。 有一点不一样的是storm会把使用none grouping的这个bolt放到这个bolt的订阅者同一个线程里面去执行(将来Storm若是可能的话会这样设计)。 并发

6. Direct Grouping

指向型分组, 这是一种比较特别的分组方法,用这种分组意味着消息(tuple)的发送者指定由消息接收者的哪一个task处理这个消息。只有被声明为 Direct Stream 的消息流能够声明这种分组方法。并且这种消息tuple必须使用 emitDirect 方法来发射。消息处理者能够经过 TopologyContext 来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)  框架

7. Local or shuffle grouping

本地或随机分组。若是目标bolt有一个或者多个task与源bolt的task在同一个工做进程中,tuple将会被随机发送给这些同进程中的tasks。不然,和普通的Shuffle Grouping行为一致分布式

8.customGrouping

自定义,至关于mapreduce那里本身去实现一个partition同样。函数

二.Storm的并发机制

Worker – 进程ui

一个Topology拓扑会包含一个或多个Worker(每一个Worker进程只能从属于一个特定的Topology) 这些Worker进程会并行跑在集群中不一样的服务器上,即一个Topology拓扑实际上是由并行运行在Storm集群中多台服务器上的进程所组成线程

Executor – 线程

Executor是由Worker进程中生成的一个线程 每一个Worker进程中会运行拓扑当中的一个或多个Executor线程 一个Executor线程中能够执行一个或多个Task任务(默认每一个Executor只执行一个Task任务),可是这些Task任务都是对应着同一个组件(Spout、Bolt)。

Task

实际执行数据处理的最小单元 每一个task即为一个Spout或者一个Bolt Task数量在整个Topology生命周期中保持不变,Executor数量能够变化或手动调整 (默认状况下,Task数量和Executor是相同的,即每一个Executor线程中默认运行一个Task任务)

设置Worker进程数

Config.setNumWorkers(int workers)

设置Executor线程数

TopologyBuilder.setSpout(String id, IRichSpout spout, Number parallelism_hint) ,TopologyBuilder.setBolt(String id, IRichBolt bolt, Number parallelism_hint) :其中, parallelism_hint即为executor线程数

设置Task数量

ComponentConfigurationDeclarer.setNumTasks(Number val)

例:

Rebalance – 再平衡

即,动态调整Topology拓扑的Worker进程数量、以及Executor线程数量

支持两种调整方式: 一、经过Storm UI 二、经过Storm CLI

经过Storm CLI动态调整: storm help rebalance

例:storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 将mytopology拓扑worker进程数量调整为5个, “ blue-spout ” 所使用的线程数量调整为3个 ,“ yellow-bolt ”所使用的线程数量调整为10个。

三.Storm的通讯机制

Worker进程间的数据通讯

ZMQ ZeroMQ 开源的消息传递框架,并非一个MessageQueue Netty Netty是基于NIO的网络框架,更加高效。(之因此Storm 0.9版本以后使用Netty,是由于ZMQ的license和Storm的license不兼容。)

Worker内部的数据通讯

Disruptor 实现了“队列”的功能。 能够理解为一种事件监听或者消息处理机制,即在队列当中一边由生产者放入消息数据,另外一边消费者并行取出消息数据处理。

Worker内部的消息传递机制

四.Storm的容错机制

一、集群节点宕机

Nimbus服务器 单点故障? 非Nimbus服务器 故障时,该节点上全部Task任务都会超时,Nimbus会将这些Task任务从新分配到其余服务器上运行

二、进程挂掉

Worker 挂掉时,Supervisor会从新启动这个进程。若是启动过程当中仍然一直失败,而且没法向Nimbus发送心跳,Nimbus会将该Worker从新分配到其余服务器上 Supervisor 无状态(全部的状态信息都存放在Zookeeper中来管理) 快速失败(每当遇到任何异常状况,都会自动毁灭) Nimbus 无状态(全部的状态信息都存放在Zookeeper中来管理) 快速失败(每当遇到任何异常状况,都会自动毁灭)

三、消息的完整性

从Spout中发出的Tuple,以及基于他所产生Tuple(例如上个例子当中Spout发出的句子,以及句子当中单词的tuple等) 由这些消息就构成了一棵tuple树 当这棵tuple树发送完成,而且树当中每一条消息都被正确处理,就代表spout发送消息被“完整处理”,即消息的完整性

Acker -- 消息完整性的实现机制 Storm的拓扑当中特殊的一些任务 负责跟踪每一个Spout发出的Tuple的DAG(有向无环图)

五.Storm的DRPC

DRPC (Distributed RPC) 分布式远程过程调用

DRPC 是经过一个 DRPC 服务端(DRPC server)来实现分布式 RPC 功能的。 DRPC Server 负责接收 RPC 请求,并将该请求发送到 Storm中运行的 Topology,等待接收 Topology 发送的处理结果,并将该结果返回给发送请求的客户端。 (其实,从客户端的角度来讲,DPRC 与普通的 RPC 调用并无什么区别。)

DRPC设计目的: 为了充分利用Storm的计算能力实现高密度的并行实时计算。 (Storm接收若干个数据流输入,数据在Topology当中运行完成,而后经过DRPC将结果进行输出。)

客户端经过向 DRPC 服务器发送待执行函数的名称以及该函数的参数来获取处理结果。实现该函数的拓扑使用一个DRPCSpout 从 DRPC 服务器中接收一个函数调用流。DRPC 服务器会为每一个函数调用都标记了一个惟一的 id。随后拓扑会执行函数来计算结果,并在拓扑的最后使用一个名为 ReturnResults 的 bolt 链接到 DRPC 服务器,根据函数调用的 id 来将函数调用的结果返回。

定义DRPC拓扑:

方法1: 经过LinearDRPCTopologyBuilder (该方法也过时,不建议使用) 该方法会自动为咱们设定Spout、将结果返回给DRPC Server等,咱们只须要将Topology实现

方法2: 直接经过普通的拓扑构造方法TopologyBuilder来建立DRPC拓扑 须要手动设定好开始的DRPCSpout以及结束的ReturnResults

运行模式:

一、本地模式

2.远程模式(集群模式)

修改配置文件conf/storm.yaml drpc.servers: - "node21“ 启动DRPC Server bin/storm drpc & 经过StormSubmitter.submitTopology提交拓扑

六.Storm的事务

事务性拓扑(Transactional Topologies)

保证消息(tuple)被且仅被处理一次

Design 1

强顺序流(强有序) 引入事务(transaction)的概念,每一个transaction(即每一个tuple)关联一个transaction id。 Transaction id从1开始,每一个tuple会按照顺序+1。 在处理tuple时,将处理成功的tuple结果以及transaction id同时写入数据库中进行存储。

两种状况:

一、当前transaction id与数据库中的transaction id不一致

二、两个transaction id相同

缺点: 一次只能处理一个tuple,没法实现分布式计算

Design 2

强顺序的Batch流

 

 

事务(transaction)以batch为单位,即把一批tuple称为一个batch,每次处理一个batch。 每一个batch(一批tuple)关联一个transaction id ,每一个batch内部能够并行计算

缺点

Design 3

Storm's design

将Topology拆分为两个阶段:

一、Processing phase 容许并行处理多个batch

二、Commit phase 保证batch的强有序,一次只能处理一个batch

Design details

Manages state - 状态管理

Storm经过Zookeeper存储全部transaction相关信息(包含了:当前transaction id 以及batch的元数据信息)

Coordinates the transactions - 协调事务

Storm会管理决定transaction应该处理什么阶段(processing、committing)

Fault detection - 故障检测

Storm内部经过Acker机制保障消息被正常处理(用户不须要手动去维护)

First class batch processing API

Storm提供batch bolt接口

三种事务:

一、普通事务

二、Partitioned Transaction - 分区事务

三、Opaque Transaction - 不透明分区事务

相关文章
相关标签/搜索