基于redis的zSet集合做数据缓存实现分页查询 java

      需要场景:最近公司要做手机页面展示新闻文章数据查询的优化工作,让我提个优化方案。现状是目前手机页面的数据请求系统后台,系统后台然后调用其他系统的接口,返回分页数据到前台展示,这样一来,用户每次下拉到页面底部加载更多数据都要调用其他接口,用户体验显然不是很好,那有没有更好的方案呢?

      优化方案:redis正好适合在这种场景下使用,用户每次下拉到页面底部,此时从前台页面到系统后台分页(假如每次取10条)取数据,可以直接到redis里取数据,如果redis返回的数据为空或者小于你要取的10条数据,那么调用接口取10条分页数据放入缓存,然后再从缓存里取数据返回到前台。这样的话,只有当其中一个用户第一次查询的时候会调用接口数据存入缓存,以后这个用户或者其他用户再看这个文章信息的时候,就是直接从缓存里取数据,就相当快捷,提高用户体验。经测试,之前每次调用接口在700~800ms左右,现在每次从缓存里取数据,只需要200ms左右,性能显然提升很大。

     前面只提到下拉到页面底部加载更多数据时的情况,其实我们当刷新最新的数据时,这时候该怎么处理呢?事实上系统后台用到了kafka消费者接收从其他后台实时发送的文章数据,这里接收的文章有三种类型:一种是add,就说明这个文章是新增的发布到手机页面的数据;一种是update,就说明这个文章是要更新已经发布的数据,最后一种是del,就说明这个文章是要从手机页面删除的。也就是说,我们一方面可以从接口获取历史的数据,另一方面可以实时获取最新的被发送来的新增文章数据(或者是要修改和删除的)。另外补充一点,为了提升用户一打开手机就能快速的看到新闻信息的体验度,我们在系统启动成功后,默认先调用接口存入缓存10条记录,这样,用户第一次进入手机页面默认就能先从缓存里取10条新闻信息。

    上面说了那么多业务,无非是两点,一:从缓存里获取分页数据;二:对缓存数据进行增删改查的操作。而redis定义了5种数据结构,这5种数据结构类型分别为String(字符串)、List(列表)、Set(集合)、Hash(散列)和 Zset(有序集合)。

下面来对这5种数据结构类型作简单的介绍(表格引用https://www.jianshu.com/p/7bf5dc61ca06文章里的):

结构 类型                 结构存储的值 结构的读写能力
String 可以是字符串、整数或者浮点数 对整个字符串或者字符串的其中一部分执行操作;对象和浮点数执行自增(increment)或者自减(decrement)
List 一个链表,链表上的每个节点都包含了一个字符串 从链表的两端推入或者弹出元素;根据偏移量对链表进行修剪(trim);读取单个或者多个元素;根据值来查找或者移除元素
Set 包含字符串的无序收集器(unorderedcollection),并且被包含的每个字符串都是独一无二的、各不相同 添加、获取、移除单个元素;检查一个元素是否存在于某个集合中;计算交集、并集、差集;从集合里卖弄随机获取元素
Hash 包含键值对的无序散列表 添加、获取、移除单个键值对;获取所有键值对
Zset 字符串成员(member)与浮点数分值(score)之间的有序映射,元素的排列顺序由分值的大小决定 添加、获取、删除单个元素;根据分值范围(range)或者成员来获取元素
       所以,Zset结构正是我们想要的缓存类型,我们把分数score用文章的主键news_id,把每个文章的的内容用json字符串放入member里,redis的数据会根据socre也就是news_id自动排序,我们只需要对redis进行新增、删除的操作就行了(修改可以先删除再新增)。最后还要考虑redis里的数据定期删除的问题,一般来说,设置缓存的过期时间即可,但是设置过期时间是针对key来设置,这里最好的解决方法就是限制缓存的数据的个数,当数据的个数超过设置的限制个数之后,就是从score最低的值开始删除即可。也就是score最低的值,也就是news_id按照自增长的规则,最小的news_id的数据就是比较早的数据,。

    补充说明一些Zset里说数据不重复是指:如果新增一个数据里的member如果缓存里存在,这个数据的socre和member就会覆盖缓存里的数据,也就是说score是在数据里会重复,而member在数据里是不重复的。      

 来一张全部的逻辑图:


   贴出一些主要代码:

一、kafka新增数据(keyNewsList为redis的key值,jm为文章的json数据格式,redisImpNewsListNum是从配置文件里取的redis大小的限值)

//创建zset格式的数据,scorenews_iddouble型,member为每个稿件的数据
double score = Double.parseDouble(jm.getString("news_id"));
redisTemplate.opsForZSet().removeRangeByScore(keyNewsList, score,score);
redisTemplate.opsForZSet().add(keyNewsList, jm.toJSONString(), score);
LOG.debug("rediskafka缓存首次数据成功score:"+score+",key:"+keyNewsList+",member:"+jm.toString());
//测试
System.out.println("新增之后的个数" + redisTemplate.opsForZSet().zCard(keyNewsList));
//追加逻辑:限制keyNewsList的个数
String redisImpNewsListNum = ApplicationSetting.getProperty("redis.impnews.listNum");
if (StringUtils.isNotBlank(redisImpNewsListNum)){
    //配置文件里设置个数限制
    Long redisImpNewsListNumLong=Long.parseLong(redisImpNewsListNum);
    //keyNewsList的个数
    Long keyNewsListSize=redisTemplate.opsForZSet().zCard(keyNewsList);
    //如果keyNewsList的个数 超过 设置的限制的话,从socre最小的值开始删除
    if (keyNewsListSize > redisImpNewsListNumLong){
        redisTemplate.opsForZSet().removeRange(keyNewsList,0,keyNewsListSize-redisImpNewsListNumLong-1);
        LOG.debug("redis里的keyNewsList的个数:"+keyNewsListSize+",超过设置的限值redis.impnews.listNum:"+redisImpNewsListNumLong+",删除超出的数据。");
    }
}
System.out.println("删除之后的个数" + redisTemplate.opsForZSet().zCard(keyNewsList));

二、kafka更新数据

//先删除后新增
redisTemplate.opsForZSet().removeRangeByScore(keyNewsList, score,score);
Boolean aBoolean= redisTemplate.opsForZSet().add(keyNewsList, jo.toJSONString(), score);
if (aBoolean){
    LOG.debug("kafka更新数据,update成功,key:"+keyNewsList+",score:"+score+",member:"+jo.toJSONString());
}

三、kafka删除数据

//通过score来删除缓存里的数据
Double score = Double.parseDouble(data.getString("news_id"));
redisTemplate.opsForZSet().removeRangeByScore(keyNewsList, score,score);
LOG.debug("kafka删除数据,直接delete成功,key:" + keyNewsList + ",score:" + score);

四、系统首次加载存入缓存数据

BSPResponse bspRes = bspClient.getList("",
        "topmaceco,topcptmkt,topmoney,topfxmkt,topbond,topcom", "1", "100", "","0","");
//String keyNewsList = "newsList_redis_*";
String keyNewsList = "newsList_redis_impNews";
LOG.info("redis首页要闻请求bsp接口状态:"+bspRes.getMessage());
if (bspRes.isSuccess()) {
    JSONArray ja = bspRes.getBodyResult().getJSONArray("LIST");
    List<JSONObject> obj = new ArrayList<JSONObject>();
    if(null != ja && ja.size() > 0){
        for (int i = 0; i < ja.size(); i++) {
            JSONObject jm = (JSONObject) ja.get(i);
            obj.add(jm);
        }
    }
    if (obj != null&& obj.size()>0) {
        //清除所有
        redisTemplate.opsForZSet().removeRange(keyNewsList,0,-1);
        for (int i = 0; i < obj.size(); i++) {
            JSONObject jm = obj.get(i);
            String news_id = jm.getString("news_id");
            String info_id = jm.getString("info_id");
            //判断资讯阅读数是否应该增加
            //is_NewReadertrue 为阅读数增加1
            String is_NewReader = "true";
            String keyName_1 = "";
            jm.put("is_newreader", is_NewReader);
            jm.put("flag", "");
            //接口有摘要(news_abst),作者(author), 正文length(data_content_size)、 可分享字段(is_share)
            // 返回给终端的字段有:摘要(news_abst),作者(author), 是否有正文(hasContent)、 是否可分享(isShare)
            jm.put("isShare","0".equals(jm.getString("is_share"))?false:true);
            jm.put("hasContent","0".equals(jm.getString("data_content_size"))? false:true);
            jm.put("news_type", jm.getString("info_type")==null?"":jm.getString("info_type"));

            //放入缓存里(防止数据重复,先删除在新增)
           Double score = Double.parseDouble(news_id);
            redisTemplate.opsForZSet().removeRangeByScore(keyNewsList, score,score);
            redisTemplate.opsForZSet().add(keyNewsList,jm.toJSONString(),score);

        }
        LOG.info("redis首页要闻缓存:" + redisTemplate.opsForZSet().reverseRange(keyNewsList,0,-1));
    }
} else {
    LOG.error("redis首页要闻请求bsp接口返回失败");
}

五、前台调用系统后台

说明:page_news_id是前台传递到后台的最小news_id,根据这值,我们可以定位到缓存的数据位置,然后开始取多条数据。

举例:注意在score 在redis里是double类型

 score           member

44390         {“news_id”:44390,"title":............}

44389         {“news_id”:44389,"title":............}

44385         {“news_id”:44385,"title":............}

44378         {“news_id”:44378,"title":............}

44376         {“news_id”:44376,"title":............}

44374         {“news_id”:44374,"title":............}

44373         {“news_id”:44373,"title":............}

44372         {“news_id”:44372,"title":............}

44370         {“news_id”:44370,"title":............}

44369         {“news_id”:44369,"title":............}

44367         {“news_id”:44367,"title":............}

44365         {“news_id”:44365,"title":............}

....               ........

假如说前台app展示数据已经到44376了,当他下拉数据调用后台接口传递参数page_news_id=44376,pageSize=5,

那么利用reverseRangeByscore(keyNewsList,0,pageScore,1,pageSize)方法,取到的数据就会按照score从大到小排序(RangeByscore是按照从小到大排序):

第一个参数 表示 keyNewsList是key,你要从哪个缓存取数;

第二第三个参数 表示 0 pageScore 表示从socre范围最小是0,最大是pageScore;

第四第五个参数 表示 你要从数据下标开始从1取到pageSize,你要取多个。如果从0开始就会把44376这条数据也会取出来,所以要从1开始取。

取出的结果就是如下数据:

44374         {“news_id”:44374,"title":............}

44373         {“news_id”:44373,"title":............}

44372         {“news_id”:44372,"title":............}

44370         {“news_id”:44370,"title":............}

44369         {“news_id”:44369,"title":............}


if(StringUtils.isNotBlank(page_news_id)){
    System.out.println("下滑分页加载数据");
    //下滑分页加载数据
    pageScore = Double.parseDouble(page_news_id);
    System.out.println("pageScore = " + pageScore);
    set = redisTemplate.opsForZSet().reverseRangeByScore(keyNewsList,0,pageScore,1,pageSize);
    System.out.println("缓存数据大小前"+set.size());

    //缓存里没有数据,则调用渠道整合接口向缓存里插入数据
    if (set == null || set.size()<10){
        bspDataAddToRedis(user_id, classify_code, page_num, page_size,delay, page_news_id,keyNewsList,pageScore,pageSize);
        set = redisTemplate.opsForZSet().reverseRangeByScore(keyNewsList,0,pageScore,1,pageSize);
        System.out.println("缓存数据大小后"+set.size());
    }

} else {
    System.out.println("前台首次加载和下拉刷新最新数据");
    set = redisTemplate.opsForZSet().reverseRange(keyNewsList,0,pageSize-1);
}

 参考资料:

https://www.jianshu.com/p/7bf5dc61ca06

https://www.cnblogs.com/knowledgesea/p/4999288.html

https://my.oschina.net/1107156537/blog/1617252