实战|我仍是很建议你用DelayQueue搞定超时订单的-(1)

1、用三根鸡毛作引言

  • 真的! 不骗大家的喔~ 相信你们都遇到相似于:订单30min后未支付自动取消的开发任务
  • 那么今日份就来了解一下怎么用延时队列 DelayQueue搞定单机版的超时订单

2、延时队列使用场景

那么何时须要用延时队列呢?常见的延时任务场景 举栗子:java

  1. 订单在30分钟以内未支付则自动取消。
  2. 重试机制实现,把调用失败的接口放入一个固定延时的队列,到期后再重试。
  3. 新建立的店铺,若是在十天内都没有上传过商品,则自动发送消息提醒。
  4. 用户发起退款,若是三天内没有获得处理则通知相关运营人员。
  5. 预约会议后,须要在预约的时间点前十分钟通知各个与会人员参加会议。
  6. 关闭空闲链接,服务器中,有不少客户端的链接,空闲一段时间以后须要关闭之。
  7. 清理过时数据业务。好比缓存中的对象,超过了空闲时间,须要从缓存中移出。
  8. 多考生考试,到期所有考生必须交卷,要求时间很是准确的场景。

3、解决办法多如鸡毛

  1. 按期轮询(数据库等)
  2. JDK DelayQueue
  3. JDK Timer
  4. ScheduledExecutorService 周期性线程池
  5. 时间轮(kafka)
  6. 时间轮(Netty的HashedWheelTimer)
  7. Redis有序集合(zset)
  8. zookeeper之curator
  9. RabbitMQ
  10. Quartz,xxljob等定时任务框架
  11. Koala(考拉)
  12. JCronTab(仿crontab的java调度器)
  13. SchedulerX(阿里)
  14. 有赞延迟队列
  15. .....(鸡毛)
  • 解决问题方法真是不胜枚举,正所谓一呼百应,一千个读者眼里有一千个哈姆雷特

🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱 🌱git

  • 那咱们第一篇先来实战JDK的DelayQueue,万祖归宗,万法同源,学会了最基础的Queue,就不愁其余的了
  • 后续再写几篇使用Redis,Zk,MQ的一些机制,实战分布式状况下的使用

4、先认亲

延时队列,首先,它是一种队列,队列意味着内部的元素是有序的,元素出队入队是有方向性的,元素从一端进入,从另外一端取出。github

其次,延时队列,最重要的特性就体如今它的延时属性上,跟普通的队列不同的是,普通队列中的元素老是等着但愿被早点取出处理,而延时队列中的元素则是但愿被在指定时间获得取出和处理,因此延时队列中的元素是都是带时间属性的,一般来讲是须要被处理的消息或者任务。数据库

一言以蔽之曰 : 延时队列就是用来存放须要在指定时间被处理的元素的队列。编程

1) DelayQueue 是谁,上族谱 数组

看的出来到 DelayQueue这一代已经第五代传人了,

要知道 DelayQueue自幼生在八戒家,长大就往外面拉,熊熊烈火它不怕,水是水来渣是渣。缓存

不过它真的是文韬武略,有一把ReentrantLock就是它的九齿钉耙,抗的死死の捍卫着本身的PriorityQueue.安全

有典故曰:服务器

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
// 用于控制并发的 可重入 全局 锁
private final transient ReentrantLock lock = new ReentrantLock();
// 根据Delay时间排序的 无界的 优先级队列
private final PriorityQueue<E> q = new PriorityQueue<E>();
// 用于优化阻塞通知的线程元素leader,标记当前是否有线程在排队(仅用于取元素时)
private Thread leader = null;
// 条件,用于阻塞和通知的Condition对象,表示如今是否有可取的元素
private final Condition available = lock.newCondition();

       /** * 省洛方法代码..... 大家懂个人省洛吗? */
复制代码
  • 注释的已经很清楚他们的意思了,也具有了并发编程之艺术的 锁,队列,状态(条件)
  • 他的几个方法也是经过 锁-->维护队列-->出队,入队-->根据Condition进行条件的判断-->进行线程之间的通讯和唤起
  • 以支持优先级无界队列的PriorityQueue做为一个容器,容器里面的元素都应该实现Delayed接口,在每次往优先级队列中添加元素时以元素的过时时间做为排序条件,最早过时的元素放在优先级最高。
  • DelayQueue是一个没有大小限制的队列,所以往队列中插入数据的操做(生产者)永远不会被阻塞,而只有获取数据的操做(消费者)才会被阻塞。

2) 优先级队列 PriorityQueue多线程

由于咱们的DelayQueue里面维护了一个优先级的队列PriorityQueue 简单的看下:

//默认容量11
     private static final int DEFAULT_INITIAL_CAPACITY = 11;
    //存储元素的地方 数组
    transient Object[] queue; // non-private to simplify nested class access
    //元素个数
    private int size = 0;
    //比较器
    private final Comparator<? super E> comparator;
复制代码
  1. 默认容量是11;
  2. queue,元素存储在数组中,这跟咱们以前说的堆通常使用数组来存储是一致的;
  3. comparator,比较器,在优先级队列中,也有两种方式比较元素,一种是元素的天然顺序,一种是经过比较器来比较;
  4. modCount,修改次数,有这个属性表示PriorityQueue也是fast-fail的;
  5. PriorityQueue不是有序的,只有堆顶存储着最小的元素;
  6. PriorityQueue 是非线程安全的;

3) DelayQueue的方法简介

  • 入队方法 : 若添加的元素是队首(堆顶)元素,就把leader置为空,并唤醒等待在条件available上的线程;
public boolean add(E e) {    return offer(e);}
public void put(E e) {    offer(e);}
public boolean offer(E e, long timeout, TimeUnit unit) {    return offer(e);}
public boolean offer(E e) {    
    final ReentrantLock lock = this.lock;    
    lock.lock();   //加锁 由于优先队列线程不安全
    try {
        q.offer(e);  //判断优先级 进行入队 
    if (q.peek() == e) {    //-----[1]
        //leader记录了被阻塞在等待队列头生效的线程 新增一个元素到队列头,
        //表示等待原来队列头生效的阻塞的线程已经失去了阻塞的意义
        //,此时须要获取新的队列头进行返回了
        leader = null;      
        //获取队列头的线程被唤起,主要有两种场景:
        //1. 以前队列为空,致使被阻塞的线程
        //2. 以前队列非空,可是队列头没有生效(到期)致使被阻塞的线程
        available.signal();     
    }        
        return true; //由于是无界队列 因此添加元素确定成功 直到OOM
    } finally {    
        lock.unlock();   //释放锁
    }
}
复制代码

offer()方法,首先获取独占锁,而后添加元素到优先级队列,因为q是优先级队列,因此添加元素后,peek并不必定是当前添加的元素,若是[1]为true,说明当前元素e的优先级最小也就即将过时的,这时候激活avaliable变量条件队列里面的线程,通知他们队列里面有元素了。

  • 出队方法 take()

请看我详细的注释,毫不是走马观花

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock; //获取锁 
    lock.lockInterruptibly(); //可中断锁 并发类里面凡是调用了await的方法获取锁时候都是使用的lockInterruptibly方法而不是Lock. 
    //也是一种fail-fast思想吧,await()方法会在中断标志设置后抛出InterruptedException异常后退出 不至于死死的等待
    try {
        for (;;) {//会写死循环的都是高手
            E first = q.peek();//get队头元素
            if (first == null)
                // 队列头为空,则阻塞,直到新增一个入队为止(1)
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);//获取剩余时间
                if (delay <= 0)
                    // 若队列头元素已生效,则直接返回(2)
                    return q.poll();
                first = null; // don't retain ref while waiting 等待的时候不能引用,表示释放当前引用的(3)
                if (leader != null)
                    // leader 非空时,表示有其余的一个线程在出队阻塞中 (4.1)
                    // 此时挂住当前线程,等待另外一个线程出队完成
                    available.await();
                else {
                    //标识当前线程处于等待队列头生效的阻塞中 (4.2.1)
                    Thread thisThread = Thread.currentThread(); 
                    leader = thisThread;
                    try {
                        // 等待队列头元素生效(4.2.2)
                        available.awaitNanos(delay);
                    } finally {
                        //最终释放当前的线程 设置leader为null (4.2.3)
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }     //(5)
    } finally {
        if (leader == null && q.peek() != null)
            // 当前线程出队完成,通知其余出队阻塞的线程继续执行(6)
            available.signal();
            lock.unlock();//解锁结束
    }
}
复制代码

那么,下面的结论肉眼可见:

  1. 若是队列为空,则阻塞,直到有个线程(生产者投递数据)完成入队操做
  2. 获取队列头,若队列头已生效,则直接返回
  3. 未生效则释放当前引用
  4. 当队列头部没有生效时候:
    1. 如有另外一个线程已经处于等待队列头生效的阻塞过程当中,则阻塞当前线程,直到另外一个线程完成出队操做
    2. 若没有其余线程阻塞在出队过程当中,即当前线程为第一个获取队列头的线程
      • 标识当前线程处于等待队列头生效的阻塞中(leader = thisThread
      • 阻塞当前线程,等待队列头生效
      • 队列头生效以后,清空标识(leader=null)
  5. 再次进入循环,获取队列头并返回
  6. 最后,当前线程出队完成,通知其余出队阻塞的线程继续执行

4) Leader/Follower模式

  1. 若是不是队首节点,根本不须要唤醒操做!
  2. 假设取值时,延时时间尚未到,那么须要等待,但这个时候,队列中新加入了一个延时更短的,并放在了队首,那么 此时,for循环由开始了,取得是新加入的元素,那以前的等待就白等了,明显能够早点退出等待!
  3. 还有就是若是好多线程都在此等待,若是时间到了,同时好多线程会充等待队列进入锁池中,去竞争锁资源,但结果只能是一个成功, 多了写无畏的竞争!(屡次的等待和唤醒)

5)Delayed

public interface Delayed extends Comparable<Delayed> { 
    long getDelay(TimeUnit unit);
}
复制代码

据情报显示:Delayed是一个继承自Delayed的接口,而且定义了一个Delayed方法,用于表示还有多少时间到期,到期了应返回小于等于0的数值。

很简答就是定义了一个,一个哈,一个表延迟的接口,就是个规范接口,目的就是骗咱们去实现它的方法.哼~

5、再实战

说了那么多废话,让我想起了那句名言:一切没有代码实操的讲解都是耍流氓 至今深深的烙在我心中,因此我必定要实战给大家看,显得我不是流氓...

  • 实战以 订单下单后三十分钟内未支付则自动取消 为业务场景

  • 该场景的代码逻辑分析以下:

    1. 下单后将订单直接放入未支付的延时队列中
    2. 若是超时未支付,则从队列中取出,进行修改成取消状态的订单
    3. 若是支付了,则不去进行取消,或者取消的时候作个状态筛选,便可避免更新
    4. 或者支付完成后,作个主动出队
    5. 还有就是用户主动取消订单,也作个主动出队
  • 那么咱们写代码必定要通用,先来写个通用的Delayed 通用...嗯! 泛型的

import lombok.Getter;
import lombok.Setter;

import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/** * @author LiJing * @ClassName: ItemDelayed * @Description: 数据延迟实现实例 用以包装具体的实例转型 * @date 2019/9/16 15:53 */

@Setter
@Getter
public class ItemDelayed<T> implements Delayed {

    /**默认延迟30分钟*/
    private final static long DELAY = 30 * 60 * 1000L;
    /**数据id*/
    private Long dataId;
    /**开始时间*/
    private long startTime;
    /**到期时间*/
    private long expire;
    /**建立时间*/
    private Date now;
    /**泛型data*/
    private T data;
    
    public ItemDelayed(Long dataId, long startTime, long secondsDelay) {
        super();
        this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + (secondsDelay * 1000);
        this.now = new Date();
    }

    public ItemDelayed(Long dataId, long startTime) {
        super();
        this.dataId = dataId;
        this.startTime = startTime;
        this.expire = startTime + DELAY;
        this.now = new Date();
    }

    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }
}
复制代码
  • 再写个通用的接口,用于规范和方便统一实现 这样任何类型的订单均可以实现这个接口 进行延时任务的处理
public interface DelayOrder<T> {


    /** * 添加延迟对象到延时队列 * * @param itemDelayed 延迟对象 * @return boolean */
    boolean addToOrderDelayQueue(ItemDelayed<T> itemDelayed);

    /** * 根据对象添加到指定延时队列 * * @param data 数据对象 * @return boolean */
    boolean addToDelayQueue(T data);

    /** * 移除指定的延迟对象从延时队列中 * * @param data */
    void removeToOrderDelayQueue(T data);
}
复制代码
  • 来具体的任务,具体的逻辑具体实现
@Slf4j
@Lazy(false)
@Component
public class DelayOwnOrderImpl implements DelayOrder<Order> {

    @Autowired
    private OrderService orderService;

    @Autowired
    private ExecutorService delayOrderExecutor;

    private final static DelayQueue<ItemDelayed<Order>> DELAY_QUEUE = new DelayQueue<>();

    /** * 初始化时加载数据库中需处理超时的订单 * 系统启动:扫描数据库中未支付(要在更新时:加上已支付就不用更新了),未过时的的订单 */
    @PostConstruct
    public void init() {
        log.info("系统启动:扫描数据库中未支付,未过时的的订单");
        List<Order> orderList = orderService.selectFutureOverTimeOrder();
        for (Order order : orderList) {
            ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
            this.addToOrderDelayQueue(orderDelayed);
        }
        log.info("系统启动:扫描数据库中未支付的订单,总共扫描了" + orderList.size() + "个订单,推入检查队列,准备到期检查...");

        /*启动一个线程,去取延迟订单*/
        delayOrderExecutor.execute(() -> {
            log.info("启动处理的订单线程:" + Thread.currentThread().getName());
            ItemDelayed<Order> orderDelayed;
            while (true) {
                try {
                    orderDelayed = DELAY_QUEUE.take();
                    //处理超时订单
                    orderService.updateCloseOverTimeOrder(orderDelayed.getDataId());
                } catch (Exception e) {
                    log.error("执行自营超时订单的_延迟队列_异常:" + e);
                }
            }
        });
    }

    /** * 加入延迟消息队列 **/
    @Override
    public boolean addToOrderDelayQueue(ItemDelayed<Order> orderDelayed) {
        return DELAY_QUEUE.add(orderDelayed);
    }

    /** * 加入延迟消息队列 **/
    @Override
    public boolean addToDelayQueue(Order order) {
        ItemDelayed<Order> orderDelayed = new ItemDelayed<>(order.getId(), order.getCreateDate().getTime());
        return DELAY_QUEUE.add(orderDelayed);
    }

    /** * 从延迟队列中移除 主动取消就主动从队列中取出 **/
    @Override
    public void removeToOrderDelayQueue(Order order) {
        if (order == null) {
            return;
        }
        for (Iterator<ItemDelayed<Order>> iterator = DELAY_QUEUE.iterator(); iterator.hasNext(); ) {
            ItemDelayed<Order> queue = iterator.next();
            if (queue.getDataId().equals(order.getId())) {
                DELAY_QUEUE.remove(queue);
            }
        }
    }
}
复制代码

解释一番上面的写的东东

  1. delayOrderExecutor是注入的一个专门处理出队的一个线程
  2. @PostConstruct是啥呢,是在容器启动后只进行一次初始化动做的一个注解,至关实用
  3. 启动后呢,咱们去数据库扫描一遍,防止有漏网之鱼,由于单机版吗,队列的数据是在内存中的,重启后确定原先的数据会丢失,因此为保证服务质量,咱们可能会录音.....因此为保证重启后数据的恢复,咱们须要从新扫描数据库把未支付的数据从新装载到内存的队列中
  4. 接下来就是用这个线程去一直不停的访问队列的take()方法,当队列无数据就一直阻塞,或者数据没到期继续阻塞着,直到到期出队,而后获取订单的信息,去处理订单的更新操做

6、后总结

  • 这就是单机的很差处,也是一个痛点,因此确定是不太适合订单量特别大的场景 你们也要酌情考虑和运用
  • 相对于同等量级的数据库轮询操做来讲,真是节省了很多数据库的压力和链接,仍是值得一用的,咱们能够只保存订单的id到延时实例中,这样缩减队列单个实例内存存储
  • 那还有技巧就是更新的时候注意控制好幂等性,控制好幂等性,会让你轻松不少,顺畅不少,可是数据量大了,要蛀牙的哦

那今日份的讲解就到此结束,具体的代码请移步个人gitHub的mybot项目Master分支查阅,fork体验一把,或者评论区留言探讨,写的很差,请多多指教~~

相关文章
相关标签/搜索