刚刚接触storm 对于滑动窗口的topN复杂模型有一些不理解,经过阅读其余的博客发现有两篇关于topN的非滑动窗口的介绍。而后转载过来。css
下面是第一种:html
Storm的另外一种常见模式是对流式数据进行所谓“streaming top N”的计算,它的特色是持续的在内存中按照某个统计指标(如出现次数)计算TOP N,而后每隔必定时间间隔输出实时计算后的TOP N结果。java
流式数据的TOP N计算的应用场景不少,例如计算twitter上最近一段时间内的热门话题、热门点击图片等等。安全
下面结合Storm-Starter中的例子,介绍一种能够很容易进行扩展的实现方法:首先,在多台机器上并行的运行多个Bolt,每一个Bolt负责一部分数据的TOP N计算,而后再有一个全局的Bolt来合并这些机器上计算出来的TOP N结果,合并后获得最终全局的TOP N结果。dom
该部分示例代码的入口是RollingTopWords类,用于计算文档中出现次数最多的N个单词。首先看一下这个Topology结构:ide
Topology构建的代码以下:性能
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("word", new TestWordSpout(), 5);
builder.setBolt("count", new RollingCountObjects(60, 10), 4)
.fieldsGrouping("word", new Fields("word"));
builder.setBolt("rank", new RankObjects(TOP_N), 4)
.fieldsGrouping("count", new Fields("obj"));
builder.setBolt("merge", new MergeObjects(TOP_N))
.globalGrouping("rank");
(1)首先,TestWordSpout()是Topology的数据源Spout,持续随机生成单词发出去,产生数据流“word”,输出Fields是“word”,核心代码以下:测试
public void nextTuple() {
Utils.sleep(100);
final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
(2)接下来,“word”流入RollingCountObjects这个Bolt中进行word count计算,为了保证同一个word的数据被发送到同一个Bolt中进行处理,按照“word”字段进行field grouping;在RollingCountObjects中会计算各个word的出现次数,而后产生“count”流,输出“obj”和“count”两个Field,其中对于synchronized的线程锁咱们也能够换成安全的容器,好比ConcurrentHashMap等组件。核心代码以下:ui
public void execute(Tuple tuple) {
Object obj = tuple.getValue(0);
int bucket = currentBucket(_numBuckets);
synchronized(_objectCounts) {
long[] curr = _objectCounts.get(obj);
if(curr==null) {
curr = new long[_numBuckets];
_objectCounts.put(obj, curr);
}
curr[bucket]++;
_collector.emit(new Values(obj, totalObjects(obj)));
_collector.ack(tuple);
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("obj", "count"));
}
(3)而后,RankObjects这个Bolt按照“count”流的“obj”字段进行field grouping;在Bolt内维护TOP N个有序的单词,若是超过TOP N个单词,则将排在最后的单词踢掉,同时每一个必定时间(2秒)产生“rank”流,输出“list”字段,输出TOP N计算结果到下一级数据流“merge”流,核心代码以下:this
public void execute(Tuple tuple, BasicOutputCollector collector) {
Object tag = tuple.getValue(0);
Integer existingIndex = _find(tag);
if (null != existingIndex) {
_rankings.set(existingIndex, tuple.getValues());
} else {
_rankings.add(tuple.getValues());
}
Collections.sort(_rankings, new Comparator<List>() {
public int compare(List o1, List o2) {
return _compare(o1, o2);
}
});
if (_rankings.size() > _count) {
_rankings.remove(_count);
}
long currentTime = System.currentTimeMillis();
if(_lastTime==null || currentTime >= _lastTime + 2000) {
collector.emit(new Values(new ArrayList(_rankings)));
_lastTime = currentTime;
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("list"));
}
(4)最后,MergeObjects这个Bolt按照“rank”流的进行全局的grouping,即全部上一级Bolt产生的“rank”流都流到这个“merge”流进行;MergeObjects的计算逻辑和RankObjects相似,只是将各个RankObjects的Bolt合并后计算获得最终全局的TOP N结果,核心代码以下:
public void execute(Tuple tuple, BasicOutputCollector collector) {
List<List> merging = (List) tuple.getValue(0);
for(List pair : merging) {
Integer existingIndex = _find(pair.get(0));
if (null != existingIndex) {
_rankings.set(existingIndex, pair);
} else {
_rankings.add(pair);
}
Collections.sort(_rankings, new Comparator<List>() {
public int compare(List o1, List o2) {
return _compare(o1, o2);
}
});
if (_rankings.size() > _count) {
_rankings.subList(_count, _rankings.size()).clear();
}
}
long currentTime = System.currentTimeMillis();
if(_lastTime==null || currentTime >= _lastTime + 2000) {
collector.emit(new Values(new ArrayList(_rankings)));
LOG.info("Rankings: " + _rankings);
_lastTime = currentTime;
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("list"));
}
另外,还有一种很聪明的方法,只在execute中插入数据而不emit,而在prepare中进行emit,建立线程根据时间进行监听。
- package test.storm.topology;
- import test.storm.bolt.WordCounter;
- import test.storm.bolt.WordWriter;
- import test.storm.spout.WordReader;
- import backtype.storm.Config;
- import backtype.storm.StormSubmitter;
- import backtype.storm.generated.AlreadyAliveException;
- import backtype.storm.generated.InvalidTopologyException;
- import backtype.storm.topology.TopologyBuilder;
- import backtype.storm.tuple.Fields;
- public class WordTopN {
- public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
- if (args == null || args.length < 1) {
- System.err.println("Usage: N");
- System.err.println("such as : 10");
- System.exit(-1);
- }
- TopologyBuilder builder = new TopologyBuilder();
- builder.setSpout("wordreader", new WordReader(), 2);
- builder.setBolt("wordcounter", new WordCounter(), 2).fieldsGrouping("wordreader", new Fields("word"));
- builder.setBolt("wordwriter", new WordWriter()).globalGrouping("wordcounter");
- Config conf = new Config();
- conf.put("N", args[0]);
- conf.setDebug(false);
- StormSubmitter.submitTopology("topN", conf, builder.createTopology());
- }
- }
这里须要注意的几点是,第一个bolt的分组策略是fieldsGrouping,按照字段分组,这一点很重要,它能保证相同的word被分发到同一个bolt上,
像作wordcount、TopN之类的应用就要使用这种分组策略。
最后一个bolt的分组策略是globalGrouping,全局分组,tuple会被分配到一个bolt用来汇总。
为了提升并行度,spout和第一个bolt均设置并行度为2(我这里测试机器性能不是很高)。
- package test.storm.spout;
- import java.util.Map;
- import java.util.Random;
- import java.util.concurrent.atomic.AtomicInteger;
- import backtype.storm.spout.SpoutOutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseRichSpout;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Values;
- public class WordReader extends BaseRichSpout {
- private static final long serialVersionUID = 2197521792014017918L;
- private SpoutOutputCollector collector;
- private static AtomicInteger i = new AtomicInteger();
- private static String[] words = new String[] { \"a\", \"b\", \"c\", \"d\", \"e\", \"f\", \"g\", \"h\", \"i\", \"j\", \"k\", \"l\", \"m\",
- \"n\", \"o\", \"p\", \"q\", \"r\", \"s\", \"t\", \"u\", \"v\", \"w\", \"x\", \"y\", \"z\" };
- @Override
- public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
- this.collector = collector;
- }
- @Override
- public void nextTuple() {
- if (i.intValue() < 100) {
- Random rand = new Random();
- String word = words[rand.nextInt(words.length)];
- collector.emit(new Values(word));
- i.incrementAndGet();
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
- }
spout的做用是随机发送word,发送100次,因为并行度是2,将产生2个spout实例,因此这里的计数器使用了static的AtomicInteger来保证线程安全。
- package test.storm.bolt;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.Comparator;
- import java.util.HashMap;
- import java.util.List;
- import java.util.Map;
- import java.util.Map.Entry;
- import java.util.concurrent.ConcurrentHashMap;
- import backtype.storm.task.OutputCollector;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.IRichBolt;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.tuple.Fields;
- import backtype.storm.tuple.Tuple;
- import backtype.storm.tuple.Values;
- public class WordCounter implements IRichBolt {
- private static final long serialVersionUID = 5683648523524179434L;
- private static Map<String, Integer> counters = new ConcurrentHashMap<String, Integer>();
- private volatile boolean edit = true;
- @Override
- public void prepare(final Map stormConf, TopologyContext context, final OutputCollector collector) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- while (true) {
- //5秒后counter再也不变化,能够认为spout已经发送完毕
- if (!edit) {
- if (counters.size() > 0) {
- List<Map.Entry<String, Integer>> list = new ArrayList<Map.Entry<String, Integer>>();
- list.addAll(counters.entrySet());
- Collections.sort(list, new ValueComparator());
- //向下一个bolt发送前N个word
- for (int i = 0; i < list.size(); i++) {
- if (i < Integer.parseInt(stormConf.get("N").toString())) {
- collector.emit(new Values(list.get(i).getKey() + ":" + list.get(i).getValue()));
- }
- }
- }
- //发送以后,清空counters,以防spout再次发送word过来
- counters.clear();
- }
- edit = false;
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
- }
- @Override
- public void execute(Tuple tuple) {
- String str = tuple.getString(0);
- if (counters.containsKey(str)) {
- Integer c = counters.get(str) + 1;
- counters.put(str, c);
- } else {
- counters.put(str, 1);
- }
- edit = true;
- }
- private static class ValueComparator implements Comparator<Map.Entry<String, Integer>> {
- @Override
- public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {
- return entry2.getValue() - entry1.getValue();
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word_count"));
- }
- @Override
- public void cleanup() {
- }
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
- }
在WordCounter里面有个线程安全的容器ConcurrentHashMap,来存储word以及对应的次数。在prepare方法里启动一个线程,长期监听edit的状态,监听间隔是5秒,
当edit为false,即execute方法再也不执行、容器再也不变化,能够认为spout已经发送完毕了,能够开始排序取TopN了。这里使用了一个volatile edit(回忆一下volatile的使用场景:
对变量的修改不依赖变量当前的值,这里设置true or false,显然不相互依赖)。
- package test.storm.bolt;
- import java.io.FileWriter;
- import java.io.IOException;
- import java.util.Map;
- import backtype.storm.task.TopologyContext;
- import backtype.storm.topology.BasicOutputCollector;
- import backtype.storm.topology.OutputFieldsDeclarer;
- import backtype.storm.topology.base.BaseBasicBolt;
- import backtype.storm.tuple.Tuple;
- public class WordWriter extends BaseBasicBolt {
- private static final long serialVersionUID = -6586283337287975719L;
- private FileWriter writer = null;
- public WordWriter() {
- }
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- try {
- writer = new FileWriter("/data/tianzhen/output/" + this);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- @Override
- public void execute(Tuple input, BasicOutputCollector collector) {
- String s = input.getString(0);
- try {
- writer.write(s);
- writer.write("\n");
- writer.flush();
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- //writer不能close,由于execute须要一直运行
- }
- }
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
- }
最后一个bolt作全局的汇总,这里我偷了懒,直接将结果写到文件了,省略截取TopN的过程,由于我这里就一个supervisor节点,因此结果是正确的。
引用链接:http://blog.itpub.net/28912557/viewspace-1579860/
http://www.cnblogs.com/panfeng412/archive/2012/06/16/storm-common-patterns-of-streaming-top-n.html