Storm系列(四)并行度和流分组

原文链接:a870439570.github.io/interview-d…git

并行度(parallelism)概念

  • 一个运行中的拓扑是由什么构成的:工做进程(worker processes),执行器(executors)和任务(tasks)
  • 在 Worker 中运行的是拓扑的一个子集。一个 worker 进程是从属于某一个特定的拓扑的,在 worker 进程中会运行一个或者多个与拓扑中的组件相关联的 executor。一个运行中的拓扑就是由这些运行于 Storm集群中的不少机器上的进程组成的。
  • 一个 executor 是由 worker 进程生成的一个线程。在 executor 中可能会有一个或者多个 task,这些 task 都是为同一个组件(spout 或者 bolt)服务的。
  • task 是实际执行数据处理的最小工做单元(注意,task 并非线程) —— 在你的代码中实现的每一个 spout 或者 bolt 都会在集群中运行不少个 task。在拓扑的整个生命周期中每一个组件的 task 数量都是保持不变的,不过每一个组件的 executor数量倒是有可能会随着时间变化。在默认状况下 task 的数量是和 executor 的数量同样的,也就是说,默认状况下 Storm会在每一个线程上运行一个 task。

Storm的流分组策略

  • Storm的分组策略对结果有着直接的影响,不一样的分组的结果必定是不同的。其次,不一样的分组策略对资源的利用也是有着很是大的不一样
  • 拓扑定义的一部分就是为每一个Bolt指定输入的数据流,而数据流分组则定义了在Bolt的task之间如何分配数据流。

八种流分组定义

Shuffle grouping:github

  • 随机分组:随机的将tuple分发给bolt的各个task,每一个bolt实例接收到相同数量的tuple。

Fields grouping:apache

  • 按字段分组:根据指定的字段的值进行分组,举个栗子,流按照“user-id”进行分组,那么具备相同的“user-id”的tuple会发到同一个task,而具备不一样“user-id”值的tuple可能会发到不一样的task上。这种状况经常用在单词计数,而实际状况是不多用到,由于若是某个字段的某个值太多,就会致使task不均衡的问题。

Partial Key grouping:bash

  • 部分字段分组:流由分组中指定的字段分区,如“字段”分组,可是在两个下游Bolt之间进行负载平衡,当输入数据歪斜时,能够更好地利用资源。优势。有了这个分组就彻底能够不用Fields grouping了

All grouping:网络

  • 广播分组:将全部的tuple都复制以后再分发给Bolt全部的task,每个订阅数据流的task都会接收到一份相同的彻底的tuple的拷贝。

Global grouping:jvm

  • 全局分组:这种分组会将全部的tuple都发到一个taskid最小的task上。因为全部的tuple都发到惟一一个task上,势必在数据量大的时候会形成资源不够用的状况。

None grouping:分布式

  • 不分组:不指定分组就表示你不关心数据流如何分组。目前来讲不分组和随机分组效果是同样的,可是最终,Storm可能会使用与其订阅的bolt或spout在相同进程的bolt来执行这些tuple

Direct grouping:post

  • 指向分组:这是一种特殊的分组策略。以这种方式分组的流意味着将由元组的生成者决定消费者的哪一个task能接收该元组。指向分组只能在已经声明为指向数据流的数据流中声明。tuple的发射必须使用emitDirect种的一种方法。Bolt能够经过使用TopologyContext或经过在OutputCollector(返回元组发送到的taskID)中跟踪emit方法的输出来获取其消费者的taskID。

Local or shuffle grouping: 本地或随机分组:和随机分组相似,可是若是目标Bolt在同一个工做进程中有一个或多个任务,那么元组将被随机分配到那些进程内task。简而言之就是若是发送者和接受者在同一个worker则会减小网络传输,从而提升整个拓扑的性能。有了此分组就彻底能够不用shuffle grouping了。性能

示例

修改上一章节的Topology Storm(三)Java编写第一个本地模式demoui

package com.qxw.topology;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

import com.qxw.bolt.OutBolt;
import com.qxw.bolt.OutBolt2;
import com.qxw.spout.DataSource;

/**
 * 拓扑的并行性
 * @author qxw
 * @data 2018年9月17日下午2:49:09
 */
public class TopologyTest2 {

	public static void main(String[] args) throws Exception {
		//配置
		Config cfg = new Config();
		cfg.setNumWorkers(2);//指定工做进程数  (jvm数量,分布式环境下可用,本地模式设置无心义)
		cfg.setDebug(false);
		
		//构造拓扑流程图
		TopologyBuilder builder = new TopologyBuilder();
		//设置数据源(产生2个执行器和俩个任务)
		builder.setSpout("dataSource", new DataSource(),2).setNumTasks(2);
		//设置数据建流处理组件(产生2个执行器和4个任务)
		builder.setBolt("out-bolt", new OutBolt(),2).shuffleGrouping("dataSource").setNumTasks(4); //随机分组
		//设置bolt的并行度和任务数:(产生6个执行器和6个任务)
//		builder.setBolt("out-bol2", new OutBolt2(),6).shuffleGrouping("out-bolt").setNumTasks(6); //随机分组
		
		//设置字段分组(产生8个执行器和8个任务)字段分组 
		builder.setBolt("out-bol2", new OutBolt2(),8).fieldsGrouping("out-bolt", new Fields("outdata")).setNumTasks(8);
		//设置广播分组
		//builder.setBolt("write-bolt", new OutBolt2(), 4).allGrouping("print-bolt");
		//设置全局分组
		//builder.setBolt("write-bolt", new OutBolt2(), 4).globalGrouping("print-bolt");
		
		//1 本地模式
		LocalCluster cluster = new LocalCluster();
		
		//提交拓扑图  会一直轮询执行
		cluster.submitTopology("topo", cfg, builder.createTopology());

		
		//2 集群模式
//		StormSubmitter.submitTopology("topo", cfg, builder.createTopology());
		
	}
}

}

复制代码
相关文章
相关标签/搜索