做者:林冠宏 / 指尖下的幽灵git
博客:http://www.cnblogs.com/linguanh/github
GitHub : https://github.com/af913337456/redis
目前的开发工做主要是将
传统电商应用
和区块链技术
相结合,区块链平台依然是以太坊
,此外地,这几天由我编写,经清华大学出版社出版的书籍,历经八月,终于出版上架了,名称是:《区块链以太坊DApp开发实战》
,现已能够网购。数据库
本文所要分享的思路就是电商应用中经常使用的
订单队列
。json
电商应用中,简单直观的用户从下单到付款,最终完成整个流程的步骤能够用下图表示:后端
其中,订单信息持久化
,就是存储数据到数据库中。而最终客户端完成支付后的更新订单状态
的操做是由第三方支付平台进行回调设置好的回调连接 NotifyUrl
,来进行的。缓存
补全订单状态的更新流程,以下图表示:服务器
服务端的直接瓶颈点
,首先要考虑 TPS
。去除细分点,咱们主要看订单信息持久化
瓶颈点。网络
在高并发业务场景中,例如 秒杀
、优惠价抢购
等。短期内的下单请求数会不少,若是订单信息持久化
部分,不作优化,而是直接对数据库层进行频繁的
读写操做,数据库会承受不了,容易成为第一个垮掉的服务,好比下图的所示的常规写单流程:架构
能够看到,每
持久化一个订单信息,通常要经历网络链接操做(连接数据库),以及多个 I/O
操做。
得益于链接池
技术,咱们能够在连接数据库的时候,不用每次都从新发起一次完整的HTTP请求,而能够直接从池中获取已打开了的链接句柄,而直接使用,这点和线程池的原理差很少。
此外,咱们还能够在上面的流程中加入更多的优化,例如对于一些须要读取的信息,能够事先存置到内存缓存层,并加于更新维护,这样在使用的时候,能够快速读取。
即便咱们都具有了上述的一些优化手段,可是对于写操做
的I/O
阻塞耗时,在高并发请求
的时候,依然容易致使数据库承受不住,容易出现连接多开异常
,操做超时
等问题。
在该层进行优化的操做,除了上面谈到的以外,还有下面一些手段:
每种方式有各自的特色,由于本文谈的是订单队列
的架构思想,因此下面咱们来看下如何在订单系统中引入订单队列。
网上有很多文章谈到订单队列的作法,大部分都漏了说明请求与响应的一致性问题。
第一种订单队列
流程图:上图是大多文章提到的队列模型,有两个没有解析的问题:
notifyUrl
,而此时 ② 还在排队等待处理,这种状况又如何处理。首先,要确定的是,上面的订单流程图是没有问题的。它有下面的优缺点,所提到的两个问题也是有解决方案的。
优势:
搭配中间件
的组合性强。缺点:
上面谈及的问题点,我后面都会给出解决方案。下面咱们来看下另一种订单队列流程图。
第二种订单队列
流程图:第二种订单队列的设计模型,注意它的同步等待
持久化处理的结果,解决了持久化与响应的一致性问题,可是有个严重的耗时等待问题,它的优缺点以下:
优势:
缺点:
这类订单队列,我下面会放出 Golang
实现的版本代码。
对比上面两种常见的订单模型,若是从用户体验的角度
去优先考虑,第一种不须要用户等待持久化处理
结果的是明显优于第二种的。若是技术团队完善,且技术过硬,也应该考虑第一种的实现方式。
若是仅仅想要达到宁愿用户等待到超时
也不肯意存储层服务被冲垮,那么有限考虑第二种。
在这里,咱们进一步细分一下,实现队列模块的功能有哪些选择。
相信不少后端开发经验比较老道的同志已经想到了,使用现有的中间件,好比知名的 Redis
、RocketMQ
,以及 Kafka
等,它们都是一种选择。
此外地,咱们还能够直接编写代码,在当前的服务系统中实现一个消息队列来达到目的,下面我用图来分类下队列类型。
不一样的队列实现方式,能直接致使不一样的功能,也有不一样的优缺点:
一级缓存优势:
一级缓存缺点:
中间件的优势:
增量
持久化,能最大程度减小因不可预料的崩溃致使订单信息丢失;中间件的缺点:
回到第一种订单模型中:
问题1:
若是订单存在第三方支付状况,① 和 ② 的一致性如何保证?
首先咱们看下,不一致性的时候,会产生什么结果:
响应页面
完成了支付动做,用户查看订单信息为空白。上述的状况,明显地,只有 3 是须要恢复订单信息的,应对的方案有:
表A
。同时启动一个定时任务B
专门遍历表A,而后去订单列表寻找是否已经有了对应的订单信息,有则更新,没则继续,或跟随制定的检测策略走。非崩溃性缘由
而致使失败时:
队列头部
,等待下一次的从新持久化处理。崩溃性
缘由而致使失败时:
定时任务B
在进行了屡次检测无果后,那么根据第三方支付平台在回调时候传递过来的订单附属信息
对订单进行恢复。定时任务B
所在服务最好
和回调连接 notifyUrl
所在的接口服务一致,这样能保证当 B 挂掉的时候,回调服务也跟随挂掉,而后第三方支付平台在调用回调失败的状况下,他们会有重试逻辑
,依赖这个,在回调服务重启时,能够完成订单信息恢复。问题2:
若是订单存在第三方支付状况,① 完成了支付,且三方支付平台回调了 notifyUrl,而此时 ② 还在排队等待处理,这种状况又如何处理?
应对的方案参考 问题1
的 定时任务B
检测修改机制。
定义一些常量
const ( QueueOrderKey = "order_queue" QueueBufferSize = 1024 // 请求队列大小 QueueHandleTime = time.Second * 7 // 单个 mission 超时时间 )
定义出入队接口,方便多种实现
// 定义出入队接口,方便多种实现 type IQueue interface { Push(key string,data []byte) error Pop(key string) ([]byte,error) }
定义请求与响应实体
// 定义请求与响应实体 type QueueTimeoutResp struct { Timeout bool // 超时标志位 Response chan interface{} } type QueueRequest struct { ReqId string `json:"req_id"` // 单次请求 id Order *model.OrderCombine `json:"order"` // 订单信息 bean AccessTime int64 `json:"access_time"` // 请求时间 ResponseChan *QueueTimeoutResp `json:"-"` }
定义队列实体
// 定义队列实体 type Queue struct { mapLock sync.Mutex RequestChan chan *QueueRequest // 缓存管道,装载请求 RequestMap map[string]*QueueTimeoutResp Queue IQueue }
实例化队列,接收接口参数
// 实例化队列,接收接口参数 func NewQueue(queue IQueue) *Queue { return &Queue{ mapLock: sync.Mutex{}, RequestChan: make(chan *QueueRequest, QueueBufferSize), RequestMap: make(map[string]*QueueTimeoutResp, QueueBufferSize), Queue: queue, } }
接收请求
// 接收请求 func (q *Queue) AcceptRequest(req *QueueRequest) interface{} { if req.ResponseChan == nil { req.ResponseChan = &QueueTimeoutResp{ Timeout: false, Response: make(chan interface{},1), } } userKey := key(req) // 惟一 key 生成函数 req.ReqId = userKey q.mapLock.Lock() q.RequestMap[userKey] = req.ResponseChan // 内存层存储对应的 req 的 resp 管道指针 q.mapLock.Unlock() q.RequestChan <- req // 接收请求 log("userKey : ", userKey) ticker := time.NewTicker(QueueHandleTime) // 以超时时间 QueueHandleTime 启动一个定时器 defer func() { ticker.Stop() // 释放定时器 q.mapLock.Lock() delete(q.RequestMap,userKey) // 当处理完一个 req,从 map 中移出 q.mapLock.Unlock() }() select { case <-ticker.C: // 超时 req.ResponseChan.Timeout = true Queue_TimeoutCounter++ // 辅助计数,int 类型 log("timeout: ",userKey) return lghError.HandleTimeOut // 返回超时错误的信息 case result := <-req.ResponseChan.Response: // req 被完整处理 return result } }
从请求管道中取出 req 放入到队列容器中,该函数在 gorutine
中运行
// 从请求管道中取出 req 放入到队列容器中,该函数在 gorutine 中运行 func (q *Queue) addToQueue() { for { req := <-q.RequestChan // 取出一个 req data, err := json.Marshal(req) if err != nil { log("redis queue parse req failed : ", err.Error()) continue } if err = q.Queue.Push(QueueOrderKey, data);err != nil { // push 入队,这里有时间消耗 log("lpush req failed. Error : ", err.Error()) continue } log("lpush req success. req time: ", req.AccessTime) } }
取出 req 处理,该函数在 gorutine
中运行
// 取出 req 处理,该函数在 gorutine 中运行 func (q *Queue) readFromQueue() { for { data, err := q.Queue.Pop(QueueOrderKey) // pop 出队,这里也有时间消耗 if err != nil { log("lpop failed. Error : ", err.Error()) continue } if data == nil || len(data) == 0 { time.Sleep(time.Millisecond * 100) // 空数据的 req,停顿下再取 continue } req := &QueueRequest{} if err = json.Unmarshal(data, req);err != nil { log("Lpop: json.Unmarshal failed. Error : ", err.Error()) continue } userKey := req.ReqId q.mapLock.Lock() resultChan, ok := q.RequestMap[userKey] // 取出对应的 resp 管道指针 q.mapLock.Unlock() if !ok { // 中间件重启时,好比 redis 重启而读取旧 key,会进入这里 Queue_KeyNotFound ++ // 计数 int 类型 log("key not found, rollback: ", userKey) continue } simulationTimeOutReq4(req) // 模拟出来任务的函数,入参为 req if resultChan.Timeout { // 处理期间,已经超时,这里作能够拓展回滚操做 Queue_MissionTimeout ++ log("handle mission timeout: ", userKey) continue } log("request result send to chan succeee, userKey : ", userKey) ret := util.GetCommonSuccess(req.AccessTime) resultChan.Response <- &ret // 输入处理成功 } }
启动
func (q *Queue) Start() { go q.addToQueue() go q.readFromQueue() }
运行例子
func test(){ ... runtime.GOMAXPROCS(4) redisQueue := NewQueue(NewFastCacheQueue()) redisQueue.Start() reqNumber := testReqNumber wg := sync.WaitGroup{} wg.Add(reqNumber) for i :=0;i<reqNumber;i++ { go func(index int) { combine := model.OrderCombine{} ret := AcceptRequest(&QueueRequest{ UserId: int64(index), Order: &combine, AccessTime: time.Now().Unix(), ResponseChan: nil, }) fmt.Println("ret: ------------- ",ret.String()) wg.Done() }(i) } wg.Wait() time.Sleep(3*time.Second) fmt.Println("TimeoutCounter: ",Queue_TimeoutCounter,"KeyNotFound: ",Queue_KeyNotFound,"MissionTimeout: ",Queue_MissionTimeout) }