问题导读:
1.你认为何图形能够显示hadoop与storm的区别?(电梯)
2.本文是如何形象讲解hadoop与storm的?(离线批量处理、实时流式处理)
3.hadoop map/reduce对应storm那两个概念?(spout/bolt)
4.storm流由谁来组成?(Tuples)
5.tuple具体是什么形式?
数据库
什么是Storm?
Storm是:缓存
区别:
咱们知道hadoop是批处理,storm是流式处理,那么是什么是批处理,什么流式处理?
Storm和Hadoop主要区别是实时和批处理的区别:
Storm概念组成:Spout和Bolt组成Topology。
Tuple是Storm的数据模型,如['jdon',12346]
多个Tuple组成事件流:服务器
Spout是读取须要分析处理的数据源,而后转为Tuples,这些数据源能够是Web日志、 API调用、数据库等等。Spout至关于事件流的生产者。
Bolt 处理Tuples而后再建立新的Tuples流,Bolt至关于事件流的消费者。
Bolt 做为真正业务处理者,主要实现大数据处理的核心功能,好比转换数据,应用相应过滤器,计算和聚合数据(好比统计总和等等) 。
以Twitter的某个Tweet为案例,看看Storm如何处理:
这些tweett贴内容是:“No Small Cell Lung #Cancer(没有小细胞肺癌#癌症)” "An #OnCology Consult...."
这些贴被Spout读取之后,产生Tuple,字段名是tweet,内容是"No Small Cell Lung #Cancer",格式相似:['No Small Cell Lung #Cancer',133221]。
而后进入被流 消费者Bolt进行处理,第一个Bolt是SplitSentence,将tuple内容进行分离,结果成为:一个个单词:"No" "Small" "Cell" "Lung" "#Cancer" ;而后通过第二个Bolt进行过滤HashTagFilter处理,Hash标签是单词中用#标注的,也就是Cancer;再通过HasTagCount计数,能够本地内存缓存这个计数结果,最后经过PrinterBolt打印出标签单词统计结果 。网络
咱们使用Stom所要作的就是编制Spout和Bolt代码:架构
1 public class RandomSentenceSpout extends BaseRichSpout { 2 SpoutOutputCollector collector; 3 Random random; 4 //读入外部数据 5 public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { 6 this.collector = collector; 7 random = new Random(); 8 } 9 //产生Tuple 10 public void nextTuple() { 11 String[] sentences = new String[] { 12 "No Small Cell Lung #Cancer", 13 "An #OnCology Consultant apple a day keeps the doctor away", 14 "four score and seven years ago", 15 "snow white and the seven dwarfs", 16 "i am at two with nature" 17 }; 18 String tweet = sentences[random.nextInt(sentences.length)]; 19 //定义字段名"tweet" 的值 20 collector.emit(new Values(tweet)); 21 } 22 // 定义字段名"tweet" 23 public void declareOutputFields(OutputFieldsDeclarer declarer) { 24 declarer.declare(new Fields("tweet")); 25 } 26 @Override 27 public void ack(Object msgId) {} 28 @Override 29 public void fail(Object msgId) {} 30 }
下面是Bolt的代码编写:app
1 public class SplitSentenceBolt extends BaseRichBolt { 2 OutputCollector collector; 3 @Override 4 public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { 5 this.collector = collector; 6 } 7 @Override 消费者激活主要方法:分离成单个单词 8 public void execute(Tuple input) { 9 for (String s : input.getString(0).split("\\s")) { 10 collector.emit(new Values(s)); 11 } 12 } 13 @Override 定义新的字段名 14 public void declareOutputFields(OutputFieldsDeclarer declarer) { 15 declarer.declare(new Fields("word")); 16 }
最后是装配运行Spout和Bolt的客户端调用代码:dom
1 public class WordCountTopology { 2 public static void main(String[] args) throws Exception { 3 TopologyBuilder builder = new TopologyBuilder(); 4 builder.setSpout("tweet", new RandomSentenceSpout(), 2); 5 builder.setBolt("split", new SplitSentenceBolt(), 4) 6 .shuffleGrouping("tweet") 7 .setNumTasks(8); 8 builder.setBolt("count", new WordCountBolt(), 6) 9 .fieldsGrouping("split", new Fields("word")); 10 ..设置多个Bolt 11 Config config = new Config(); 12 config.setNumWorkers(4); 13 14 StormSubmitter.submitTopology("wordcount", config, builder.createTopology()); 15 //Local testing 16 //LocalCluster cluster = new LocalCluster(); 17 //cluster.submitTopology("wordcount", config, builder.createTopology()); 18 //Thread.sleep(10000); 19 //cluster.shutdown(); 20 } 21 }
在这个代码中定义了一些参数好比Works的数目是4,其含义在后面详细分析。
下面咱们要将上面这段代码发布部署到Storm中,首先了解Storm物理架构图:分布式
Nimbus是一个主后台处理器,主要负责:
1.发布分发代码
2.分配任务
3.监控失败。
Supervisor是负责当前这个节点的后台工做处理器的监听。
Work相似Java的线程,采起JDK的Executor 。ide
下面开始将咱们的代码部署到这个网络拓扑中:将代码Jar包上传到Nimbus的inbox,包括全部的依赖包,而后提交。
Nimbus将保存在本地文件系统,而后开始配置网络拓扑,分配开始拓扑。
见下图:Nimbus服务器将拓扑Jar 配置和结构下载到 Supervisor,负载平衡ZooKeeper分配某个特定的Supervisor服务器,而Supervisor开始基于配置分配Work,Work调用JDK的Executor启动线程,开始任务处理。
下面是咱们代码对拓扑分配的参数示意图:
Executor启动的线程数目是12个,组件的实例是16个,那么如何在实际服务器中分配呢?以下图:图中RsSpout表明咱们的代码中RandomSentenceSpout;SplitSentenceBolt简写为SSbolt。oop