延时队列 DelayQueue
搞定单机版的超时订单
那么何时须要用延时队列呢?常见的延时任务场景 举栗子:java
🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱git
DelayQueue
,万祖归宗,万法同源,学会了最基础的Queue
,就不愁其余的了延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另外一端取出。github
其次,延时队列,最重要的特性就体如今它的延时属性上,跟普通的队列不同的是,普通队列中的元素老是等着但愿被早点取出处理,而延时队列中的元素则是但愿被在指定时间获得取出和处理,因此延时队列中的元素是都是带时间属性的,一般来讲是须要被处理的消息或者任务。数据库
一言以蔽之曰 : 延时队列就是用来存放须要在指定时间被处理的元素的队列。编程
1) DelayQueue 是谁,上族谱 数组
DelayQueue
这一代已经第五代传人了,
要知道 DelayQueue
自幼生在八戒家,长大就往外面拉,熊熊烈火它不怕,水是水来渣是渣。缓存
不过它真的是文韬武略,有一把ReentrantLock
就是它的九齿钉耙,抗的死死の捍卫着本身的PriorityQueue
.安全
有典故曰:服务器
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
// 用于控制并发的 可重入 全局 锁
private final transient ReentrantLock lock = new ReentrantLock();
// 根据Delay时间排序的 无界的 优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化阻塞通知的线程元素leader,标记当前是否有线程在排队(仅用于取元素时)
private Thread leader = null;
// 条件,用于阻塞和通知的Condition对象,表示如今是否有可取的元素
private final Condition available = lock.newCondition();
/** * 省洛方法代码..... 大家懂个人省洛吗? */
复制代码
锁,队列,状态(条件)
Condition
进行条件的判断-->进行线程之间的通讯和唤起PriorityQueue
做为一个容器,容器里面的元素都应该实现Delayed
接口,在每次往优先级队列中添加元素时以元素的过时时间做为排序条件,最早过时的元素放在优先级最高。DelayQueue
是一个没有大小限制的队列,所以往队列中插入数据的操做(生产者)永远不会被阻塞,而只有获取数据的操做(消费者)才会被阻塞。2) 优先级队列 PriorityQueue多线程
由于咱们的DelayQueue
里面维护了一个优先级的队列PriorityQueue
简单的看下:
//默认容量11
private static final int DEFAULT_INITIAL_CAPACITY = 11;
//存储元素的地方 数组
transient Object[] queue; // non-private to simplify nested class access
//元素个数
private int size = 0;
//比较器
private final Comparator<? super E> comparator;
复制代码
3) DelayQueue的方法简介
leader
置为空,并唤醒等待在条件available
上的线程;public boolean add(E e) { return offer(e);}
public void put(E e) { offer(e);}
public boolean offer(E e, long timeout, TimeUnit unit) { return offer(e);}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock(); //加锁 由于优先队列线程不安全
try {
q.offer(e); //判断优先级 进行入队
if (q.peek() == e) { //-----[1]
//leader记录了被阻塞在等待队列头生效的线程 新增一个元素到队列头,
//表示等待原来队列头生效的阻塞的线程已经失去了阻塞的意义
//,此时须要获取新的队列头进行返回了
leader = null;
//获取队列头的线程被唤起,主要有两种场景:
//1. 以前队列为空,致使被阻塞的线程
//2. 以前队列非空,可是队列头没有生效(到期)致使被阻塞的线程
available.signal();
}
return true; //由于是无界队列 因此添加元素确定成功 直到OOM
} finally {
lock.unlock(); //释放锁
}
}
复制代码
offer()
方法,首先获取独占锁,而后添加元素到优先级队列,因为q是优先级队列,因此添加元素后,peek并不必定是当前添加的元素,若是[1]为true,说明当前元素e的优先级最小也就即将过时的,这时候激活avaliable变量条件队列里面的线程,通知他们队列里面有元素了。
请看我详细的注释,毫不是走马观花
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock; //获取锁
lock.lockInterruptibly(); //可中断锁 并发类里面凡是调用了await的方法获取锁时候都是使用的lockInterruptibly方法而不是Lock.
//也是一种fail-fast思想吧,await()方法会在中断标志设置后抛出InterruptedException异常后退出 不至于死死的等待
try {
for (;;) {//会写死循环的都是高手
E first = q.peek();//get队头元素
if (first == null)
// 队列头为空,则阻塞,直到新增一个入队为止(1)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);//获取剩余时间
if (delay <= 0)
// 若队列头元素已生效,则直接返回(2)
return q.poll();
first = null; // don't retain ref while waiting 等待的时候不能引用,表示释放当前引用的(3)
if (leader != null)
// leader 非空时,表示有其余的一个线程在出队阻塞中 (4.1)
// 此时挂住当前线程,等待另外一个线程出队完成
available.await();
else {
//标识当前线程处于等待队列头生效的阻塞中 (4.2.1)
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 等待队列头元素生效(4.2.2)
available.awaitNanos(delay);
} finally {
//最终释放当前的线程 设置leader为null (4.2.3)
if (leader == thisThread)
leader = null;
}
}
}
} //(5)
} finally {
if (leader == null && q.peek() != null)
// 当前线程出队完成,通知其余出队阻塞的线程继续执行(6)
available.signal();
lock.unlock();//解锁结束
}
}
复制代码
那么,下面的结论肉眼可见:
leader = thisThread
)leader=null
)4) Leader/Follower模式
5)Delayed
public interface Delayed extends Comparable<Delayed> {
long getDelay(TimeUnit unit);
}
复制代码
据情报显示:Delayed
是一个继承自Delayed
的接口,而且定义了一个Delayed
方法,用于表示还有多少时间到期,到期了应返回小于等于0的数值。
很简答就是定义了一个,一个哈,一个表延迟的接口,就是个规范接口,目的就是骗咱们去实现它的方法.哼~
说了那么多废话,让我想起了那句名言:一切没有代码实操的讲解都是耍流氓
至今深深的烙在我心中,因此我必定要实战给大家看,显得我不是流氓...
实战以 订单下单后三十分钟内未支付则自动取消 为业务场景
该场景的代码逻辑分析以下:
那么咱们写代码必定要通用,先来写个通用的Delayed
通用...嗯! 泛型的
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/** * @author LiJing * @ClassName: ItemDelayed * @Description: 数据延迟实现实例 用以包装具体的实例转型 * @date 2019/9/16 15:53 */
@Setter
@Getter
public class ItemDelayed<T> implements Delayed {
/**默认延迟30分钟*/
private final static long DELAY = 30 * 60 * 1000L;
/**数据id*/
private Long dataId;
/**开始时间*/
private long startTime;
/**到期时间*/
private long expire;
/**建立时间*/
private Date now;
/**泛型data*/
private T data;
public ItemDelayed(Long dataId, long startTime, long secondsDelay) {
super();
this.dataId = dataId;
this.startTime = startTime;
this.expire = startTime + (secondsDelay * 1000);
this.now = new Date();
}
public ItemDelayed(Long dataId, long startTime) {
super();
this.dataId = dataId;
this.startTime = startTime;
this.expire = startTime + DELAY;
this.now = new Date();
}
@Override
public int compareTo(Delayed o) {
return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
}
复制代码
public interface DelayOrder<T> {
/** * 添加延迟对象到延时队列 * * @param itemDelayed 延迟对象 * @return boolean */
boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed);
/** * 根据对象添加到指定延时队列 * * @param data 数据对象 * @return boolean */
boolean addToDelayQueue(T data);
/** * 移除指定的延迟对象从延时队列中 * * @param data */
void removeToOrderDelayQueue(T data);
}
复制代码
@Slf4j
@Lazy(false)
@Component
public class DelayOwnOrderImpl implements DelayOrder<Order> {
@Autowired
private OrderService orderService;
@Autowired
private ExecutorService delayOrderExecutor;
private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>();
/** * 初始化时加载数据库中需处理超时的订单 * 系统启动:扫描数据库中未支付(要在更新时:加上已支付就不用更新了),未过时的的订单 */
@PostConstruct
public void init() {
log.info("系统启动:扫描数据库中未支付,未过时的的订单");
List<Order> orderList = orderService.selectFutureOverTimeOrder();
for (Order order : orderList) {
ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
this.addToOrderDelayQueue(orderDelayed);
}
log.info("系统启动:扫描数据库中未支付的订单,总共扫描了" + orderList.size() + "个订单,推入检查队列,准备到期检查...");
/*启动一个线程,去取延迟订单*/
delayOrderExecutor.execute(() -> {
log.info("启动处理的订单线程:" + Thread.currentThread().getName());
ItemDelayed<Order> orderDelayed;
while (true) {
try {
orderDelayed = DELAY_QUEUE.take();
//处理超时订单
orderService.updateCloseOverTimeOrder(orderDelayed.getDataId());
} catch (Exception e) {
log.error("执行自营超时订单的_延迟队列_异常:" + e);
}
}
});
}
/** * 加入延迟消息队列 **/
@Override
public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) {
return DELAY_QUEUE.add(orderDelayed);
}
/** * 加入延迟消息队列 **/
@Override
public boolean addToDelayQueue(Order order) {
ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
return DELAY_QUEUE.add(orderDelayed);
}
/** * 从延迟队列中移除 主动取消就主动从队列中取出 **/
@Override
public void removeToOrderDelayQueue(Order order) {
if (order == null) {
return;
}
for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
ItemDelayed<Order> queue = iterator.next();
if (queue.getDataId().equals(order.getId())) {
DELAY_QUEUE.remove(queue);
}
}
}
}
复制代码
解释一番上面的写的东东
delayOrderExecutor
是注入的一个专门处理出队的一个线程@PostConstruct
是啥呢,是在容器启动后只进行一次初始化动做的一个注解,至关实用- 启动后呢,咱们去数据库扫描一遍,防止有漏网之鱼,由于单机版吗,队列的数据是在内存中的,重启后确定原先的数据会丢失,因此为保证服务质量,咱们可能会录音.....因此为保证重启后数据的恢复,咱们须要从新扫描数据库把未支付的数据从新装载到内存的队列中
- 接下来就是用这个线程去一直不停的访问队列的
take()
方法,当队列无数据就一直阻塞,或者数据没到期继续阻塞着,直到到期出队,而后获取订单的信息,去处理订单的更新操做
id
到延时实例中,这样缩减队列单个实例内存存储那今日份的讲解就到此结束,具体的代码请移步个人gitHub的mybot项目Master分支查阅,fork体验一把,或者评论区留言探讨,写的很差,请多多指教~~