【面试必备】聊聊高性能延时队列应用

延时队列的应用场景:java

下单后,30分钟内未付款就自动取消订单等;
支付后,24小时未评论自动好评;
在咱们实际开发过程当中,应用场景不少...redis

基于Redis Zset 实现

实现原理

Redis因为其自身的Zset数据结构,也一样能够实现延时的操做。
Zset本质就是Set结构上加了个排序的功能,除了添加数据value以外,还提供另外一属性score,这一属性在添加元素时候能够指定,每次指定score后,Zset会自动从新按新的值调整顺序算法

  1. 若是score表明的是想要执行时间的时间戳,在某个时间将它插入Zset集合中,它会按照时间戳大小进行排序,也就是对执行时间先后进行排序。
> ZADD delay_queue 1581309229  taskId_1
(integer) 1
> ZADD delay_queue 1581309129  taskId_2
(integer) 1
> ZADD delay_queue 1581309329  taskId_3
(integer) 1
复制代码
  1. 不断地进行取第一个key值,若是当前时间戳大于等于该key值的socre就将它取出来进行消费删除,就能够达到延时执行的目的。 注意不须要遍历整个Zset集合,以避免形成性能浪费。
> ZRANGE delay_queue 0 -1 withscores
1) "taskId_2"
2) "1581309129"
3) "taskId_1"
4) "1581309229"
5) "taskId_3"
6) "1581309329"
复制代码

使用注意

  • 遍历逻辑,删除逻辑,注意使用 Redis Lua 封装,确保原子性操做。更要注意 Redis Lua 在 Redis Cluster 的伪集群问题。
  • 如果JAVA 语言能够直接使用 redisson,封装了 DelayedQueue 的实现。

源码逻辑 org/redisson/RedissonDelayedQueue.javaspring

Beanstalkd 消息队列

Beanstalkd,一个高性能、轻量级的分布式内存队列系统。支持过有9.5 million用户的Facebook Causes应用。后来开源,如今有PostRank大规模部署和使用,天天处理百万级任务。docker

部署使用

  • Linux 安装 || docker 部署
yum install beanstalkd

||

docker run -d -p 11300:11300 pig4cloud/beanstalkd
复制代码
  • 客户端使用,pom 依赖
<!--封装了 官方的 java sdk,只支持 springboot 2.X-->
<dependency>
    <groupId>com.pig4cloud.beanstalk</groupId>
    <artifactId>beanstalkd-client-spring-boot-starter</artifactId>
    <version>0.0.1</version>
</dependency>
复制代码
  • 默认配置

  • 代码使用
@Autowired
private JobProducer producer;

/**
 * @param delay    是一个整形数,表示将job放入ready队列须要等待的秒数
 * @param ttr      time to run—是一个整形数,表示容许一个worker执行该job的秒数。这个时间将从一个worker 获取一个job开始计算。
 *                 若是该worker没能在<ttr> 秒内删除、释放或休眠该job,这个job就会超时,服务端会主动释放该job。
 *                 最小ttr为1。若是客户端设置了0,服务端会默认将其增长到1。
 * @param priority 优先级 0~2**32的整数,最高优先级是0
 */
@Test
public void testSend() {
	String taskId = "1";// 业务对象信息
    producer.putJob(0, 10, 10, taskId.getBytes());
}
复制代码
@Component
public class DemoJobConsumer extends AbstractTubeConsumerListener {

    @Override
    public void work(JobConsumer consumer) {
        // 阻塞多少秒获取一次 Job
        Job job = consumer.reserveJob(1000L);

        // 消费此Job
        consumer.deleteJob(job.getId());

        // 执行延时的业务逻辑
        String biz = new String(job.getData());
    }
}
复制代码

扩展

  • 数据库,利用定时任务轮询实现,业务量大会性能瓶颈。
  • 延时队列的其余实现,好比 rabbitmq 利用ttl特性能够实现。没法取消已放入队列里面的数据,使用时特别注意死信队列的配置等。
  • 还能够本身根据 时间轮片的算法 自行实现 。
  • 总之一切,都要有补偿的逻辑,不管是业务人员手动触发仍是自动补偿。

相关文章
相关标签/搜索