1.storm总体架构(画图+描述)java
storm的数据流程:web
storm分布式计算结构称为topology(拓扑),由stream(数据流)、spout(数据流的生成者)、bolt(运算)组成服务器
stream:网络
storm的核心数据结构是tuple(元祖),tuple是包含一个或者多个键值对的列表,Stream就是由无限制的tuple组成的序列数据结构
spout:架构
spout为一个storm topology的主要数据入口,充当采集器的角色,链接到数据源,将数据转化为一个tuple,并将tuple做为数据负载均衡
storm为spout提供了简易的API,开发一个spout的主要工做就是编写代码从数据源或者API消费数据jvm
数据源的种类:分布式
web或者移动程序的点击数据ide
应用程序的日志数据
传感器的输出
bolt:
bolt能够理解为计算程序的运算,将一个或者多个数据流做为输入,对数据进行实施运算后,
Storm定义了八种内置数据流分组的定义:
① 随机分组(Shuffle grouping):这种方式下元组会被尽量随机地分配到 bolt 的不一样任务(tasks)中,使得每一个任务所处理元组数量可以可以保持基本一致,以确保集群的负载均衡。
② 按字段分组(Fields grouping):这种方式下数据流根据定义的“字段”来进行分组。例如,若是某个数据流是基于一个名为“user-id”的字段进行分组的,那么全部包含相同的“user-id”的元组都会被分配到同一个task中,这样就能够确保消息处理的一致性。
③ 彻底分组(All grouping):将全部的tuple复制后分发给全部的bolt task。每一个订阅的task都会接收到tuple的拷贝,全部在使用此分组时需当心使用。
④ 全局分组(Global grouping):这种方式下全部的数据流都会被发送到 Bolt 的同一个任务中,也就是 id 最小的那个任务。
⑤不分组(None grouping):使用这种方式说明你不关心数据流如何分组。目前这种方式的结果与随机分组彻底等效,不过将来 Storm 社区可能会考虑经过非分组方式来让 Bolt 和它所订阅的 Spout 或 Bolt 在同一个线程中执行
⑥ 指向型分组(Direct grouping):数据源会调用emitDirect()方法来判断一个tuple应该由那个Storm组件来接收。只能在声明了是指向型的数据流上使用。
这个是物理流程图
Storm集群由一个主节点(nimbus)和一个或者多个工做节点(Supervisor)组成
nimbus:Storm的主节点,相似于hadoop中的Jobtracker,管理,协调和监控在集群上运行的topology。包括topology的发布,事务处理失败时从新指派任务。
Supervisor:等待nimbus分配任务后生成并监控worker(jvm进程)执行任务
zookeeper:storm主要使用zookeeper来协调集群中的状态信息,
任务提交描述
1.client:提交topology
2.numbus:这个角色所作的操做相对较多,具体以下:
a.会把提交的jar包放到nimbus所在服务器的nimbus/inbox目录下
b.submitTopology方法会负责topology的处理;包括检查集群是否有active节点,配置文件是否正确,是否有重复的topology名称,各个bolt/spout名是否使用相同的id等
c.创建topology的本地目录:nimbus/stormdist/topology-uuid
该目录包括三个文件:
stormjar.jar --从nimbus/inbox目录拷贝
stormcode.ser --此topology对象的序列化
stormconf.ser --此topology的配置文件序列化
d.nimbus 任务分配,根据topology中的定义,给是spout/bolt设置Task的数据,并分配对应的task-id,最后把分配好的信息写入到zookeeper的。
e.nimbus在zookeeper上建立taskbeats目录,要求每一个task定时向nimbus汇报
f.将分配好的任务写入到zookeeper,此时任务提交完毕,zk上的目录为assignments/topology-uuid
g.将topology信息写入到zookeeper/storms目录
3.Supervisor
a.按期扫描zookeeper上的storm目录,看看是否有新的任务,有就下载。
b.删除本地不须要的topology
c.根据nimbus指定的任务信息启动worker
4.worker
a.查看须要执行的任务,根据任务id分辨出spout/bolt任务
b.计算出所表明的spout/bolt会给那些task发送信息
c.执行spout任务或者bolt任务
Supervisor会定时从zookeeper获取拓扑信息topologies、任务分配信息assignment及各种心跳信息,以此为依据进行任务分配。
在Supervisor同步时,会根据新的任务分配状况来启动新的worker或者关闭旧的worker并进行负载均衡。
worker经过按期的更新connection信息,来获取其应该通信的其余worker
worker启动时,会根据其分配到的任务启动一个或多个execute线程,这些线程仅会处理惟一的topology
若是有新的topolog被提交到集群,nimbus会从新分配任务,这个到后面会说到
execute线程负责处理多个spout或者多个bolts的逻辑,这写spout或者bolts,也称为tasks
具体有多少个worker,多少个execute,每一个execute负责多少个task,是由配置和指定的parallelism-hint共同决定的,但这个值并不必定等于实际运行的数目
若是计算出的总的executors超过了nimbus的限制,此topology将不会获得执行。
2.storm完成一个单词计数功能
注意:在咱们编写Java代码以前必定要将storm的安装目录下的lib包导入到项目中,否则程序在编译和本地运行会报错的
1.wordcount 的数据发送端,这一端就是进行数据的收集,而后发送给bolt进行逻辑处理的
1 import java.util.HashMap; 2 import java.util.Map; 3 4 import backtype.storm.Config; 5 import backtype.storm.LocalCluster; 6 import backtype.storm.StormSubmitter; 7 import backtype.storm.generated.AlreadyAliveException; 8 import backtype.storm.generated.InvalidTopologyException; 9 import backtype.storm.topology.TopologyBuilder; 10 public class WordCountTest { 11 12 public static void main(String[] args) { 13 14 //建立一个Topology对象 15 TopologyBuilder builder=new TopologyBuilder(); 16 //设置一个数据的输入源 17 builder.setSpout("spout", new WordCountSpoutSource(),1); 18 //设置一个数据的逻辑处理bolt,就是对每个单词进行计数的 19 builder.setBolt("bolt", new WordCountBoldHandle(),1).shuffleGrouping("spout"); 20 builder.setBolt("bolt1", new WordCountBoldOut(),1).shuffleGrouping("bolt"); 21 22 //设置是在本地运行仍是在集群中运行 23 Map map=new HashMap(); 24 map.put(Config.TOPOLOGY_WORKERS, 2); 25 if(args.length>0){ 26 try { 27 //在集群中晕运行这些topology 28 StormSubmitter.submitTopology("topology", map, builder.createTopology()); 29 } catch (AlreadyAliveException | InvalidTopologyException e) { 30 // TODO Auto-generated catch block 31 e.printStackTrace(); 32 } 33 }else { 34 //在本地运行这些topology 35 LocalCluster loadCluster=new LocalCluster(); 36 loadCluster.submitTopology("topology", map, builder.createTopology()); 37 } 38 39 40 41 42 43 } 44 }
2.第一个bolt接收到来自spout的数据,将每个单词放到map集合中,使用containsKey方法检查map是否这个单词的key,若是有就进行value进行加1操做,没有就进行put操做,将单词作为key,1做为value放到map集合中,最后将每一个map发送给下一个bolt,
1 import java.util.HashMap; 2 import java.util.Map; 3 4 import backtype.storm.task.OutputCollector; 5 import backtype.storm.task.TopologyContext; 6 import backtype.storm.topology.IRichBolt; 7 import backtype.storm.topology.OutputFieldsDeclarer; 8 import backtype.storm.tuple.Fields; 9 import backtype.storm.tuple.Tuple; 10 import backtype.storm.tuple.Values; 11 12 public class WordCountBoldHandle implements IRichBolt{ 13 14 TopologyContext context; 15 OutputCollector collector; 16 17 Map map=new HashMap<String, Integer>(); 18 19 @Override 20 public void cleanup() { 21 // TODO Auto-generated method stub 22 23 } 24 25 @Override 26 public void execute(Tuple tuple) { 27 // TODO Auto-generated method stub 28 String word=(String)tuple.getValueByField("word"); 29 System.out.println("接收数据1"+"----"+word); 30 if(map.containsKey(word)){ 31 //这里面是将咱们的额Integer对象拆箱成一个int类型 32 int num=(int) map.get(word)+1; 33 map.put(word, num); 34 }else { 35 map.put(word, 1); 36 } 37 38 collector.emit(new Values(map)); 39 40 41 } 42 43 @Override 44 public void prepare(Map arg0, TopologyContext context, OutputCollector collector) { 45 // TODO Auto-generated method stub 46 this.collector=collector; 47 this.context=context; 48 49 } 50 51 @Override 52 public void declareOutputFields(OutputFieldsDeclarer declarer) { 53 // TODO Auto-generated method stub 54 declarer.declare(new Fields("WordMap")); 55 56 } 57 58 @Override 59 public Map<String, Object> getComponentConfiguration() { 60 // TODO Auto-generated method stub 61 return null; 62 }
3.第二个bolt接收到来自的bolt的数据,将map里的数据进行遍历输出。
1 package com.cgh.storm.wordcount; 2 3 import java.util.Map; 4 import java.util.Set; 5 6 import backtype.storm.task.OutputCollector; 7 import backtype.storm.task.TopologyContext; 8 import backtype.storm.topology.IRichBolt; 9 import backtype.storm.topology.OutputFieldsDeclarer; 10 import backtype.storm.tuple.Tuple; 11 12 public class WordCountBoldOut implements IRichBolt{ 13 14 @Override 15 public void cleanup() { 16 // TODO Auto-generated method stub 17 18 } 19 20 @Override 21 public void execute(Tuple arg0) { 22 // TODO Auto-generated method stub 23 //接收数据并删除数据 24 Map map=(Map<String, Integer>)arg0.getValueByField("WordMap"); 25 Set<String> set=map.keySet(); 26 for(String key:set){ 27 System.out.println("单词:"+key+", 出现次数:"+map.get(key)); 28 } 29 30 } 31 32 @Override 33 public void prepare(Map arg0, TopologyContext arg1, OutputCollector arg2) { 34 // TODO Auto-generated method stub 35 36 } 37 38 @Override 39 public void declareOutputFields(OutputFieldsDeclarer arg0) { 40 // TODO Auto-generated method stub 41 42 } 43 44 @Override 45 public Map<String, Object> getComponentConfiguration() { 46 // TODO Auto-generated method stub 47 return null; 48 } 49 50 }
4.本地运行测试代码:
3.storm 任务提交流程:
4.storm 的worker间的通讯
5.storm与hadoop的有哪些异同点
hadoop是磁盘级计算,进行计算时,数据在磁盘上,须要读写磁盘;storm是内存级计算,数据直接经过网络导入内存。读写比磁盘块n个数量级。
hadoop 的mapreduce基于hdfs ,须要切分输入的数据,产生中间数据文件、排序、数据压缩、多份复制等,效率较低
storm基于zeroMQ这个高性能的消息通信库,不持久化数据。
最主要的方面:hadoop使用磁盘做为中间交换的介质,而storm的数据是一直在内存中流转的,二者的面向的领域也不彻底相同,一个批处理,基于任务调度的,另外一个是实时处理,基于流的,以水为例,hadoop能够当作做为一桶一桶的搬,而storm是用水管,预先接好(topology),而后打开水龙头,水就会源源不断的流出来。