JStorm源码分析系列--02--拓扑分配TopologyAssign

  写在前面的话,笔者第一次阅读框架源码,因此可能有些地方理解错误或者没有详细解释,若是在阅读过程发现错误很欢迎在文章下面评论指出。文章后续会陆续更新,能够关注或者收藏,转发请先私信我,谢谢。对了,笔者看的是2.2.1这个版本。上一篇博客,JStorm源码分析系列--01--Nimbus启动分析笔者讲解了Nimbus启动过程当中作的一些基本的操做,在initFollowerThread方法中,若是当前的Nimbus变成Leader以后,这个方法内会负责执行一些初始化init操做。下面就来说讲第一个初始化操做--拓扑分配。本文将详细(很是长,因此慢慢看)的讲解如何去为一个拓扑分配相应的资源。
  从方法initTopologyAssign开始,TopologyAssign是一个单例对象,在这个类的init方法内,作了简单的赋值操做以后,并初始化一个调度器实例对象以后,就创建一个守护线程,这个守护线程的目的是不断从TopologyAssign内部维护的一个阻塞队列中读取系统提交的拓扑任务,并调用相应的方法doTopologyAssignment进行分配操做。代码都比较简单,就不浪费版面去贴了。
  下面是doTopologyAssignment方法的源码,git

protected boolean doTopologyAssignment(TopologyAssignEvent event) {
        Assignment assignment;
        try {
            Assignment oldAssignment = null;
            boolean isReassign = event.isScratch();
            if (isReassign) {
                //若是存在旧的分配信息,须要先将旧的分配信息存储下来
                oldAssignment = nimbusData.getStormClusterState().assignment_info(event.getTopologyId(), null);
            }
            //调用方法执行新的分配
            assignment = mkAssignment(event);
            //将task添加到集群的metrics中
            pushTaskStartEvent(oldAssignment, assignment, event);

            if (!isReassign) {
                //若是是新建的拓扑,须要把拓扑设置为active状态
                setTopologyStatus(event);
            }
        } catch (Throwable e) {
            LOG.error("Failed to assign topology " + event.getTopologyId(), e);
            event.fail(e.getMessage());
            return false;
        }

        if (assignment != null)
            //将拓扑备份到ZK上
            backupAssignment(assignment, event);
        event.done();
        return true;
    }

  因此,最重要的方法仍是mkAssignment,这里执行了实际的分配操做。下面就来详细的介绍这个方法。github

prepareTopologyAssign

  prepareTopologyAssign这个方法整体的目的为了初始化拓扑分配的上下文信息,生成一个TopologyAssignContext的实例对象。这个上下文对象须要存下拓扑的不少关键信息,包括拓扑的组件信息(用StormTopology对象保存,下文在添加acker的时候会详细介绍这个类),拓扑的配置信息,拓扑上全部的task id,以及死掉的task id,unstopped task id(这里的解释是,那些supervisor死掉可是worker还继续运行的称为unstopworker,而包含在unstopworker内的task则称为unstoppedTask)。以及这个拓扑能分配到的worker,以上说起的这些信息都会在这个方法内慢慢的初始化。下面一步步来看吧。prepareTopologyAssign方法的源码比较长,一部分一部分来说解。编程

//建立一个上下文的实例对象
TopologyAssignContext ret = new TopologyAssignContext();

String topologyId = event.getTopologyId();
ret.setTopologyId(topologyId);

int topoMasterId = nimbusData.getTasksHeartbeat().get(topologyId).get_topologyMasterId();
ret.setTopologyMasterTaskId(topoMasterId);
LOG.info("prepareTopologyAssign, topoMasterId={}", topoMasterId);

Map<Object, Object> nimbusConf = nimbusData.getConf();
//根据拓扑id从nimbus上读取拓扑的配置信息
Map<Object, Object> topologyConf = StormConfig.read_nimbus_topology_conf(topologyId, nimbusData.getBlobStore());
//这里读取拓扑中各个组件的一个结构,后续会讲解这个类的组成
StormTopology rawTopology = StormConfig.read_nimbus_topology_code(topologyId, nimbusData.getBlobStore());
ret.setRawTopology(rawTopology);
//设置一些配置信息
Map stormConf = new HashMap();
stormConf.putAll(nimbusConf);
stormConf.putAll(topologyConf);
ret.setStormConf(stormConf);

  紧接着,根据目前集群的状态,初始化一份集群上全部的supervisor,并获取全部可用的workersegmentfault

StormClusterState stormClusterState = nimbusData.getStormClusterState();

// get all running supervisor, don't need callback to watch supervisor
Map<String, SupervisorInfo> supInfos = Cluster.get_all_SupervisorInfo(stormClusterState, null);
// init all AvailableWorkerPorts
for (Entry<String, SupervisorInfo> supInfo : supInfos.entrySet()) {
     SupervisorInfo supervisor = supInfo.getValue();
     if (supervisor != null)
        //设置所有的端口都为可用,后面经过HB去除掉那些已经被使用的worker
        //supervisor是一个k-v,k是supervisorid,v是保存实例信息
        supervisor.setAvailableWorkerPorts(supervisor.getWorkerPorts());
}
//这个方法就是利用HB去掉那些挂掉的supervisor
//判断的方法是获取每一个supervisor最近的HB时间,
//由当前时间减去最近HB时间和超时时间作对比。
getAliveSupervsByHb(supInfos, nimbusConf);

  接下来获取拓扑中定义的taskid对应上组件,这里要解释下,对于一个拓扑而言,taskid老是从1开始分配的,而且,相同的组件taskid是相邻的。好比你定义了一个SocketSpout(并行度5),一个PrintBolt(并行度4,那么SocketSpout的taskid多是1-5,PrintBolt的taskid多是6-9。负载均衡

//这个k-v,k是taskid,v是拓扑内定义的组件的id。
//写过应用的同窗都应该知道,TopologyBuilder在setSpout或者Bolt的时候,须要指定<组件id,对象,和并行度>。
//eg:builder.setSpout("integer", new ReceiverSpout(), 2);
Map<Integer, String> taskToComponent = Cluster.get_all_task_component(stormClusterState, topologyId, null);
ret.setTaskToComponent(taskToComponent);

//获取全部的taskid。
Set<Integer> allTaskIds = taskToComponent.keySet();
ret.setAllTaskIds(allTaskIds);

  若是原来存在旧的拓扑分配信息,还须要设置unstoppedTasks,deadTasks,unstoppedWorkers等信息。而后调用getFreeSlots方法负责去除那些已经分配出去的worker。处理过程比较直观,获取集群上全部的拓扑分配信息,而后根据每一个分配信息中保存的worker信息,从原先supInfos中移除那些被分配出去的worker。
  若是没有旧的分配信息,说明拓扑分配类型为ASSIGN_TYPE_NEW。若是存在同名的拓扑,也会把同名的拓扑设置旧的分配信息,放到上下文中。若是存在旧的分配信息,须要把旧的分配信息放入到上下文中,此外还要判断是ASSIGN_TYPE_REBALANCE仍是ASSIGN_TYPE_MONITOR,由于还须要设置unstoppedWorkers的信息。到这里,预分配,建立拓扑分配上下文就完成了。目前咱们带有比较重要的信息是拓扑全部的taskid,以及拓扑基本的组件信息。框架

集群assignTasks

  在完成拓扑上下文初始化以后,开始实际给拓扑分配相应的worker,不过这里须要判断是本地模式仍是集群模式,本地模式下比较简单,找个一个合适的端口,而后新建一个worker的资源对象ResourceWorkerSlot,将一些关键信息如hostname,port,allTaskId配置好。由于local模式下比较简单,因此,即便设置多个worker也不会启动多个jvm。而在集群模式下,一个worker表示的是一个jvm进程。下面就重点讲解集群下的分配状况。我把集群上的分配过程(assignTasks这个方法)分红三个主要的部分,分别是资源准备,worker分配,task分配。dom

Set<ResourceWorkerSlot> assignments = null;
if (!StormConfig.local_mode(nimbusData.getConf())) {
    IToplogyScheduler scheduler = schedulers.get(DEFAULT_SCHEDULER_NAME);
    //集群下的分配,见下文讲解
    assignments = scheduler.assignTasks(context);
} else {
    assignments = mkLocalAssignment(context);
}

资源准备

  首先第一步是判断拓扑分配的类型是否符合要求,不符合则抛出异常。紧接着,根据上一个方法生成的拓扑分配上下文来生成一个默认的拓扑分配上下文实例对象,DefaultTopologyAssignContext这个类的构造方法执行了不少很细节的操做。包括为拓扑添加附加的组件,存储下taskid和组件的对应信息,计算拓扑须要的worker数目,计算unstopworker的数目等。jvm

//根据以前的上下文,初始化一个分配的上下文对象
DefaultTopologyAssignContext defaultContext = new DefaultTopologyAssignContext(context);
if (assignType == TopologyAssignContext.ASSIGN_TYPE_REBALANCE) {
    freeUsed(defaultContext);
}

  下面代码是DefaultTopologyAssignContext的构造方法ide

public DefaultTopologyAssignContext(TopologyAssignContext context){
    super(context);
    try {
        sysTopology = Common.system_topology(stormConf, rawTopology);
    } catch (Exception e) {
        throw new FailedAssignTopologyException("Failed to generate system topology");
    }

    sidToHostname = generateSidToHost();
    hostToSid = JStormUtils.reverse_map(sidToHostname);

    if (oldAssignment != null && oldAssignment.getWorkers() != null) {
        oldWorkers = oldAssignment.getWorkers();
    } else {
        oldWorkers = new HashSet<ResourceWorkerSlot>();
    }

    refineDeadTasks();

    componentTasks = JStormUtils.reverse_map(context.getTaskToComponent());

    for (Entry<String, List<Integer>> entry : componentTasks.entrySet()) {
    List<Integer> componentTaskList = entry.getValue();
    Collections.sort(componentTaskList);
}

    totalWorkerNum = computeWorkerNum();
    unstoppedWorkerNum = computeUnstoppedAssignments();
}
添加附加组件

  从上面的代码能够看出在DefaultTopologyAssignContext的构造方法中,第一句是调用父类构造方法先去初始化一些参数,而后调用system_topology这个方法。下面来看看这个方法的内部。第一个方法就是添加一个acker到原来的拓扑中去。拓扑做为JStrom处理的一个逻辑模型,对用户提供了很是简单且强大的编程原语,只要分别继承两大组件,就能够构造一个拓扑模型,可是实际上,一个实际运行的拓扑模型远远不止用户定义的用于处理输入的spout和用于处理业务的bolt,JStorm为了保证消息的可靠性,拓扑Metrics管理,拓扑HB管理,再拓扑实际模型中添加了几个很是重要的bolt,下面就详细的介绍acker,用于保证消息的可靠性。函数

public static StormTopology system_topology(Map storm_conf, StormTopology topology) throws InvalidTopologyException {
    StormTopology ret = topology.deepCopy();
    add_acker(storm_conf, ret);
    addTopologyMaster(storm_conf, ret);
    add_metrics_component(ret);
    add_system_components(ret);
    return ret;
}
StormTopology

  这里先来介绍下StormTopology这个类,才能往下理解。StormTopology这个类用于存储拓扑的组件信息,在这个类内部,有三个很是重要的成员变量,分别存储spout和bolt以及state_spout,第三个笔者暂时没有弄清楚其做用,可是前两个就很是明显,分别存储拓扑的两大组件,spout和bolt

private Map<String,SpoutSpec> spouts; // required
  private Map<String,Bolt> bolts; // required
  private Map<String,StateSpoutSpec> state_spouts; // required

  Map中的key表示咱们定义的组件的id,上文提到过的id。SpoutSpec和Bolt中有两个重要的成员变量。

private ComponentObject spout_object; // required
  private ComponentCommon common; // required

  ComponentObject用于存储序列化后的代码信息,第二个ComponentCommon用于存储很重要的配置信息,包括输入的流,输出的流和分组信息。有三个重要的成员变量

//GlobalStreamId有两个String成员变量,componentId表示这个输入组件的流来源的那个组件id,
  //streamId表示componentId所输出的特定的流
  private Map<GlobalStreamId,Grouping> inputs; // 输入的来源和分组状况
  //StreamInfo有个重要的成员变量List<String> output_fields,表示输出的域。
  private Map<String,StreamInfo> streams; // 输出的流
  private int parallelism_hint; // 并行度

  根据上述的结构,StormTopology可以完整的表示拓扑中每一个组件输出以后的流所流向的位置。

acker

  这一小节笔者不打算先从源码的角度入手,先来将一个acker的做用以及从一个小例子来说解acker是怎样工做的。咱们都知道做为一个流式处理框架,消息的可靠性是一个很是特性之一。除开更加高级的事务框架能保证消息只被处理一次(exactly-once),JStorm自己也提供了at-least-once,这个机制能保证消息必定会被处理。下面从一个例子的角度来说解,这是如何实现的。
[图片]
  如上图所示,integer做为输入的spout,sliding和printer都是负责处理的bolt,Field表示之间输出的元组内的元素对应的key。StreamID为默认,不指定数据流分组的形式,则默认状况下shuffle。上述是一个很是简单的拓扑逻辑结构,而后在通过add_acker这个方法以后,实际的拓扑结构发生了一些变化,以下图
[图片]
  JStrom为原来的拓扑结构添加了一个_ack的bolt,负责维护拓扑的可靠性,大体的状况能够从上图中看出,每当一个元组被发送到拓扑下游bolt中去的时候,也会发送到_ack中去保存下来,而后后续处理的每一个bolt每次调用ack函数都会发送给_ack(bolt),在指定时间间隔内收到最后处理的ack,那么_ack(bolt)就发送一个消息给最初的spout,则保证了一个元组的可靠性。因此综上,_ack这个Bolt就是维护了整个拓扑的可靠性,那么读者可能会问,_ack里面保存了那么多的消息,若是某个元组通过的组件很是多,是否会形成该元组的拓扑树变的很大。这里阿里利用异或,实现了一个很是简单且高效低耗的判断方法。
  其实在_ack中存储的内容很是简单,就是一个k-v键值对,k是一个随机无重复的id(root_id),且在元组被处理的整个过程当中保持不变,将消息存储为<root_id,random>,random由每一个收到元组的组件生成,每通过一个组件,random就会改变一次。如上图,integer在发送一个<root_id,x>给sliding以后,也会发送一个<root_id,x>给_ack,而后sliding通过处理以后,发送<root_id,y>给printer,而且发送一个<root_id,x^y>给_ack,而后当printer处理完以后在发送一个<root_id,y>给_ack,此时的_ack内部对于root_id这个消息的值是x^x^y^y=0。也就是处理成功,若是达到指定超时时间root_id对应的值还不是0,则须要通知给出这个元组的task(_ack也是一个bolt,因此内部也有保存某个消息的来源task),要求重发。以上就是JStorm用于保证消息可靠性所使用的方法,直观且简单。
  后续的几个方法如addTopologyMaster,add_metrics_component,add_system_components都是添加了相应的控件(bolt)来进行协同操做。好比topology master能够负责metrics,也能够负责baskpressure(反压)机制。笔者还没深刻解读,相应部分后续再作相应的添加,这里先挖个坑。

计算worker数目

  在DefaultTopologyAssignContext的构造函数中,添加完附加的组件以后,紧接着获取supervisorid和hostname对应的键值对,若是存在旧的分配信息,则获取原先全部的worker,若是没有,则新建一个worker的集合。去除deadtaskid中那些在unstopworker内的task(这里的目的是分开处理,若是是new的状况下,这两个都是空集)。而后计算须要的worker数目。看下面的源码,

private int computeWorkerNum() {
    //获取拓扑设置的worker数目
    Integer settingNum = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_WORKERS));
    //
    int ret = 0, hintSum = 0, tmCount = 0;

    Map<String, Object> components =     ThriftTopologyUtils.getComponents(sysTopology);
    for (Entry<String, Object> entry : components.entrySet()) {
        String componentName = entry.getKey();
        Object component = entry.getValue();

        ComponentCommon common = null;
        if (component instanceof Bolt) {
            common = ((Bolt) component).get_common();
        }
        if (component instanceof SpoutSpec) {
            common = ((SpoutSpec) component).get_common();
        }
        if (component instanceof StateSpoutSpec) {
            common = ((StateSpoutSpec) component).get_common();
        }
        //获取每一个组件中设置的并行度
        int hint = common.get_parallelism_hint();
        if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
            //若是是属于TM组件,则加到tmCount
            tmCount += hint;
            continue;
        }
        //这个变量存下全部组件并行度的和
        hintSum += hint;
    }
    
    //ret存下较小的值
    if (settingNum == null) {
        ret = hintSum;
    } else {
        ret =  Math.min(settingNum, hintSum);
    }
    //这里还须要判断主TM是否须要一个独立的worker节点用于处理
    Boolean isTmSingleWorker = ConfigExtension.getTopologyMasterSingleWorker(stormConf);
    if (isTmSingleWorker != null) {
        if (isTmSingleWorker == true) {
        ret += tmCount;
        setAssignSingleWorkerForTM(true);
    }
    } else {
        if (ret >= 10) {
            ret += tmCount;
        setAssignSingleWorkerForTM(true);
        }
    }
    return ret;
}

worker分配

  实例化完DefaultTopologyAssignContext以后,若是是rebalance类型,则还须要先将原先占用的那些worker给释放掉,具体作法就是将worker使用的端口放回可用端口集合中。几个变量的含义,needAssignTasks:就是指须要分配的task,也就是除去unstopworker中的那些task。allocWorkerNum:等于原先计算好的worker的数目-减去unstopworker的数目再减去keepAssigns(只有在拓扑类型是ASSIGN_TYPE_MONITOR才有的)的数目。实际worker分配中,最重要是方法WorkerScheduler.getAvailableWorkers。下面就来详细讲解这个方法内部怎么实现。

int workersNum = getAvailableWorkersNum(context);
    if (workersNum < allocWorkerNum) {
        throw new FailedAssignTopologyException("there's no enough worker.allocWorkerNum="+ allocWorkerNum + ", availableWorkerNum="+ workersNum);
}
    workersNum = allocWorkerNum;
    List<ResourceWorkerSlot> assignedWorkers = new ArrayList<ResourceWorkerSlot>();

    getRightWorkers(context,needAssign,assignedWorkers,workersNum,getUserDefineWorkers(context, ConfigExtension.getUserDefineAssignment(context.getStormConf())));

  首先得知集群上可用的所有worker,若是可用的worker小于须要分配的worker数,则须要抛出异常。若是足够,则会分配足量的worker给指定的拓扑。调用getRightWorkers这个方法来获取合适的worker,这里所谓right的worker是指用户自定义的worker,能够指定worker的资源分配状况。

getRightWorkers

  分为两部分来说解这个方法,首先是准备工做--getUserDefineWorkers这个方法,这个方法须要两个参数,拓扑的上下文信息context,用户自定义的worker列表workers。看下面的源码:

private List<ResourceWorkerSlot> getUserDefineWorkers(
            DefaultTopologyAssignContext context, List<WorkerAssignment> workers) {
    List<ResourceWorkerSlot> ret = new ArrayList<ResourceWorkerSlot>();
    //若是没有用户自定义的worker,则不必任何操做
    if (workers == null)
        return ret;
    Map<String, List<Integer>> componentToTask = (HashMap<String, List<Integer>>) ((HashMap<String, List<Integer>>) context
                .getComponentTasks()).clone();
    //若是分配类型不是NEW,则仍是从workers资源分配信息列表中去除unstopworker。
    //这里是用户有指定某些worker资源属于unstopworker才能去掉。
    if (context.getAssignType() != context.ASSIGN_TYPE_NEW) {
        checkUserDefineWorkers(context, workers, context.getTaskToComponent());
}
    //遍历用户定义的worker,去除那些没有分配task的worker
    //用户定义的worker中已经指定哪些task该分配到哪一个worker中
    for (WorkerAssignment worker : workers) {
        ResourceWorkerSlot workerSlot = new ResourceWorkerSlot(worker,componentToTask);
        if (workerSlot.getTasks().size() != 0) {
            ret.add(workerSlot);
        }
    }
return ret;
}

  去除那些没有指定task的worker以后,真正进入getRightWorkers方法内部。源码以下,这里解释下五个参数的含义,context表示以前准备的拓扑上下文信息,needAssign表示这个拓扑须要分配的各个taskid,assignedWorkers表示用来存储那些在这个方法内分配到的worker资源,workersNum表示须要拓扑须要分配的worker数目,workers表示上个方法中用户自定义的可用的worker资源。简而言之,这个方法就是从workers中选出已经分配了指定的task的worker,而后存到assignedWorkers中去。

private void getRightWorkers(DefaultTopologyAssignContext context,
            Set<Integer> needAssign, List<ResourceWorkerSlot> assignedWorkers,
            int workersNum, Collection<ResourceWorkerSlot> workers) {
        Set<Integer> assigned = new HashSet<Integer>();
        List<ResourceWorkerSlot> users = new ArrayList<ResourceWorkerSlot>();
        if (workers == null)
            return;
        for (ResourceWorkerSlot worker : workers) {
            boolean right = true;
            Set<Integer> tasks = worker.getTasks();
            if (tasks == null)
                continue;
            for (Integer task : tasks) {
                if (!needAssign.contains(task) || assigned.contains(task)) {
                    right = false;
                    break;
                }
            }
            if (right) {
                assigned.addAll(tasks);
                users.add(worker);
            }
        }
        if (users.size() + assignedWorkers.size() > workersNum) {
            LOG.warn(
                    "There are no enough workers for user define scheduler / keeping old assignment, userdefineWorkers={}, assignedWorkers={}, workerNum={}",
                    users, assignedWorkers, workersNum);
            return;
        }

        assignedWorkers.addAll(users);
        needAssign.removeAll(assigned);
    }

  上面代码主要的处理逻辑是在for循环中,在这个循环会去判断worker内是否存有本拓扑内的taskid,若是有则把worker存储起来,而且从taskid列表中移除掉那些分配出去的task,没有则直接退出了。

使用旧分配/rebalance

  回到getAvailableWorkers方法内,看下面这段代码。

//若是配置指定要使用旧的分配,则从旧的分配中选出合适的worker。
        if (ConfigExtension.isUseOldAssignment(context.getStormConf())) {
            getRightWorkers(context, needAssign, assignedWorkers, workersNum,
                    context.getOldWorkers());
        } else if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE
                && context.isReassign() == false) {
            //若是是rebalance,且可使用原来的worker,将原来使用的worker存储起来。
            int cnt = 0;
            for (ResourceWorkerSlot worker : context.getOldWorkers()) {
                if (cnt < workersNum) {
                    ResourceWorkerSlot resFreeWorker = new ResourceWorkerSlot();
                    resFreeWorker.setPort(worker.getPort());
                    resFreeWorker.setHostname(worker.getHostname());
                    resFreeWorker.setNodeId(worker.getNodeId());
                    assignedWorkers.add(resFreeWorker);
                    cnt++;
                } else {
                    break;
                }
            }
        }
        // 计算TM bolt的个数
        int workersForSingleTM = 0;
        if (context.getAssignSingleWorkerForTM()) {
            for (Integer taskId : needAssign) {
                String componentName = context.getTaskToComponent().get(taskId);
                if (componentName.equals(Common.TOPOLOGY_MASTER_COMPONENT_ID)) {
                    workersForSingleTM++;
                }
            }
        }
        int restWokerNum = workersNum - assignedWorkers.size();
        if (restWokerNum < 0)
            throw new FailedAssignTopologyException(
                    "Too much workers are needed for user define or old assignments. workersNum="
                            + workersNum + ", assignedWokersNum="
                            + assignedWorkers.size());

  笔者一开始以为上述的代码多是在判断restWokerNum < 0是极可能会成立而致使抛出异常的,由于若是用户一开始就指定了worker分配信息,而后rebalance状况下,不断去添加旧的worker到assignedWorkers内,这样就会致使assignedWorkers的大小比实际须要的worker数目workersNum大。可是还没来得及用实际集群去测试,只是在github问了官方的人,若是有更新解决方案会后续再这里说明。

分配剩下的worker
//restWokerNum是剩下须要的worker的数目,直接添加ResourceWorkerSlot实例对象。
    for (int i = 0; i < restWokerNum; i++) {
        assignedWorkers.add(new ResourceWorkerSlot());
    }
    //这里是获取那些专门指定运行拓扑的supervisor节点。
    List<SupervisorInfo> isolationSupervisors = this.getIsolationSupervisors(context);
    if (isolationSupervisors.size() != 0) {
        putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(isolationSupervisors));
    } else {
        putAllWorkerToSupervisor(assignedWorkers, getResAvailSupervisors(context.getCluster()));
    }
    this.setAllWorkerMemAndCpu(context.getStormConf(), assignedWorkers);
    LOG.info("Assigned workers=" + assignedWorkers);
    return assignedWorkers;

  上述代码中的isolationSupervisors存放的是那些指定给这个拓扑的supervisor节点的id。若是有指定,则在这些特定的节点上分配,若是没有指定,那么,就在全局内分配。因此实际剩下的分配任务的是putAllWorkerToSupervisor这个方法,getResAvailSupervisors这个方法负责剔除那些没法分配worker的supervisor节点,由于节点上分配的worker已经满了。下面来介绍putAllWorkerToSupervisor这个方法的做用。
  putAllWorkerToSupervisor须要两个参数,第一个是已经分配的worker,包含那些尚未设定运行在那个节点的worker(上面直接新建的那些worker),第二个参数是目前可用的supervisor节点。下面是这个方法的代码

private void putAllWorkerToSupervisor( List<ResourceWorkerSlot> assignedWorkers, List<SupervisorInfo> supervisors) {
    for (ResourceWorkerSlot worker : assignedWorkers) {
        if (worker.getHostname() != null) {
            for (SupervisorInfo supervisor : supervisors) {
                if (NetWorkUtils.equals(supervisor.getHostName(), worker.getHostname()) && supervisor.getAvailableWorkerPorts().size() > 0) {
                    putWorkerToSupervisor(supervisor, worker);
                    break;
                }
            }
        }
    }
    supervisors = getResAvailSupervisors(supervisors);
    Collections.sort(supervisors, new Comparator<SupervisorInfo>() {

@Override
        public int compare(SupervisorInfo o1, SupervisorInfo o2) {
            // TODO Auto-generated method stub
            return -NumberUtils.compare( o1.getAvailableWorkerPorts().size(), o2.getAvailableWorkerPorts().size());
        }
    });
    putWorkerToSupervisor(assignedWorkers, supervisors);
}

  进入方法的第一步,首先要作的事情,就是对于那些已经分配好节点的worker,从supervisor节点上给该worker分配一个合适的端口。putWorkerToSupervisor这方法主要的操做是从supervisor节点上获取一个可用的端口,而后设置worker的端口,并将该端口从supervisor节点的可用端口列表中移除。代码结构很是简单,以下:

private void putWorkerToSupervisor(SupervisorInfo supervisor, ResourceWorkerSlot worker) {
    int port = worker.getPort();
    if (!supervisor.getAvailableWorkerPorts().contains(worker.getPort())) {
        port = supervisor.getAvailableWorkerPorts().iterator().next();
    }
    worker.setPort(port);
    supervisor.getAvailableWorkerPorts().remove(port);
    worker.setNodeId(supervisor.getSupervisorId());
}

  设置好了一部分已经分配好的worker以后,继续分配那些没有指定supervisor的worker。根据supervisor中可用端口逆序,从大到小排。而后调用putWorkerToSupervisor这个方法。
  putWorkerToSupervisor方法内部首先统计全部已经使用的端口,而后计算出一个理论的负载平均值{(全部使用掉的+将要分配的)/supervisor的个数,就会获得分配后,集群的一个理论上的负载值theoryAveragePorts,能够平摊到每一个supervisor身上}。而后经过遍历须要分配worker的list,进行第一次分配,能够将worker依次分配到那些负载值(跟理论值的计算方式同样)小于理论平均负载的supervisor上。而超过负载的,则放进到负载列表中。通过一轮分配以后,若是还存在没有分配的worker(源码这里先进行排序再进行判断,很明显形成排序时间浪费的可能性)。根据supervisor中可用端口逆序,从大到小排序。再不断将worker分配进去。
  到这里,worker的分配就顺利结束了,总结一下,首先是根据拓扑信息初始化上下文信息,而后计算出实际使用的worker数目,若是这些worker有指定运行在某个supervisor节点上,那么就在节点上分配合适的worker。若是没有指定,那么就根据节点的负载状况,尽可能平均的分配到每一个supervisor节点上。若是你们的负载都比较大的状况下,再分配到哪些具备比较多的可用端口的节点,完成分配。

task分配

  getAvailableWorkers方法完成了worker的分配,以及若是用户指定了特定的worker上运行指定的task,剩下的taskid将会在接下来的方法中说明如何去分配。主要在TaskScheduler的构造函数中,这里须要三个参数,第一个是拓扑的上下文信息defaultContext,第二个是须要分配的task的列表needAssignTasks,以及上文中获取到的合适的worker列表availableWorkers。(ps:记住,前文若是没有指定特定的worker资源分配的信息,则没有taskid被分配到worker中去,也就是worker内部仅有supervisorid,内存,cpu,端口等信息,不存在tasks信息)。接下来看看TaskScheduler的构造函数。

public TaskScheduler(DefaultTopologyAssignContext context, Set<Integer> tasks, List<ResourceWorkerSlot> workers) {
        this.tasks = tasks;
        LOG.info("Tasks " + tasks + " is going to be assigned in workers " + workers);
        this.context = context;
        this.taskContext =
                new TaskAssignContext(this.buildSupervisorToWorker(workers), Common.buildSpoutOutoputAndBoltInputMap(context), context.getTaskToComponent());
        this.componentSelector = new ComponentNumSelector(taskContext);
        this.inputComponentSelector = new InputComponentNumSelector(taskContext);
        this.totalTaskNumSelector = new TotalTaskNumSelector(taskContext);
        if (tasks.size() == 0)
            return;
        if (context.getAssignType() != TopologyAssignContext.ASSIGN_TYPE_REBALANCE || context.isReassign() != false){
            // warning ! it doesn't consider HA TM now!!
            if (context.getAssignSingleWorkerForTM() && tasks.contains(context.getTopologyMasterTaskId())) {
                assignForTopologyMaster();
            }
        }

        int taskNum = tasks.size();
        Map<ResourceWorkerSlot, Integer> workerSlotIntegerMap = taskContext.getWorkerToTaskNum();
        Set<ResourceWorkerSlot> preAssignWorkers = new HashSet<ResourceWorkerSlot>();
        for (Entry<ResourceWorkerSlot, Integer> worker : workerSlotIntegerMap.entrySet()) {
            if (worker.getValue() > 0) {
                taskNum += worker.getValue();
                preAssignWorkers.add(worker.getKey());
            }
        }
        setTaskNum(taskNum, workerNum);

        // Check the worker assignment status of pre-assigned workers, e.g user defined or old assignment workers.
        // Remove the workers which have been assigned with enough workers.
        for (ResourceWorkerSlot worker : preAssignWorkers) {
            if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){

                Set<ResourceWorkerSlot> doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker);
                if (doneWorkers != null) {
                    for (ResourceWorkerSlot doneWorker : doneWorkers) {
                        taskNum -= doneWorker.getTasks().size();
                        workerNum--;
                    }
                }

            }

        }
        setTaskNum(taskNum, workerNum);

        // For Scale-out case, the old assignment should be kept.
        if (context.getAssignType() == TopologyAssignContext.ASSIGN_TYPE_REBALANCE && context.isReassign() == false) {
            keepAssignment(taskNum, context.getOldAssignment().getWorkers());
        }
    }
初始化

  在这个构造函数中,首先是构造一个task分配的上下文信息。这个对象主要须要维护的几个重要信息是

  • taskToComponent:一个Map,Key表示taskid,Value表示所对应的组件id。

  • supervisorToWorker:也是一个Map,Key表示这个拓扑分配的supervisorid,Value表示节点上分配到的worker列表。

  • relationship:维护这个拓扑的一个结构信息,依然是个Map,Key表示组件bolt/spout的组件id,Value表示的是,若是Key对应组件是一个bolt,则Value存下是全部输入到组件的对应组件的id。若是Key对应组件是一个spout,则Value存下是这个组件全部输出到的组件id。举个例子,integer(spout)输出到sliding(bolt),sliding(bolt)输出到printer(bolt)。则relationship存下的是[{integer,[sliding]},{sliding,[integer]},{printer,[sliding]}]。

  • workerToTaskNum:Map,Key表示一个worker,Value表示实际在这个worker上运行的task的总数目。

  • workerToComponentNum:Map,Key表示一个worker,Value表示一个Map,存下的是组件id,以及对应的数目。

  紧接着初始化三个selector,第一个是ComponentNumSelector(内部定义了一二WorkerComparator,负责对worker进行比对,对比worker内某个组件的task数目。以及对比每一个supervisor上全部worker内某个组件的总task和),第二个是InputComponentNumSelector(内部也是定义了两个比对函数,一个是获取worker内某个组件的所有输入的task个数,以及在整个supervisor上的所有输入task个数),第三个是TotalTaskNumSelector(worker内所有task的个数,和supervisor上所有task的个数)。这三个selector的目的都是为了后续合理的将task分配到这些worker上作的准备。

分配TM bolt

  若是集群资源足够,用户定义TM须要单独分配到一个独立的worker上,则须要调用assignForTopologyMaster进行单独分配。

private void assignForTopologyMaster() {
        int taskId = context.getTopologyMasterTaskId();
        ResourceWorkerSlot workerAssigned = null;
        int workerNumOfSuperv = 0;
        for (ResourceWorkerSlot workerSlot : taskContext.getWorkerToTaskNum().keySet()){
            List<ResourceWorkerSlot> workers = taskContext.getSupervisorToWorker().get(workerSlot.getNodeId());
            if (workers != null && workers.size() > workerNumOfSuperv) {
                for (ResourceWorkerSlot worker : workers) {
                    Set<Integer> tasks = worker.getTasks();
                    if (tasks == null || tasks.size() == 0) {
                        workerAssigned = worker;
                        workerNumOfSuperv = workers.size();
                        break;
                    }
                }
            }
        }

        if (workerAssigned == null)
            throw new FailedAssignTopologyException("there's no enough workers for the assignment of topology master");
        updateAssignedTasksOfWorker(taskId, workerAssigned);
        taskContext.getWorkerToTaskNum().remove(workerAssigned);
        assignments.add(workerAssigned);
        tasks.remove(taskId);
        workerNum--;
        LOG.info("assignForTopologyMaster, assignments=" + assignments);
    }

  这个方法首先是找出某个最合适的worker,这个worker符合两个条件,一是没有分配其余的task,第二,worker所在的supervisor相对分配了最多的worker,第二点的目的是保证负载均衡。若是找不到合适的worker,那么就抛出异常。若是能找到的话,就把负责TM的task分配给这个worker。updateAssignedTasksOfWorker这个方法的目的就是更新新的分配状况。

task分配

  接下来获取所有的task数目,以及已经分配出去的worker列表preAssignWorkers。根据得到的总task数目来计算每一个worker上平均的task数目avgTaskNum,以及剩下多少尚未分配出去的task(总task%总worker,求得余数leftTaskNum)。而后遍历preAssignWorkers,调用方法removeWorkerFromSrcPool来判断一个worker是否分配了足够的task,而且移除那些已经合理分配的task和worker。

for (ResourceWorkerSlot worker : preAssignWorkers) {
            if (taskContext.getWorkerToTaskNum().keySet().contains(worker)){

                Set<ResourceWorkerSlot> doneWorkers = removeWorkerFromSrcPool(taskContext.getWorkerToTaskNum().get(worker), worker);
                if (doneWorkers != null) {
                    for (ResourceWorkerSlot doneWorker : doneWorkers) {
                        taskNum -= doneWorker.getTasks().size();
                        workerNum--;
                    }
                }

            }

        }

  removeWorkerFromSrcPool这个方法挺有趣的,第一次看的时候有点懵逼,可是其实仔细看下就很明确了。下面我简单讲解下:

private Set<ResourceWorkerSlot> removeWorkerFromSrcPool(int taskNum, ResourceWorkerSlot worker) {
        Set<ResourceWorkerSlot> ret = new HashSet<ResourceWorkerSlot>();

        if (leftTaskNum <= 0) {
            if (taskNum >= avgTaskNum) {
                taskContext.getWorkerToTaskNum().remove(worker);
                assignments.add(worker);
                ret.add(worker);
            }
        } else {
            if (taskNum > avgTaskNum ) {
                taskContext.getWorkerToTaskNum().remove(worker);
                leftTaskNum = leftTaskNum -(taskNum -avgTaskNum);
                assignments.add(worker);
                ret.add(worker);
            }
            if (leftTaskNum <= 0) {
                List<ResourceWorkerSlot> needDelete = new ArrayList<ResourceWorkerSlot>();
                for (Entry<ResourceWorkerSlot, Integer> entry : taskContext.getWorkerToTaskNum().entrySet()) {
                    if (avgTaskNum != 0 && entry.getValue() == avgTaskNum)
                        needDelete.add(entry.getKey());
                }
                for (ResourceWorkerSlot workerToDelete : needDelete) {
                    taskContext.getWorkerToTaskNum().remove(workerToDelete);
                    assignments.add(workerToDelete);
                    ret.add(workerToDelete);
                }
            }
        }

        return ret;
    }

  ret保存的是须要返回给调用者须要移除的worker集合。看这个方法,首先判断,在剩余数小于等于0的状况,若是当前worker内的task数目大于等于平均数,说明这个worker的确分配了合理的task。(缘由是,若是leftTaskNum小于等于0,是否是就当作,平均数会比正常状况下加1。举个例子,有3个盒子,10个球放进去,那么,平均数为3的状况下,余数为1,若是平均数为4,那么余数就是-2了)。若是leftTaskNum大于0,判断就复杂一点,首先若是数目taskNum大于平均的avgTaskNum,说明这个worker多分配了一些task,那么这些多分配的就必须从leftTaskNum减去。甚至可能taskNum的数目大于avgTaskNum+leftTaskNum的数目,那么直接致使leftTaskNum小于等于0。在leftTaskNum小于等于0的状况下,找出分配上下文中worker分配的task数目恰好是平均数的worker,存在needDelete列表中。而后遍历这个列表,把这些worker从加到须要移除的集合ret中,并返回。(由于若是有某个worker分配的数目多于avgTaskNum+leftTaskNum的数目,那么那些分配数是平均数的worker确定是合理的,剩下那些分配小于平均数的才是须要调整的)。
  在执行完上述的操做以后,更新下目前的平均数avgTaskNum和分配剩余的task数目leftTaskNum。(此刻还有一些task还没有实际分配),完成分配的调度是在assign方法中。在这个方法内,若是已经没有须要分配的task,则将原来已经分配好的返回就好了。若是还存在须要分配的task,遍历这个须要分配的task列表,若是task对应的组件属于系统组件(组件id为__acker或者__topology_master的组件),则存下来,若是是通常的task,则调用chooseWorker方法选择一个合适的worker,而后将task分配到worker上。(固然这里还须要作一些额外的操做,好比清除那些已经合理的分配的worker,经过调用removeWorkerFromSrcPool这个方法去清除)。而chooseWorker这个方法利用的就是前文提到的三个selector来选择最佳的supervisor,选择最佳的worker(须要考虑这个task接收的input,须要考虑supervisor节点的负载状况和worker内的负载状况)。分配完普通的task以后,在分配系统组件,分配方式也是同样的。
  至此,task的分配也完成,总结一下,除开那些已经指定的分配外,比较重要的是,定义合理的selector(综合考虑节点负载,worker负载,已经input输入,考虑本地化)。分配的同时不断去检测是否已经有worker已经合理分配了,就不要在继续分配到那个worker上。

HeartBeat操做

  上述完成task和worker的分配以后,回到mkAssignment方法。剩下的操做就是设置task的HB起始时间和超时时间。这些比较简单就再也不细说了。

结束语

  解读拓扑分配的过程可让咱们更加清楚,咱们写的一个逻辑拓扑,其实是如何变成一个能够实际运行在集群的拓扑。以及拓扑如何保证负载均衡等问题。笔者后续还会更新JStorm几个比较重要的特性的源码分析。包括如何实现反压机制,如何实现nimbus和supervisor容错,supervisor启动的时候须要执行那些操做。

相关文章
相关标签/搜索