在上一篇 Storm系列二: Storm拓扑设计 中咱们已经设计了一个稍微复杂一点的拓扑。html
而本篇就是在上一篇的基础上再作出必定的调整。前端
在这里先大概提一下上一篇的业务逻辑, 咱们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其余信息, 咱们须要作的事情是当接收到消息以后,根据SessionId判断是否属于同一消息, 若是是的话将内容拼接, 若是结束标识为 true, 表示会话已结束,则存入数据库或其余地方, 若是不为true, 则等待, 在1分钟后 仍是没有收到消息, 则存入数据库。java
在上一篇中, 消息内容指示的是用户行为, 所以对于消息的可靠性保障并无要求。node
如今咱们将需求微调,消息内容是系统的日志信息, 并保证日志信息没有遗漏, 由于极可能在未来咱们须要查找到系统的日志消息, 断定某些错误发生的缘由,就要保证毫无遗漏。git
那么什么是消息可靠性呢?github
若是咱们的拓扑由于某种意外终止了,当拓扑再度恢复,总不可能从头开始读取数据,又或者数据由于时效性已经丢失,没法被再度获取, 因此首先,咱们要有一个可靠的数据源。 这意味着须要有保存数据的能力, 除非通知数据已经被消费,不然就不能删除数据, 同时要记录每次消费到某个位置。数据库
当有了可靠的数据源以后,由于故障意外,某个bolt所处的节点挂掉,致使正在处理的数据被丢弃了, 因此须要Spout再度发送数据。 那么第二点,须要一个有重发指定消息的能力。json
咱们已经知足了上述两点,那么Spout是如何得知当前数据已经被处理掉了呢? 不管是成功仍是失败,总须要经过某种途径获取其监听信息。 因此第三点就是,须要一个可以被一路跟踪状态信息的元组流。 也叫作锚定。由于下游并不止一个bolt,可能会在任何一个节点出问题, 因此须要持续跟踪。缓存
第四点是, 须要一个具备容错能力的Storm拓扑。ide
固然咱们可以影响的只有前三点。
在这里选择一个可靠的数据源,文本输入固然是能够的, 实际中使用的是 kafka, RabbitMQ, 等消息队列, 做为可靠数据源的输入。
在Storm中的消息可靠性保障意味着, 消息以元组的形式从spout中发射出来,并通过拓扑中的各个bolt完成处理, 若是一个元组在某一个节点处理失败, Storm会马上得知相信的信息,并通知Spout能够进行相应的处理,不管是重发仍是抛弃(固然,若是是抛弃的话, 这里并无必要采起可靠数据源, 也不怎么须要可靠性保障。由于Spout表现出的行为是对元组的成功失败不闻不问。),直到,这个元组被完成掉。
元组有两种状态,ack 和 fail,当Spout发射出一个元组以后,下游的bolt在处理完成以后,可能会发出更多的元组, Storm为spout发出的每个tuple都建立了一个元组树(tupletree), 其中Spout发射的元组成为根元组,当一棵元组树的全部叶节点都完成了对元组的处理,此时storm才会认为 对当前tuple已完成处理。
那么storm中的消息保障自己就是可选的, 你可能在任意节点决定, 当前元组已经完成, 后续的全部处理并不须要再度进行保障性操做。既然自由度是这样高, 所以你须要作这样两件事:
在每一个节点发出元组的时候进行 锚定,也就是意味着storm须要继续跟踪当前元组的状态。
确保你的每个叶节点会对tuple进行应答,告诉storm我已经处理完成了。
固然,没有人能保证bolt永远会作出应答,即便bolt挂掉了,storm依然会跟踪元组状态,在得不到回应的状况下,元组树将会报错, 表示当前tuple处于fail状态, 这个时间的配置是TOPOLOGY_MESSAGE_TIMEOUT_SECS, 缺省配置为30s。
Config conf = new Config(); conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30);
已经有了相应的概念, 下面的部分咱们开始从spout层面,来一步步说明。
代码基于上一篇中的代码进行修改, 开头已经提到过了。
本篇的代码存储在:
git@github.com:zyzdisciple/storm_study.git
的guaranteed_message包下。
须要提到的一点是, 我继承的都是 BaseRichSpout/BaseRichBolt;
在BaseBasic系列中, 已经默认为你作了相关处理。
if (isValid(info, line)) { completingMessage(info); //判断当前的时间区间. long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000); collector.emit(new Values(timeGroup, info), info.hashCode()); }
在这里,核心在:
collector.emit(new Values(timeGroup, info), info.hashCode());
collector.emit的第二个参数配置,就是 msgId, 若是咱们在从Spout中发射数据时, 没有配置messageId,那么storm并不会跟踪元组状态, 即便后续再怎么处理, 也是无效的。
在这里, 我简单的采起了hashCode, 固然也重写了info的hashCode方法, 在实际中, 咱们从kafka数据源中拉取数据的时候, 通常都会有其ID做为惟一性标识, 并不须要去单首创建。
其余工做暂且不提, 让咱们继续。
更改的有这样一个个地方:
if (info.getEnd()) { collector.emit(input, new Values(info)); //发送后须要移除相关数据 collectMap.remove(key); } else { collectMap.put(key, info); }
会发如今 emit的同时,在第一个emit中传递了 当前tuple做为参数,这就是进行了锚定行为, 将spout发出的tuple与后续的相关联, 能够监听状态, 若是不监听而直接响应ack,那么系统会认为你已经完成了, 若是不监听也不响应,时间到了,系统会认为你超时了。
对于系统消息咱们并无进行 ack处理, 这是由于storm仅跟踪 spout发出的tuple, 对于系统消息, 并不须要理会。
然而,在数据不知足直接发射条件的时候, 咱们对tuple并无进行ack,考虑若是ack,表示tuple在当前节点已经完成处理,若是不存在后续bolt的话, 则能够认为整个tuple都已经处理完毕, 那么在spout中就会删除对应数据, 基于可靠数据源也会忽略该数据, 然而事实上目前的数据咱们是存在内存中的,当bolt挂掉, 则内存相关数据消息, 那么就真的是彻底没法恢复了。
而不进行ack, 那么就会出现这样一个问题, 当咱们的定时器,messageTimeout超时以前, tuple的定时器已经超时了,此时会从新发出一条数据,形成了更多的困扰, 因此必须有这样一条要求:
咱们的tuple超时时间必须大于 messageTimeout。
但这样就不会形成问题了吗? 并非。
若是咱们的messageTimeout设置的时间原本就很长,好比十分钟, 那么tupleTimeout必须大于十分钟,也就是一条tuple发出去以后, 十分钟我才能将其定义为失败状态, 这没什么, 可是十分钟内会有多少条数据累积?
所以另外一条配置也是比较有用的:
Config conf = new Config(); conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30000);
这一条一样是配置给topology级别的:
表示的意思是, 若是在整个拓扑中, 有超过30000条tuple处于未响应状态, 那么spout就中止发送数据, 将其阻塞掉.
但仔细思考会发现仅仅这样处理是不够的,假定存在数据属于同一SessionId:
1,2,3,4,5
按照目前的假设来看,咱们收到1234时既不进行ack,也不锚定,惟有收到5的时候再作处理, 此时是否应该取出1234所属的tuple一一ack?并不合适,理应须要对1234再度进行锚定,由于惟有下游有权决定数据究竟是处理失败了仍是成功了。那咱们对1234再进行锚定发送? 也不合适,由于这意味着要将数据发送5遍,有4条数据是彻底无效的。
那么首先能够肯定的是,当收到中间数据须要进行ack,当真正须要发送数据的时候再进行锚定,也就是收到5的时候进行锚定,锚定的对象又是谁呢?是1所在的tuple。
那再来分析一下,身为数据源Spout,须要知足怎样的特性才可以保证收到2345的时候并不删除数据,只有收到1的时候,再将数据删除掉。
也就是说,数据源是一个队列,惟有当收到第一个数据的ack时,才按顺序检测,一一删除,不然都不删除。
这里咱们作的是简化处理,毕竟真正的可靠消息,有kafka这些专门的消息组件进行保证。
分析了这么久终于能够开始代码了。
而在这以前,小小的总结一下:
惟有当数据肯定不须要再度进行回放,一是数据已经被完全处理掉了,没有利用价值,二是保存在了另外一种可靠的存储结构中, 此时咱们才能进行ack,通知数据源,数据已经无效了。
做为数据源也须要为消息的可靠性提供必定的保障, 不可以跨节点删除, 最好是只可以按序删除, 进行标记删除的处理方式。
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; import java.util.concurrent.BlockingDeque; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicLong; /** * 本身设计的数据源, 须要完成一系列功能。 * @author zyzdisciple * @date 2019/4/11 */ public enum DataSource { INSTANCE; private BufferedReader br; private BlockingQueue<Node> queue; private AtomicLong seq; private BlockingDeque<Long> ackIndexes; private static final Object deleteQueueLock = new Object(); private static final Logger logger = LoggerFactory.getLogger(DataSource.class); DataSource() { try { br = new BufferedReader(new FileReader("E:\\IdeaProjects\\storm_demo\\src\\main\\resources\\user_behavior_data.txt")); } catch (FileNotFoundException e) { e.printStackTrace(); } queue = new LinkedBlockingDeque<>(); ackIndexes = new LinkedBlockingDeque<>(); } /** * 获取一行数据,没有返回null。 * @return */ public String nextLine() { Node node = null; try { String line = br.readLine(); if (!line.trim().isEmpty()) { node = new Node(seq.getAndIncrement(), line); queue.add(node); } } catch (IOException e) { logger.warn("empty queue, e:" + e); } return node == null ? null : node.getValue(); } /** * 成功响应 * @param seq */ public void ack(Object seq) { if (seq == null) { return; } deleteNode(Long.parseLong(seq.toString())); } private void deleteNode(long seq) { synchronized (deleteQueueLock) { Node headNode = queue.peek(); if (headNode != null && headNode.getIndex() == seq) { queue.poll(); //一直向下删除, 直到不等 deepDelete(); } else { Long headIndex = ackIndexes.peek(); if (headIndex == null) { ackIndexes.add(seq); } else if (seq > headIndex) { ackIndexes.addLast(seq); } else if (seq < headIndex) { ackIndexes.addFirst(seq); } } } } /** * 继续向下删除 */ private void deepDelete() { Node headNode = queue.peek(); long seq = ackIndexes.peek(); boolean hasDeleted = false; if (headNode != null && headNode.getIndex() == seq) { queue.poll(); ackIndexes.poll(); //一直向下删除, 直到不等 deepDelete(); } } }
其主要有两个功能, 一个是ack,删除缓存数据, 另外一个是取出数据。
@Override public void nextTuple() { Node node = dataSource.nextLine(); if (node == null) { return; } String line = node.getValue(); MessageInfo info = gson.fromJson(line, MessageInfo.class); if (isValid(info, line)) { completingMessage(info); //判断当前的时间区间. long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000); collector.emit(new Values(timeGroup, info), node.getIndex()); } }
主要是改写了nextTuple, 在这里用咱们的“可靠数据源”接收数据,进行响应。
在这里会发现一点特性,贯穿数据源-spout-bolt的整个过程,其中key,也就是咱们定义的ID,起到了桥梁的做用
那么当数据真正处理完成,收到下游的ack以后,又应该做何处理?这就须要关注Spout的接口了。 咱们会注意到, 还提供了这样一个方法:
void ack(Object msgId);
所以咱们重写这个方法就能够了,告诉dataSource当前数据已经处理完了。
@Override public void ack(Object msgId) { dataSource.ack(msgId); }
那么若是下有数据处理失败了,天然有另外一个方法,fail:
void fail(Object msgId);
那么当咱们接收到fail时, 该从dataSource... 等等,dataSource并无提供根据msgId取出对应数据的功能啊,是咱们疏忽忘记了吗?并非,消息队列,是一个队列,并不支持根据msgId查询返回特定的数据, 大多数状况下, 咱们都须要本身维护相应的数据。
//加入属性cacheMap private Map<Long, MessageInfo> cacheMap; //在open方法中初始化 cacheMap = new HashMap<>(); //在nextTuple发射以前 cacheMap.put(node.getIndex(), info); //在ack中收到消息以后 cacheMap.remove(msgId); //在fail中 @Override public void fail(Object msgId) { long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000); collector.emit(new Values(timeGroup, cacheMap.get(msgId)), msgId); }
这样就完成了一个tuple从生到死的过程处理。
等等,在上一篇咱们提到过一个问题,spout同样是能够设置并行度的,也就是说可能会存在多个线程,咱们这么操做cacheMap而且不加锁真的好吗?
内容处理bolt就不贴在这里了,设计仍有必定不合理的地方, 但已经能说明主要问题。外加代码太多, 有兴趣能够本身去github上看一下。
@Override public void execute(Tuple input) { MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO); String jsonMessage = null; try { jsonMessage = gson.toJson(info, MessageInfo.class); } catch (Exception e) { logger.warn("格式转换失败, e" + e); collector.ack(input); } try { pw.println(jsonMessage); pw.flush(); } catch (Exception e) { logger.error("写入文件失败, e:" + e); collector.fail(input); } }
在这里,对于两种不一样的状况会看到咱们的 ack ,fail处理方式有所不一样, 为何呢?
对于错误咱们分为,已知的和未知的, 对于已知的错误,也有两种,可重试,和不可重试,对于不可重试错误,数据错了就错了,再试一千次也是错的,因此直接响应ack。
对于可重试错误,如数据库插入失败等其余状况,就能够告知拓扑失败信息,促使重试。
而对于未知错误,那天然是没办法处理了,只能等到它发生变成已知错误,再处理。
在本章主要讲了数据的可靠性保障相关的东西, 了解了实现可靠性的基本要求是, 一个可靠的数据源, 一个锚定的元组流, 一个可以感知并处理元组状态的spout。
还有很重要的一点没有提到, 是 一个容错性的拓扑。
概念比较宽泛,须要考虑到整个拓扑若是挂掉,如何恢复数据,从上次的某个地方继续向下读取数据, 若是某个bolt挂掉,相应的数据ack相关又该怎样处理, 以及与外界交互的,如文件流,数据库写入等等地方, 出现问题又该怎样处理?
同时,对于数据处理可靠性级别有这样几种:
最多一次, 至少一次, 仅一次。
最多一次就意味着能够不处理数据, 不可靠的数据源就是这样的
至少一次,只要咱们可以对拓扑中的 ack,fail使用的谨慎而明白,这一点也是很好保证的。
仅一次, 若是咱们在处理的是扣费项目, 由于数据从新发送,致使重复扣费,别人会投诉你的。 因此须要对数据加入惟一性标识, 而且将数据的处理状态, 处理节点等等都交给另外一个可靠的系统进行维护。
在本身设计的时候,有这样一个简单的处理办法:
对于咱们处理的每个节点,举个例子:
有8个流程须要执行,顺序未必一致。
咱们只须要始终在一个可靠的地方,维护数据状态:
000 000 00 8位0,当某个节点被处理,即置为1, 当节点再度收到数据,便知道是否 处理。但依然要当心,在存储状态及发送数据的中间,拓扑挂掉,等等。
关于Storm中Ack的详细机制:Apache Storm 实时流处理系统ACK机制以及源码分析