storm从不懂到有点懂

目录

框架介绍

Storm应用场景总结:

Storm体系结构

Storm组件

关系简介

举例说明:

故障容忍

概念

三种消息保证机制

At Most Once语义

At Least Once语义

Exactly Once语义

数据流分组

参考文章

Related Posts:



框架介绍

storm是一个分布式,高容错的实时计算系统,对数据实时计算提供了简单的spout和bolt原语。

Storm应用场景总结

  1. 数据流处理: 与其它流处理系统不同,storm不需要中间队列媒介

  2. 实时计算: 可连续不断的进行实时数据处理,把处理的结果实时更新展示到客户端

  3. 分布式远程过程调用: 可充分利用集群中CPU资源,进行CPU密集型计算。

Storm体系结构

 

  • 主控节点和工作节点

    Storm 将每个节点分为主控节点和工作节点两种,其中主控节点只有一个,工作节点可以有很多个。

  • Nimbus:

    负责资源分配和任务调度。主控节点运行 Nimbus 守护进程,负责在集群中分发代码,对节点分配任务,并监视主机故障。

  • Zookeeper:

    负责Nimbus和多个Supervisor之间的所有协调工作。

  • Supervisor:

    负责接受nimbus分配的任务,启动和停止属于自己管理的worker进程。 每个工作节点运行 Supervisor 守护进程,负责监听工作节点上已经分配的主机作业,启动和停止 Nimbus 已经分配的工作进程。supervisor 会定时从 zookeeper 获取拓扑信息 topologies、任务分配信息 assignments 及各类心跳信息,以此为依据进行任务分配。在 supervisor 同步时,会根据新的任务分配情况来启动新的 worker 或者关闭旧的 worker 并进行负载均衡。

  • Worker:

    工作进程,一个工作进程中可以含有一个或者多个Executor线程。

  • Executor:

    线程,里面运行着多个Task。

  • Task:

    worker中每一个spout/bolt的线程称为一个task. 一个task中一定是运行的是相同组件。

 

Storm组件

 


Topology: 用于封装一个实时计算应用程序的逻辑,类似于Hadoop的MapReduce Job;
Stream: 消息流,是一个没有边界的tuple序列,这些tuples会被以一种分布式的方式并行地创建和处理;
Spouts: 数据源,是消息生产者,他会从一个外部源读取数据并向topology里面面发出消息:tuple;
Bolts: 处理消息,所有消息处理逻辑被封装在bolts里面,处理输入的数据流并产生新的输出数据流,可执行过滤,聚合,查询数据库等操作;
Stream groupings: 消息分发策略,定义一个Topology的其中一步是定义每个tuple接受什么样的流作为输入,stream grouping就是用来定义一个stream应该如何分配给Bolts们。

 

关系简介

  1. Worker进程存在于每个工作节点Supervisor中,一个Worker进程中可以含有一个或者多个Executor线程,每个Executor线程都会启动一个消息循环线程,用于接收、处理和发送消息,当Executor收到其下某一task的消息后,就会调用该Task对应的处理逻辑对消息进行处理;

  2. 1个topology可以有多个worker进程,1个worker进程只为1个topology服务。即1个worker进程执行的是1个topology的子集

  3. 一个线程Executor,运行时只会运行一个task,如果有多个,循环执行,即其他的task出去等待状态;

  4. task,最终运行spout或bolt中代码的执行单元,即task可能是spout组件也有可能是bolt组件

  5. 默认情况下: 1个supervisor节点最多可以启动4个worker进程,每1个topology默认占用1个worker进程,每个spout或者bolt会占用1个executor,每个executor启动1个task。

 

 

  • 注意 : 在同一个线程中,如果有多个task,这些task一定是相同组件实例;

举例说明:

假设现在有一个Topology实例,在该Topology实例中,配置为整个Topology实例服务的进程数量为10,配置了Spout单元和Bolt单元以及Spout单元和Bolt单元之间的数据流,为这些Spout和Bolt单元服务的线程和任务数量分别为:Spout单元,10个线程,20个任务;Bolt单元,20个线程,20个任务。 
那么一旦将该Topology实例提交给Nimbus,接下来就会由Nimbus控制运行。 
在Nimbus的控制下,有些Supervisor会在所在的worker node上建立一个进程,整个Supervisor集群中共建立10个进程,这些进程都为该Topology实例服务。这些进程可以运行在多个worker node上,也可以运行在同一台worker node上。每个进程都持有对项目JAR包的引用。 
现在一共需要30个线程来为Spout和Bolt单元服务,那么10个进程中,每个进程上运行3个线程。一个进程中的3个线程可以分别为不同的Spout单元和Bolt单元服务。每个线程都创建一份Spout单元或者Bolt单元的实例。 
Spout单元共有10个线程,20个任务为其服务,那么每个线程上运行2个任务,同理为Bolt单元服务的20个线程中的每个线程上运行1个任务。 
每个线程中的任务使用线程所持有的Spout实例或者Bolt实例,同一个线程中的多个任务间是串行执行的关系,因而在一个线程有多个任务的情况下,不会产生并发问题。 
比如某个线程中持有一个Spout实例spoutInstance,配置该线程中需要运行5个任务,那么Storm的框架代码有可能是这么实现的: 

for(int i=0;i<5;i++) {    
  spoutInstance.nextTuple(); 
}

 

 

 

故障容忍

概念

  • Worker进程不会因Nimbus或者Supervisor的挂掉而受到影响

    1. worker进程死掉: Supervisor会重启它。如果这个Worker连续在启动时失败,并且无法让Nimbus观察到它的心跳,Nimbus将这个Worker重新分配到另一台机器上。

    2. supervisor进程死掉: 这样不会影响之前已经提交的topology的运行,只是后期不会再向这个节点分配任务了。

    3. nimbus进程死掉: 这样不会影响之前已经提交的topology的运行,只是后期不能向集群中提交topology了。

  • Nimbus和Supervisor daemon进程,设计成快速失败(无论何时当遇到任何异常情况,将会执行自毁)和无状态(所有的状态都保存在Zookeeper或者磁盘上)。

  • Nimbus和Supervisor daemon进程,必须在监控下运行,如使用daemontools或者monit工具。

  • Nimbus是会有单点故障的问题,但Nimbus进程挂掉也不会引起任何灾难发生。

 

三种消息保证机制

Storm提供的三种不同消息保证机制中,利用Spout,Bolt以及Acker的组合可以实现At Most Once以及At Least Once语义,Storm在At Least Once的基础上进行了一次封装(Trident),从而实现Exactly Once语义

At Most Once语义

Storm的消息保证机制中,如果需要实现At Most Once语义,只需要满足下面任何一条即可:

1.关闭ACK机制,即Acker数目设置为0

2.Spout不实现可靠性传输:Spout发送消息是使用不带message ID的API。不实现fail函数。

3.Bolt不把处理成功或失败的消息发送给Acker

At Least Once语义

如果需要实现At Least Once 语义,则需要同时保证如下几条:

1.开启ACK机制,即Acker数目大于0

2.Spout实现可靠性传输保证:Spout发送消息附带message 的ID。如果收到Acker的处理失败反馈,需要进行消息重传,即实现fail函数。

3.Bolt在处理成功或失败后需要调用相应的方法通知Acker

At least once 的消息处理机制,在运用时需要格外小心,Storm 采用 ack/fail 机制来追踪消息的流向,当一个消息(tuple)发送到下游时,如果超时未通知 spout,或者发送失败,Storm 默认会根据配置策略进行重发,可通过调节重发策略来尽量减少消息的重复发送。一个常见情况是,Storm 集群经常会超负载运行,导致下游的 bolt 未能及时 ack,从而导致 spout 不断的重发一个 tuple,进而导致消息大量的重复消费。

 

  • 不发生任何异常的情况下,消息不会重复不会丢失。

  • Spout 发生异常的情况下,消息的重复数目约等于 spout.max.pending(Spout 的配置项,每次可以发送的最多消息条数) * NumberOfException(异常次数)。

  • Acker 发生异常的情况下,消息重复的数目等于 spout.max.pending * NumberOfException。

  • Bolt 发生异常的情况:

1.emit 之前发生异常,消息不会重复。

2.emit 之后发生异常,消息重复的次数等于异常的次数。

 

Exactly Once语义

实现Exactly Once语义,则需要在At Least Once的基础上进行状态的存储,用来防止重复发送的数据被重复处理,在Storm中使用Trident API实现

 

 

数据流分组

定义数据流数据流应该发送到那些bolt中。数据流分组就是将数据流进行分组,按需要进入不同的bolt中。可以使用Storm提供的分组规则,也可以实现backtype.storm.grouping.CustomStreamGrouping自定义分组规则。Storm定义了8种内置的数据流分组方法:

  1. Shuffle grouping(随机分组):随机分发tuple给bolt的各个task,每个bolt实例接收到相同数量的tuple;

  2. Fields grouping(按字段分组):根据指定字段的值进行分组。比如,一个数据流按照"user-id"分组,所有具有相同"user-id"的tuple将被路由到同一bolt的task中,不同"user-id"可能路由到不同bolt的task中;

  3. Partial Key grouping(部分key分组):数据流根据field进行分组,类似于按字段分组,但是将在两个下游bolt之间进行均衡负载,当资源发生倾斜的时候能够更有效率的使用资源。The Power of Both Choices: Practical Load Balancing for Distributed Stream Processing Engines提供了更加详细的说明;

  4. All grouping(全复制分组):将所有tuple复制后分发给所有bolt的task。小心使用。

  5. Global grouping(全局分组):将所有的tuple路由到唯一一个task上。Storm按照最小的task ID来选取接收数据的task;(注意,当时用全局分组是,设置bolt的task并发是没有意义的,因为所有tuple都转发到一个task上。同时需要注意的是,所有tuple转发到一个jvm实例上,可能会引起storm集群某个jvm或服务器出现性能瓶颈或崩溃)

  6. None grouping(不分组):这种分组方式指明不需要关心分组方式。实际上,不分组功能与随机分组相同。预留功能。

  7. Direct grouping(指向型分组):数据源会调用emitDirect来判断一个tuple应该由哪个storm组件接收,只能在声明了指向型的数据流上使用。

  8. Local or shuffle grouping(本地或随机分组):当同一个worker进程中有目标bolt,将把数据发送到这些bolt中。否则,功能将与随机分组相同。该方法取决与topology的并发度,本地或随机分组可以减少网络传输,降低IO,提高topology性能。


 

 

 

参考文章

Storm--实时数据处理框架

Storm 的可靠性保证测试

Storm--并行度

Storm--故障容忍和消息可靠性

Storm中-Worker Executor Task的关系

Storm的消息保证机制

Storm详解

 

Related Posts:

  1. Apache Storm 官方文档 —— Trident Spouts

  2. Apache Storm 官方文档 —— Trident API 概述

  3. Apache Storm 官方文档 —— Trident 教程

  4. Apache Storm 官方文档 —— FAQ

  5. Apache Storm 官方文档 —— 分布式 RPC

  6. Apache Storm 官方文档 —— 定义 Storm 的非 JVM 语言 DSL

  7. Apache Storm 官方文档 —— 常用模式

  8. Apache Storm 官方文档 —— 理解 Storm 拓扑的并行度(parallelism)概念

  9. Apache Storm 官方文档 —— Ack 框架的实现

  10. Apache Storm 官方文档 —— 多语言接口协议

  11. Apache Storm 官方文档 —— 消息的可靠性保障

  12. Apache Storm 官方文档 —— Storm 与 Kestrel

  13. Apache Storm 官方文档 —— 源码组织结构

  14. Apache Storm 官方文档 —— 配置

  15. Apache Storm 官方文档 —— 基础概念