Storm系列三: Storm消息可靠性保障

Storm系列三: Storm消息可靠性保障

在上一篇 Storm系列二: Storm拓扑设计 中咱们已经设计了一个稍微复杂一点的拓扑。html

而本篇就是在上一篇的基础上再作出必定的调整。前端

在这里先大概提一下上一篇的业务逻辑, 咱们会不断收到来自前端的消息,消息包含消息的发送时间,消息内容,结束标识, 消息的发送者, SessionId等其余信息, 咱们须要作的事情是当接收到消息以后,根据SessionId判断是否属于同一消息, 若是是的话将内容拼接, 若是结束标识为 true, 表示会话已结束,则存入数据库或其余地方, 若是不为true, 则等待, 在1分钟后 仍是没有收到消息, 则存入数据库。java

在上一篇中, 消息内容指示的是用户行为, 所以对于消息的可靠性保障并无要求。node

如今咱们将需求微调,消息内容是系统的日志信息, 并保证日志信息没有遗漏, 由于极可能在未来咱们须要查找到系统的日志消息, 断定某些错误发生的缘由,就要保证毫无遗漏。git

那么什么是消息可靠性呢?github

若是咱们的拓扑由于某种意外终止了,当拓扑再度恢复,总不可能从头开始读取数据,又或者数据由于时效性已经丢失,没法被再度获取, 因此首先,咱们要有一个可靠的数据源。 这意味着须要有保存数据的能力, 除非通知数据已经被消费,不然就不能删除数据, 同时要记录每次消费到某个位置。数据库

当有了可靠的数据源以后,由于故障意外,某个bolt所处的节点挂掉,致使正在处理的数据被丢弃了, 因此须要Spout再度发送数据。 那么第二点,须要一个有重发指定消息的能力。json

咱们已经知足了上述两点,那么Spout是如何得知当前数据已经被处理掉了呢? 不管是成功仍是失败,总须要经过某种途径获取其监听信息。 因此第三点就是,须要一个可以被一路跟踪状态信息的元组流。 也叫作锚定。由于下游并不止一个bolt,可能会在任何一个节点出问题, 因此须要持续跟踪。缓存

第四点是, 须要一个具备容错能力的Storm拓扑。ide

固然咱们可以影响的只有前三点。

在这里选择一个可靠的数据源,文本输入固然是能够的, 实际中使用的是 kafka, RabbitMQ, 等消息队列, 做为可靠数据源的输入。

消息可靠性保障

在Storm中的消息可靠性保障意味着, 消息以元组的形式从spout中发射出来,并通过拓扑中的各个bolt完成处理, 若是一个元组在某一个节点处理失败, Storm会马上得知相信的信息,并通知Spout能够进行相应的处理,不管是重发仍是抛弃(固然,若是是抛弃的话, 这里并无必要采起可靠数据源, 也不怎么须要可靠性保障。由于Spout表现出的行为是对元组的成功失败不闻不问。),直到,这个元组被完成掉。

元组状态

元组有两种状态,ack 和 fail,当Spout发射出一个元组以后,下游的bolt在处理完成以后,可能会发出更多的元组, Storm为spout发出的每个tuple都建立了一个元组树(tupletree), 其中Spout发射的元组成为根元组,当一棵元组树的全部叶节点都完成了对元组的处理,此时storm才会认为 对当前tuple已完成处理。

那么storm中的消息保障自己就是可选的, 你可能在任意节点决定, 当前元组已经完成, 后续的全部处理并不须要再度进行保障性操做。既然自由度是这样高, 所以你须要作这样两件事:

  • 在每一个节点发出元组的时候进行 锚定,也就是意味着storm须要继续跟踪当前元组的状态。

  • 确保你的每个叶节点会对tuple进行应答,告诉storm我已经处理完成了。

固然,没有人能保证bolt永远会作出应答,即便bolt挂掉了,storm依然会跟踪元组状态,在得不到回应的状况下,元组树将会报错, 表示当前tuple处于fail状态, 这个时间的配置是TOPOLOGY_MESSAGE_TIMEOUT_SECS, 缺省配置为30s。

Config conf = new Config();
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 30);

实现

已经有了相应的概念, 下面的部分咱们开始从spout层面,来一步步说明。

代码基于上一篇中的代码进行修改, 开头已经提到过了。

本篇的代码存储在:

git@github.com:zyzdisciple/storm_study.git

的guaranteed_message包下。

须要提到的一点是, 我继承的都是 BaseRichSpout/BaseRichBolt;

在BaseBasic系列中, 已经默认为你作了相关处理。

FileReaderSpout

if (isValid(info, line)) {
    completingMessage(info);
    //判断当前的时间区间.
    long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
    collector.emit(new Values(timeGroup, info), info.hashCode());
}

在这里,核心在:

collector.emit(new Values(timeGroup, info), info.hashCode());

collector.emit的第二个参数配置,就是 msgId, 若是咱们在从Spout中发射数据时, 没有配置messageId,那么storm并不会跟踪元组状态, 即便后续再怎么处理, 也是无效的。

在这里, 我简单的采起了hashCode, 固然也重写了info的hashCode方法, 在实际中, 咱们从kafka数据源中拉取数据的时候, 通常都会有其ID做为惟一性标识, 并不须要去单首创建。

其余工做暂且不提, 让咱们继续。

ContentStitchingBolt

更改的有这样一个个地方:

if (info.getEnd()) {
    collector.emit(input, new Values(info));
    //发送后须要移除相关数据
    collectMap.remove(key);
} else {
    collectMap.put(key, info);
}

会发如今 emit的同时,在第一个emit中传递了 当前tuple做为参数,这就是进行了锚定行为, 将spout发出的tuple与后续的相关联, 能够监听状态, 若是不监听而直接响应ack,那么系统会认为你已经完成了, 若是不监听也不响应,时间到了,系统会认为你超时了。

对于系统消息咱们并无进行 ack处理, 这是由于storm仅跟踪 spout发出的tuple, 对于系统消息, 并不须要理会。

然而,在数据不知足直接发射条件的时候, 咱们对tuple并无进行ack,考虑若是ack,表示tuple在当前节点已经完成处理,若是不存在后续bolt的话, 则能够认为整个tuple都已经处理完毕, 那么在spout中就会删除对应数据, 基于可靠数据源也会忽略该数据, 然而事实上目前的数据咱们是存在内存中的,当bolt挂掉, 则内存相关数据消息, 那么就真的是彻底没法恢复了。

而不进行ack, 那么就会出现这样一个问题, 当咱们的定时器,messageTimeout超时以前, tuple的定时器已经超时了,此时会从新发出一条数据,形成了更多的困扰, 因此必须有这样一条要求:


咱们的tuple超时时间必须大于 messageTimeout。

但这样就不会形成问题了吗? 并非。

若是咱们的messageTimeout设置的时间原本就很长,好比十分钟, 那么tupleTimeout必须大于十分钟,也就是一条tuple发出去以后, 十分钟我才能将其定义为失败状态, 这没什么, 可是十分钟内会有多少条数据累积?

所以另外一条配置也是比较有用的:

Config conf = new Config();
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 30000);

这一条一样是配置给topology级别的:

表示的意思是, 若是在整个拓扑中, 有超过30000条tuple处于未响应状态, 那么spout就中止发送数据, 将其阻塞掉.

但仔细思考会发现仅仅这样处理是不够的,假定存在数据属于同一SessionId:

1,2,3,4,5

按照目前的假设来看,咱们收到1234时既不进行ack,也不锚定,惟有收到5的时候再作处理, 此时是否应该取出1234所属的tuple一一ack?并不合适,理应须要对1234再度进行锚定,由于惟有下游有权决定数据究竟是处理失败了仍是成功了。那咱们对1234再进行锚定发送? 也不合适,由于这意味着要将数据发送5遍,有4条数据是彻底无效的。

那么首先能够肯定的是,当收到中间数据须要进行ack,当真正须要发送数据的时候再进行锚定,也就是收到5的时候进行锚定,锚定的对象又是谁呢?是1所在的tuple。

那再来分析一下,身为数据源Spout,须要知足怎样的特性才可以保证收到2345的时候并不删除数据,只有收到1的时候,再将数据删除掉。

也就是说,数据源是一个队列,惟有当收到第一个数据的ack时,才按顺序检测,一一删除,不然都不删除。

这里咱们作的是简化处理,毕竟真正的可靠消息,有kafka这些专门的消息组件进行保证。

分析了这么久终于能够开始代码了。

而在这以前,小小的总结一下:

  • 惟有当数据肯定不须要再度进行回放,一是数据已经被完全处理掉了,没有利用价值,二是保存在了另外一种可靠的存储结构中, 此时咱们才能进行ack,通知数据源,数据已经无效了。

  • 做为数据源也须要为消息的可靠性提供必定的保障, 不可以跨节点删除, 最好是只可以按序删除, 进行标记删除的处理方式。

数据源调整

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;

/**
* 本身设计的数据源, 须要完成一系列功能。
* @author zyzdisciple
* @date 2019/4/11
*/
public enum DataSource {

    INSTANCE;

    private BufferedReader br;

    private BlockingQueue<Node> queue;

    private AtomicLong seq;

    private BlockingDeque<Long> ackIndexes;

    private static final Object deleteQueueLock = new Object();

    private static final Logger logger = LoggerFactory.getLogger(DataSource.class);

    DataSource() {
        try {
            br = new BufferedReader(new FileReader("E:\\IdeaProjects\\storm_demo\\src\\main\\resources\\user_behavior_data.txt"));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        queue = new LinkedBlockingDeque<>();
        ackIndexes = new LinkedBlockingDeque<>();
    }

    /**
    * 获取一行数据,没有返回null。
    * @return
    */
    public String nextLine() {
        Node node = null;
        try {
            String line = br.readLine();
            if (!line.trim().isEmpty()) {
                node = new Node(seq.getAndIncrement(), line);
                queue.add(node);
            }
        } catch (IOException e) {
            logger.warn("empty queue, e:" + e);
        }
        return node == null ? null : node.getValue();
    }

    /**
    * 成功响应
    * @param seq
    */
    public void ack(Object seq) {
        if (seq == null) {
            return;
        }
        deleteNode(Long.parseLong(seq.toString()));
    }

    private void deleteNode(long seq) {
        synchronized (deleteQueueLock) {
            Node headNode = queue.peek();
            if (headNode != null && headNode.getIndex() == seq) {
                queue.poll();
                //一直向下删除, 直到不等
                deepDelete();
            } else {
                Long headIndex = ackIndexes.peek();
                if (headIndex == null) {
                    ackIndexes.add(seq);
                } else if (seq > headIndex) {
                    ackIndexes.addLast(seq);
                } else if (seq < headIndex) {
                    ackIndexes.addFirst(seq);
                }
            }
        }
    }

    /**
    * 继续向下删除
    */
    private void deepDelete() {
        Node headNode = queue.peek();
        long seq = ackIndexes.peek();
        boolean hasDeleted = false;
        if (headNode != null && headNode.getIndex() == seq) {
            queue.poll();
            ackIndexes.poll();
            //一直向下删除, 直到不等
            deepDelete();
        }
    }
}

其主要有两个功能, 一个是ack,删除缓存数据, 另外一个是取出数据。

spout

@Override
public void nextTuple() {
    Node node = dataSource.nextLine();
    if (node == null) {
        return;
    }
    String line = node.getValue();
    MessageInfo info = gson.fromJson(line, MessageInfo.class);
    if (isValid(info, line)) {
        completingMessage(info);
        //判断当前的时间区间.
        long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
        collector.emit(new Values(timeGroup, info), node.getIndex());
    }
}

主要是改写了nextTuple, 在这里用咱们的“可靠数据源”接收数据,进行响应。


在这里会发现一点特性,贯穿数据源-spout-bolt的整个过程,其中key,也就是咱们定义的ID,起到了桥梁的做用

那么当数据真正处理完成,收到下游的ack以后,又应该做何处理?这就须要关注Spout的接口了。 咱们会注意到, 还提供了这样一个方法:

void ack(Object msgId);

所以咱们重写这个方法就能够了,告诉dataSource当前数据已经处理完了。

@Override
public void ack(Object msgId) {
    dataSource.ack(msgId);
}

那么若是下有数据处理失败了,天然有另外一个方法,fail:

void fail(Object msgId);

那么当咱们接收到fail时, 该从dataSource... 等等,dataSource并无提供根据msgId取出对应数据的功能啊,是咱们疏忽忘记了吗?并非,消息队列,是一个队列,并不支持根据msgId查询返回特定的数据, 大多数状况下, 咱们都须要本身维护相应的数据。

//加入属性cacheMap
private Map<Long, MessageInfo> cacheMap;
//在open方法中初始化
cacheMap = new HashMap<>();
//在nextTuple发射以前
cacheMap.put(node.getIndex(), info);
//在ack中收到消息以后
cacheMap.remove(msgId);
//在fail中
@Override
public void fail(Object msgId) {
    long timeGroup = System.currentTimeMillis() / (BehaviorConstants.SESSION_TIME_OUT_SECS * 1000);
    collector.emit(new Values(timeGroup, cacheMap.get(msgId)), msgId);
}

这样就完成了一个tuple从生到死的过程处理。

等等,在上一篇咱们提到过一个问题,spout同样是能够设置并行度的,也就是说可能会存在多个线程,咱们这么操做cacheMap而且不加锁真的好吗?

内容处理bolt就不贴在这里了,设计仍有必定不合理的地方, 但已经能说明主要问题。外加代码太多, 有兴趣能够本身去github上看一下。

MessageWriterBolt

@Override
public void execute(Tuple input) {
    MessageInfo info = (MessageInfo) input.getValueByField(BehaviorConstants.FIELD_INFO);
    String jsonMessage = null;
    try {
        jsonMessage = gson.toJson(info, MessageInfo.class);
    } catch (Exception e) {
        logger.warn("格式转换失败, e" + e);
        collector.ack(input);
    }
    try {
        pw.println(jsonMessage);
        pw.flush();
    } catch (Exception e) {
        logger.error("写入文件失败, e:" + e);
        collector.fail(input);
    }
}

在这里,对于两种不一样的状况会看到咱们的 ack ,fail处理方式有所不一样, 为何呢?

对于错误咱们分为,已知的和未知的, 对于已知的错误,也有两种,可重试,和不可重试,对于不可重试错误,数据错了就错了,再试一千次也是错的,因此直接响应ack。

对于可重试错误,如数据库插入失败等其余状况,就能够告知拓扑失败信息,促使重试。

而对于未知错误,那天然是没办法处理了,只能等到它发生变成已知错误,再处理。

结语

在本章主要讲了数据的可靠性保障相关的东西, 了解了实现可靠性的基本要求是, 一个可靠的数据源, 一个锚定的元组流, 一个可以感知并处理元组状态的spout。

还有很重要的一点没有提到, 是 一个容错性的拓扑。

概念比较宽泛,须要考虑到整个拓扑若是挂掉,如何恢复数据,从上次的某个地方继续向下读取数据, 若是某个bolt挂掉,相应的数据ack相关又该怎样处理, 以及与外界交互的,如文件流,数据库写入等等地方, 出现问题又该怎样处理?

同时,对于数据处理可靠性级别有这样几种:

最多一次, 至少一次, 仅一次。

最多一次就意味着能够不处理数据, 不可靠的数据源就是这样的

至少一次,只要咱们可以对拓扑中的 ack,fail使用的谨慎而明白,这一点也是很好保证的。

仅一次, 若是咱们在处理的是扣费项目, 由于数据从新发送,致使重复扣费,别人会投诉你的。 因此须要对数据加入惟一性标识, 而且将数据的处理状态, 处理节点等等都交给另外一个可靠的系统进行维护。

在本身设计的时候,有这样一个简单的处理办法:

对于咱们处理的每个节点,举个例子:

有8个流程须要执行,顺序未必一致。

咱们只须要始终在一个可靠的地方,维护数据状态:

000 000 00 8位0,当某个节点被处理,即置为1, 当节点再度收到数据,便知道是否 处理。但依然要当心,在存储状态及发送数据的中间,拓扑挂掉,等等。

关于Storm中Ack的详细机制:Apache Storm 实时流处理系统ACK机制以及源码分析

相关文章
相关标签/搜索