继以前用rabbitMQ实现延时队列,Redis因为其自身的Zset数据结构,也一样能够实现延时的操做java
Zset本质就是Set结构上加了个排序的功能,除了添加数据value以外,还提供另外一属性score,这一属性在添加修改元素时候能够指定,每次指定后,Zset会自动从新按新的值调整顺序。能够理解为有两列字段的数据表,一列存value,一列存顺序编号。操做中key理解为zset的名字,那么对延时队列又有何用呢?试想若是score表明的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它变会按照时间戳大小进行排序,也就是对执行时间先后进行排序,这样的话,起一个死循环线程不断地进行取第一个key值,若是当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就能够达到延时执行的目的, 注意不须要遍历整个Zset集合,以避免形成性能浪费。redis
Zset的排列效果以下图:apache
java代码实现以下:缓存
package cn.chinotan.service.delayQueueRedis; import org.apache.commons.lang3.StringUtils; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Tuple; import java.text.SimpleDateFormat; import java.util.Calendar; import java.util.Date; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; /** * @program: test * @description: redis实现延时队列 * @author: xingcheng * @create: 2018-08-19 **/ public class AppTest { private static final String ADDR = "127.0.0.1"; private static final int PORT = 6379; private static JedisPool jedisPool = new JedisPool(ADDR, PORT); private static CountDownLatch cdl = new CountDownLatch(10); public static Jedis getJedis() { return jedisPool.getResource(); } /** * 生产者,生成5个订单 */ public void productionDelayMessage() { for (int i = 0; i < 5; i++) { Calendar instance = Calendar.getInstance(); // 3秒后执行 instance.add(Calendar.SECOND, 3 + i); AppTest.getJedis().zadd("orderId", (instance.getTimeInMillis()) / 1000, StringUtils.join("000000000", i + 1)); System.out.println("生产订单: " + StringUtils.join("000000000", i + 1) + " 当前时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())); System.out.println((3 + i) + "秒后执行"); } } //消费者,取订单 public static void consumerDelayMessage() { Jedis jedis = AppTest.getJedis(); while (true) { Set<Tuple> order = jedis.zrangeWithScores("orderId", 0, 0); if (order == null || order.isEmpty()) { System.out.println("当前没有等待的任务"); try { TimeUnit.MICROSECONDS.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } continue; } Tuple tuple = (Tuple) order.toArray()[0]; double score = tuple.getScore(); Calendar instance = Calendar.getInstance(); long nowTime = instance.getTimeInMillis() / 1000; if (nowTime >= score) { String element = tuple.getElement(); Long orderId = jedis.zrem("orderId", element); if (orderId > 0) { System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + ":redis消费了一个任务:消费的订单OrderId为" + element); } } } } static class DelayMessage implements Runnable{ @Override public void run() { try { cdl.await(); consumerDelayMessage(); } catch (InterruptedException e) { e.printStackTrace(); } } } public static void main(String[] args) { AppTest appTest = new AppTest(); appTest.productionDelayMessage(); for (int i = 0; i < 10; i++) { new Thread(new DelayMessage()).start(); cdl.countDown(); } } }
实现效果以下:数据结构
生产环境使用注意:并发
因为这种实现方式简单,但在生产环境下大可能是多实例部署,因此存在并发问题,即缓存的查找和删除不具备原子性(zrangeWithScores和zrem操做不是一个命令,不具备原子性),会致使消息的屡次发送问题,这个问题的避免方法以下:app
1.能够采用单独一个实例部署解决(不具有高可用特性,容易单机出现故障后消息不能及时发送)分布式
2.采用redis的lua脚本进行原子操做,即原子操做查找和删除(实现难度大)ide
所以,延时队列的实现最好采用rabbitMQ来实现,rabbitMQ自然具有分布式的特性,能够很好的用在多服务,多实例环境下,具体的实现参考http://www.javashuo.com/article/p-xqhwqaxt-cr.html性能