延迟队列,顾名思义它是一种带有延迟功能的消息队列。 那么,是在什么场景下我才须要这样的队列呢?php
先看看一下业务场景:mysql
一般解决以上问题,最简单直接的办法就是定时去扫表。git
扫表存在的问题是:github
延时队列能对于上述需求能很好的解决web
调研了市场上一些开源的方案,如下:redis
1.有赞科技:只有原理,没有开源代码sql
2.github我的的:github.com/ouqiang/del…数据库
1.基于redis实现,redis只能配置一个,若是redis挂了整个服务不可用,可用性差点
2.消费端实现的是拉模式,接入成本大,每一个项目都得去实现一遍接入代码
3.在star使用的人数很少,放在生产环境,存在风险,加之对go语言不了解,出了问题难以维护
复制代码
3.SchedulerX-阿里开源的: 功能很强大,可是运维复杂,依赖组件多,不够轻量性能优化
4.RabbitMQ-延时任务: 自己没有延时功能,须要借助一特性本身实现,并且公司没有部署这个队列,去单独部署一个这个来作延时队列成本有点高,并且还须要专门的运维来维护,目前团队不支持bash
基本以上缘由打算本身写一个,日常使用php多,项目基本redis的zset结构做为存储,用php语言实现 ,实现原理参考了有赞团队:tech.youzan.com/queuing_del…
整体架构
采用master-work架构模式,主要包括6个模块:
环境依赖:PHP 5.4+ 安装sockets,redis,pcntl,pdo_mysql 拓展
create database dq;
#存放告警信息
CREATE TABLE `dq_alert` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`host` varchar(255) NOT NULL DEFAULT '',
`port` int(11) NOT NULL DEFAULT '0',
`user` varchar(255) NOT NULL DEFAULT '',
`pwd` varchar(255) NOT NULL DEFAULT '',
`ext` varchar(2048) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
#存放redis信息
CREATE TABLE `dq_redis` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`t_name` varchar(200) NOT NULL DEFAULT '',
`t_content` varchar(2048) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=14 DEFAULT CHARSET=utf8;
#存储注册信息
CREATE TABLE `dq_topic` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`t_name` varchar(1024) NOT NULL DEFAULT '',
`delay` int(11) NOT NULL DEFAULT '0',
`callback` varchar(1024) NOT NULL DEFAULT '',
`timeout` int(11) NOT NULL DEFAULT '3000',
`email` varchar(1024) NOT NULL DEFAULT '',
`topic` varchar(255) NOT NULL DEFAULT '',
`createor` varchar(1024) NOT NULL DEFAULT '',
`status` tinyint(4) NOT NULL DEFAULT '1',
`method` varchar(32) NOT NULL DEFAULT 'GET',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8;
复制代码
在DqConf.php文件中修改php了路径 $logPath
命令:
php DqHttpServer.php --port 8088
访问:http://127.0.0.1:8088,出现配置界面
php DqInit.php --port 6789 看到以下信息说明启动成功
![]()
<?php
include_once 'DqLoader.php';
date_default_timezone_set("PRC");
//可配置多个
$server=array(
'127.0.0.1:6789',
);
$dqClient = new DqClient();
$dqClient->addServer($server);
$topic ='order_openvip_checker'; //topic在后台注册
$id = uniqid();
$data=array(
'id'=>$id,
'body'=>array(
'a'=>1,
'b'=>2,
'c'=>3,
'ext'=>str_repeat('a',64),
),
//可选,设置后以这个通知时间为准,默认延时时间在注册topic的时候指定
'fix_time'=>date('Y-m-d 23:50:50'),
);
//添加
$boolRet = $dqClient->add($topic, $data);
echo 'add耗时:'.(msectime() - $time)."ms\n";
//查询
$time = msectime();
$result = $dqClient->get($topic, $id);
echo 'get耗时:'.(msectime() - $time)."ms\n";
//删除
$time = msectime();
$boolRet = $dqClient->del($topic,$id);
echo 'del耗时:'.(msectime() - $time)."ms\n";
复制代码
执行php test.php
默认日志目录在项目目录的logs目录下,在DqConf.php修改$logPath
ps -ef | grep dq-master| grep -v grep | head -n 1 | awk '{print $2}' | xargs kill -USR2
须要安装pthreads拓展:
测试原理:使用多线程模拟并发,在1s内能成功返回请求成功的个数
php DqBench concurrency requests
concurrency:并发数
requests: 每一个并发产生的请求数
测试环境:内存 8G ,8核cpu,2个redis和1个dq-server 部署在一个机器上,数据包64字节
qps:2400
复制代码
若是调用通知接口在超时时间内,没有收到回复认为通知失败,系统会从新把数据放入队列,从新通知,系统默认最大通知10次(能够在Dqconf.php文件中修改$notify_exp_nums)通知间隔为2n+1,好比第一次1分钟,通知失败,第二次3分钟后,直到收到回复,超出最大通知次数后系统自动丢弃,同时发邮件通知
ps:网络抖动在所不免,通知接口若是涉及到核心的服务,必定要保证幂等!!
线上部署了两个实例每一个机房部一个,4个redis做存储,服务稳定运行数月,各项指标均符合预期
主要接入业务:
项目地址: github.com/chenlinzhon…