DelayQueue系列(二):基础组件

原文发表于简书DelayQueue之通用组件,本次将作部分优化调整。bash

在咱们产品中有这么一个场景,在医生关闭问诊的3min后,患者将没法继续和医生进行对话。我根据对业务的理解,和对技术实现成本的衡量,决定经过DelayQueue的方式来实现。app

关于DelayQueue的相关内容介绍和核心源码解析已在上一篇DelayQueue系列(一):源码分析说明了。异步

根据个人经验,我认为在生活中有以下场景能够用获得DelayQueue:
1.下单后一段时间(业内基本上都是30分钟)内不付款,就自动取消订单。
2.提交打车申请后,一段时间内(好比说30秒)没有附近的司机接单,就自动将该请求发送给更多距离更远的司机。ide

这类场景都有以下特色:
1.须要有一段时间的延迟,若是仅仅是为了异步执行,那么消息队列显然是是更优的选择。
2.对执行时间的精确度有必定要求,固然异常情况下,也能够对精确度适当放宽松。好比场景1的订单取消,规则设置为30分钟不支付就取消,但实际场景中,精确到30分天然是最好结果,但假如出现故障,那么在可容许的范围内将订单取消也是能够接受的(好比说将取消时间在放宽到32分钟)。
3.执行是高频率的。这点须要和第2点结合起来看,若是仅仅是为了低频率的定时执行,我的认为任务调度多是更优的选择。函数

综合来看,若是不须要延迟执行,那么推荐用消息队列;若是对执行时间的精确度不那么在乎或执行频率并不高,那么推荐使用任务调度;若是须要延迟执行,且执行比较高频,对执行时间的精确度有必定要求,能够考虑使用延迟队列。 以上这些是咱们为什么采用DelayQueue来实现这个业务场景的缘由。源码分析

为了方便使用DelayQueue,我封装了组件对DelayQueue进行了扩展。
首先我定义了一个类TaskMessage,对Delayed进行了扩展,实现了compareTo和getDelay方法。 以下是TaskMessage类的核心代码。post

public class TaskMessage implements Delayed {

    private String body;  //消息内容
    private long executeTime;//执行时间
    private Function function;//具体执行方式
    
    public TaskMessage(Long delayTime,String body,  Function function) {
        this.body = body;
        this.function = function;
        this.executeTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();
    }

    @Override
    public int compareTo(Delayed delayed) {
        TaskMessage msg = (TaskMessage) delayed;
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -msg.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.executeTime - System.nanoTime(), TimeUnit.NANOSECONDS);
    }
}
复制代码

外部调用只须要TaskMessage m1 = new TaskMessage(delayTime, body, function)就能够生成一个延迟任务的元素了,内部自动就根据延迟时间计算出这个延迟任务元素的预期执行时间。优化

Function是1.8版引入的函数式接口,主要方法是R apply(T t),功能是将Function对象应用到输入的参数上,而后返回计算结果。 那么达到延迟任务的预期执行时间时,只须要调用一下function.apply()方法就能够了,不须要关心apply的具体实现。apply的具体实现方法是在调用时才明确的。ui

而后定义一个延迟任务的执行线程类TaskConsumer,实现了Runnable,重写了run方法。由于延迟任务的执行,必然是须要从新起线程去执行的,不能阻碍主线程的操做。this

以下是TaskConsumer类的核心代码。

public class TaskConsumer implements Runnable {

    // 延时队列
    private DelayQueue<TaskMessage> queue;

    //用于标记处理任务线程的id
    private int threadId;
    
    //信号量
    private Boolean signal;

    TaskConsumer(DelayQueue<TaskMessage> queue, int threadId) {
        this.queue = queue;
        this.threadId = threadId;
        this.signal = Boolean.TRUE;
    }

    void finish() {
        this.signal = Boolean.FALSE;
    }
	 	
    @Override
    public void run() {
        while (signal) {
            try {
                TaskMessage take = queue.take();
                if (logger.isInfoEnabled()) {
                    logger.info("处理线程的id为" + threadId + ",消费消息内容为:" + take.getBody() + ",预计执行时间为" +
                            DateFormatUtils.timeStampToString(take.getDelay(TimeUnit.MILLISECONDS) + System.currentTimeMillis(), "yyyy-MM-dd HH:mm:ss"));
                }
                take.getFunction().apply(take.getBody());
            } catch (InterruptedException e) {
                if (logger.isInfoEnabled()) {
                    logger.info("id为" + threadId + "的处理线程被强制中断");
                }
            } catch (Exception e) {
                logger.error("taskConsumer error", e);
            }
        }
        if (logger.isInfoEnabled()) {
            logger.info("id为" + threadId + "的处理线程已中止");
        }
    }
}
复制代码

这个类核心代码就只有以下两行。
TaskMessage take = queue.take();
获取延迟队列的队首元素。前文已经解释过,Queue的take方法会返回队列的队首元素,不然就会挂起线程。因此只要有返回值,必然就能获取到当前须要执行的TaskMessage元素。

take.getFunction().apply(take.getBody());
执行延迟任务元素的apply方法。applay方法是在定义TaskMessage的时候肯定的,代表了到达预期执行时间所须要进行的一系列操做,那么此时只须要执行对应的apply方法就能够了。

最后是加载TaskConsumer的统一管理类TaskManager。
以下是TaskManager类的核心代码。

public class TaskManager implements InitializingBean,DisposableBean{

    @Override
    public void afterPropertiesSet() {
        for (int i = 0; i < threadCount; i++) {
            TaskConsumer taskConsumer = new TaskConsumer(queue, i);
            taskConsumerList.add(taskConsumer);
            Thread thread = new Thread(taskConsumer);
            threadList.add(thread);
            thread.start();
        }
    }

    @Override
    public void destroy() {
        for (int i = 0; i < threadList.size(); i++) {
            threadList.get(i).interrupt();
            taskConsumerList.get(i).finish();
        }
    }
}
复制代码

这个类的做用在于初始化类后,就启动线程不断的去获取延迟任务。而后在销毁类后,先中断消费者线程,而后设置信号量使得消费者线程的run方法能跳出死循环,从而使得消费线程正常结束。

最后是如何调用的示例。很简单,就只有两步:
一、生成延迟任务元素taskMessage
二、将taskMessage添加到延迟队列中

TaskMessage taskMessage = new TaskMessage(delayTime * 1000, messageBody,
        function -> this.processTask(delayTaskMessage));
DelayQueue<TaskMessage> queue = taskManager.getQueue();
queue.offer(taskMessage);
复制代码

ok,以上是如何扩展DelayQueue的功能构形成高可用的组件的方案,欢迎你们来一块儿讨论。

下一章我准备讲一下咱们项目中运用DelayQueue的过程当中碰到的问题以及如何持久化的方案。

相关文章
相关标签/搜索