Storm源码阅读总结(1) -- Client Nimbus Supervisor

Client

客户端提交做业

NimbusClient: RPC 客户端, 向RPC服务端即Nimbus Server发起RPC调用.node

App经过StormSubmitter提交计算拓扑做业submitTopology:
首先提交jar包, 会向Nimbus服务器发起beginFileUpload, 申请到要上传的路径后, 调用uploadChunk开始将jar包上传到nimbus服务器
文件传输完毕, 调用finishFileUpload结束上传jar包过程. 最后会调用NimbusClient的submitTopology算法

Nimbus启动初始化

Nimbus启动时, 建立匿名的INimbus内部类.
初始时会根据系统中的supervisors建立可用的WorkSlot空闲槽位. --> allSlotsAvailableForScheduling服务器

在一个机器上一个supervisor能够配置多个端口, 每一个端口都会对应一个工做者线程, 即建立对应数量的WorkSlot.
Nimbus要分配任务给Supervisor上的Worker进行工做, 而每一个Supervisor会有多个worker.架构

Thrift RPC步骤

编写thrift文件
自动生成接口相关的类, 好比Iface, Processor, Client内部类
自定义Handler实现Iface业务逻辑, 对应thrift文件中的方法
服务端使用ServerTransport和Handler建立的Processor建立出Server并启动服务器
客户端使用Transport链接服务端, 使用Protocol协议构造出Client代理类, 而后调用接口的方法, 完成RPC调用负载均衡

Nimbus

Thrift RPC Server

启动Nimbus会启动它的thrift服务. Nimbus会做为Thrift RPC协议的服务端, 处理客户端发起的RPC调用请求.
nimbus的thrift服务的实现类定义在storm.thrift文件中, 对应的实现方法是service-handler[服务处理器].dom

Nimbus与ZooKeeper

Nimbus启动时会建立调度器, 它为集群中的须要调度的计算拓扑分配任务. 新的任务能够经过cluster.getAssignments()获取.异步

nimbus在启动时会链接ZooKeeper, 并在zookeeper中建立节点
以便在做业运行过程当中将Storm的全部的状态信息都是保存在Zookeeper里面.分布式

nimbus经过在zookeeper上面写状态信息来分配任务,
supervisor,task经过从zookeeper中读状态来领取任务,
同时supervisor, task也会定义发送心跳信息到zookeeper,
使得nimbus能够监控整个storm集群的状态, 从而能够重启一些挂掉的task函数

注意Nimbus和Supervisor之间没有直接交互, 状态都是保存在Zookeeper上ui

问题1: storm的jar包是上传到zookeeper上,仍是nimbus, supervisor上?
答: jar包上传到nimbus上, 在zookeeper的assignments中只是记录了topology做业在nimbus上的代码目录.

watcher和callback

storm使用curator framework建立districtbuted-cluster-state[分布式的集群状态].
建立客户端时指定watcher函数, 当节点状态发生改变时, 会触发curator上注册的监听器, 回调watcher方法.

注册的watcher使用callbacks. ClusterState自定义的register实现会将回调实现传入, 最终触发回调的调用.
callback和watcher都是function, 都经过回调的方式.

Storm的集群状态StormClusterState包括任务的分配assignments, supervisors, workers, hearbeat, errors等信息

Topology执行流程

定义在nimbus的service-handler的submitTopology里, 步骤包括:
上传topology代码
运行topology前的校验
在nimbus上创建topology的本地目录: setup-storm-code
创建Zookeeper heartbeats: setup-heartbeats!
启动storm: start-storm, 在zk上写入StormBase信息,会记录component和任务并行度的关系. 用于后面任务的分配.
分配任务: mk-assignments

Topology Supervisor Worker Executor Task
1. topology 物理上由一个nimbus机器和多台supervisor机器组成

  1. supervisor 能够配置多个slots. 每一个slots占用一个端口, 配置在storm.yaml的supervisor.slots.ports
    一个worker对应supervisor的一个端口. 所以一个supervisor配置了几个slots, 就对应有几个worker.

  2. topology 逻辑上由 spout 和 bolt 组成. spout和bolt统称为component
    spout和bolt均可以配置并行度, 这个并行度的数量就是component的executors的数量.

component若是没有设置task的数量, 默认一个executor运行一个task.
若是设置了task的数量(task数量要大于并行度的值), 则一个executor能够运行多个task.
固然由于executor是针对具体component的(即指定的spout或bolt), 因此executor里运行的多个task都是同一种component.

  1. executors 是topology中全部component每一个task起始和结束编号的序列: ([start-task end-task]), 可是没有对应的component信息.
    executors->component 是一个Map: {[start-task end-task] component-id}, 给上面的executors添加了component-id信息.
    topology->executors 也是一个Map: {storm-id [start-task end-task]}, 把executors做为topology-id的value

假设builder.setSpout("spout", new RandomSentenceSpout(), 5).setNumTasks(10);
则表示RandomSentenceSpout这个component有5个executors, 设置了10个任务tasks.
那么这个component的executors = ([1 2] [3 4] [5 6] [7 8] [9 10]).
对应的executors->component = {[1 2] "spout" [3 4] "spout" [5 6] "spout" [7 8] "spout" [9 10] "spout"}
注意: executors和executors->component的数据应该是包括了topology全部的component!
这里由于只列出一个component,因此只给出部分数据.

  1. executor是属于逻辑上的, 而任务是要运行在物理机器上的. 因此就涉及到逻辑上的executor运行在物理上的supervisor的问题.
    而咱们知道supervisor经过slots端口配置能够分红多个worker. worker由node-id+port组成. node-id实际上就是supervisor-id.
    所以一个worker能够运行多个executor. 每一个executor是运行在node-id + port上的.
    executor->node+port 的结构是: {executor [node port]}. 表示每一个executor运行在哪一个supervisor的哪一个端口上.
    经过这种方式能够将同一个component的多个executor分布在多个机器上执行
    topology->executor->node+port 的结构: {topology-id -> {executor [node port]}}

任务的分配

产生executor->node+port关系, 将executor分配到哪一个node的哪一个slot上

mk-assignments的第一步的流程: 准备阶段读取全部active topology的信息
1. storm-cluster-state.active-storms 获取集群中全部活动的topology-id. 对每一个topology-id作以下处理:
2. read-topology-details 根据topology-id读取每一个topology的详细信息, 并组合成{topology-id TopologyDetails}
2.1 storm-cluster-state.storm-base 读取/storm/storms/topology-id节点的数据为StormBase对象,得到保存在其中的numWorkers
2.2 读取storm-conf和storm-topology造成topology-conf和topology对象
2.3 executor->component 的映射关系: {ExecutorDetails component-id}
2.3.1 compute-executor->component 返回: {[start-task end-task] component-id}
2.3.1.1 component->executors 是component-id和executors并行度的数量的映射
2.3.1.2 storm-task-info 给每一个executor添加编号
2.3.1.3 compute-executors 列出全部executor的[start-task end-task]序列
2.3.1.4 通过join等一些列操做, 造成最后的{[start-task end-task] component-id}
2.3.2 将上面的[start-task end-task]封装成ExecutorDetails, 造成最终的{ExecutorDetails component-id}
2.4 将上面的storm-id, topology-conf, topology, numWorkers, executor->component构形成TopologyDetails
3. 将全部的{topology-id TopologyDetails} 封装成Topologies.

分配新任务

计算拓扑和compoent的并行度: topology->executors和topology->alive-executors
找出Supervisor中dead的WorkerSlot: supervisor->dead-ports, 用于获得Supervisor的信息SupervisorDetail
为alive-executors生成SchedulerAssignment: topology->scheduler-assignment
用supervisors信息和topology->scheduler-assignment会生成Cluster
Nimbus的调度器开始调度拓扑做业, 使用集群信息topologies和cluster开始调度做业
分配的任务会写到zk的assignments节点中, 下次调度时会根据已经分配的任务找出须要新分配的任务

计算拓扑和集群信息

Topologies包含当前集群里面运行的全部Topology的信息:StormTopology对象,配置信息,
以及从task到组件(bolt, spout)id的映射信息(executor->component-id)。

Cluster对象则包含了当前集群的全部状态信息:全部的supervisor信息,
系统全部Topology的task分配信息(topology->scheduler-assignment, executor->slot),

任务调度

nimbus的任务分配算法:
在slot充沛的状况下,可以保证全部topology的task被均匀的分配到整个机器的全部机器上
在slot不足的状况下,它会把topology的全部的task分配到仅有的slot上去,这时候其实不是理想状态,因此。。
在nimbus发现有多余slot的时候,它会从新分配topology的task分配到空余的slot上去以达到理想状态。
在没有slot的时候,它什么也不作

Nimbus的边城世界

nimbus中最重要的方法是mk-assignments分配任务. 它的调用有几个地方:

service-handler的定时线程schedule-recurring会定时调用.
service-handler的匿名内部类Nimbus$Iface的submitTopology第一次提交拓扑做业时
do-rebalance进行负载均衡时, 是由state-transitions来断定nimbus的状态达到rebalance状态时调用的.

第一次提交的topology做业, 就会马上调用mk-assignments由nimbus分配任务.
可是做业一提交并不必定可以分配到任务. 好比集群的计算资源很是紧张没有可用slot的状况下.
所以须要一个定时器定时调用mk-assignments方法, 在计算资源可用的状况下为没有分配的topology分配任务.
也有一种多是初次提交做业后, nimbus只为这个topology分配了一部分任务, 所以在下一轮回时还要继续为这个做业分配任务.
这种状况常见于: 一个很长的topology流计算做业, 前面的blot任务还没完成时, 若是为后面的bolt分配任务,
后面的blot任务就一直占用这计算资源, 与其这样, 还不如集中精力把计算资源都分配给前面的blot task.
或者是集群的资源比较紧张, 只够分配topology的一部分任务, 剩余的任务因为没有可用的空闲槽位须要等到下次才能申请到资源.

mk-assignments中最重要的方法当属compute-new-topology->executor->node+port
计算新的topology到executor, 再到executor所属的supervisor node+port的映射关系.

用户编写的topology做业是由自定义的spout和blot组成, 它们统称为component.
能够为component定义并行度, 这就是topology的executors. executors其实是自定义任务的封装.
集群的supervisor可配置多个port, 对应的是集群中的计算资源WorkerSlot.
任务是要运行在WorkerSlot上的. 因此分配任务就是要将executor分配到指定的WorkerSlot上.

因为上面集中做业的任务分配的不肯定性, 为topology分配完任务后, 须要记录已经分配了哪些任务,
这样下次的任务分配就不会为已经分配的任务再次申请计算资源了. nimbus将任务的分配写入到zk的assignments中.

任务分配后要进行调度才算真正的执行. 调度器的工做调用cluster.assign传入
做业topology, 计算资源worker-slot和表示任务的executors.
经过topology获得做业的SchedulerAssignment, 将slot和executors传给SchedulerAssignment的assign.
虽然SchedulerAssignmentImpl只是在内存中记录了ExecutorDetails和WorkerSlot的映射关系.
可是这对于任务的运行而言已经足够了, 由于任务编号记录在ExecutorDetails中, 任务执行的节点记录在WorkerSlot里.
接下来任务就能够真正跑在集群的计算资源里了.

Supervisor

物理上的Supervisor配置了多个端口, 对于集群而言工做节点就是计算资源. 一个端口是一个WorkerSlot.
Nimbus负责任务的调度分配, 将任务分配信息Assignment写到ZooKeeper中. Supervisor会负责读取ZK中的任务分配信息.

mk-synchronize-supervisor和synchronize-processes是zk的assignment节点发生变化触发执行的回调函数.
当有任务写到ZK中, Supervisor会调用回调函数同步topology代码到本地,
因为Supervisor负责启动Worker进程,也会同步Worker信息.
同步topology和worker信息的事件会放到一个队列线程里异步地执行.

同步supervisor会从nimbus下载topology代码到本地目录, 并持久化到LocalState状态信息里.
同步worker会找出新分配的worker-id, 负责启动Worker进程.

Nimbus是集群的总管, 只有一台. 而Supervisor监督者有多个, 对于Master-Slave的架构, Slave要按期发送心跳信息给Master. 一个监督者也有多个Worker, 因此Worker也要发送心跳信息给Supervisor. Supervisor和Worker的心跳信息都保存在ZK节点上. 可是注意supervisor从nimbus下载的topology信息和已经处理完成的worker信息是保存在supervisor的本地目录中.

相关文章
相关标签/搜索