在学习过程当中发现redis的zset还能够用来实现轻量级的延时消息队列功能,虽然可靠性还有待提升,可是对于一些对数据可靠性要求不那么高的功能要求彻底能够实现。本次主要采用了redis中zset中的zadd, zrangebyscore 和 zdel来实现一个小demo。git
由于用的是macOS, 直接github
$ brew install redis
$ go get github.com/garyburd/redigo/redis
复制代码
又由于比较懒,生成任务的惟一id时,直接采用了bson中的objectId,因此:redis
$ go get gopkg.in/mgo.v2/bson
复制代码
惟一id不是必须有,但若是以后有实际应用须要携带,便于查找相应任务。shell
经过一个for循环生成10w个任务, 每个任务有不一样的时间json
func producer() {
count := 0
//生成100000个任务
for count < 100000 {
count++
dealTime := int64(rand.Intn(5)) + time.Now().Unix()
uuid := bson.NewObjectId().Hex()
redis.Client.AddJob(&job.JobMessage{
Id: uuid,
DealTime: dealTime,
}, + int64(dealTime))
}
}
复制代码
其中AddJob函数在另外一个包中, 将上一个函数中随机生成的时间做为须要处理的时间戳.bash
// 添加任务
func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
conn.Do("zadd", key, dealTime, util.JsonEncode(msg))
}
复制代码
消费者处理流程分为两个步骤:函数
由于在获取小于等于当前时间戳的任务时,可能有多个go routine同时读到了当前任务,而只有一个任务能够来处理当前任务。所以咱们须要经过一个方案来判断究竟由谁来处理这个任务(固然若是只有一个消费者能够读到就直接处理):这个时候能够经过redis的删除操做来获取,由于删除指定value时只有成功的操做才会返回不为0,因此咱们能够认为删除当前队列成功的那个go routine拿到了当前的任务。学习
下面是代码:ui
// 消费者
func consumer() {
// 启动10个go routine一块儿去拿
count := 0
for count < 10 {
go func() {
for {
jobs := redis.Client.GetJob()
if len(jobs) <= 0 {
time.Sleep(time.Second * 1)
continue
}
currentJob := jobs[0]
// 若是当前抢redis队列成功,
if redis.Client.DelJob(currentJob) > 0 {
var jobMessage job.JobMessage
util.JsonDecode(currentJob, &jobMessage) //自定义的json解析函数
handleMessage(&jobMessage)
}
}
}()
count++
}
}
// 处理任务用函数
func handleMessage(msg *job.JobMessage) {
fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime)
go func() {
countChan <- true
}()
}
复制代码
redis部分的代码,获取任务和删除任务spa
// 获取任务
func (client *RedisClient) GetJob() []string {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
timeNow := time.Now().Unix()
ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1))
if err != nil {
panic(err)
}
return ret
}
// 删除当前任务, 用来判断是否抢到了当前任务
func (client *RedisClient) DelJob(value string) int {
conn := client.Get()
defer conn.Close()
key := "JOB_MESSAGE_QUEUE"
ret, err := redis.Int(conn.Do("zrem", key, value))
if err != nil {
panic(err)
}
return ret
}
复制代码
代码大抵如此。最后跑起来以后,大概每3-4秒钟可以处理掉1w个任务,速度上确实是...