采用简易的环形延时队列处理秒级定时任务的解决方案

 业务背景

在稍微复杂点业务系统中,不可避免会碰到作定时任务的需求,好比淘宝的交易超时自动关闭订单、超时自动确认收货等等。对于一些定时做业比较多的系统,一般都会搭建专门的调度平台来管理,经过建立定时器来周期性执行任务。如刚才所说的场景,咱们能够给订单建立一个专门的任务来处理交易状态,每秒轮询一次订单表,找出那些符合超时条件的订单而后标记状态。这是最简单粗暴的作法,但明显也很low,本身都下不去手写这样的代码,全部必需要找个更好的方案。web

回到真实项目中的场景,系统中某个活动上线后要给目标用户发送短信通知,这些通知须要按时间点批量发送。虽然已经基于quartz.net给系统搭建了任务调度平台,但着实不想用上述方案来实现。在网上各类搜索和思考,找到一篇文章让我眼前一亮,稍加分析发现里面的思路彻底符合如今的场景,因而决定在本身项目中实现出来。数据库

 

原理分析

 这种方案的核心就是构造一种数据结构,称之为环形队列,但实际上仍是一个数组,加上对它的循环遍历,达到一种环状的假象。而后再配合定时器,就能够实现按需延时的效果。上面提到的文章中也介绍了实现思路,这里我采用个人理解再更加详细的解释一下。数组

咱们先为这个数组分配一个固定大小的空间,好比60,每一个数组的元素用来存听任务的集合。而后开启一个定时器每隔一秒来扫描这个数组,扫完一圈恰好是一分钟。若是提早设置好任务被扫描的圈数(CycleNum)和在数组中的位置(Slot),在恰好扫到数组的Slot位置时,集合里那些CycleNum为0的任务就是达到触发条件的任务,拉出来作业务操做而后移除掉,其余的把圈数减掉一次,而后留到下次继续扫描,这样就实现了延时的效果。原理以下图所示:数据结构

能够看出中间的重点是计算出每一个任务所在的位置以及须要循环的圈数。假设当前时间为15:20:08,当前扫描位置是2,个人任务要在15:22:35这个时刻触发,也就是147秒后。那么我须要循环的圈数就是147/60=2圈,须要被扫描的位置就是(147+2)%60=29的地方。计算好任务的坐标后塞到数组中属于它的位置,而后静静等待被消费就好啦。测试

 

撸码实现

光讲原理不上代码怎么能行呢,根据上面的思路,下面一步步在.net平台下实现出来。ui

先作一些基础封装。spa

首先构造任务参数的基类,用来记录任务的位置信息和定义业务回调方法:.net

    public class DelayQueueParam
    {
        internal int Slot { get; set; }

        internal int CycleNum { get; set; }

        public Action<object> Callback { get; set; }
    }

接下来是核心地方。再构造队列的泛型类,真实类型必须派生自上面的基类,用来扩展一些业务字段方便消费时使用。队列的主要属性有当前位置指针以及数组容器,主要的操做有插入、移除和消费。插入任务时须要传入执行时间,用来计算这个任务的坐标。线程

    public class DelayQueue<T> where T : DelayQueueParam
    {
        private List<T>[] queue;

        private int currentIndex = 1;

        public DelayQueue(int length)
        {
            queue = new List<T>[length];
        }

        public void Insert(T item, DateTime time)
        {
            //根据消费时间计算消息应该放入的位置
            var second = (int)(time - DateTime.Now).TotalSeconds;
            item.CycleNum = second / queue.Length;
            item.Slot = (second + currentIndex) % queue.Length;
            //加入到延时队列中
            if (queue[item.Slot] == null)
            {
                queue[item.Slot] = new List<T>();
            }
            queue[item.Slot].Add(item);
        }

        public void Remove(T item)
        {
            if (queue[item.Slot] != null)
            {
                queue[item.Slot].Remove(item);
            }
        }

        public void Read()
        {
            if (queue.Length >= currentIndex)
            {
                var list = queue[currentIndex - 1];
                if (list != null)
                {
                    List<T> target = new List<T>();
                    foreach (var item in list)
                    {
                        if (item.CycleNum == 0)
                        {
                            //在本轮命中,用单独线程去执行业务操做
                            Task.Run(()=> { item.Callback(item); });
                            target.Add(item);
                        }
                        else
                        {
                            //等下一轮
                            item.CycleNum--;
                            System.Diagnostics.Debug.WriteLine($"@@@@@索引:{item.Slot},剩余:{item.CycleNum}");
                        }
                    }
                    //把已过时的移除掉
                    foreach (var item in target)
                    {
                        list.Remove(item);
                    }
                }
                currentIndex++;
                //下一遍从头开始
                if (currentIndex > queue.Length)
                {
                    currentIndex = 1;
                }
            }
        }
    }

接下来是使用方法。设计

建立一个管理队列实例的静态类,里面封装对队列的操做:

    public static class NotifyPlanManager
    {
        private static DelayQueue<NotifyPlan> _queue = new DelayQueue<NotifyPlan>(60);

        public static void Insert(NotifyPlan plan, DateTime time)
        {
            _queue.Insert(plan, time);
        }

        public static void Read()
        {
            _queue.Read();
        }
    }

构建咱们的实际业务参数类,派生自DelayQueueParam:

    public class NotifyPlan : DelayQueueParam
    {
        public Guid CamId { get; set; }

        public int PreviousTotal { get; set; }

        public int Amount { get; set; }
    }

生产端往队列中插入数据:

    Action<object> callback = (result) =>
    {
        var np = result as NotifyPlan;
        //这里作本身的业务操做
        //举个例子:
        Debug.WriteLine($"活动ID:{np.CamId},已发送数量:{np.PreviousTotal},本次发送数量:{np.Amount}");
    };
    NotifyPlanManager.Insert(new NotifyPlan
    {
        Amount = set.MainAmount,
        CamId = camId,
        PreviousTotal = 0,
        Callback = callback
    }, smsTemplate.SendDate);

再建立一个每秒执行一次的定时器用作消费端,我这里使用的是FluentScheduler,核心代码:

    internal class NotifyPlanJob : IJob
    {
        /// <summary>
        /// 执行计划
        /// </summary>
        public void Execute()
        {
            NotifyPlanManager.Read();
        }
    }

    internal class JobFactory : Registry
    {
        public JobFactory()
        {
            //每秒运行一次
            Schedule<NotifyPlanJob >().ToRunEvery(1).Seconds();
        }
    }

  JobManager.Initialize(new JobFactory());

而后开启调试运行,打开本机的系统时间面板,对着时间看输出结果。亲测有效。

 

总结

 这种方案的好处是避免了频繁地扫描数据库和没必要要的业务操做,另外也很方便控制时间精度。带来的问题是若是web服务异常或重启可能会发生任务丢失的状况,我目前的处理方法是在数据库中标记任务状态,服务启动时把状态为“排队中”的任务从新加载到队列中等待消费。

以上方案在单机环境测试没问题,多节点状况下暂时没有深究。如有设计实现上的缺陷,欢迎讨论与指正,要是有更好的方案,那就当抛砖引玉,再好不过了~

相关文章
相关标签/搜索