Storm的wordCounter计数器详解

点击(此处)折叠或打开 php

  1. package cn.jd.storm;

  2. import backtype.storm.Config;
  3. import backtype.storm.LocalCluster;
  4. import backtype.storm.topology.TopologyBuilder;
  5. import backtype.storm.tuple.Fields;
  6. /**
  7.  * 功能说明:
  8.  * 设计一个topology,来实现对一个句子里面的单词出现的频率进行统计。
  9.  * 整个topology分为三个部分:
  10.  *         WordReader:数据源,负责发送单行文本记录(句子)
  11.  *         WordNormalizer:负责将单行文本记录(句子)切分红单词
  12.  *         WordCounter:负责对单词的频率进行累加
  13.  *
  14.  * @author 毛祥溢
  15.  * Email:frank@maoxiangyi.cn
  16.  * 2013-8-26 下午5:59:06
  17.  */
  18. public class TopologyMain {

  19.     /**
  20.      * @param args 文件路径
  21.      */
  22.     public static void main(String[] args)throws Exception {
  23.         // Storm框架支持多语言,在JAVA环境下建立一个拓扑,须要使用TopologyBuilder进行构建
  24.         TopologyBuilder builder = new TopologyBuilder();
  25.         /* WordReader类,主要是将文本内容读成一行一行的模式
  26.          * 消息源spout是Storm里面一个topology里面的消息生产者。
  27.          * 通常来讲消息源会从一个外部源读取数据而且向topology里面发出消息:tuple。
  28.          * Spout能够是可靠的也能够是不可靠的。
  29.          * 若是这个tuple没有被storm成功处理,可靠的消息源spouts能够从新发射一个tuple,可是不可靠的消息源spouts一旦发出一个tuple就不能重发了。
  30.          *
  31.          * 消息源能够发射多条消息流stream。多条消息流能够理解为多中类型的数据。
  32.          * 使用OutputFieldsDeclarer.declareStream来定义多个stream,而后使用SpoutOutputCollector来发射指定的stream。
  33.          *
  34.          * Spout类里面最重要的方法是nextTuple。要么发射一个新的tuple到topology里面或者简单的返回若是已经没有新的tuple。
  35.          * 要注意的是nextTuple方法不能阻塞,由于storm在同一个线程上面调用全部消息源spout的方法。
  36.          *
  37.          * 另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,不然调用fail。storm只对可靠的spout调用ack和fail。
  38.          */
  39.         builder.setSpout("word-reader",new WordReader());
  40.         /* WordNormalizer类,主要是将一行一行的文本内容切割成单词
  41.          *
  42.          * 全部的消息处理逻辑被封装在bolts里面。Bolts能够作不少事情:过滤,聚合,查询数据库等等。
  43.          * Bolts能够简单的作消息流的传递。复杂的消息流处理每每须要不少步骤,从而也就须要通过不少bolts。
  44.          * 好比算出一堆图片里面被转发最多的图片就至少须要两步:
  45.          *     第一步算出每一个图片的转发数量。
  46.          *     第二步找出转发最多的前10个图片。(若是要把这个过程作得更具备扩展性那么可能须要更多的步骤)。
  47.          *
  48.          * Bolts能够发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。
  49.          * Bolts的主要方法是execute, 它以一个tuple做为输入,bolts使用OutputCollector来发射tuple。
  50.          * bolts必需要为它处理的每个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。
  51.          * 通常的流程是: bolts处理一个输入tuple, 发射0个或者多个tuple, 而后调用ack通知storm本身已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。
  52.          *
  53.          *
  54.          */
  55.         builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");
  56.         /*
  57.          * 上面的代码和下面的代码中都设定了数据分配的策略stream grouping
  58.          * 定义一个topology的其中一步是定义每一个bolt接收什么样的流做为输入。stream grouping就是用来定义一个stream应该若是分配数据给bolts上面的多个tasks。
  59.          * Storm里面有7种类型的stream grouping
  60.          *         Shuffle Grouping: 随机分组, 随机派发stream里面的tuple,保证每一个bolt接收到的tuple数目大体相同。
  61.          *         Fields Grouping:按字段分组, 好比按userid来分组, 具备一样userid的tuple会被分到相同的Bolts里的一个task,
  62.          *             而不一样的userid则会被分配到不一样的bolts里的task。
  63.          *         All Grouping:广播发送,对于每个tuple,全部的bolts都会收到。
  64.          *         Global Grouping:全局分组, 这个tuple被分配到storm中的一个bolt的其中一个task。再具体一点就是分配给id值最低的那个task。
  65.          *         Non Grouping:不分组,这stream grouping个分组的意思是说stream不关心到底谁会收到它的tuple。
  66.          *             目前这种分组和Shuffle grouping是同样的效果, 有一点不一样的是storm会把这个bolt放到这个bolt的订阅者同一个线程里面去执行。
  67.          *         Direct Grouping: 直接分组, 这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪一个task处理这个消息。
  68.          *             只有被声明为Direct Stream的消息流能够声明这种分组方法。并且这种消息tuple必须使用emitDirect方法来发射。
  69.          *             消息处理者能够经过TopologyContext来获取处理它的消息的task的id (OutputCollector.emit方法也会返回task的id)。
  70.          *         Local or shuffle grouping:若是目标bolt有一个或者多个task在同一个工做进程中,tuple将会被随机发生给这些tasks。
  71.          *             不然,和普通的Shuffle Grouping行为一致。
  72.          *
  73.          */
  74.         builder.setBolt("word-counter", new WordCounter(),1).fieldsGrouping("word-normalizer", new Fields("word"));
  75.  
  76.         /*
  77.          * storm的运行有两种模式: 本地模式和分布式模式.
  78.          * 1) 本地模式:
  79.          *         storm用一个进程里面的线程来模拟全部的spout和bolt. 本地模式对开发和测试来讲比较有用。
  80.          *         你运行storm-starter里面的topology的时候它们就是以本地模式运行的, 你能够看到topology里面的每个组件在发射什么消息。
  81.          * 2) 分布式模式:
  82.          *         storm由一堆机器组成。当你提交topology给master的时候, 你同时也把topology的代码提交了。
  83.          *         master负责分发你的代码而且负责给你的topolgoy分配工做进程。若是一个工做进程挂掉了, master节点会把认为从新分配到其它节点。
  84.          * 下面是以本地模式运行的代码:
  85.          *
  86.          * Conf对象能够配置不少东西, 下面两个是最多见的:
  87.          *        TOPOLOGY_WORKERS(setNumWorkers) 定义你但愿集群分配多少个工做进程给你来执行这个topology.
  88.          *            topology里面的每一个组件会被须要线程来执行。每一个组件到底用多少个线程是经过setBolt和setSpout来指定的。
  89.          *            这些线程都运行在工做进程里面. 每个工做进程包含一些节点的一些工做线程。
  90.          *            好比, 若是你指定300个线程,60个进程, 那么每一个工做进程里面要执行6个线程, 而这6个线程可能属于不一样的组件(Spout, Bolt)。
  91.          *            你能够经过调整每一个组件的并行度以及这些线程所在的进程数量来调整topology的性能。
  92.          *        TOPOLOGY_DEBUG(setDebug), 当它被设置成true的话, storm会记录下每一个组件所发射的每条消息。
  93.          *            这在本地环境调试topology颇有用, 可是在线上这么作的话会影响性能的。
  94.          */
  95.         Config conf = new Config();
  96.         conf.setDebug(true);
  97.         conf.setNumWorkers(2);
  98.         conf.put("wordsFile","/root/workspace1/com.jd.storm.demo/src/main/resources/words.txt");
  99.         conf.setDebug(true);
  100.         conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);
  101.         /*
  102.          * 定义一个LocalCluster对象来定义一个进程内的集群。提交topology给这个虚拟的集群和提交topology给分布式集群是同样的。
  103.          * 经过调用submitTopology方法来提交topology, 它接受三个参数:要运行的topology的名字,一个配置对象以及要运行的topology自己。
  104.          * topology的名字是用来惟一区别一个topology的,这样你而后能够用这个名字来杀死这个topology的。前面已经说过了, 你必须显式的杀掉一个topology, 不然它会一直运行。
  105.          */
  106.         LocalCluster cluster = new LocalCluster();
  107.         cluster.submitTopology("wordCounterTopology", conf, builder.createTopology());
  108.         Thread.sleep(1000);
  109.         cluster.killTopology("wordCounterTopology");
  110.         cluster.shutdown();
  111.     }

  112. }

读句子

点击(此处)折叠或打开 css

  1. package cn.jd.storm;

  2. import java.io.BufferedReader;
  3. import java.io.FileNotFoundException;
  4. import java.io.FileReader;
  5. import java.util.Map;

  6. import backtype.storm.spout.SpoutOutputCollector;
  7. import backtype.storm.task.TopologyContext;
  8. import backtype.storm.topology.OutputFieldsDeclarer;
  9. import backtype.storm.topology.base.BaseRichSpout;
  10. import backtype.storm.tuple.Fields;
  11. import backtype.storm.tuple.Values;
  12. /**
  13.  *
  14.  * 功能说明:
  15.  *         主要是将文件内容读出来,一行一行
  16.  *
  17.  *     Spout类里面最重要的方法是nextTuple。
  18.  *     要么发射一个新的tuple到topology里面或者简单的返回若是已经没有新的tuple。
  19.  *     要注意的是nextTuple方法不能阻塞,由于storm在同一个线程上面调用全部消息源spout的方法。
  20.  *     另外两个比较重要的spout方法是ack和fail。
  21.  *     storm在检测到一个tuple被整个topology成功处理的时候调用ack,不然调用fail。
  22.  * storm只对可靠的spout调用ack和fail。
  23.  *
  24.  * @author 毛祥溢
  25.  * Email:frank@maoxiangyi.cn
  26.  * 2013-8-26 下午6:05:46
  27.  */
  28. public class WordReader extends BaseRichSpout {

  29.     private SpoutOutputCollector collector;
  30.     private FileReader fileReader;
  31.     private String filePath;
  32.     private boolean completed = false;
  33.     //storm在检测到一个tuple被整个topology成功处理的时候调用ack,不然调用fail。
  34.     public void ack(Object msgId) {
  35.         System.out.println("OK:"+msgId);
  36.     }
  37.     public void close() {}
  38.     //storm在检测到一个tuple被整个topology成功处理的时候调用ack,不然调用fail。
  39.     public void fail(Object msgId) {
  40.         System.out.println("FAIL:"+msgId);
  41.     }
  42.  
  43.   /*
  44.    * 在SpoutTracker类中被调用,每调用一次就能够向storm集群中发射一条数据(一个tuple元组),该方法会被不停的调用
  45.    */
  46.     public void nextTuple() {
  47.         if(completed){
  48.             try {
  49.                 Thread.sleep(1000);
  50.             } catch (InterruptedException e) {
  51.             }
  52.             return;
  53.         }
  54.         String str;
  55.         BufferedReader reader =new BufferedReader(fileReader);
  56.         try{
  57.             while((str = reader.readLine()) != null){
  58.                 System.out.println("WordReader类 读取到一行数据:"+ str);
  59.                 this.collector.emit(new Values(str),str);
  60.                 System.out.println("WordReader类 发射了一条数据:"+ str);
  61.             }
  62.         }catch(Exception e){
  63.             throw new RuntimeException("Error reading tuple",e);
  64.         }finally{
  65.             completed = true;
  66.         }
  67.     }
  68.  
  69.     public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {
  70.         try {
  71.             this.fileReader = new FileReader(conf.get("wordsFile").toString());
  72.         } catch (FileNotFoundException e) {
  73.             throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");
  74.         }
  75.         this.filePath    = conf.get("wordsFile").toString();
  76.         this.collector = collector;
  77.     }
  78.     /**
  79.      * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
  80.      * 该declarer变量有很大做用,咱们还能够调用declarer.declareStream();来定义stramId,该id能够用来定义更加复杂的流拓扑结构
  81.      */
  82.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  83.         declarer.declare(new Fields("line"));
  84.     }
  85. }

将句子切割成单词

点击(此处)折叠或打开 html

  1. package cn.jd.storm;

  2. import backtype.storm.topology.BasicOutputCollector;
  3. import backtype.storm.topology.OutputFieldsDeclarer;
  4. import backtype.storm.topology.base.BaseBasicBolt;
  5. import backtype.storm.tuple.Fields;
  6. import backtype.storm.tuple.Tuple;
  7. import backtype.storm.tuple.Values;
  8. /**
  9.  *
  10.  * 功能说明:
  11.  *     将一行文本切割成单词,并封装collector中发射出去
  12.  *
  13.  * @author 毛祥溢
  14.  * Email:frank@maoxiangyi.cn
  15.  * 2013-8-26 下午6:05:59
  16.  */
  17. public class WordNormalizer extends BaseBasicBolt {

  18.     public void cleanup() {
  19.         System.out.println("将一行文本切割成单词,并封装collector中发射出去 ---完毕!");
  20.     }
  21.     
  22.     /**
  23.      * 接受的参数是WordReader发出的句子,即input的内容是句子
  24.      * execute方法,将句子切割造成的单词发出
  25.      */
  26.     public void execute(Tuple input, BasicOutputCollector collector) {
  27.         String sentence = input.getString(0);
  28.         String[] words = sentence.split(" ");
  29.         System.out.println("WordNormalizer类 收到一条数据,这条数据是: "+ sentence);
  30.         for(String word : words){
  31.             word = word.trim();
  32.             if(!word.isEmpty()){
  33.                 word = word.toLowerCase();
  34.                 System.out.println("WordNormalizer类 收到一条数据,这条数据是: "+ sentence+"数据正在被切割,切割出来的单词是 "+ word);
  35.                 collector.emit(new Values(word));
  36.             }
  37.         }
  38.     }
  39.  
  40.     /**
  41.     * 定义字段id,该id在简单模式下没有用处,但在按照字段分组的模式下有很大的用处。
  42.     * 该declarer变量有很大做用,咱们还能够调用declarer.declareStream();来定义stramId,该id能够用来定义更加复杂的流拓扑结构
  43.     */
  44.     public void declareOutputFields(OutputFieldsDeclarer declarer) {
  45.         declarer.declare(new Fields("word"));
  46.     }
  47. }

计数器WordCounter

点击(此处)折叠或打开 java

  1. package cn.jd.storm;

  2. import java.util.HashMap;
  3. import java.util.Map;

  4. import backtype.storm.task.TopologyContext;
  5. import backtype.storm.topology.BasicOutputCollector;
  6. import backtype.storm.topology.OutputFieldsDeclarer;
  7. import backtype.storm.topology.base.BaseBasicBolt;
  8. import backtype.storm.tuple.Tuple;
  9. /**
  10.  *
  11.  * 功能说明:
  12.  *     实现计数器的功能,第一次将collector中的元素存放在成员变量counters(Map)中.
  13.  *     若是counters(Map)中已经存在该元素,getValule并对Value进行累加操做。
  14.  *
  15.  * @author 毛祥溢
  16.  * Email:frank@maoxiangyi.cn
  17.  * 2013-8-26 下午6:06:07
  18.  */
  19. public class WordCounter extends BaseBasicBolt {

  20.     private static final long serialVersionUID = 5678586644899822142L;
  21.     Integer id;
  22.     String name;
  23.      //定义Map封装最后的结果
  24.     Map<String, Integer> counters;
  25.  
  26.     /**
  27.      * 在spout结束时被调用,将最后的结果显示出来
  28.      *
  29.      * 結果:
  30.      * -- Word Counter [word-counter-2] --
  31.      * really: 1
  32.      * but: 1
  33.      * application: 1
  34.      * is: 2
  35.      * great: 2
  36.      */
  37.     @Override
  38.     public void cleanup() {
  39.         System.out.println("-- Word Counter ["+name+"-"+id+"] --");
  40.         for(Map.Entry<String, Integer> entry : counters.entrySet()){
  41.             System.out.println(entry.getKey()+": "+entry.getValue());
  42.         }
  43.         System.out.println("实现计数器的功能 --完畢!");
  44.     }
  45.  
  46.     /**
  47.      * 初始化操做
  48.      */
  49.     @Override
  50.     public void prepare(Map stormConf, TopologyContext context) {
  51.         this.counters = new HashMap<String, Integer>();
  52.         this.name = context.getThisComponentId();
  53.         this.id = context.getThisTaskId();
  54.     }
  55.  
  56.     public void declareOutputFields(OutputFieldsDeclarer declarer) {}
  57.  
  58.     /**
  59.      * 实现计数器的功能,第一次将collector中的元素存放在成员变量counters(Map)中.
  60.      * 若是counters(Map)中已经存在该元素,getValule并对Value进行累加操做。
  61.      */
  62.     public void execute(Tuple input, BasicOutputCollector collector) {
  63.         String str = input.getString(0);
  64.         System.out.println("WordCounter 计数器收到单词 "+ str);
  65.         if(!counters.containsKey(str)){
  66.             counters.put(str, 1);
  67.         }else{
  68.             Integer c = counters.get(str) + 1;
  69.             counters.put(str, c);
  70.         }
  71.     }
  72. }

数据格式
storm ni great he he xi wang
test haha heihei very
are mao xiang yi jd

运行日志

WordReader类 发射了一条数据:storm ni great he he xi wang
WordReader类 发射了一条数据:test haha heihei very

WordNormalizer类 收到一条数据,这条数据是:    storm ni great he he xi wang
WordNormalizer类 收到一条数据,这条数据是:    storm ni great he he xi wang数据正在被切割,切割出来的单词是 storm
WordNormalizer类 收到一条数据,这条数据是:    storm ni great he he xi wang数据正在被切割,切割出来的单词是 ni
WordNormalizer类 收到一条数据,这条数据是:    storm ni great he he xi wang数据正在被切割,切割出来的单词是 great
WordNormalizer类 收到一条数据,这条数据是:    storm ni great he he xi wang数据正在被切割,切割出来的单词是 he
WordNormalizer类 收到一条数据,这条数据是:    storm ni great he he xi wang数据正在被切割,切割出来的单词是 he
WordNormalizer类 收到一条数据,这条数据是:    storm ni great he he xi wang数据正在被切割,切割出来的单词是 xi
WordNormalizer类 收到一条数据,这条数据是:    storm ni great he he xi wang数据正在被切割,切割出来的单词是 wang
WordCounter 计数器收到单词 storm
WordCounter 计数器收到单词 ni
WordCounter 计数器收到单词 great
WordCounter 计数器收到单词 he
WordCounter 计数器收到单词 he
WordCounter 计数器收到单词 xi
WordCounter 计数器收到单词 wang


WordNormalizer类 收到一条数据,这条数据是:    test haha heihei very
WordNormalizer类 收到一条数据,这条数据是:    test haha heihei very 数据正在被切割,切割出来的单词是 test
WordNormalizer类 收到一条数据,这条数据是:    test haha heihei very 数据正在被切割,切割出来的单词是 haha
WordNormalizer类 收到一条数据,这条数据是:    test haha heihei very 数据正在被切割,切割出来的单词是 heihei
WordNormalizer类 收到一条数据,这条数据是:    test haha heihei very 数据正在被切割,切割出来的单词是 very
WordCounter 计数器收到单词 test
WordCounter 计数器收到单词 haha
WordCounter 计数器收到单词 heihei
WordCounter 计数器收到单词 very


WordReader类 发射了一条数据:are mao xiang yi jd
WordNormalizer类 收到一条数据,这条数据是:    are mao xiang yi jd
WordNormalizer类 收到一条数据,这条数据是:    are mao xiang yi jd 数据正在被切割,切割出来的单词是 are
WordNormalizer类 收到一条数据,这条数据是:    are mao xiang yi jd 数据正在被切割,切割出来的单词是 mao
WordNormalizer类 收到一条数据,这条数据是:    are mao xiang yi jd 数据正在被切割,切割出来的单词是 xiang
WordNormalizer类 收到一条数据,这条数据是:    are mao xiang yi jd 数据正在被切割,切割出来的单词是 yi
WordNormalizer类 收到一条数据,这条数据是:    are mao xiang yi jd 数据正在被切割,切割出来的单词是 jd
WordCounter 计数器收到单词 are
WordCounter 计数器收到单词 mao
WordCounter 计数器收到单词 xiang
WordCounter 计数器收到单词 yi
WordCounter 计数器收到单词 jd

-- Word Counter [word-counter-2] --
xi: 1
test: 1
heihei: 1
haha: 1
he: 2
storm: 1
wang: 1
jd: 1
xiang: 1
great: 1
are: 1
ni: 1
yi: 1
very: 1
mao: 1
实现计数器的功能 --完畢!
将一行文本切割成单词,并封装collector中发射出去 ---完毕!
阅读(40) | 评论(0) | 转发(0) |
给主人留下些什么吧!~~
评论热议
相关文章
相关标签/搜索