storm之8:并行度


(一)storm拓扑的并行度能够从如下4个维度进行设置:
一、node(服务器):指一个storm集群中的supervisor服务器数量。
二、worker(jvm进程):指整个拓扑中worker进程的总数量,这些数量会随机的平均分配到各个node。
三、executor(线程):指某个spout或者bolt的总线程数量,这些线程会被随机平均的分配到各个worker。
四、task(spout/bolt实例):task是spout和bolt的实例,它们的nextTuple()和execute()方法会被executors线程调用。除非明确指定,storm会给每一个executor分配一个task。若是设置了多个task,即一个线程持有了多个spout/bolt实例.
注意:以上设置的都是总数量,这些数量会被平均分配到各自的宿主上,而不是设置每一个宿主进行多少个进程/线程。详见下面的例子。


(二)并行度的设置方法
一、node:买机器吧,而后加入集群中……
二、worker:Config#setNumWorkers() 或者配置项 TOPOLOGY_WORKERS
三、executor:Topology.setSpout()/.setBolt()
四、task:ComponentConfigurationDeclarer#setNumWorker()

(三)例子:
        // 三、建立topology
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5);//设置executor数量为5
        builder.setBolt("filter-bolt", new FilterBolt(), 3).shuffleGrouping(
                "kafka-reader");//设置executor数量为3
        builder.setBolt("log-splitter", new LogSplitterBolt(), 3)
                .shuffleGrouping("filter-bolt");//设置executor数量为5
        builder.setBolt("hdfs-bolt", hdfsBolt, 2).shuffleGrouping(
                "log-splitter");//设置executor数量为2

        // 四、启动topology
        Config conf = new Config();
        conf.put(Config.NIMBUS_HOST, nimbusHost);
        conf.setNumWorkers(3);      //设置worker数量
        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf,
                builder.createTopology());

一、经过config.setNumWorkers(3)将worker进程数量设置为3,假设集群中有3个node,则每一个node会运行一个worker。
二、executor的数量分别为:
spout:5
filter-bolt:3
log-splitter:3
hdfs-bolt:2
总共为13个executor,这13个executor会被随机分配到各个worker中去。
注:这段代码是从kafka中读取消息源的,而这个topic在kafka中的分区数量设置为5,所以这里spout的线程娄为5.
三、这个示例都没有单独设置task的数量,即便用每一个executor一个task的默认配置。若须要设置,能够:
        builder.setBolt("log-splitter", new LogSplitterBolt(), 3)
                .shuffleGrouping("filter-bolt").setNumTasks(5);
来进行设置,这5个task会被分配到3个executor中。

(四)并行度的动态调整
对storm拓扑的并行度进行调整有2种方法:
一、kill topo—>修改代码—>编译—>提交拓扑
二、动态调整
第1种方法太不方便了,有时候topo不能说kill就kill,另外,若是加几台机器,难道要把全部topo kill掉还要修改代码?
所以storm提供了动态调整的方法,动态调整有2种方法:
一、ui方式:进入某个topo的页面,点击rebalance便可,此时能够看到topo的状态是rebalancing。但此方法只是把进程、线程在各个机器上从新分配,即适用于增长机器,或者减小机器的情形,不能调整worker数量、executor数量等
二、cli方式:storm rebalance
举个例子
storm rebalance toponame -n 7 -e filter-bolt=6 -e hdfs-bolt=8
将topo的worker数量设置为7,并将filter-bolt与hdfs-bolt的executor数量分别设置为六、8.
此时,查看topo的状态是rebalancing,调整完成后,能够看到3台机器中的worker数量分别为三、二、2

node

相关文章
相关标签/搜索