有一个客户有给云音箱推送临时广播的需求。应用场景:商场、店铺节日大促活动通知等。html
1.支持批量推送; 2.可按指定时间播报; 3.新设置的临时广播覆盖旧设置的; 4.支持取消还没有播报的临时广播;
延时消息的实现方案有不少(本文不作介绍,自行查找),考虑到现有项目中没有集成可用于延时队列的消息中间件等因素,决定采用Redis实现延时消息。redis
1.借助SortedSet的score特性,将消息指定播报的时间timestamp减基线时间N做为socre进行排序(N为2020-01-01 00:00:00对应的秒数),member=DelayMessge({"msg":"广播内容", "timestamp":"指定播报的时间", "appId":"云音箱设备所属的应用ID"}),key=DelayMessageSet。json
2.考虑到延时消息(DelayMessge)与设备(sn)是一对多的关系,sn须要单独存储,使用数据结构list;list的key=timestamp_appID_message,value=sn;(key由DelayMessage计算而来,保证惟一性);缓存
3.考虑到须要支持覆盖(要求3)和取消操做(要求4),本方案增长一个redis缓存,存储hash对象;
key=DelayMessageHash_(appID), filed=sn value={"score": "对应SortedSet中延时消息的分数", ..., "arg":"拓展字段"}。
注:“要求3”和“要求4”是客户后期提的需求,但客户制定的接口传参中未设置消息id这个概念,消息和sn又是一对多的关系。覆盖或取消操做时,咱们后台服务没法经过客户的接口传参找到以前推送的临时广播。因此采用hash存储,覆盖操做时,将hash表中对应sn的score从新hset便可;同理,取消操做时,将hash表中对应的sn删除便可。数据结构
4.定时器Timer,每隔N分钟检查一次DelayMessageSet,若是有N分钟内须要处理的延时消息DelayMessage,则添加延时任务,一条DelayMessage对应一个延时任务执行。app
5.延时任务执行时,计算list的key,调用lpop取出sn,再根据sn调用hget取到score,检查对应score是否一致。分布式
接收到客户发起的延时推送广播请求后,生产延时消息,若是timestamp - currentTIme > 定时器时间间隔,则直接将添加延时任务。ide
/** * 延时推送 */ private void pushDelay(int currentTime, DelayMessage delayMessage) { //此处无论 DelayTaskProducer producer = new DelayTaskProducer(delayMessage); producer.product(); long delay = delayMessage.getScore() + BASELINE_TIME - currentTime; int reloadInterval = SystemProperty.getIntProperty("config_delay_message_interval"); if(reloadInterval == 0) { //默认十分钟 reloadInterval = 600; } if (delay <= reloadInterval){ logger.debug("消息延时时间小于定时检查时间,直接添加延时任务"); try { String element = JSONObject.toJSONString(delayMessage); PushBatchExecutor.getScheduledThreadPoolExecutor().schedule(new DelayTaskConsumer(element), delay, TimeUnit.SECONDS); } catch (Exception e) { logger.error("直接添加延时任务出错", e); } }
/** * 生产延时消息 */ public void product() { try { long score = delayMessage.getScore(); int companyId = delayMessage.getCompanyId(); String data = JSONObject.toJSONString(delayMessage); //将DelayMessage存入SortedSet RedisManager.zadd("DelayMessageSet", score, data); //listkey:(timestamp)_(companyId)_(message) String listKey = delayMessage.getRedisKey(); //hashKey:DelayMessageHash_(companyId) String hashKey = "DelayMessageHash_" + companyId; //存储json,后期可加入拓展字段 JSONObject valueJson = new JSONObject(); //设置有效时间 long expiredTime = score + DelayTaskConstant.BASELINE_TIME - System.currentTimeMillis() / 1000 + 60; //map<sn, sn所属机构> Map<String, String> map = delayMessage.getDeviceMap(); for (String sn: map.keySet()) { //调用rpush,从列表右边开始放入元素 RedisManager.addListItem(redisKey, sn, (int) expiredTime); valueJson.put("score", score); //将设备号对应的score存入hash表中,便于延时消息覆盖及取消等操做。 RedisManager.hset(hashKey, sn, valueJson.toJSONString()); } }catch (Exception e) { logger.error("延时消息生产失败:", e); } }
包含过时消息清除优化
public class DelayMessageTimer implements Runnable{ private static final Logger logger = LoggerFactory.getLogger(DelayMessageTimer.class); private int interval; public DelayMessageTimer(int interval) { this.interval = interval; } @Override public void run() { try { logger.debug("开始延时消息检查"); int currentTime = (int) (System.currentTimeMillis() / 1000); int startTime = currentTime - DelayTaskConstant.BASELINE_TIME; int endTime = startTime + interval; logger.debug("range:{}to:{}", startTime, endTime); //取出范围内的数据zrangeByScoreWithScores Set<Tuple> tupleSet = RedisManager.zrangeByScoreWithScores("DelayMessageSet", startTime, endTime); if (tupleSet == null || tupleSet.isEmpty()) { logger.debug("当前没有{}秒内须要执行的延时消息", interval); return; } logger.debug("取出{}秒内待处理延迟消息{}个", interval, tupleSet.size()); tupleSet.forEach(tuple -> { String element = tuple.getElement(); double delay = tuple.getScore() + DelayTaskConstant.BASELINE_TIME - currentTime ; logger.debug("{}秒后消费延时消息{}", delay, element); //执行延时任务 PushBatchExecutor.getScheduledThreadPoolExecutor().schedule(new DelayTaskConsumer(element), (long) delay, TimeUnit.SECONDS); }); //检查过时的消息,须要删除 Set<Tuple> expiredSet = RedisManager.zrangeByScoreWithScores("DelayMessageSet", 0d, startTime); if (expiredSet == null || expiredSet.isEmpty()) { logger.debug("没有过时的延时消息"); } else { logger.debug("取出{}个过时消息,执行删除element操做", expiredSet.size()); expiredSet.forEach(tuple -> RedisManager.zrem("DelayMessageSet", tuple.getElement())); } logger.debug("延时消息检查结束"); } catch (Exception e) { logger.error("检查延时消息出现错误", e); } }
延时消息消费时,须要判断hash中sn对应的score与DelayMessage对应的score的一致性。this
public class DelayTaskConsumer implements Runnable{ private static final Logger logger = LoggerFactory.getLogger(DelayTaskConsumer.class); private String element; private DelayMessage delayMessage; public DelayTaskConsumer(String element) { this.element = element; } @Override public void run() { logger.debug("开始消费延时消息"); delayMessage = JSONObject.toJavaObject(JSON.parseObject(element), DelayMessage.class); //须要查询设备 String key = delayMessage.getRedisKey(); String hashKey = "DelayMessageHash_" + delayMessage.getCompanyId(); while (true) { //移出并获取列表的第一个元素,当列表不存在或者为空时,返回Null //延时消息生产时,已进行设备号去重,此处从列表左边一个一个的取出。 String sn= RedisManager.lpop(key); if (null == sn|| "nil".equals(sn)) { break; } //从hash表中取出filed为sn的数据 String value = RedisManager.hget(hashKey, sn); if (null == value || "nil".equals(value)) { //表名执行过取消操做,跳过当前设备 continue; } //取出value,判断score是否一致 JSONObject valueJson = JSON.parseObject(value); long score = valueJson.getLong("score"); if (delayMessage.getScore() != score) { //表名执行过覆盖操做,跳过当前设备 logger.debug("score:{}不一致,跳过当前设备:{}", score, sn); continue; } //删除filed RedisManager.hdel(hashKey, sn); //执行推送 pushAndCreateMessage(sn); } //将消费完的消息从redis中剔除 RedisManager.zrem("DelayMessageSet", element); logger.debug("消费延时消息完成"); } }
因为客户要求的特殊性,本文在实现延时消息时,同时使用了sortedset(用于排序、范围查取)、list(利用lpop特性,避免分布式场景中的重复消费)、hash(用于过滤设备)三种数据结构。虽然说方便覆盖(hset设置新的消息对应的score)和取消(hdel对应的filed)未消费的延时消息,但数据存储显得冗余。本方案彻底依赖Redis,暂未作消息持久化存贮。若是你们对方案优化有啥建议,还望不吝赐教。