Storm 性能优化

目录


  1. 场景假设
  2. 调优步骤和方法
  3. Storm 的部分特性
  4. Storm 并行度
  5. Storm 消息机制
  6. Storm UI 解析
  7. 性能优化

场景假设


在介绍 Storm 的性能调优方法以前,假设一个场景:
项目组部署了3台机器,计划运行且仅运行 Storm(1.0.1) + Kafka(0.9.0.1) + Redis(3.2.1) 的小规模实验集群,集群的配置状况以下表:html

主机名 硬件配置 角色描述
hd01 2CPUs, 4G RAM, 2TB 机械硬盘 nimbus, supervisor, ui, kafka, zk
hd02 2CPUs, 4G RAM, 2TB 机械硬盘 supervisor, kafka, zk
hd03 2CPUs, 4G RAM, 2TB 机械硬盘 supervisor, kafka, zk

现有一个任务,须要实时计算订单的各项汇总统计信息。订单数据经过 kafka 传输。在 Storm 中建立了一个 topology 来执行此项任务,并采用 Storm kafkaSpout 读取该 topic 的数据。kafka 和 Storm topology 的基本信息以下:java

  • kafka topic partitions = 3
  • topology 的配置状况:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", new kafkaSpout(), 3);
builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout");
builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order"));

Config conf = new Config();
conf.setNumWorkers(2);
StormSubmitter.submitTopologyWithProgressBar("topology-name", conf, builder.createTopology());

那么,在此假设下,Storm topology 的数据怎么分发?性能如何调优?这就是下文要讨论的内容,其中性能调优是最终目的,数据分发即 Storm 的消息机制,则是进行调优前的知识储备。mysql

调优步骤和方法


Storm topology 的性能优化方法,总体来讲,可依次划分为如下几个步骤:linux

  1. 硬件配置的优化
  2. 代码层面的优化
  3. topology 并行度的优化
  4. Storm 集群配置参数和 topology 运行参数的优化

其中第一点不是讨论的重点,无外乎增长机器的硬件资源,提升机器的硬件配置等,可是这一步却也不能忽略,由于机器配置过低,极可能后面的步骤怎么调优都无济于事。redis

Storm 的一些特性和原理,是进行调优的必要知识储备算法

Storm 的部分特性

目前 Storm 的最新版本为 2.0.0-SNAPSHOT。该版本太新,未通过大量验证和测试,所以本文的讨论都基于 2.0 之前的版本。Storm 有以下几个重要的特性:sql

  • DAG
  • 常驻内存,topology 启动后除非 kill 掉不然一直运行
  • 提供 DRPC 服务
  • Pacemaker(1.0之后的新特性)心跳守护进程,常驻内存,比ZooKeeper性能更好
  • 采用了 ZeroMQ 和 Netty 做为底层框架
  • 采用了 ACK/fail 方式的 tuple 追踪机制
    而且 ack/fail 只能由建立该tuple的task所承载的spout触发

了解这些机制对优化 Storm 的运行性能有必定帮助数据库

Storm 并行度


Storm 是一个分布式的实时计算软件,各节点,各组件间的通讯依赖于 zookeeper。从组件的角度看,Storm 运做机制构建在 nimbus, supervisor, woker, executor, task, spout/bolt 之上,若是再加上 topology,有时也能够称这些组件为 Concepts(概念)。详见官网介绍文章 http://storm.apache.org/releases/2.0.0-SNAPSHOT/Concepts.html 和
http://storm.apache.org/releases/current/Tutorial.htmlapache

在介绍 Storm 并行度以前,先归纳地了解下 Storm 的几个概念,此处假定读者有必定的 Storm 背景知识,至少曾经跑过一个 topology 实例。api

nimbus 和 supervisor

nimbus 是 Storm 集群的管理和调度进程,一个集群启动一个 nimbus,主要用于管理 topology,执行rebalance,管理 supervisor,分发 task,监控集群的健康情况等。nimbus 依赖 zookeeper 来实现上述职责,nimbus 与 supervisor 等其余组件并无直接的沟通。运行 nimbus 的节点成为主节点,运行 supervisor 的节点成为工做节点,nimbus 向 supervisor 分派任务,所以 Storm 集群也是一个 master/slave 集群。简单来讲,nimbus 就是工头,supervisor 就是工人,nimbus 经过 zookeeper 来管理 supervisor。

supervisor 是一个工做进程,负责监听 nimbus 分派的任务。当它接到任务后,会启动一个 worker 进程,由 worker 运行 topology 的一个子集。为何说是子集呢?由于当一个 topology 提交到集群后,nimbus 便会根据该 topology 的配置(此处假定 numWorker=3),将 topology 分配给3个 worker 并行执行(正常状况下是这样,也有不是均匀分配的,好比有一个 supervisor 节点内存不足了)。若是恰好集群有3个 supervisor,则每一个 supervisor 会启动1个 worker,即一个节点启动一个 worker(一个节点只能有一个 supervisor 有效运行)。所以,worker 进程运行的是 topology 的一个子集。supervisor 一样经过 zookeeper 与 nimbus 进行交流,所以 nimbus 和 supervisor 均可以快速失败/中止,由于全部的状态信息都保存在本地文件系统的 zookeeper 中, 当失败中止运行后,只须要从新启动 nimbus 或 supervisor 进程以快速恢复。固然,若是集群中正在工做的 supervisor 中止了,其上运行着的 topology 子集也会跟着中止,不过一旦 supervisor 启动起来,topology 子集又马上恢复正常了。


nimbus 和 supervisor 的协做关系
worker

worker 是一个JVM进程,由 supervisor 启动和关闭。当 supervisor 接到任务后,会根据 topology 的配置启动若干 worker,实际的任务执行便由 worker 进行。worker 进程会占用固定的可由配置进行修改的内存空间(默认768M)。一般使用 conf.setNumWorkers() 函数来指定一个 topolgoy 的 worker 数量。

executor

executor 是一个线程,由 worker 进程派生(spawned)。executor 线程负责根据配置派生 task 线程,默认一个 executor 建立一个 task,可经过 setNumTask() 函数指定每一个 executor 的 task 数量。executor 将实例化后的 spout/bolt 传递给 task。

task

task 能够说是 topology 最终的实际的任务执行者,每一个 task 承载一个 spout 或 bolt 的实例,并调用其中的 spout.nexTuple(),bolt.execute() 等方法,而 spout.nexTuple() 是数据的发射器,bolt.execute() 则是数据的接收方,业务逻辑的代码基本上都在这两个函数里面处理了,所以能够说 task 是最终搬砖的苦逼。

topology

topology 中文翻译为拓扑,相似于 hdfs 上的一个 mapreduce 任务。一个 topology 定义了运行一个 Storm 任务的全部必要元件,主要包括 spout 和 bolt,以及 spout 和 bolt 之间的流向关系。


topology 结构
并行度

什么是并行度?在 Storm 的设定里,并行度大致分为3个方面:

  1. 一个 topology 指定多少个 worker 进程并行运行;
  2. 一个 worker 进程指定多少个 executor 线程并行运行;
  3. 一个 executor 线程指定多少个 task 并行运行。

通常来讲,并行度设置越高,topology 运行的效率就越高,可是也不能一股脑地给出一个很高的值,还得考虑给每一个 worker 分配的内存的大小,还得平衡系统的硬件资源,以免浪费。
Storm 集群能够运行一个或多个 topology,而每一个 topology 包含一个或多个 worker 进程,每一个 worer 进程可派生一个或多个 executor 线程,而每一个 executor 线程则派生一个或多个 task,task 是实际的数据处理单元,也是 Storm 概念里最小的工做单元, spout 或 bolt 的实例即是由 task 承载。


worker executor task 的关系

为了更好地解释 worker、executor 和 task 之间的工做机制,咱们用官网的一个简单 topology 示例来介绍。先看此 topology 的配置:

Config conf = new Config();
conf.setNumWorkers(2); // 为此 topology 配置两个 worker 进程

topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // blue-spout 并行度=2

topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2) // green-bolt 并行度=2
               .setNumTasks(4)  // 为此 green-bolt 配置 4 个 task
               .shuffleGrouping("blue-spout");

topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)  // yellow-bolt 并行度=6
               .shuffleGrouping("green-bolt");

StormSubmitter.submitTopology(
    "mytopology",
    conf,
    topologyBuilder.createTopology()
);

从上面的代码能够知道:

  • 这个 topology 装备了2个 worker 进程,也就是一样的工做会有 2 个进程并行进行,能够确定地说,2个 worker 确定比1个 worker 执行效率要高不少,可是并无2倍的差距;
  • 配置了一个 blue-spout,而且为其指定了 2 个 executor,即并行度为2;
  • 配置了一个 green-bolt,而且为其指定了 2 个 executor,即并行度为2;
  • 配置了一个 yellow-bolt,而且为其指定了 6 个 executor,即并行度为6;

你们看官方给出的下图:


一个 topology 的结构图示

能够看出,这个图片完好无损地还原了代码里设定的 topology 结构:

  • 图左最大的灰色方框,表示这个 topology;
  • topology 里面恰好有两个白色方框,表示2个 worker 进程;
  • 每一个 worker 里面的灰色方框表示 executor 线程,能够看到2个 worker 方框里各有5个 executor,为何呢?由于代码里面指定的 spout 并行度=2,green-bolt并行度=2,yellow-bolt并行度=6,加起来恰好是10,而配置的 worker 数量为2,那么天然地,这10个 executor 会均匀地分配到2个 worker 里面;
  • 每一个 executor 里面的黄蓝绿(写着Task)的方框,就是最小的处理单元 task 了。你们仔细看绿色的 Task 方框,与其余 Task 不一样的是,两个绿色方框同时出如今一个 executor 方框内。为何会这样呢?你们回到上文看 topology 的定义代码,topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2).setNumTasks(4),这里面的 setNumTasks(4) 表示为该 green-bolt 指定了4个 task,且 executor 的并行度为2,那么天然地,这4个 task 会均匀地分配到2个 executor 里面;
  • 图右的三个圆圈,依次是蓝色的 blue-spout,绿色的 green-bolt 和黄色的 yellow-bolt,而且用箭头指示了三个组件之间的关系。spout 是数据的产生元件,而 green-bolt 则是数据的中间接收节点,yellow-bolt 则是数据的最后接收节点。这也是 DAG 的体现,有向的(箭头不能往回走)无环图。

参考
http://storm.apache.org/releases/1.0.1/Understanding-the-parallelism-of-a-Storm-topology.html
http://www.michael-noll.com/blog/2012/10/16/understanding-the-parallelism-of-a-storm-topology/

一个 topology 的代码较完整例子
TopologyBuilder builder = new TopologyBuilder();

BrokerHosts hosts       = new ZkHosts(zkConns);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, zkRoot, clintId);
spoutConfig.scheme      = new SchemeAsMultiScheme(new StringScheme());

/** 指示 kafkaSpout 从 kafka topic 最后记录的 ofsset 开始读取数据 */
spoutConfig.startOffsetTime = kafka.api.OffsetRequest.LatestTime();

KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
builder.setSpout("kafkaSpout", kafkaSpout, 3); // spout 并行度=3
builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout"); // FilterBolt 并行度=3
builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order")); // AlertBolt 并行度=3

Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(3); // 为此 topology 配置3个 worker 进程
conf.setMaxSpoutPending(10000);

try {
    StormSubmitter.submitTopologyWithProgressBar(topology, conf, builder.createTopology());
} catch (Exception e) {
    e.printStackTrace();
}

Storm 消息机制


Storm 主要提供了两种消息保证机制(Message Processing Guarantee)

  • 至少一次 At least once
  • 仅且一次 exactly once

其中 exactly once 是经过 Trident 方式实现的(exactly once through Trident)。两种模式的选择要视业务状况而定,有些场景要求精确的仅且一次消费,好比订单处理,决不能容许重复的处理订单,由于极可能会致使订单金额、交易手数等计算错误;有些场景容许必定的重复,好比页面点击统计,访客统计等。总之,无论何种模式,Storm 都能保证数据不会丢失,开发者须要关心的是,如何保证数据不会重复消费。

At least once 的消息处理机制,在运用时须要格外当心,Storm 采用 ack/fail 机制来追踪消息的流向,当一个消息(tuple)发送到下游时,若是超时未通知 spout,或者发送失败,Storm 默认会根据配置策略进行重发,可经过调节重发策略来尽可能减小消息的重复发送。一个常见状况是,Storm 集群常常会超负载运行,致使下游的 bolt 未能及时 ack,从而致使 spout 不断的重发一个 tuple,进而致使消息大量的重复消费。

在与 Kafka 集成时,经常使用 Storm 提供的 kafkaSpout 做为 spout 消费 kafka 中的消息。Storm 提供的 kafkaSpout 默认有两种实现方式:至少一次消费的 core Storm spouts 和仅且一次消费的 Trident spouts :(We support both Trident and core Storm spouts)。

在 Storm 里面,消息的处理,经过两个组件进行:spout 和 bolt。其中 spout 负责产生数据,bolt 负责接收并处理数据,业务逻辑代码通常都写入 bolt 中。能够定义多个 bolt ,bolt 与 bolt 之间能够指定单向连接关系。一般的做法是,在 spout 里面读取诸如 kafka,mysql,redis,elasticsearch 等数据源的数据,并发射(emit)给下游的 bolt,定义多个 bolt,分别进行多个不一样阶段的数据处理,好比第一个 bolt 负责过滤清洗数据,第二个 bolt 负责逻辑计算,并产生最终运算结果,写入 redis,mysql,hdfs 等目标源。

Storm 将消息封装在一个 Tuple 对象里,Tuple 对象经由 spout 产生后经过 emit() 方法发送给下游 bolt,下游的全部 bolt 也一样经过 emit() 方法将 tuple 传递下去。一个 tuple 多是一行 mysql 记录,也多是一行文件内容,具体视 spout 如何读入数据源,并如何发射给下游。

以下图,是一个 spout/bolt 的执行过程:


spout/bolt 的执行过程

spout -> open(pending状态) -> nextTuple -> emit -> bolt -> execute -> ack(spout) / fail(spout) -> message-provider 将该消息移除队列(complete) / 将消息从新压回队列

ACK/Fail

上文说到,Storm 保证了数据不会丢失,ack/fail 机制即是实现此机制的法宝。Storm 在内部构建了一个 tuple tree 来表示每个 tuple 的流向,当一个 tuple 被 spout 发射给下游 bolt 时,默认会带上一个 messageId,能够由代码指定但默认是自动生成的,当下游的 bolt 成功处理 tuple 后,会经过 acker 进程通知 spout 调用 ack 方法,当处理超时或处理失败,则会调用 fail 方法。当 fail 方法被调用,消息可能被重发,具体取决于重发策略的配置,和所使用的 spout。

对于一个消息,Storm 提出了『彻底处理』的概念。即一个消息是否被彻底处理,取决于这个消息是否被 tuple tree 里的每个 bolt 彻底处理,当 tuple tree 中的全部 bolt 都彻底处理了这条消息后,才会通知 acker 进程并调用该消息的原始发射 spout 的 ack 方法,不然会调用 fail 方法。

ack/fail 只能由建立该 tuple 的 task 所承载的 spout 触发

默认状况下,Storm 会在每一个 worker 进程里面启动1个 acker 线程,觉得 spout/bolt 提供 ack/fail 服务,该线程一般不太耗费资源,所以也无须配置过多,大多数状况下1个就足够了。


ack/fail 示意
Worker 间通讯

上文所说是在一个 worker 内的状况,可是 Storm 是一个分布式的并行计算框架,而实现并行的一个关键方式,即是一个 topology 能够由多个 worker 进程分布在多个 supervisor 节点并行地执行。那么,多个 worker 之间必然是会有通讯机制的。nimbus 和 supervsor 之间仅靠 zookeeper 进行沟通,那么为什么 worker 之间不经过 zookeeper 之类的中间件进行沟通呢?其中的一个缘由我想,应该是组件隔离的原则。worker 是 supervisor 管理下的一个进程,那么 worker 若是也采用 zookeeper 进行沟通,那么就有一种越级操做的嫌疑了。


Worker 间通讯

你们看上图,一个 worker 进程装配了以下几个元件:

  • 一个 receive 线程,该线程维护了一个 ArrayList,负责接收其余 worker 的 sent 线程发送过来的数据,并将数据存储到 ArrayList 中。数据首先存入 receive 线程的一个缓冲区,可经过 topology.receiver.buffer.size (此项配置在 Storm 1.0 版本之后被删除了)来配置该缓冲区存储消息的最大数量,默认为8(个数,而且得是2的倍数),而后才被推送到 ArrayList 中。receive 线程接收数据,是经过监听 TCP的端口,该端口有 storm 配置文件中 supervisor.slots.prots 来配置,好比 6700;
  • 一个 sent 线程,该线程维护了一个消息队列,负责将队里中的消息发送给其余 worker 的 receive 线程。一样具备缓冲区,可经过 topology.transfer.buffer.size 来配置缓冲区存储消息的最大数量,默认为1024(个数,而且得是2的倍数)。当消息达到此阈值时,便会被发送到 receive 线程中。sent 线程发送数据,是经过一个随机分配的TCP端口来进行的。
  • 一个或多个 executor 线程。executor 内部一样拥有一个 receive buffer 和一个 sent buffer,其中 receive buffer 接收来自 receive 线程的的数据,sent buffer 向 sent 线程发送数据;而 task 线程则介于 receive buffer 和 sent buffer 之间。receive buffer 的大小可经过 Conf.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE 参数配置,sent buffer 的大小可经过 Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE 配置,两个参数默认都是 1024(个数,而且得是2的倍数)。
Config conf = new Config(); conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 16); // 默认8 conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

参考
http://storm.apache.org/releases/1.0.1/Guaranteeing-message-processing.html
http://www.michael-noll.com/blog/2013/06/21/understanding-storm-internal-message-buffers/

Storm UI 解析


首页
  • Cluster Summary

    Cluster Summary
参数名 说明
Supervisors 集群中配置的 supervisor 数量
Used slots 集群中已用掉的 workers 数量
Free slots 集群中空闲的 workers 数量
Total slots 集群中总的的 workers 数量
Executors 当前集群中总的 Executor 线程数量,该值等于集群中全部 topology 的全部 spout/bolt 配置的 executor 数量之和,其中默认状况下每一个 worker 进程还会派生一个 acker executor 线程,也一并计算在内了
Tasks 当前集群中总的 task 数量,也是全部 executor 派生的 task 数量之和
  • Nimbus Summary
    比较简单,就略过了

  • Topology Summary


    Topology Summary

这部分也比较简单,值得注意的是 Assigned Mem (MB),这里值得是分配给该 topolgoy 下全部 worker 工做内存之和,单个 worker 的内存配置可由 Config.WORKER_HEAP_MEMORY_MB 和 Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB 指定,默认为 768M,另外再加上默认 64M 的 logwritter 进程内存空间,则有 832M。
此处 fast-pay 的值为 2496M = 3*832

  • Supervisor Summary

    Supervisor Summary

此处也比较简单,值得注意的是 slot 和 used slot 分别表示当前节点总的可用 worker 数,及已用掉的 worker 数。

  • Nimbus Configuration

    Nimbus Configuration

可搜索和查看当前 topology 的各项配置参数

topology 页面
  • Topology summary

    Topology summary

此处的大部分配置与上文中出现的意义同样,值得注意的是
Num executors 和 Num tasks 的值。其中 Num executors 的数量等于当前 topology 下全部 spout/bolt 的并行度总和,再加上全部 worker 下的 acker executor 线程总数(默认状况下一个 worker 派生一个 acker executor)。

  • Topology actions

    Topology actions
按钮 说明
Activate 激活此 topology
Deactivate 暂停此 topology 运行
Rebalance 调整并行度并从新平衡资源
Kill 关闭并删除此 topology
Debug 调试此 topology 运行,须要设置 topology.eventlogger.executors 数量 > 0
Stop Debug 中止调试
Change Log Level 调整日志级别
  • Topology stats

    Topology stats
参数 说明
Window 时间窗口,好比"10m 0s"表示在topology启动后10m 0s以内
Emitted 此时间窗口内发射的总tuple数
Transferred 此时间窗口内成功转移到下一个bolt的tuple数
Complete latency (ms) 此时间窗口内每一个tuple在tuple tree中彻底处理所花费的平均时间
Acked 此时间窗口内成功处理的tuple数
Failed 此时间窗口内处理失败或超时的tuple数
  • Spouts (All time)

    Spouts (All time)
参数 说明
Id topologoy 中 spout 的名称,通常是在代码里设置的
Executors 当前分配给此 spout 的 executor 线程总数
Tasks 当前分配给此 spout 的 task 线程总数
Emitted 截止当前发射的总tuple数
Transferred 截止当前成功转移到下一个bolt的tuple数
Complete latency (ms) 截止当前每一个tuple在tuple tree中彻底处理所花费的平均时间
Acked 截止当前成功处理的tuple数
Failed 截止当前处理失败或超时的tuple数
  • Bolts (All time)

    Bolts (All time)
参数 说明
Id topologoy 中 bolt 的名称,通常是在代码里设置的
Executors 当前分配给此 bolt 的 executor 线程总数
Tasks 当前分配给此 bolt 的 task 线程总数
Emitted 截止当前发射的总tuple数
Transferred 截止当前成功转移到下一个bolt的tuple数
Complete latency (ms) 截止当前每一个tuple在tuple tree中彻底处理所花费的平均时间
Capacity (last 10m) 性能指标,取值越小越好,当接近1的时候,说明负载很严重,须要增长并行度,正常是在 0.0x 到 0.1 0.2 左右。该值计算方式为 (number executed * average execute latency) / measurement time
Execute latency (ms) 截止当前成功处理的tuple数
Executed 截止当前处理过的tuple数
Process latency (ms) 截止当前单个 tuple 的平均处理时间,越小越好,正常也是 0.0x 级别;若是很大,能够考虑增长并行度,但主要以 Capacity 为准
Acked 截止当前成功处理的tuple数
Failed 截止当前处理失败或超时的tuple数
spout 页面

这个页面,大部分都比较简单,就不一一说明了,值得注意的是下面这个 Tab:

  • Executors (All time)

    Executors (All time)

这个Tab的参数,应该不用解释了,可是要注意看,Emitted,Transferred 和 Acked 这几个参数,看看是否全部的 executor 都均匀地分担了 tuple 的处理工做。

bolt 页面

这个页面与 spout 页面相似,也不赘述了。

参考:这个页面经过 API 的方式,对 UI 界面的参数作了一些解释
http://storm.apache.org/releases/1.0.1/STORM-UI-REST-API.html

Storm debug


Storm 提供了良好的 debug 措施,许多操做能够再 UI 上完成,也能够在命令行完成。好比 Change log level 在不重启 topology 的状况下动态修改日志记录的级别,在 UI 界面上查看某个 bolt 的日志等,固然也能够在命令行上操做。

# 几个与 debug 相关的命令 bin/storm set_log_level [topology name]-l [logger name]=[LEVEL]:[TIMEOUT] bin/storm logviewer &

下面的参考文章写的很详细,你们有兴趣能够去阅读一下,本文就再也不讨论了。

参考
https://community.hortonworks.com/articles/36151/debugging-an-apache-storm-topology.html

性能调优


上文说了这么多,这才进入主题。

一、合理地配置硬件资源

此处暂不讨论

二、优化代码的执行性能

要优化代码的性能,若是严谨一点,首先要有一个衡量代码执行效率的方式。在数学上,一般使用大O函数来衡量一个算法的时间复杂度。咱们能够考虑使用大O函数来近似地估计一个代码片断的执行时间:假定一行代码花费1个单位时间,那么代码片断的时间复杂度能够近似地用大O表示为 O(n),其中n表示代码的行数或执行次数。固然,若是代码里引入了其余的类和函数,或者处于循环体内,那么其余类、函数的代码行数,以及循环体内代码的重复执行次数也须要统计在内。这里说到大O函数的概念,在实际中也不多用到,咱们每每会用第三方工具来较为准确地计算代码的实际执行时间,可是理解这个概念有助于优化咱们的代码。有兴趣的同窗能够阅读《算法概论》这本书。

这里顺便举一个斐波那契数列的例子:

/** C代码:用递归实现的斐波那契数列 */
int fibonacci(unsigned int n)
{
    if (n <= 0) return 0;
    if (n == 1) return 1;
    return fibonacci(n - 1) + fibonacci(n - 2);
}
/** C代码:用循环实现的斐波那契数列 */
int fibonacci(unsigned int n)
{
    int r, a, b;
    int i;
    int result[2] = {0, 1};

    if (n < 2) return result[n];

    a = 0;
    b = 1;
    r = 0;
    for (i = 2; i <= n; i++)
    {
        r = a + b;
        a = b;
        b = r;
    }
    return r;
}

观看两个不一样实现的例子,第一种递归的方式,当传入的n很大时,代码执行的时间将会呈指数级增加,这时T(n)接近于 O(2^n);第二种循环的方式,即便传入很大的n,代码也能够在较短的时间内执行完毕,这时T(n)接近于O(n),为何是O(n)呢,好比说n=1000,那么整个算法的执行时间基本集中在那个 for 循环里了,至关于执行了 for 循环内3行代码1000屡次,因此差很少是n。这其实就是一种用空间换时间的概念,利用循环代替递归的方式,从而大大地优化了代码的执行效率。

回到咱们的 Storm。代码优化,归结起来,应该有这几种:

  • 在算法层面进行优化
  • 在业务逻辑层面进行优化
  • 在技术层面进行优化
  • 特定于 Storm,合理地规划 topology,即安排多少个 bolt,每一个 bolt 作什么,连接关系如何

在技术层面进行优化,手法就很是多了,好比链接数据库时,运用链接池,经常使用的链接池有 alibaba 的 druid,还有 redis 的链接池;好比合理地使用多线程,合理地优化JVM参数等等。这里举一个工做中可能会遇到的例子来介绍一下:

在配置了多个并行度的 bolt 中,存取 redis 数据时,若是不使用 redis 线程池,那么极可能会遇到 topology 运行缓慢,spout 不断重发,甚至直接挂掉的状况。首先 redis 的单个实例并非线程安全的,其次在不使用 redis-pool 的状况下,每次读取 redis 都建立一个 redis 链接,同建立一个 mysql 链接同样,在链接 redis 时所耗费的时间相较于 get/set 自己是很是巨大的。

/**
 * redis-cli 操做工具类
 */
package net.mtide.dbtool;

import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;

public class RedisCli {

    private static JedisPool pool = null;

    private final static Logger logger = LoggerFactory.getLogger(FilterBolt.class);

    /**
     * 同步初始化 JedisPool
     */
    private static synchronized void initPool() {
        if (pool == null) {
            String hosts = "HOST";
            String port  = "PORT";
            String pass  = "PASS";
            pool = new JedisPool(new JedisPoolConfig(), hosts, Integer.parseInt(port), 2000, pass);
        }
    }

    /**
     * 将链接 put back to pool
     * 
     * @param jedis
     */
    private static void returnResource(final Jedis jedis) {
        if (pool != null && jedis != null) {
            pool.returnResource(jedis);
        }
    }

    /**
     * 同步获取 Jedis 实例
     * 
     * @return Jedis
     */
    public synchronized static Jedis getJedis() {
        if (pool == null) {
            initPool();
        }
        return pool.getResource();
    }

    public static void set(final String key, final String value) {
        Jedis jedis = getJedis();
        try {
            jedis.set(key, value);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }
    }

    public static void set(final String key, final String value, final int seconds) {
        Jedis jedis = getJedis();
        try {
            jedis.set(key, value);
            jedis.expire(key, seconds);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }
    }

    public static String get(final String key) {
        String value = null;

        Jedis jedis = getJedis();
        try {
            value = jedis.get(key);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }

        return value;
    }

    public static List<String> mget(final String... keys) {
        List<String> value = null;

        Jedis jedis = getJedis();
        try {
            value = jedis.mget(keys);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }

        return value;
    }

    public static Long del(final String key) {
        Long value = null;

        Jedis jedis = getJedis();
        try {
            value = jedis.del(key);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }

        return value;
    }

    public static Long expire(final String key, final int seconds) {
        Long value = null;

        Jedis jedis = getJedis();
        try {
            value = jedis.expire(key, seconds);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }

        return value;
    }

    public static Long incr(final String key) {
        Long value = null;

        Jedis jedis = getJedis();
        try {
            value = jedis.incr(key);
        }
        catch (Exception e) {
            logger.error(e.toString());
        }
        finally {
            returnResource(jedis);
        }

        return value;
    }
}
View Code

当一个配置了多个并行度的 topology 运行在集群上时,若是 redis 操做不当,极可能会形成运行该 redis 的 bolt 长时间阻塞,从而形成 tuple 传递超时,默认状况下 spout 在 fail 后会重发该 tuple,然而 redis 阻塞的问题没有解决,重发不只不能解决问题,反而会加剧集群的运行负担,那么 spout 重发愈来愈多,fail 的次数也愈来愈多, 最终致使数据重复消费愈来愈严重。上面贴出来的 RedisCli 工具类,能够在多线程的环境下安全的使用 redis,从而解决了阻塞的问题。

三、合理的配置并行度

有几个手段能够配置 topology 的并行度:

  • conf.setNumWorkers() 配置 worker 的数量
  • builder.setBolt("NAME", new Bolt(), 并行度) 设置 executor 数量
  • spout/bolt.setNumTask() 设置 spout/bolt 的 task 数量

如今回到咱们的一开始的场景假设:

项目组部署了3台机器,计划运行一个 Storm(1.0.1) + Kafka(0.9.0.1) + Redis(3.2.1) 的小规模实验集群,每台机器的配置为 2CPUs,4G RAM

/** 初始配置 */ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", new kafkaSpout(), 3); builder.setBolt("filter", new FilterBolt(), 3).shuffleGrouping("kafkaSpout"); builder.setBolt("alert", new AlertBolt(), 3).fieldsGrouping("filter", new Fields("order")); Config conf = new Config(); conf.setNumWorkers(2); StormSubmitter.submitTopologyWithProgressBar("topology-name", conf, builder.createTopology());

那么问题是:

  1. setNumWorkers 应该取多少?取决于哪些因素?
  2. kafkaSpout 的并行度应该取多少?取决于哪些因素?
  3. FilterBolt 的并行度应该取多少?取决于哪些因素?
  4. AlertBolt 的并行度应该取多少?取决于哪些因素?
  5. FilterBolt 用 shuffleGrouping 是最好的吗?
  6. AlertBolt 用 fieldsGrouping 是最好的吗?

回答以下:
第一个问题:关于 worker 的并行度:worker 能够分配到不一样的 supervisor 节点,这也是 Storm 实现多节点并行计算的主要配置手段。据此, workers 的数量,能够说是越多越好,但也不能形成浪费,并且也要看硬件资源是否足够。因此主要考虑集群各节点的内存状况:默认状况下,一个 worker 分配 768M 的内存,外加 64M 给 logwriter 进程;所以一个 worker 会耗费 832M 内存;题设的集群有3个节点,每一个节点4G内存,除去 linux 系统、kafka、zookeeper 等的消耗,保守估计仅有2G内存可用来运行 topology,由此可知,当集群只有一个 topology 在运行的状况下,最多能够配置6个 worker。
另外,咱们还能够调节 worker 的内存空间。这取决于流过 topology 的数据量的大小以及各 bolt 单元的业务代码的执行时间。若是数据量特别大,代码执行时间较长,那么能够考虑增长单个 worker 的工做内存。有一点须要注意的是,一个 worker 下的全部 executor 和 task 都是共享这个 worker 的内存的,也就是假如一个 worker 分配了 768M 内存,3个 executor,6个 task,那么这个 3 executor 和 6 task 实际上是共用这 768M 内存的,可是好处是能够充分利用多核 CPU 的运算性能。

总结起来,worker 的数量,取值因素有:

  • 节点数量,及其内存容量
  • 数据量的大小和代码执行时间

机器的CPU、带宽、磁盘性能等也会对 Storm 性能有影响,可是这些外在因素通常不影响 worker 数量的决策。

须要注意的是,Storm 在默认状况下,每一个 supervisor 节点只容许最多4个 worker(slot)进程运行;若是所配置的 worker 数量超过这个限制,则须要在 storm 配置文件中修改。

第二个问题:关于 FilterBolt 的并行度:若是 spout 读取的是 kafka 的数据,那么正常状况下,设置为 topic 的分区数量便可。计算 kafkaSpout 的最佳取值,有一个最简单的办法,就是在 Storm UI里面,点开 topology 的首页,在 Spouts (All time) 下,查看如下几个参数的值:

  • Emitted 已发射出去的tuple数
  • Transferred 已转移到下一个bolt的tuple数
  • Complete latency (ms) 每一个tuple在tuple tree中彻底处理所花费的平均时间
  • Acked 成功处理的tuple数
  • Failed 处理失败或超时的tuple数

Paste_Image.png

怎么看这几个参数呢?有几个技巧:

  • 正常状况下 Failed 值为0,若是不为0,考虑增长该 spout 的并行度。这是最重要的一个判断依据;
  • 正常状况下,Emitted、Transferred和Acked这三个值应该是相等或大体相等的,若是相差太远,要么该 spout 负载过重,要么下游负载太重,须要调节该 spout 的并行度,或下游 bolt 的并行度;
  • Complete latency (ms) 时间,若是很长,十秒以上就已经算很长的了。固然具体时间取决于代码逻辑,bolt 的结构,机器的性能等。

kafka 只能保证同一分区下消息的顺序性,当 spout 配置了多个 executor 的时候,不一样分区的消息会均匀的分发到不一样的 executor 上消费,那么消息的总体顺序性就难以保证了,除非将 spout 并行度设为 1

第三个问题:关于 FilterBolt 的并行度:其取值也有一个简单办法,就是在 Storm UI里面,点开 topology 的首页,在 Bolts (All time) 下,查看如下几个参数的值:

  • Capacity (last 10m) 取值越小越好,当接近1的时候,说明负载很严重,须要增长并行度,正常是在 0.0x 到 0.1 0.2 左右
  • Process latency (ms) 单个 tuple 的平均处理时间,越小越好,正常也是 0.0x 级别;若是很大,能够考虑增长并行度,但主要以 Capacity 为准

Paste_Image.png

通常状况下,按照该 bolt 的代码时间复杂度,设置一个 spout 并行度的 1-3倍便可。

第四个问题:AlertBolt 的并行度同 FilterBolt。

第五个问题:shuffleGrouping 会将 tuple 均匀地随机分发给下游 bolt,通常状况下用它就是最好的了。

总之,要找出并行度的最佳取值,主要结合 Storm UI 来作决策。

四、优化配置参数
/** tuple发送失败重试策略,通常状况下不须要调整 */ spoutConfig.retryInitialDelayMs = 0; spoutConfig.retryDelayMultiplier = 1.0; spoutConfig.retryDelayMaxMs = 60 * 1000; /** 此参数比较重要,可适当调大一点 */ /** 一般状况下 spout 的发射速度会快于下游的 bolt 的消费速度,当下游的 bolt 还有 TOPOLOGY_MAX_SPOUT_PENDING 个 tuple 没有消费完时,spout 会停下来等待,该配置做用于 spout 的每一个 task。 */ conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10000) /** 调整分配给每一个 worker 的内存,关于内存的调节,上文已有描述 */ conf.put(Config.WORKER_HEAP_MEMORY_MB, 768); conf.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, 768); /** 调整 worker 间通讯相关的缓冲参数,如下是一种推荐的配置 */ conf.put(Config.TOPOLOGY_RECEIVER_BUFFER_SIZE, 8); // 1.0 以上已移除 conf.put(Config.TOPOLOGY_TRANSFER_BUFFER_SIZE, 32); conf.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384); conf.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);

能够在 Storm UI 上查看当前集群的 Topology Configuration

五、rebalance

能够直接采用 rebalance 命令(也能够在 Storm UI上操做)从新配置 topology 的并行度:

storm rebalance TOPOLOGY-NAME -n 5 -e SPOUT/BOLT1-NAME=3 -e SPOUT/BOLT2-NAME=10
相关文章
相关标签/搜索