最近项目中有个业务,须要对用户新增任务到期后进行业务处理。使用定时任务定时扫描过时时间,浪费资源,且不实时。只能使用延时队列处理。java
第一想到的是java自带的延时队列delayqueue。算法
首先实现一个Delyed类。数据库
实现两个最重要方法。第一个是队列里面的消息排序。DelayQueue底层使用的是阻塞队列。队列的消费端会去take队列的头部元素,没有元素就阻塞在那里。所以,延迟队列中的元素必须按执行时间顺序排列。数据结构
@Override public int compareTo(Delayed delayed) { Message message = (Message) delayed; return this.exceptTime > message.getExceptTime() ? 1 : 0; }
第二个方法是剩余时间延迟时间。每加入一个元素时将延迟时间传入,获得一个预期执行时间。每当执行此方法的时候,使用预期时间减去当前时间,即时剩余延迟时间。换句话说,还有多长时间执行。为0时当即执行。框架
@Override public long getDelay(TimeUnit unit) { System.out.println(exceptTime - System.nanoTime()); return unit.convert(exceptTime - System.nanoTime(), TimeUnit.SECONDS); }
所有代码:dom
import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; public class Message implements Delayed{ private Integer id; private String content; private long delay;//延迟时间 private long exceptTime;//执行时间 public Message() {} public Message(Integer id, String content, long delay) { this.id = id; this.content = content; this.delay = delay; this.exceptTime = System.nanoTime() + delay; } @Override public int compareTo(Delayed delayed) { Message message = (Message) delayed; return this.exceptTime > message.getExceptTime() ? 1 : 0; } @Override public long getDelay(TimeUnit unit) { System.out.println(exceptTime - System.nanoTime()); return unit.convert(exceptTime - System.nanoTime(), TimeUnit.SECONDS); } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public long getDelay() { return delay; } public void setDelay(long delay) { this.delay = delay; } public long getExceptTime() { return exceptTime; } public void setExceptTime(long exceptTime) { this.exceptTime = exceptTime; } }
而后初始化一个DelayQueue,加入任务。并建立一个线程异步执行。异步
DelayQueue<Message> delayqueue = new DelayQueue<>(); Random random = new Random(); for (int i = 0; i < 10; i++) { Message message = new Message(i, "content" + i, random.nextInt(1000000)); delayqueue.add(message); } new Thread(new Runnable() { @Override public void run() { while (true) { Message message; try { message = delayqueue.take(); System.out.println("message = " + message.getId()); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start();
缺陷分布式
1.毕竟是jdk级别的,不可能作过多的封装。不少API并非那么好直接使用。好比直接传入一个延迟时间是并不能自动实现的,须要手动封装。ide
2.DelayQueue并无长度限制。有内存占用的风险。post
3.效率,稳定性方面,在DelayQueue自己确定是没有问题的,可是在项目中使用,势必须要作一些封装,直接上生产环境内心并无底。
netty毕竟是一个大名鼎鼎的框架,普遍使用于业界。它有许多心跳检测等定时任务,使用延时队列来实现。HashedWheelTimer底层数据结构依然是使用DelayedQueue。加上一种叫作时间轮的算法来实现。
关于时间轮算法,有点相似于HashMap。在new 一个HashedWheelTimer实例的时候,能够传入几个参数。
第一,一个时间长度,这个时间长度跟具体任务什么时候执行没有关系,可是跟执行精度有关。这个时间能够看做手表的指针循环一圈的长度。
而后第二,刻度数。这个能够看做手表的刻度。好比第一个参数为24小时,刻度数为12,那么每个刻度表示2小时。时间精度只能到两小时。时间长度/刻度数值越大,精度越大。
而后添加一个任务的时候,根据hash算法获得hash值并对刻度数求模获得一个下标,这个下标就是刻度的位置。
然而有一些任务的执行周期超过了第一个参数,好比超过了24小时,就会获得一个圈数round。
简点说,添加一个任务时会根据任务获得一个hash值,并根据时间轮长度和刻度获得一个商值round和模index,好比时间长度24小时,刻度为12,延迟时间为32小时,那么round=1,index=8。时间轮从开启之时起每24/12个时间走一个指针,即index+1,第一圈round=0。当走到第7个指针时,此时index=7,此时刚才的任务并不能执行,由于刚才的任务round=1,必需要等到下一轮index=7的时候才能执行。
如图所示
对于Delayed两个重要实现方法,第一排序,实际上是经过hash求商和模决定放入哪一个位置。这些位置自己就已经按照时间顺序排序了。第二,延迟时间,已经被封装好了,传入一个延迟的时间就行了。
代码实例:
获得一个延迟队列实例
HashedWheelTimer timer = new HashedWheelTimer(24, //时间轮一圈的长度 TimeUnit.SECONDS, 12);//时间轮的度刻
建立一个任务
TimerTask task = new TimerTask() { @Override public void run(Timeout timeout) throws Exception { System.out.println("任务执行"); } };
将任务加入延迟队列
timer.newTimeout(task, 1000, TimeUnit.SECONDS);
以上两种方案都没有实现持久化和分布式。持久化能够借助数据库来达到。分布式的话仍是使用消息中间件吧。RabbitMq据说已经能够借助某些参数实现。