延迟队列,顾名思义它是一种带有延迟功能的消息队列。 那么,是在什么场景下我才须要这样的队列呢?php
先看看一下业务场景:mysql
一般解决以上问题,最简单直接的办法就是定时去扫表。git
扫表存在的问题是:github
延时队列能对于上述需求能很好的解决web
调研了市场上一些开源的方案,如下:redis
1.基于redis实现,redis只能配置一个,若是redis挂了整个服务不可用,可用性差点
2.消费端实现的是拉模式,接入成本大,每一个项目都得去实现一遍接入代码
3.在star使用的人数很少,放在生产环境,存在风险,加之对go语言不了解,出了问题难以维护sql
基本以上缘由打算本身写一个,日常使用php多,项目基本redis的zset结构做为存储,用php语言实现 ,实现原理参考了有赞团队:https://tech.youzan.com/queui...数据库
可用性:性能优化
整体架构swoole
采用master-work架构模式,主要包括6个模块:
环境依赖:PHP 5.4+ 安装sockets,redis,pcntl,pdo_mysql 拓展
create database dq; #存放告警信息 CREATE TABLE `dq_alert` ( `id` int(11) NOT NULL AUTO_INCREMENT, `host` varchar(255) NOT NULL DEFAULT '', `port` int(11) NOT NULL DEFAULT '0', `user` varchar(255) NOT NULL DEFAULT '', `pwd` varchar(255) NOT NULL DEFAULT '', `ext` varchar(2048) NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8; #存放redis信息 CREATE TABLE `dq_redis` ( `id` int(11) NOT NULL AUTO_INCREMENT, `t_name` varchar(200) NOT NULL DEFAULT '', `t_content` varchar(2048) NOT NULL DEFAULT '', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8; #存储注册信息 CREATE TABLE `dq_topic` ( `id` int(11) NOT NULL AUTO_INCREMENT, `t_name` varchar(1024) NOT NULL DEFAULT '', `delay` int(11) NOT NULL DEFAULT '0', `callback` varchar(1024) NOT NULL DEFAULT '', `timeout` int(11) NOT NULL DEFAULT '3000', `email` varchar(1024) NOT NULL DEFAULT '', `topic` varchar(255) NOT NULL DEFAULT '', `createor` varchar(1024) NOT NULL DEFAULT '', `status` tinyint(4) NOT NULL DEFAULT '1', `method` varchar(32) NOT NULL DEFAULT 'GET', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
在DqConf.php文件中修改php了路径 $logPath
命令:
php DqHttpServer.php --port 8088
访问:http://127.0.0.1:8088,出现配置界面
redis信息格式:host:post:auth 好比 127.0.0.1:6379:12345
php DqInit.php --port 6789
看到以下信息说明启动成功
![]()
<?php include_once 'DqLoader.php'; date_default_timezone_set("PRC"); //可配置多个 $server=array( '127.0.0.1:6789', ); $dqClient = new DqClient(); $dqClient->addServer($server); $topic ='order_openvip_checker'; //topic在后台注册 $id = uniqid(); $data=array( 'id'=>$id, 'body'=>array( 'a'=>1, 'b'=>2, 'c'=>3, 'ext'=>str_repeat('a',64), ), //可选,设置后以这个通知时间为准,默认延时时间在注册topic的时候指定 'fix_time'=>date('Y-m-d 23:50:50'), ); //添加 $boolRet = $dqClient->add($topic, $data); echo 'add耗时:'.(msectime() - $time)."ms\n"; //查询 $time = msectime(); $result = $dqClient->get($topic, $id); echo 'get耗时:'.(msectime() - $time)."ms\n"; //删除 $time = msectime(); $boolRet = $dqClient->del($topic,$id); echo 'del耗时:'.(msectime() - $time)."ms\n";
执行php test.php
默认日志目录在项目目录的logs目录下,在DqConf.php修改$logPath
ps -ef | grep dq-master| grep -v grep | head -n 1 | awk '{print $2}' | xargs kill -USR2
须要安装pthreads拓展:
测试原理:使用多线程模拟并发,在1s内能成功返回请求成功的个数
php DqBench concurrency requests concurrency:并发数 requests: 每一个并发产生的请求数 测试环境:内存 8G ,8核cpu,2个redis和1个dq-server 部署在一个机器上,数据包64字节 qps:2400
若是调用通知接口在超时时间内,没有收到回复认为通知失败,系统会从新把数据放入队列,从新通知,系统默认最大通知10次(能够在Dqconf.php文件中修改$notify_exp_nums)通知间隔为2n+1,好比第一次1分钟,通知失败,第二次3分钟后,直到收到回复,超出最大通知次数后系统自动丢弃,同时发邮件通知
ps:网络抖动在所不免,通知接口若是涉及到核心的服务,必定要保证幂等!!
线上部署了两个实例每一个机房部一个,4个redis做存储,服务稳定运行数月,各项指标均符合预期
主要接入业务: