Java延时队列DelayQueue的使用

问题背景

最近的某个业务中,遇到一个问题,一个用户动做,会产生A和B两个行为,分别经过对应的esb消息总线发出。java

咱们的业务线对AB两条esb消息队列进行监听(两个进程),作数据的同步更新操做。redis

 

在正常过程当中,A行为比B行为先产生,而且A行为优先级高于B行为,数据最终会根据A行为作更新。sql

可是在实际应用中,出现了并发问题,数据最终根据B行为作了更新,覆盖了A行为。缓存

 

最开始经过redis缓存进行上锁,在收到A消息时,在redis中添加一个key,处理完毕后删除key 。处理过程当中收到B消息,直接返回。并发

但测试的时候发现并不可用,可能先收到B消息,后收到A消息, 可是先更新A数据,再更新B数据,仍是进行了覆盖。app

还有一种方法是修改底层代码,经过自定义sql的方法,先比较再update 。ide

 

问题分析

除此以外,还在考虑是否还有别的办法,问题的产生缘由就是A和B的消息队列基本都在同一时间点拿到数据,对程序来讲形成了并发操做。测试

若是咱们能够把B的消息队列的都延迟一个时间点,保证两个消息队列不在同一时间点得到数据,基本上就能够解决这个问题。ui

 

因而就上网开始搜索,查到了延迟队列DelayQueue。this

虽然咱们不能让公司的消息队列延迟发送,可是咱们能够延迟处理。当收到消息时先不处理,放入延迟消息队列中,另一个线程再从延迟队列中得到数据进行处理。

 

类介绍

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E>

DelayQueue 是 Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。若是延迟都尚未期满,则队列没有头部,而且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即便没法使用 take 或 poll 移除未到期的元素,也不会将这些元素做为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不容许使用 null 元素。

 

放入DelayQueue的对象须要实现Delayed接口。

 

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

 

测试demo

import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author lujianing01@58.com
 * @Description:
 * @date 2016/6/21
 */
public class DelayQueueTest {

    public static void main(String[] args) {
        DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>();

        //生产者
        producer(delayQueue);

        //消费者
        consumer(delayQueue);

        while (true){
            try {
                TimeUnit.HOURS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 每100毫秒建立一个对象,放入延迟队列,延迟时间1毫秒
     * @param delayQueue
     */
    private static void producer(final DelayQueue<DelayedElement> delayQueue){
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    try {
                        TimeUnit.MILLISECONDS.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    DelayedElement element = new DelayedElement(1000,"test");
                    delayQueue.offer(element);
                }
            }
        }).start();

        /**
         * 每秒打印延迟队列中的对象个数
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("delayQueue size:"+delayQueue.size());
                }
            }
        }).start();
    }

    /**
     * 消费者,从延迟队列中得到数据,进行处理
     * @param delayQueue
     */
    private static void consumer(final DelayQueue<DelayedElement> delayQueue){
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    DelayedElement element = null;
                    try {
                        element =  delayQueue.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(System.currentTimeMillis()+"---"+element);
                }
            }
        }).start();
    }
}


class DelayedElement implements Delayed {

    private final long delay; //延迟时间
    private final long expire;  //到期时间
    private final String msg;   //数据
    private final long now; //建立时间

    public DelayedElement(long delay, String msg) {
        this.delay = delay;
        this.msg = msg;
        expire = System.currentTimeMillis() + delay;    //到期时间 = 当前时间+延迟时间
        now = System.currentTimeMillis();
    }

    /**
     * 须要实现的接口,得到延迟时间   用过时时间-当前时间
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
    }

    /**
     * 用于延迟队列内部比较排序   当前时间的延迟时间 - 比较对象的延迟时间
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS));
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("DelayedElement{");
        sb.append("delay=").append(delay);
        sb.append(", expire=").append(expire);
        sb.append(", msg='").append(msg).append('\'');
        sb.append(", now=").append(now);
        sb.append('}');
        return sb.toString();
    }
}

 

补充说明

1.参考网上一些的例子,有些   compareTo  方法就是错的, 要么形成队列中数据积压,要么不能起到延迟的效果。因此必定要通过本身的用例测试确保没有问题。

2.楼主的使用场景,须要考虑,若是进程关闭时,要先等本地延迟队列中的数据被处理完后,再结束进程。

相关文章
相关标签/搜索