gmq
是基于redis
提供的特性,使用go
语言开发的一个简单易用的队列;关于redis使用特性能够参考以前本人写过一篇很简陋的文章Redis 实现队列; gmq
的灵感和设计是基于有赞延迟队列设计,文章内容清晰并且很好理解,可是没有提供源码,在文章的最后也提到了一些将来架构方向; gmq
不是简单按照有赞延迟队列的设计实现功能,在它的基础上,作了一些修改和优化,主要以下:php
dispatcher
调度分配各个bucket
,而不是由timer
bucket
维护一个timer
,而不是全部bucket一个timer
timer
每次扫描bucket
到期job
时,会一次性返回多个到期job
,而不是每次只返回一个job
timer
的扫描时钟由bucket
中下个job
到期时间决定,而不是每秒扫描一次(TTR)
没有执行完毕或程序被意外中断,则消息从新回到队列再次被消费,通常用于数据比较敏感,不容丢失的dispatcher
任务调度器,负责将job
分配到bucket
或直接推送到ready queue
bucket
任务桶,用于存放延迟任务;每一个bucket
会维护一个timer
定时器,而后将到期的job
推送到ready queue
ready queue
存放已准备好的job
,等待被consumer
消费ready queue
,等待被消费ready queue
,等待被消费参考第一个图的流程,当job被消费者读取后,若是job.TTR>0
,即job设置了执行超时时间,那么job会在读取后会被添加到TTRBucket(专门存放设置了超时时间的job),而且设置job.delay = job.TTR
,若是在TTR时间内没有获得消费者ack确认而后删除job,job将在TTR时间以后添加到ready queue
,而后再次被消费(若是消费者在TTR时间以后才请求ack,会获得失败的响应)python
主要和TTR的设置有关系,确认机制能够分为两种:git
pop
出job时,即会自动删除job pool
中的job元数据pop
出job时开始到用户ack
确认删除结束这段时间,若是在这段时间没有ACK
,job会被再次加入到ready queue
,而后再次被消费,只有用户调用了ACK
,才会去删除job pool
中job元数据配置文件位于gmq/conf.ini
,能够根据本身项目需求修改配置github
git clone https://github.com/wuzhc/gmq.git
cd gmq
go get -u -v github.com/kardianos/govendor # 若是有就不须要安装了
govendor sync
go run main.go
# go build # 可编译成可执行文件
复制代码
# 启动
./gmq start
# 中止
./gmq stop
# 守护进程模式启动,不输出日志到console
nohup ./gmq start >/dev/null 2>&1 &
# 守护进程模式下查看日志输出(配置文件conf.ini须要设置target_type=file,filename=gmq.log)
tail -f gmq.log
复制代码
目前只实现python,go,php语言的客户端的demo,参考:github.com/wuzhc/demo/…golang
# php
# 生产者
php producer.php
# 消费者
php consumer.php
# python
# 生产者
python producer.py
# 消费者
python consumer.py
复制代码
{
"id": "xxxx", # 任务id,这个必须是一个惟一值,将做为redis的缓存键
"topic": "xxx", # topic是一组job的分类名,消费者将订阅topic来消费该分类下的job
"body": "xxx", # 消息内容
"delay": "111", # 延迟时间,单位秒
"TTR": "11111", # 执行超时时间,单位秒
"status": 1, # job执行状态,该字段由gmq生成
"consumeNum":1, # 被消费的次数,主要记录TTR>0时,被重复消费的次数,该字段由gmq生成
}
复制代码
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '1800', // 单位秒,半个小时后执行
'TTR' => '0'
];
复制代码
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '100' // 100秒后还未获得消费者ack确认,则再次添加到队列,将再次被被消费
];
复制代码
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_xxx"],
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];
复制代码
$data = [
'id' => 'xxxx_id' . microtime(true) . rand(1,999999999),
'topic' => ["topic_A","topic_B","topic_C"], //优先消费topic_A,消费完后消费topic_B,最后再消费topic_C
'body' => 'this is a rpc test',
'delay' => '0',
'TTR' => '0'
];
复制代码
gmq
提供了一个简单web监控平台(后期会提供根据job.Id追踪消息的功能),方便查看当前堆积任务数,默认监听端口为8000
,例如:http://127.0.0.1:8000, 界面以下: web
如下是开发遇到的问题,以及一些粗糙的解决方案redis
若是强行停止gmq
的运行,可能会致使一些数据丢失,例以下面一个例子:
json
bucket
中,也不在
ready queue
,这就出现了job丢失的状况,并且将没有任何机会去删除
job pool
中已丢失的job,长久以后
job pool
可能会堆积不少的已丢失job的元数据;因此安全退出须要在接收到退出信号时,应该等待全部
goroutine
处理完手中的事情,而后再退出
gmq
退出流程gmq
经过context传递关闭信号给
dispatcher
,
dispatcher
接收到信号会关闭
dispatcher.closed
,每一个
bucket
会收到
close
信号,而后先退出
timer
检索,再退出
bucket
,
dispatcher
等待全部bucket退出后,而后退出
dispatcher
退出顺序流程: timer
-> bucket
-> dispatcher
segmentfault
不要使用kill -9 pid
来强制杀死进程,由于系统没法捕获SIGKILL信号,致使gmq可能执行到一半就被强制停止,应该使用kill -15 pid
,kill -1 pid
或kill -2 pid
,各个数字对应信号以下:缓存
bucket
都会维护一个timer
,不一样于有赞设计,timer
不是每秒轮询一次,而是根据bucket
下一个job到期时间来设置timer
的定时时间 ,这样的目的在于若是bucket
没有job或job到期时间要好久才会发生,就能够减小没必要要的轮询;timer
只有处理完一次业务后才会重置定时器;,这样的目的在于可能出现上一个时间周期还没执行完毕,下一个定时事件又发生了timer
就会频繁重置定时器时间,就目前使用来讲,还没出现什么性能上的问题咱们知道redis的命令是排队执行,在一个复杂的业务中可能会屡次执行redis命令,若是在大并发的场景下,这个业务有可能中间插入了其余业务的命令,致使出现各类各样的问题;
redis保证整个事务原子性和一致性问题通常用multi/exec
或lua脚本
,gmq
在操做涉及复杂业务时使用的是lua脚本
,由于lua脚本
除了有multi/exec
的功能外,还有Pipepining
功能(主要打包命令,减小和redis server
通讯次数),下面是一个gmq
定时器扫描bucket集合到期job的lua脚本:
-- 获取到期的50个job
local jobIds = redis.call('zrangebyscore',KEYS[1], 0, ARGV[4], 'withscores', 'limit', 0, 50)
local res = {}
for k,jobId in ipairs(jobIds) do
if k%2~=0 then
local jobKey = string.format('%s:%s', ARGV[3], jobId)
local status = redis.call('hget', jobKey, 'status')
-- 检验job状态
if tonumber(status) == tonumber(ARGV[1]) or tonumber(status) == tonumber(ARGV[2]) then
-- 先移除集合中到期的job,而后到期的job返回给timer
local isDel = redis.call('zrem', KEYS[1], jobId)
if isDel == 1 then
table.insert(res, jobId)
end
end
end
end
local nextTime
-- 计算下一个job执行时间,用于设置timer下一个时钟周期
local nextJob = redis.call('zrange', KEYS[1], 0, 0, 'withscores')
if next(nextJob) == nil then
nextTime = -1
else
nextTime = tonumber(nextJob[2]) - tonumber(ARGV[4])
if nextTime < 0 then
nextTime = 1
end
end
table.insert(res,1,nextTime)
return res
复制代码
可能通常phper写业务不多会接触到链接池,其实这是由php自己所决定他应用不大,固然在php的扩展swoole
仍是颇有用处的
gmq
的redis链接池是使用gomodule/redigo/redis
自带链接池,它带来的好处是限制redis链接数,经过复用redis链接来减小开销,另外能够防止tcp被消耗完,这在生产者大量生成数据时会颇有用
// gmq/mq/redis.go
Redis = &RedisDB{
Pool: &redis.Pool{
MaxIdle: 30, // 最大空闲连接
MaxActive: 10000, // 最大连接
IdleTimeout: 240 * time.Second, // 空闲连接超时
Wait: true, // 当链接池耗尽时,是否阻塞等待
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword(""))
if err != nil {
return nil, err
}
return c, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
},
}
复制代码
job pool
是惟一的,它将做为redis的缓存键;gmq
不自动为job生成惟一id值是为了用户能够根据本身生成的job.id来追踪job状况,若是job.id是重复的,push时会报重复id的错误ready queue
的速度取决与redis性能,而不是bucket数量netstat -anp | grep 9503 | wc -l
tcp 0 0 10.8.8.188:41482 10.8.8.185:9503 TIME_WAIT -
复制代码
这个是正常现象,由tcp四次挥手能够知道,当接收到LAST_ACK发出的FIN后会处于TIME_WAIT
状态,主动关闭方(客户端)为了确保被动关闭方(服务端)收到ACK,会等待2MSL时间,这个时间是为了再次发送ACK,例如被动关闭方可能由于接收不到ACK而重传FIN;另外也是为了旧数据过时,不影响到下一个连接,; 若是要避免大量TIME_WAIT
的链接致使tcp被耗尽;通常方法以下:
TIME_WAIT
状态的链接json
外,可支持protobuf
序列化