可靠的消息 VS 不可靠的消息html
在设计拓扑结构时,始终在头脑中记着的一件重要事情就是消息的可靠性。当有没法处理的消息时,你就要决定该怎么办,以及做为一个总体的拓扑结构该作些什么。举个例子,在处理银行存款时,不要丢失任何事务报文就是很重要的事情。可是若是你要统计分析数以百万的tweeter消息,即便有一条丢失了,仍然能够认为你的结果是准确的。java
对于Storm来讲,根据每一个拓扑的须要担保消息的可靠性是开发者的责任。这就涉及到消息可靠性和资源消耗之间的权衡。高可靠性的拓扑必须管理丢失的消息,必然消耗更多资源;可靠性较低的拓扑可能会丢失一些消息,占用的资源也相应更少。不论选择什么样的可靠性策略,Storm都提供了不一样的工具来实现它。git
要在spout中管理可靠性,你能够在分发时包含一个元组的消息ID(collector.emit(new Values(…),tupleId))。在一个元组被正确的处理时调用ack方法,而在失败时调用fail方法。当一个元组被全部的靶bolt和锚bolt处理过,便可断定元组处理成功(你将在第5章学到更多锚bolt知识)。
发生下列状况之一时为元组处理失败:github
提供数据的spout调用collector.fail(tuple)web
处理时间超过配置的超时时间redis
让咱们来看一个例子。想象你正在处理银行事务,需求以下:apache
若是事务失败了,从新发送消息json
若是失败了太屡次,终结拓扑运行api
建立一个spout和一个bolt,spout随机发送100个事务ID,有80%的元组不会被bolt收到(你能够在例子ch04-spout查看完整代码)。实现spout时利用Map分发事务消息元组,这样就比较容易实现重发消息。缓存
public void nextTuple() { if(!toSend.isEmpty()){ for(Map.Entry<Integer, String> transactionEntry : toSend.entrySet()){ Integer transactionId = transactionEntry.getKey(); String transactionMessage = transactionEntry.getValue(); collector.emit(new Values(transactionMessage),transactionId); } toSend.clear(); }}
若是有未发送的消息,获得每条事务消息和它的关联ID,把它们做为一个元组发送出去,最后清空消息队列。值得一提的是,调用map的clear是安全的,由于nextTuple失败时,只有ack方法会修改map,而它们都运行在一个线程内。
维护两个map用来跟踪待发送的事务消息和每一个事务的失败次数。ack方法只是简单的把事务从每一个列表中删除。
public void ack(Object msgId) { messages.remove(msgId); failCounterMessages.remove(msgId); }
fail方法决定应该从新发送一条消息,仍是已经失败太屡次而放弃它。
NOTE:若是你使用所有数据流组,而拓扑里的全部bolt都失败了,spout的fail方法才会被调用。
public void fail(Object msgId) { Integer transactionId = (Integer) msgId; //检查事务失败次数 Integer failures = transactionFailureCount.get(transactionId) + 1; if(failes >= MAX_FAILS){ //失败数过高了,终止拓扑 throw new RuntimeException("错误, transaction id 【"+ transactionId+"】 已失败太屡次了 【"+failures+"】"); } //失败次数没有达到最大数,保存这个数字并重发此消息 transactionFailureCount.put(transactionId, failures); toSend.put(transactionId, messages.get(transactionId)); LOG.info("重发消息【"+msgId+"】"); }
首先,检查事务失败次数。若是一个事务失败次数太多,经过抛出RuntimeException终止发送此条消息的工人。不然,保存失败次数,并把消息放入待发送队列(toSend),它就会再次调用nextTuple时得以从新发送。
NOTE:Storm节点不维护状态,所以若是你在内存保存信息(就像本例作的那样),而节点又不幸挂了,你就会丢失全部缓存的消息。
Storm是一个快速失败的系统。拓扑会在抛出异常时挂掉,而后再由Storm重启,恢复到抛出异常前的状态。
接下来你会了解到一些设计spout的技巧,帮助你从多数据源获取数据。
在一个直接链接的架构中,spout直接与一个消息分发器链接(见图4-1)。
图4-1 直接链接的spout
这个架构很容易实现,尤为是在消息分发器是已知设备或已知设备组时。已知设备知足:拓扑从启动时就已知道该设备,并贯穿拓扑的整个生命周期保持不变。未知设备就是在拓扑运行期添加进来的。已知设备组就是从拓扑启动时组内全部设备都是已知的。
下面举个例子说明这一点。建立一个spout使用Twitter流API读取twitter数据流。spout把API看成消息分发器直接链接。从数据流中获得符合track参数的公共tweets(参考twitter开发页面)。完整的例子能够在连接https://github.com/storm-book/examples-ch04-spouts/找到。
spout从配置对象获得链接参数(track,user,password),并链接到API(在这个例子中使用Apache的DefaultHttpClient)。它一次读一行数据,并把数据从JSON转化成Java对象,而后发布它。
public void nextTuple() { //建立http客户端 client = new DefaultHttpClient(); client.setCredentialsProvider(credentialProvider); HttpGet get = new HttpGet(STREAMING_API_URL+track); HttpResponse response; try { //执行http访问 response = client.execute(get); StatusLine status = response.getStatusLine(); if(status.getStatusCode() == 200){ InputStream inputStream = response.getEntity().getContent(); BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); String in; //逐行读取数据 while((in = reader.readLine())!=null){ try{ //转化并发布消息 Object json = jsonParser.parse(in); collector.emit(new Values(track,json)); }catch (ParseException e) { LOG.error("Error parsing message from twitter",e); } } } } catch (IOException e) { LOG.error("Error in communication with twitter api ["+get.getURI().toString()+"], sleeping 10s"); try { Thread.sleep(10000); } catch (InterruptedException e1) {} } }
NOTE:在这里你锁定了nextTuple方法,因此你永远也不会执行ack和fail方法。在真实的应用中,咱们推荐你在一个单独的线程中执行锁定,并维持一个内部队列用来交换数据(你会在下一个例子中学到如何实现这一点:消息队列)。
棒极了!
如今你用一个spout读取Twitter数据。一个明智的作法是,采用拓扑并行化,多个spout从同一个流读取数据的不一样部分。那么若是你有多个流要读取,你该怎么作呢?Storm的第二个有趣的特性(译者注:第一个有趣的特性已经出现过,这句话原文都是同样的,不过按照中文的行文习惯仍是不重复使用措词了)是,你能够在任意组件内(spouts/bolts)访问TopologyContext。利用这一特性,你可以把流划分到多个spouts读取。
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { //从context对象获取spout大小 int spoutsSize = context.getComponentTasks(context.getThisComponentId()).size(); //从这个spout获得任务id int myIdx = context.getThisTaskIndex(); String[] tracks = ((String) conf.get("track")).split(","); StringBuffer tracksBuffer = new StringBuffer(); for(int i=0; i< tracks.length;i++){ //Check if this spout must read the track word if( i % spoutsSize == myIdx){ tracksBuffer.append(","); tracksBuffer.append(tracks[i]); } } if(tracksBuffer.length() == 0) { throw new RuntimeException("没有为spout获得track配置" + " [spouts大小:"+spoutsSize+", tracks:"+tracks.length+"] tracks的数量必须高于spout的数量"); this.track =tracksBuffer.substring(1).toString(); } ... }
利用这一技巧,你能够把collector对象均匀的分配给多个数据源,固然也能够应用到其它的情形。好比说,从web服务器收集日志文件见图4-2
图4-2 直连hash
经过上一个例子,你学会了从一个spout链接到已知设备。你也可使用相同的方法链接未知设备,不过这时你须要借助于一个协同系统维护的设备列表。协同系统负责探察列表的变化,并根据变化建立或销毁链接。好比,从web服务器收集日志文件时,web服务器列表可能随着时间变化。当添加一台web服务器时,协同系统探查到变化并为它建立一个新的spout。见图4-3
图4-3 直连协同
第二种方法是,经过一个队列系统接收来自消息分发器的消息,并把消息转发给spout。更进一步的作法是,把队列系统做为spout和数据源之间的中间件,在许多状况下,你能够利用多队列系统的重播能力加强队列可靠性。这意味着你不须要知道有关消息分发器的任何事情,并且添加或移除分发器的操做比直接链接简单的多。这个架构的问题在于队列是一个故障点,另外你还要为处理流程引入新的环节。
图4-4展现了这一架构模型
图4-4 使用队列系统
NOTE:你能够经过轮询队列或哈希队列(把队列消息经过哈希发送给spouts或建立多个队列使队列spouts一一对应)在多个spouts之间实现并行性。
接下来咱们利用Redis和它的java库Jedis建立一个队列系统。在这个例子中,咱们建立一个日志处理器从一个未知的来源收集日志,利用lpush命令把消息插入队列,利用blpop命令等待消息。若是你有不少处理过程,blpop命令采用了轮询方式获取消息。
咱们在spout的open方法建立一个线程,用来获取消息(使用线程是为了不锁定nextTuple在主循环的调用):
new Thread(new Runnable() { @Override public void run() { try{ Jedis client= new Jedis(redisHost, redisPort); List res = client.blpop(Integer.MAX_VALUE, queues); messages.offer(res.get(1)); }catch(Exception e){ LOG.error("从redis读取队列出错",e); try { Thread.sleep(100); }catch(InterruptedException e1){} } }}).start();
这个线程的唯一目的就是,建立redis链接,而后执行blpop命令。每当收到了一个消息,它就被添加到一个内部消息队列,而后会被nextTuple消费。对于spout来讲数据源就是redis队列,它不知道消息分发者在哪里也不知道消息的数量。
NOTE:咱们不推荐你在spout建立太多线程,由于每一个spout都运行在不一样的线程。一个更好的替代方案是增长拓扑并行性,也就是经过Storm集群在分布式环境建立更多线程。
在nextTuple方法中,要作的唯一的事情就是从内部消息队列获取消息并再次分发它们。
public void nextTuple(){ while(!messages.isEmpty()){ collector.emit(new Values(messages.poll())); }}
NOTE:你还能够借助redis在spout实现消息重发,从而实现可靠的拓扑。(译者注:这里是相对于开头的可靠的消息VS不可靠的消息讲的)
DRPC
DRPCSpout从DRPC服务器接收一个函数调用,并执行它(见第三章的例子)。对于最多见的状况,使用backtype.storm.drpc.DRPCSpout就足够了,不过仍然有可能利用Storm包内的DRPC类建立本身的实现。
小结
如今你已经学习了常见的spout实现模式,它们的优点,以及如何确保消息可靠性。不存在适用于全部拓扑的架构模式。若是你知道数据源,而且可以控制它们,你就可使用直接链接;然而若是你须要添加未知数据源或从多种数据源接收数据,就最好使用消息队列。若是你要执行在线过程,你可使用DRPCSpout或相似的实现。
你已经学习了三种常见链接方式,不过依赖于你的需求仍然有无限的可能。