最近的某个业务中,遇到一个问题,一个用户动做,会产生A和B两个行为,分别经过对应的esb消息总线发出。java
咱们的业务线对AB两条esb消息队列进行监听(两个进程),作数据的同步更新操做。redis
在正常过程当中,A行为比B行为先产生,而且A行为优先级高于B行为,数据最终会根据A行为作更新。sql
可是在实际应用中,出现了并发问题,数据最终根据B行为作了更新,覆盖了A行为。缓存
最开始经过redis缓存进行上锁,在收到A消息时,在redis中添加一个key,处理完毕后删除key 。处理过程当中收到B消息,直接返回。并发
但测试的时候发现并不可用,可能先收到B消息,后收到A消息, 可是先更新A数据,再更新B数据,仍是进行了覆盖。app
还有一种方法是修改底层代码,经过自定义sql的方法,先比较再update 。ide
除此以外,还在考虑是否还有别的办法,问题的产生缘由就是A和B的消息队列基本都在同一时间点拿到数据,对程序来讲形成了并发操做。测试
若是咱们能够把B的消息队列的都延迟一个时间点,保证两个消息队列不在同一时间点得到数据,基本上就能够解决这个问题。ui
因而就上网开始搜索,查到了延迟队列DelayQueue。this
虽然咱们不能让公司的消息队列延迟发送,可是咱们能够延迟处理。当收到消息时先不处理,放入延迟消息队列中,另一个线程再从延迟队列中得到数据进行处理。
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
DelayQueue 是 Delayed 元素的一个无界阻塞队列,只有在延迟期满时才能从中提取元素。该队列的头部 是延迟期满后保存时间最长的 Delayed 元素。若是延迟都尚未期满,则队列没有头部,而且 poll 将返回 null。当一个元素的 getDelay(TimeUnit.NANOSECONDS) 方法返回一个小于等于 0 的值时,将发生到期。即便没法使用 take 或 poll 移除未到期的元素,也不会将这些元素做为正常元素对待。例如,size 方法同时返回到期和未到期元素的计数。此队列不容许使用 null 元素。
放入DelayQueue的对象须要实现Delayed接口。
public interface Delayed extends Comparable<Delayed> { /** * Returns the remaining delay associated with this object, in the * given time unit. * * @param unit the time unit * @return the remaining delay; zero or negative values indicate * that the delay has already elapsed */ long getDelay(TimeUnit unit); }
import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; /** * @author lujianing01@58.com * @Description: * @date 2016/6/21 */ public class DelayQueueTest { public static void main(String[] args) { DelayQueue<DelayedElement> delayQueue = new DelayQueue<DelayedElement>(); //生产者 producer(delayQueue); //消费者 consumer(delayQueue); while (true){ try { TimeUnit.HOURS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 每100毫秒建立一个对象,放入延迟队列,延迟时间1毫秒 * @param delayQueue */ private static void producer(final DelayQueue<DelayedElement> delayQueue){ new Thread(new Runnable() { @Override public void run() { while (true){ try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } DelayedElement element = new DelayedElement(1000,"test"); delayQueue.offer(element); } } }).start(); /** * 每秒打印延迟队列中的对象个数 */ new Thread(new Runnable() { @Override public void run() { while (true){ try { TimeUnit.MILLISECONDS.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("delayQueue size:"+delayQueue.size()); } } }).start(); } /** * 消费者,从延迟队列中得到数据,进行处理 * @param delayQueue */ private static void consumer(final DelayQueue<DelayedElement> delayQueue){ new Thread(new Runnable() { @Override public void run() { while (true){ DelayedElement element = null; try { element = delayQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(System.currentTimeMillis()+"---"+element); } } }).start(); } } class DelayedElement implements Delayed { private final long delay; //延迟时间 private final long expire; //到期时间 private final String msg; //数据 private final long now; //建立时间 public DelayedElement(long delay, String msg) { this.delay = delay; this.msg = msg; expire = System.currentTimeMillis() + delay; //到期时间 = 当前时间+延迟时间 now = System.currentTimeMillis(); } /** * 须要实现的接口,得到延迟时间 用过时时间-当前时间 * @param unit * @return */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.expire - System.currentTimeMillis() , TimeUnit.MILLISECONDS); } /** * 用于延迟队列内部比较排序 当前时间的延迟时间 - 比较对象的延迟时间 * @param o * @return */ @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS)); } @Override public String toString() { final StringBuilder sb = new StringBuilder("DelayedElement{"); sb.append("delay=").append(delay); sb.append(", expire=").append(expire); sb.append(", msg='").append(msg).append('\''); sb.append(", now=").append(now); sb.append('}'); return sb.toString(); } }
补充说明
1.参考网上一些的例子,有些 compareTo 方法就是错的, 要么形成队列中数据积压,要么不能起到延迟的效果。因此必定要通过本身的用例测试确保没有问题。
2.楼主的使用场景,须要考虑,若是进程关闭时,要先等本地延迟队列中的数据被处理完后,再结束进程。