最近接到一个需求,用一句话来讲就是:展现关注人发布的动态,这个涉及到 feed 流系统的设计。本文主要介绍一个通常企业可用的 Feed 流解决方案。java
下面先介绍一下关于 Feed 流的简单概念。redis
Feed
:Feed 流中的每一条状态或者消息都是 Feed,好比微博中的一条微博就是一个 Feed。Feed流
:持续更新并呈现给用户内容的信息流。每一个人的朋友圈,微博关注页等等都是一个 Feed 流。Feed 流常见的分类有两种:数据库
Timeline
:按发布的时间顺序排序,产品若是选择 Timeline 类型,那么就是认为 Feed 流中的 Feed 很少,可是每一个 Feed 都很重要,都须要用户看到。相似于微信朋友圈,微博等。Rank
:按某个非时间的因子排序,通常是按照用户的喜爱度排序,通常用于新闻推荐类、商品推荐等。设计一个 Feed 流系统,两个关键步骤,一个是 Feed 流的
初始化
,一个是推送
。关于 Feed 流的存储其实也是一个核心的点,可是笔主持久化使用的仍是 MySQL,后续能够考虑优化。缓存
Feed 流【关注页 Feed 流】的初始化指的是,当用户的 Feed 流还不存在的时候,为该用户建立一个属于他本身的关注页 Feed 流,具体怎么作呢?其实很简单,遍历一遍关注列表,取出全部关注用户的 feed,将 feedId 存放到 redis 的 sortSet
中便可。这里面有几个关键点:微信
通过上面的初始化,已经把 feed 流放在了 redis 缓存中了。接下来就是须要更新 feed 流了,在下面四种状况须要进行更新:网络
上面四步具体怎么操做,会在下面的实现步骤中详细描述,在这里先咱们重点讨论一下第1、二种状况。由于在处理 大V 【千万级别粉丝】的时候,咱们是须要对 大V 的全部粉丝的 feed 流进行处理的,这时候涉及到的量就会很是巨大,须要多加斟酌。关于推送,通常有两种 推/拉。并发
推
:A用户发布新的动态时,要往 A用户全部的粉丝 feed 流中推。拉
:A用户发布新的动态时,先不进行推送,而是等 粉丝进来的时候,才主动到 A用户的我的页TimeLine 拉取最新的 feed,而后进行一个 merge。若是关注了多个大V,能够并发的向多个大V 我的页TimeLine 中拉取。当用户发布一条新的 Feed 时,处理流程以下:app
当刷新本身的Feed流的时候,处理流程以下:异步
至此,使用推拉结合方式的发布,读取Feed流的流程都结束了。优化
若是只是用推模式了,则会变的比较简单:
推拉结合存在一个弊端,就是刷新本身的Feed流时,大V的我的页Timeline 的读压力会很大。
如何解决:
笔主主要采用纯推模式实现了一个普通企业基本可用的 Feed 流系统,下面介绍一下具体的实现代码,主要包括3大个部分:
当用户第一进来刷新Feed 流,且 Feed 流还不存在时,咱们须要进行初始化,初始化的具体代码以下:核心思想就是从数据库中load出 feed 信息,塞到 zSet 中,而后分页返回。
/** * 获取关注的人的信息流 */
public List<FeedDto> listFocusFeed(Long userId, Integer page, Integer size) {
String focusFeedKey = "focusFeedKey" + userId;
// 若是 zset 为空,先初始化
if (!zSetRedisTemplate.exists(focusFeedKey)) {
initFocusIdeaSet(userId);
}
// 若是 zset 存在,可是存在 0 值
Double zscore = zSetRedisTemplate.zscore(focusFeedKey, "0");
if (zscore != null && zscore > 0) {
return null;
}
//分页
int offset = (page - 1) * size;
long score = System.currentTimeMillis();
// 按 score 值从大到小从 zSet 中取出 FeedId 集合
List<String> list = zSetRedisTemplate.zrevrangeByScore(focusFeedKey, score, 0, offset, size);
List<FeedDto> result = new ArrayList<>();
if (QlchatUtil.isNotEmpty(list)) {
for (String s : list) {
// 根据 feedId 从缓存中 load 出 feed
FeedDto feedDto = this.loadCache(Long.valueOf(s));
if (feedDto != null) {
result.add(feedDto);
}
}
}
return result;
}
/** * 初始化关注的人的信息流 zSet */
private void initFocusFeedSet( Long userId) {
String focusFeedKey = "focusFeedKey" + userId;
zSetRedisTemplate.del(focusIdeaKey);
// 从数据库中加载当前用户关注的人发布过的 Feed
List<Feed> list = this.feedMapper.listFocusFeed(userId);
if (QlchatUtil.isEmpty(list)) {
//保存0,避免空数据频繁查库
zSetRedisTemplate.zadd(focusFeedKey, 1, "0");
zSetRedisTemplate.expire(focusFeedKey, RedisKeyConstants.ONE_MINUTE * 5);
return;
}
// 遍历 FeedList,把 FeedId 存到 zSet 中
for (Feed feed : list) {
zSetRedisTemplate.zadd(focusFeedKey, feed.getCreateTime().getTime(), feed.getId().toString());
}
zSetRedisTemplate.expire(focusFeedKey, 60 * 60 * 60);
}
复制代码
每当用户发布/删除新的 feed,咱们须要更新该用户全部的粉丝的 Feed流,该步骤通常比较耗时,因此建议异步处理,为了不一次性load出太多的粉丝数据,这里采用循环分页查询。为了不粉丝的 Feed流过大,咱们会限制 Feed 流的长度为1000,当Feed流长度超过1000时,会移除最旧的 Feed。
/** * 新增/删除 feed时,处理粉丝 feed 流 * * @param userId 新增/删除 feed的用户id * @param feedId 新增/删除 的feedId * @param type feed_add = 新增feed feed_sub = 删除feed */
public void handleFeed(Long userId, Long feedId, String type) {
Integer currentPage = 1;
Integer size = 1000;
List<FansDto> fansDtos;
while (true) {
Page page = new Page();
page.setSize(size);
page.setPage(currentPage);
fansDtos = this.fansService.listFans(userId, page);
for (FansDto fansDto : fansDtos) {
String focusFeedKey = "focusFeedKey" + userId;
// 若是粉丝 zSet 不存在,退出
if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
continue;
}
// 新增Feed
if ("feed_add".equals(type)) {
this.removeOldestZset(focusFeedKey);
zSetRedisTemplate.zadd(focusFeedKey, System.currentTimeMillis(), feedId);
}
// 删除Feed
else if ("feed_sub".equals(type)) {
zSetRedisTemplate.zrem(focusFeedKey, feedId);
}
}
if (fansDtos.size() < size) {
break;
}
currentPage++;
}
}
/** * 删除 zSet 中最旧的数据 */
private void removeOldestZset(String focusFeedKey){
// 若是当前 zSet 大于1000,删除最旧的数据
if (this.zSetRedisTemplate.zcard(focusFeedKey) >= 1000) {
// 获取当前 zSet 中 score 值最小的
List<String> zrevrange = this.zSetRedisTemplate.zrevrange(focusFeedKey, -1, -1, String.class);
if (QlchatUtil.isNotEmpty(zrevrange)) {
this.zSetRedisTemplate.zrem(focusFeedKey, zrevrange.get(0));
}
}
}
复制代码
这里比较简单,新增/取消关注,把新关注的 Feed 往本身的 Feed流中增长/删除 便可,可是一样须要异步处理。
/** * 关注/取关 时,处理followerId的zSet * * @param followId 被关注的人 * @param followerId 当前用户 * @param type focus = 关注 unfocus = 取关 */
public void handleFocus( Long followId, Long followerId, String type) {
String focusFeedKey = "focusFeedKey" + userId;
// 若是粉丝 zSet 不存在,退出
if (!this.zSetRedisTemplate.exists(focusFeedKey)) {
return;
}
List<FeedDto> FeedDtos = this.listFeedByFollowId(source, followId);
for (FeedDto feedDto : FeedDtos) {
// 关注
if ("focus".equals(type)) {
this.removeOldestZset(focusFeedKey);
this.zSetRedisTemplate.zadd(focusFeedKey, feedDto.getCreateTime().getTime(), feedDto.getId());
}
// 取关
else if ("unfocus".equals(type)) {
this.zSetRedisTemplate.zrem(focusFeedKey, feedDto.getId());
}
}
}
复制代码
上面展现的是核心代码,仅仅是为你们提供一个实现思路,并非直接可运行的代码,毕竟真正实现还会涉及到不少其余的无关要紧的类。
在这里已经介绍完一个简单可用的 Feed流系统,欢迎各路大神指出错误,多提意见!
参考文章: