手把手实现一条延时消息

前言

近期在维护公司的调度平台,其中有个关键功能那就是定时任务;定时任务你们平时确定接触的很多,好比 JDK 中的 TimerScheduledExecutorService、调度框架 Quartz 等。java

一般用于实现 XX 时间后的延时任务,或周期性任务;git

好比一个常见的业务场景:用户下单 N 分钟未能支付便自动取消订单。github

实现这类需求一般有两种方式:数据库

  • 轮询定时任务:给定周期内扫描全部未支付的订单,查看时间是否到期。
  • 延时消息:订单建立的时候发送一条 N 分钟到期的信息,一旦消息消费后即可判断订单是否能够取消。

先看第一种,这类方式实现较为简单,只须要启动一个定时任务便可;但缺点一样也很明显,这个间隔扫描的时间很差控制。segmentfault

给短了会形成不少无心义的扫描,增大数据库压力,给长了又会使得偏差较大。api

固然最大的问题仍是效率较低,随着订单增多耗时会呈线性增加,最差的状况甚至会出现上一波轮询尚未扫描完,下一波调度又来了。数组


这时第二种方案就要显得靠谱多了,经过延时消息能够去掉没必要要的订单扫描,实时性也比较高。数据结构

延时消息

这里咱们不过多讨论这类需求如何实现;重点聊聊这个延时消息,看它是如何实现的,基于实现延时消息的数据结构还能实现定时任务。框架

我在以前的开源 IM 项目中也加入了此类功能,能够很直观的发送一条延时消息,效果以下:函数

使用 :delay hahah 2 发送了一条两秒钟的延时消息,另一个客户端将会在两秒钟以后收到该消息。

具体的实现步骤会在后文继续分析。

时间轮

要实现延时消息就不得不提到一种数据结构【时间轮】,时间轮听这名字能够很直观的抽象出它的数据结构。

其实本质上它就是一个环形的数组,如图所示,假设咱们建立了一个长度为 8 的时间轮。


task0 = 当咱们须要新建一个 5s 延时消息,则只须要将它放到下标为 5 的那个槽中。

task1 = 而若是是一个 10s 的延时消息,则须要将它放到下标为 2 的槽中,但同时须要记录它所对应的圈数,否则就和 2 秒的延时消息重复了。

task2= 当建立一个 21s 的延时消息时,它所在的位置就和 task0 相同了,都在下标为 5 的槽中,因此为了区别须要为他加上圈数为 2。

经过这张图能够更直观的理解。

当咱们须要取出延时消息时,只须要每秒往下移动这个指针,而后取出该位置的全部任务便可。

固然取出任务以前还得判断圈数是否为 0 ,不为 0 时说明该任务还得再轮几圈,同时须要将圈数 -1 。

这样就可避免轮询全部的任务,不过若是时间轮的槽比较少,致使某一个槽上的任务很是多那效率也比较低,这就和 HashMaphash 冲突是同样的。

编码实现

理论讲完后咱们来看看实际的编码实现,为此我建立了一个 RingBufferWheel 类。

它的主要功能以下:

  • 能够添加指定时间的延时任务,在这个任务中能够实现本身的业务逻辑。
  • 中止运行(包含强制中止和全部任务完成后中止)。
  • 查看待执行任务数量。

首先直接看看这个类是如何使用的。

我在这里建立了 65 个延时任务,每一个任务都比前一个延后 1s 执行;同时自定义了一个 Job 类来实现本身的业务逻辑,最后调用 stop(false) 会在全部任务执行完毕后退出。

构造函数

先来看看其中的构造函数,这里一共有两个构造函数,用于接收一个线程池及时间轮的大小。

线程池的做用会在后面讲到。

这里的时间轮大小也是有讲究的,它的长度必须得是 2∧n,至于为何有这个要求后面也会讲到。

默认状况下会初始化一个长度为 64 的数组。

添加任务

下面来看看添加任务的逻辑,根据咱们以前的那张抽象图其实很容易实现。


首先咱们要定义一个 Task 类,用于抽象任务;它自己也是一个线程,一旦延时到期便会执行其中的 run 函数,因此使用时即可继承该类,将业务逻辑写在 run() 中便可。

它其中还有两个成员变量,也很好理解。

  • cycleNum 用于记录该任务所在时间轮的圈数。
  • key 在这里其实就是延时时间。

//经过 key 计算应该存放的位置
    private Set<Task> get(int key) {
        int index = mod(key, bufferSize);
        return (Set<Task>) ringBuffer[index];
    }

    private int mod(int target, int mod) {
        // equals target % mod
        target = target + tick.get() ;
        return target & (mod - 1);
    }

首先是根据延时时间 (key) 计算出所在的位置,其实就和 HashMap 同样的取模运算,只不过这里使用了位运算替代了取模,同时效率会高上很多。

这样也解释了为何数组长度必定得是 2∧n

而后查看该位置上是否存在任务,不存在就新建一个;存在天然就是将任务写入这个集合并更新回去。

private int cycleNum(int target, int mod) {
        //equals target/mod
        return target >> Integer.bitCount(mod - 1);
    }
其中的 cycleNum() 天然是用于计算该任务所处的圈数,也是考虑到效率问题,使用位运算替代了除法。
private void put(int key, Set<Task> tasks) {
        int index = mod(key, bufferSize);
        ringBuffer[index] = tasks;
    }

put() 函数就很是简单了,就是将任务写入指定数组下标便可。

启动时间轮

任务写进去后下一步即是启动这个时间轮了,我这里定义了一个 start() 函数。

其实本质上就是开启了一个后台线程来作这个事情:

它会一直从时间轮中取出任务来运行,而运行这些任务的线程即是咱们在初始化时传入的线程池;因此全部的延时任务都是由自定义的线程池调度完成的,这样能够避免时间轮的阻塞。

这里调用的 remove(index) 很容易猜到是用于获取当前数组中的全部任务。

逻辑很简单就再也不赘述,不过其中的 size2Notify() 却是值得说一下。

他是用于在中止任务时,主线程等待全部延时任务执行完毕的唤醒条件。这类用法几乎是全部线程间通讯的常规套路,值得收入技能包。

中止时间轮

刚才提到的唤醒主线程得配合这里的中止方法使用:

若是是强制中止那便什么也无论,直接更新中止标志,同时关闭线程池便可。

但若是是软中止(等待全部任务执行完毕)时,那就得经过上文提到的方式阻塞主线程,直到任务执行完毕后被唤醒。

CIM 中的应用

介绍了核心原理和基本 API 后,咱们来看看实际业务场景如何结合使用(背景是一个即时通信项目)。

我这里所使用的场景在文初也提到了,就是真的发送一条延时消息;

现有的消息都是实时消息,因此要实现一个延时消息即是在现有的发送客户端处将延时消息放入到这个时间轮中,在任务到期时再执行真正的消息发送逻辑。

因为项目自己结合了 Spring,因此第一步天然是配置 bean

bean 配置好后其实就可使用了。

每当发送的是延时消息时,只须要将这个消息封装为一个 Job 放到时间轮中,而后在本身的业务类中完成业务便可。

后续能够优化下 api,不用每次新增任务都要调用 start() 方法。

这样一个延时消息的应用便完成了。

总结

时间轮这样的应用还很是多,好比 Netty 中的 HashedWheelTimer 工具原理也差很少,能够用于维护长链接心跳信息。

甚至 Kafka 在这基础上还优化出了层级时间轮,这些都是后话了,你们感兴趣的话能够自行搜索资料或者抽时间我再完善一次。

这篇文章从前期准备到撸码实现仍是花了很多时间,若是对你有帮助的话还请点赞转发。

本文的全部源码均可在此处查阅:

https://github.com/crossoverJie/cim

你的点赞与分享是对我最大的支持

相关文章
相关标签/搜索