最近在作一个监控系统,用来监控网站上各个业务功能的调用量以及处理时间,以便及时发现问题,及时处理。作这种实时统计处理系统,天然首先想到了storm,因而现学现用,天然遇到了一些坑,并且很多是网上也难以找到的问题。在这里就作个记录,记录下这个最让我苦恼的错误。java
首先个人业务逻辑是按分钟统计一分钟中的调用次数的数据,因此我在bolt里跑了一个定时器,定时将统计数据发到下一个bolt入库。所在我在定时器执行的代码里调用了OutputCollector发射到下一个bolt。本地调试没啥问题,就部署到外网环境测试。一般也没发现问题,可是偶尔会出现这种错误,做为开发人员最讨厌的就是这种可复现率很低的错误 。网络
这里是错误日志:dom
5675 [Thread-7-disruptor-executor[2 2]-send-queue] ERROR backtype.storm.daemon.executor - java.lang.RuntimeException: java.lang.NullPointerException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:128) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$consume_loop_STAR_$fn__1460.invoke(disruptor.clj:94) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:722) [na:1.7.0_15] Caused by: java.lang.NullPointerException: null at clojure.lang.RT.intCast(RT.java:1087) ~[clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$mk_transfer_fn$fn__3549.invoke(worker.clj:129) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.executor$start_batch_transfer__GT_worker_handler_BANG_$fn__3283.invoke(executor.clj:258) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.disruptor$clojure_handler$reify__1447.onEvent(disruptor.clj:58) ~[storm-core-0.9.3.jar:0.9.3] at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125) ~[storm-core-0.9.3.jar:0.9.3] ... 6 common frames omitted 5697 [Thread-7-disruptor-executor[2 2]-send-queue] ERROR backtype.storm.util - Halting process: ("Worker died") java.lang.RuntimeException: ("Worker died") at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3] at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na] at backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452) [storm-core-0.9.3.jar:0.9.3] at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3] at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3] at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na] at java.lang.Thread.run(Thread.java:722) [na:1.7.0_15]
若是你也遇到这个问题,相信你第一次看到这个错误必定很痛苦,由于错误日志中没有任何与本身的业务代码相关的记录。因此实在是无从定位问题的所在。痛苦至极的是复现还不那么容易。async
通过我屡次猜想尝试,终于测出了问题的所在。下面我先贴出一个会报这个错误的例子代码:ide
public class Main { public static void main(String[] args) { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout",new TestWordSpout()); builder.setBolt("dispatch", new WordDispatchBolt()).shuffleGrouping("spout"); builder.setBolt("print",new PrintBolt()).fieldsGrouping("dispatch", new Fields("word")); Config conf = new Config(); conf.setDebug(false); conf.setNumWorkers(1); //conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test-kafka-1", conf, builder.createTopology()); } }
public class TestWordSpout extends BaseRichSpout { private static final long serialVersionUID = 1L; boolean _isDistributed; SpoutOutputCollector _collector; String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; public TestWordSpout() { this(true); } public TestWordSpout(boolean isDistributed) { _isDistributed = isDistributed; } public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; } public void close() { } public void nextTuple() { Utils.sleep(1000); final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; _collector.emit(new Values(word), word+new Random().nextDouble()); } public void ack(Object msgId) { System.out.println("### ack:"+msgId); } public void fail(Object msgId) { System.out.println("### fail:"+msgId); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
public class WordDispatchBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; new Thread(new Runnable() { @Override public void run() { while(true){ send();//不作sleep休眠,不然抛出此异常的概率过小,不容易观察到 } } }).start(); } public void send(){ this.collector.emit(new Values(new Random().nextDouble())); } @Override public void execute(Tuple input) { String word = input.getStringByField("word"); this.collector.emit(new Values(word)); this.collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
public class PrintBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { } @Override public void execute(Tuple input) { System.out.println(input.getValue(0)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
这个代码很简单,就不作详细介绍了。在WordDispatchBolt类里我启动了另外一个线程来发射数据到下一个bolt。个人业务代码中与此相似,是经过Timer定时发送数据的(Timer底层其实也是线程,就很少说了)。可是Timer是按分钟调用的,因此出现问题的概率小的可怜,这里我故意零停顿的调用,让此异常发生的概率更大一些。 oop
若是运行以上例子代码,你也确定遇到前边贴出的错误异常。若是不知道是OutputCollector的同步问题,相信解决起来绝对让人痛不欲生。既然知道了是同步问题,要么避免在别的线程里调用collector,要么改为同步的。如下是我简单想到的解决方案。(若是有大神还有更好的,但愿留言指教)测试
对WordDispatchBolt类作以下修改:网站
public class WordDispatchBolt extends BaseRichBolt{ private OutputCollector collector; @Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; new Thread(new Runnable() { @Override public void run() { while(true){ send(new Values(new Random().nextDouble()));//不作sleep休眠,不然抛出此异常的概率过小,不容易观察到 } } }).start(); } public synchronized void send(List<Object> tuple){ this.collector.emit(tuple); } @Override public void execute(Tuple input) { String word = input.getStringByField("word"); send(new Values(word)); this.collector.ack(input); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
到这里,这个坑算是基本获得解决了。以后可能还要大量使用到storm,遇到坑是再作记录。ui
”把遇到的坑记录下来,让后遇到者能够有更多的网络资源查询,以减小排查问题的时间和纠结“this