深度解析某头条的一道面试题

首先,某头条的文章量、用户量都是很大的,点击量那就更恐怖了。 请问,若是实时展示热门文章,好比近8小时点击量最大的文章前100名。 若是是你来开发这个功能,你怎么作?node

这个好办啊,redis一个sortedset搞定啊,score计数,key是文章ID,不就ok了么?redis

回答的不错,你能够走了!算法

要听清题目,说好的8小时动态时间窗口,计数是会过时的。还有,头条的量有这么小么,一个redis就搞定了?同窗啊,我告诉你,文章的量你起码得估计个几十万,用户你得估计几个亿,点击量你至少得估计个1M/s吧。数据库

数据接收

1M/s的点击并发量,确定是须要分布式了。客户端可能会为了减轻服务器的压力而选择延迟合并点击请求进行批量发送。简单起见,这里就使用HTTP协议吧。咱们先不考虑恶意用户刷点击的行为。安全

服务器确定会有多台机器多进程部署来接受点击请求,接收到的请求在进行参数解析后,被发送到存储单元。为了减轻存储的压力,每一个进程可能会使用小窗口聚合数据,每隔一小段时间将窗口内的数据聚合起来一块儿发给存储单元。bash

数据存储

点击数据是很重要的数据,用户的兴趣偏好就靠它了。这么大的点击数据若是所有用内存装的话,成本过高。因此别期望彻底使用redis了。服务器

拿kafka存是一个好办法,ZeroCopy机制并发量很高,数据持久化在磁盘里成本低。不过kafka的数据通常是有过时时间的,若是想彻底记住用户的点击以便作长期的数据分析,少不了要使用hdfs了。多线程

可是由于要作准实时统计,hdfs可不适合干这个,hdfs适合作离线统计的数据源。因此还得靠kafka接数据,而后消费者一边入hdfs,一边作实时统计。并发

实时统计可使用spark stream、storm接受kafka的输入,也能够本身手写。机器学习

分布式TopN算法

用户太多,用户表按用户ID哈希分红了1024张子表。用户表里有一个字段score,表示这个用户的积分数。如今咱们要计算前100名积分最多的用户以及积分数,该怎么查询?

若是是单个表,一个SQL也就搞定了

select id, score from user order by score desc limit 100
复制代码

若是是多个子表,你得在每一个子表上都进行一次TopN查询,而后聚合结果再作一次TopN查询。下面是伪代码

candidates = []
for k in range(1024):
    # 每一个表都取topn
    rows = select id, score from user_${k} order by score desc limit 100
    # 聚合结果
    candidates.extend(rows)
# 根据score倒排
candidates = sorted(candidates, key=lambda t: t[1], reverse=True)
# 再取topn
candidates[:100]
复制代码

子表查询能够多线程并行,提升聚合效率。

滑动窗口

8小时的滑动窗口,意味着新的数据源源不断的进来,旧的数据时时刻刻在淘汰。严格来讲,精准的8小时滑动窗口要求每条数据要严格的过时,差了1秒都不行,到点了就当即被淘汰。

精准的代价是咱们要为每条点击记录都设置过时时间,过时时间自己也是须要存储的,并且过时策略还须要定时扫描时间堆来确认哪些记录过时了。量大的时候这些都是不容小嘘的负担。

可是在业务上来说,排行版没有必要作到如此的精准,误差个几分钟这都不是事。

业务上的折中给服务的资源优化带来了机遇。咱们对时间片进行了切分,一分钟一个槽来进行计数。下面是伪代码

class HitSlot {
    long timestamp; # earlies timestamp
    map[int]int hits;  # post_id => hits
    
    void onHit(int postId, int hits) {
        this.hits[postId] += hits;
    }
}

class WindowSlots {
    HitSlot currentSlot;  # current active slots
    LinkedList<HitSlot> historySlots;  # history unactive slots
    map[int]int topHits; # topn posts
    
    void onHit(int postId, int hits) {  # 由于上游有合并点击,因此有了hits参数
        long ts = System.currentTimeMillis();
        if(this.currentSlot == null) { # 建立第一个槽
            this.currentSlot == new HitSlot(ts);
        } elif(ts - this.currentSlot.timestamp > 60 * 1000) {  # 建立下一个槽,一分钟一个槽
            this.historySlots.add(this.currentSlot);
            this.currentSlot = new HitSlot(ts);
        }
        this.currentSlot.onHit(postId, hits);
    }
    
    void onBeat() {  # 维护窗口,移除过时的槽,而后统计topn,30s~60s调用一次
        if(historySlots.isEmpty()) {
            return;
        }
        HitSlot slot = historySlots[0];
        long ts = System.currentTimeMillis();
        if(ts - slot.timestamp > 8 * 60 * 60 * 1000) {  # 过时了8小时,移掉第一个
            historySlots.remove(0);
            topHits = topn(aggregateSlots(historySlots));  # 计算topn的帖子
        }
    }
}

复制代码

上面的代码表明着每一个分布式子节点的逻辑,由于是伪代码,因此加锁问题就不细写了。 它的目标就是定时维持一个8小时的统计窗口,并汇聚topn的热帖放在内存里。 这个topn的数据并非特别实时,有一个大约1分钟的短暂的时间窗口。

定时任务

每一个子节点都会有一个定时任务去负责维持统计窗口,过时失效的统计数据,计算局部的topn热帖。

如今每一个子节点都有了各自的局部topn热帖,那么还须要一个主节点去汇总这些局部热点,而后计算去全局热帖。

主节点也不必特别实时,按期从子节点拉取topn数据便可,也可让字节点主动汇报。

class HotPostsAggregator {
    map[int]map[int]int localTopnPosts;  # nodeId => topn posts
    map[int]int globalTopnPosts;
    
    void onBeat() {
        // do aggregate
        // save globalTopnPosts to redis
    }
    
    void onLocalReport(int nodeId, map[int]int topnPosts) {
        // 子节点上报局部热帖
    }
}
复制代码

散列

按照头条的文章至少几十万篇,若是每一个子节点都要对全部的文章统计点击数,彷佛也会占用很多内存,聚合和排序热帖也会有很多计算量。最好的想法是每一个子节点只负责一部分文章的统计,这样能够明显节省计算资源。

咱们将kafka的分区数设置为字节点的数量,这样每一个节点负责消费一个分区的数据。在kafka生产端,对点击记录的帖子ID进行散列,保证相同文章ID的点击流进入相同的分区,最终流向同一个统计子节点。

消费者挂了

当机器增多时,节点挂掉的几率也会增大。硬件可能损坏,电源可能掉电,人为操做失误。若是没有作任何防范措施,当一个字节点挂掉时,该节点上8个小时时间窗口的统计数据将会丢失。该节点所管理的局部热点文章就丧失了进入全局热帖的机会。

这可能不会对产品和体验上带来很大的伤害,节点重启8小时以后也就彻底恢复了。并且这8小时以内,丧失了部分文章的热点投票权也不会对总体业务带来巨大影响。

可是咱们都但愿系统能够更加完美一点不是么?当节点挂掉时,咱们但愿能够快速恢复状态,这也是能够作到的,难度也不是很大,不过是定时作一下checkpoint,将当前的状态持久化到本地文件或者数据库中。由于每一个子节点管理的文章不会太多,因此须要序列化的内容也不会太大。当节点重启时,从持久化的checkpoint中将以前的状态恢复出来,而后继续进行消费和统计。

若是你使用的是spark-stream,它内置的checkpoint功能会让你实现备份和恢复会更加简单,更加安全。

若是你不想作checkpoint,办法仍是有的,就是可能耗时旧一点。那就是对hdfs中的存储的全部的点击流数据进行一次mapreduce,将8小时窗口内的点击流的点击量统计出来,而后想办法导入到字节点进程中去。

这要求hdfs的数据也是散列存储的,和kafka对应,这样能够快速圈出须要统计的数据范围。也许会由于mapreduce自己会耗时一点时间,最终致使恢复的数据没有那么准确,不过这关系也不大,咱们用这样粗糙的方法,能对得起那9.5成的数据已经作的很不错了。

点击去重

上面讲了一堆堆,代码敲了很多图画了很多,彷佛颇有道理。可是还有个重要的没提到,那就是点击去重。若是一个用户反复点击了不少次,那该如何计数比较合理。

一篇好的文章若是它不是过短的话,通常会吸引读者反复阅读不少次。这个计数若是彻底去重了记为一次彷佛也不太合理。可是若是是故意被人反复点击而被记了太屡次明显也很差。那该如何选择呢?

首先要从客户端下手,客户端自己能够过滤一部分无效点击。同一篇文章在过短的时间内被当前用户反复点击,这个模式仍是很好发现的。若是间隔时间比较长,那就是读者的回味点击,属于文章的正向反馈,应该记录下来。

客户端作好了,而后再从服务器端下手,服务器端下手就比较困难了。要探测用户的行为模式意味着要对用户的行为状态化,这样就会大量加剧服务器的存储负担。

服务器还须要防止用户的防刷行为。若是缺失防刷控制,一个头条号能够经过这种漏洞来使得本身的文章非法得到大量点击,进入热门文章列表,打上热门标签,被海量的用户看到,就会得到较大的经济效益,即便这篇文章内容自己吸引力并不足够。

当用户发现这样差劲的内容也能上热门榜单时,无疑会对产品产生必定的质疑。若是这种行为泛滥开来,那就可能对产品形成比较致命的负面影响。

防刷是一门大型课题,本篇内容就不作详细讲解了,笔者在这方面也不是什么专家。简单点说放刷本质上就是提取恶意行为的特征。常见的策略就是同一篇文章被来自于同一个IP或者有限的几个IP的频繁点击请求,这时就可使用封禁IP的招数来搞定。还可使用用户反馈机制来识别非正常的热门内容,而后人工干预等。业界还有一些更高级的如机器学习深度学习等方法来防刷,这些读者均可以自行搜索研究。

阅读相关文章,关注公众号【码洞】

相关文章
相关标签/搜索