最近项目里有需求,在接口调用完毕后将一些消息经过MQ通知给另外一个服务,而且由于业务的缘由,须要停留一分钟再投递到MQ,另外一个团队来消费,我原本想用RabbitMQ(如下简称RMQ)来实现,但通过和同事讨论决定不用RMQ来实现延时,RMQ只充当消息通知,延时在本地进行实现。本地采用一个单机的延时队列,是我另外一个同事写的简单组件,拿过来直接用就好了,把功能作完,顺利上线,可是以后问题仍是暴露了出来,MQ消费者那边的团队反馈消息有时候没收到,因而我开始排查问题所在,确认了我业务代码是没问题的,可是看服务端的日志显示的确没发出去,可是由于是本地的一个延时队列,消息放在内存里,我也无法查队列里的具体消息状况,也不敢下结论是本地延时队列的问题,由于开发环境和测试环境都没出现问题。只能先跟踪一下,次日把前面三天的日志都拉下来看发现问题所在,许多实例都出现了消息在次日要么就当天的很晚的时候才发出去的状况,排除了多是由于虚拟机没有同步宿主机的时间的缘由以后(由于延时队列里面是获取的当前时间和消息建立时间的差来判断时间间隔),我这下基本肯定这个是我同事写的延时队列的问题。html
其实这个延时队列逻辑很简单,数据结构就是一个数组,入队列就是把消息放在可用位置上,到了数量知足必定条件的时候就扩容,出队列的时候全数组扫描,当碰到到期的消息的时候,将消息取出。其实如今看来这个延时队列其实设计得不是很优雅,若是取元素,须要通过不少次扫描大数组,而且扩容的时候对内存的消耗也大,这里代码就不帖了,这个故事告诉咱们,一个组件要给别人用,必需要通过多方面专业的测试才行,此次的问题后来我和同事讨论其实仍是由于有些case没有测试到致使的。另外就是,选用组件的时候最好仍是选用已知的稳定的,由于通过检验的才是出故障可能性比较小的。redis
延时队列在业务中常常会用到,好比网上买个东西,订单生成了可是多少时间内没支付就关闭订单,定时逻辑等等。以前我在学RabbitMQ的时候也实现过相似的功能。具体能够看RabbitMQ延时队列,今天来整理一下去设计一个延时队列须要些什么而且有哪些方案。算法
来粗略看一下里面带的结构有啥数组
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue<E> q = new PriorityQueue<E>(); .....
能够看到实际上这里面的结构是一个PriorityQueue,咱们知道实际上这个队列能够用来作最大堆或者最小堆,取出的元素是经过比较器的规则比较出来的最大值或者最小值。再来看泛型里的参数,是都须要实现一个Delayed的接口的,再来看看接口数据结构
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
从注释中,能够得知这里面的方法是用来获取剩余的时间的。而且这接口仍是实现了比较器的接口的,因此不难推出,这里其实就是经过堆排序,来找到最先过时的元素。也就是最早应该出队列的元素。分布式
简单作个demo,是实现如下Delayed的接口ide
public class DelayTask implements Delayed { /** * 消息编号 * */ private int index; /** * 延时时长+入队时间的值 * */ private long dealAt; public DelayTask(long time,int index){ this.dealAt = time; this.index = index; } @Override public long getDelay(TimeUnit unit) { return unit.convert( dealAt-System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { if(getDelay(TimeUnit.MILLISECONDS)>o.getDelay(TimeUnit.MILLISECONDS)) { return 1; }else { return -1; } } //getter setter }
作个简单的测试测试
public class DelayQueueTest { public static void main(String[] args) throws InterruptedException{ DelayQueue<DelayTask> queue = new DelayQueue<>(); long currentTime = System.currentTimeMillis(); long[] delayTimes = {10000L,5000L,15000L}; for (int i = 0;i < delayTimes.length;i++) { DelayTask t = new DelayTask(delayTimes[i]+currentTime,i); queue.add(t); } while(!queue.isEmpty()) { DelayTask t = queue.take(); if (t != null) { queue.poll(); System.out.println("当前执行的任务编号为:" + t.getIndex()); long timeSpan = System.currentTimeMillis()-currentTime; System.out.println("时间间隔为:"+timeSpan); } Thread.sleep(1000); } } }
运行结果this
当前执行的任务编号为:1 时间间隔为:5001 当前执行的任务编号为:0 时间间隔为:10000 当前执行的任务编号为:2 时间间隔为:15000
那么上面的方法有啥利弊呢,首先优势确定是简单,缺点也显而易见,可靠性差,而且内存占用的问题也很明显。线程
像我前面提到我公司的同事的作法中,循环去遍历整个数组去检测消息是否达到延时时间的方法其实只能适用于小服务而且调用量不大的状况,一旦像调用量大了起来,实际上轮询整个数组去检测消息是否达到延时时间是很低效的。那么在这基础上,能够采用时间轮的办法,一个时间轮表明一个周期,一个周期里分为几个时间间隔,每个时间间隔里包含在这一分钟内全部的定时任务,时间轮在结构上是一个双向链表。
如图所示
假设这里一个时间节点表明一分钟,这里一个时间轮也就是周期为八分钟,当当前时间到达时间节点2的时候,这说明1中的任务已经所有过时且处理完成,时间节点2对应的定时任务就开始处理。这样作的优势是能够经过一个线程监控多个定时任务,可是缺点也很明显,就是时间颗粒度由节点的间隔决定,而且这些任务的时间间隔还须要用一样的时间颗粒度。而且须要考虑,不在时间周期里的任务如何处理。而后延时队列的其余特性都还须要经过本身实现来补上。
代码这里先挖个坑,以后我补上。
Zset的排序功能,直接提供了很方便的解决办法,只要咱们把Score设置为定时任务预计执行时间的时间戳,也就是当前时间+延时的时间,这样排序后首先拿到的就是最先过时的,命令也很简单,就是
ZRANGEBYSCORE key min max
就能够获取到max对应时间戳以前的全部任务。这种作法的优势是,许多功能redis都实现了,好比持久化,高可用性这些。可是缺点也有,那就是消息的延时和咱们轮询读redis的速度有关,获取当前时间以前的定时任务,可能有任务离当前时间比较远,而且消息过多的状况下,redis自己会受必定影响
这个我在前面有写过相似的文章。
阿里的开源消息队列,但我目前还没作太多了解,在我补齐这个中间件的技能点的时候一起补上。