P133
经过将待执行任务的相关信息放入队列里面,并在以后对队列进行处理,能够推迟执行那些耗时对操做,这种将工做交给任务处理器来执行对作法被称为任务队列 (task queue) 。 P133
git
P133
能够 Redis
的列表结构存储任务的相关信息,并使用 RPUSH
将待执行任务的相关信息推入列表右端,使用阻塞版本的弹出命令 BLPOP
从队列中弹出待执行任务的相关信息(由于任务处理器除了执行任务不须要执行其余工做)。 P134
github
发送任务redis
// 将任务参数推入指定任务对应的列表右端 func SendTask(conn redis.Conn, queueName string, param string) (bool, error) { count, err := redis.Int(conn.Do("RPUSH", queueName, param)) if err != nil { return false, nil } // 只有成功推入 1 个才算成功发送 return count == 1, nil }
执行任务json
// 不断从任务对应的列表中获取任务参数,并执行任务 func RunTask(conn redis.Conn, queueName string, taskHandler func(param string)) { for ; ; { result, err := redis.Strings(conn.Do("BLPOP", queueName, 10)) // 若是成功获取任务信息,则执行任务 if err != nil && len(result) == 2 { taskHandler(result[1]) } } }
以上代码是任务队列与 Redis
交互的通用版本,使用方式简单,只须要将入参信息序列化成字符串传入便可发送一个任务,提供一个处理任务的方法回调便可执行任务。数组
P136
在此基础上能够讲原有的先进先出任务队列改成具备优先级的任务队列,即高优先级的任务须要在低优先级的任务以前执行。 BLPOP
将弹出第一个非空列表的第一个元素,因此咱们只须要将全部任务队列名数组按照优先级降序排序,让任务队列名数组做为 BLPOP
的入参便可实现上述功能(固然这种若是高优先级任务的生成速率大于消费速率,那么低优先级的任务就永远不会执行)。 P136
网络
优先执行高优先级任务数据结构
// 不断从任务对应的列表中获取任务参数,并执行任务 // queueNames 从前日后的优先级依次下降 func RunTasks(conn redis.Conn, queueNames []string, queueNameToTaskHandler map[string]func(param string)) { // 校验是否全部任务都有对应的处理方法 for _, queueName := range queueNames { if _, exists := queueNameToTaskHandler[queueName]; !exists { panic(fmt.Sprintf("queueName(%v) not in queueNameToTaskHandler", queueName)) } } // 将全部入参放入同一个数组 length := len(queueNames) args := make([]interface{}, length + 1) for i := 0; i < length; i++ { args[i] = queueNames[i] } args[length] = 10 for ; ; { result, err := redis.Strings(conn.Do("BLPOP", args...)) // 若是成功获取任务信息,则执行任务 if err != nil && len(result) == 2 { // 找到对应的处理方法并执行 taskHandler := queueNameToTaskHandler[result[0]] taskHandler(result[1]) } } }
P136
实际业务场景中还存在某些任务须要在指定时间进行操做,例如:邮件定时发送等。此时还须要存储任务执行的时间,并将能够执行的任务放入刚刚的任务队列中。可使用有序集合进行存储,时间戳做为分值,任务相关信息及队列名等信息的 json
串做为键。ide
发送延迟任务函数
// 存储延迟任务的相关信息,用于序列化和反序列化 type delayedTaskInfo struct { UnixNano int64 `json:"unixNano"` QueueName string `json:"queueName"` Param string `json:"param"` } // 发送一个延迟任务 func SendDelayedTask(conn redis.Conn, queueName string, param string, executeAt time.Time) (bool, error) { // 若是已到执行时间,则直接发送到任务队列 if executeAt.UnixNano() <= time.Now().UnixNano() { return SendTask(conn, queueName, param) } // 还未到执行时间,须要放入有序集合 // 序列化相关信息 infoJson, err := json.Marshal(delayedTaskInfo{ UnixNano: time.Now().UnixNano(), QueueName:queueName, Param:param, }) if err != nil { return false, err } // 放入有序集合 count, err := redis.Int(conn.Do("ZADD", "delayed_tasks", infoJson, executeAt.UnixNano())) if err != nil { return false, err } // 只有成功加入 1 个才算成功 return count == 1, nil }
拉取可执行的延迟任务,放入任务队列idea
// 轮询延迟任务,将可执行的任务放入任务队列 func PollDelayedTask(conn redis.Conn) { for ; ; { // 获取最先须要执行的任务 infoMap, err := redis.StringMap(conn.Do("ZRANGE", "delayed_tasks", 0, 0, "WITHSCORES")) if err != nil || len(infoMap) != 1 { // 睡 1ms 再继续 time.Sleep(time.Millisecond) continue } for infoJson, unixNano := range infoMap { // 已到时间,放入任务队列 executeAt, err := strconv.Atoi(unixNano) if err != nil { log.Errorf("#PollDelayedTask -> convert unixNano to int error, infoJson: %v, unixNano: %v", infoJson, unixNano) // 作一些后续处理,例如:删除该条信息,防止耽误其余延迟任务 } if int64(executeAt) <= time.Now().UnixNano() { // 反序列化 info := new(delayedTaskInfo) err := json.Unmarshal([]byte(infoJson), info) if err != nil { log.Errorf("#PollDelayedTask -> infoJson unmarshal error, infoJson: %v, unixNano: %v", infoJson, unixNano) // 作一些后续处理,例如:删除该条信息,防止耽误其余延迟任务 } // 从有序集合删除该信息,并放入任务队列 count, err := redis.Int(conn.Do("ZREM", "delayed_tasks", infoJson)) if err != nil && count == 1 { _, _ = SendTask(conn, info.QueueName, info.Param) } } else { // 未到时间,睡 1ms 再继续 time.Sleep(time.Millisecond) } } } }
有序集合不具有列表的阻塞弹出机制,因此程序须要不断循环,并尝试从队列中获取要被执行的任务,这一操做会增大网络和处理器的负载。能够经过在函数里面增长一个自适应方法 (adaptive method) ,让函数在一段时间内都没有发现可执行的任务时,自动延长休眠时间,或者根据下一个任务的执行时间来决定休眠的时长,并将休眠时长的最大值限制为 100ms ,从而确保任务能够被及时执行。 P138
P139
两个或多个客户端在互相发送和接收消息的时候,一般会使用如下两种方法来传递信息: P139
Redis
内置了用于进行消息推送的 PUBLISH
命令和 SUBSCRIBE
命令(05. Redis 其余命令简介 介绍了这两个命令的用法和缺陷)P140
单个接受者时,只须要将发送的信息保存至每一个接收者对应的列表中便可,使用 RPUSH
能够向执行接受者发送消息,使用 LTRIM
能够移除列表中的前几个元素来获取收到的消息。 P140
P141
多个接受者的状况相似群组,即群组内的人发消息,其余人均可以收到。咱们可使用如下几个数据结构存储所需数据,以便实现咱们的所需的功能:
INCR
: 实现 id 自增并获取ZRANGEBYSCORE
: 得到未获取的消息ZCARD
: 获取群组人数ZRANGE
: 通过处理后,可实现哪些消息成功被哪些人接收了的功能ZRANGE
: 获取 id 最小数据,可实现删除被全部人获取过的消息的功能ZCARD
: 获取所在的群组个数ZRANGE
: 通过处理后,可实现批量拉取全部群组的未获取的消息的功能P145
P146
如今拥有每一个 ip 天天进行活动的时间和具体操做,现须要计算天天每一个城市的人操做数量(相似于统计日活)。
原始数据十分巨大,因此须要分批读入内存进行聚合统计,而聚合后的数据相对来讲很小,因此彻底能够在内存中进行聚合统计,完成后再将结果写入 Redis
中,能够有效减小程序与 Redis
服务的通讯次数,缩短任务时间。
如今有一台机器的本地日志须要交给多个日志处理器进行不一样的分析。
这种场景相似群组,因此咱们能够复用上面提到的支持多个接受者的消息拉取组件。
本地机器:
日志处理器:
INCR
,表示当前日志处理器已完成处理本文首发于公众号:满赋诸机(点击查看原文) 开源在 GitHub :reading-notes/redis-in-action