原文连接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/golang
直入本文要描述的问题:网站流量上来了,高并发负载是不可避免滴问题了,当服务端须要处理大量耗时的任务时,咱们通常都会考虑将耗时任务异步处理。那么若是使用Go如何实现?redis
传统上,咱们会考虑使用如下方法建立工做者层架构:json
golang的异步处理之携程:go func()能够带来了很大的方便,虽然协程相对于线程占用的系统资源更少,但这并不表明咱们能够无休止的建立协程。缓存
不停建立协程也有压垮系统的风险。然而绝大多数的时候,咱们不能简单粗暴的建立协程来处理异步任务,缘由是不可控。下面咱们引用原做者的demo,一个执行耗时任务的handler。安全
代码咱们只用看大体的实现流程原理,实现细节暂且不论。架构
package main import ( "bytes" "encoding/json" "fmt" "io" "net/http" "time" ) type PayloadCollection struct { WindowsVersion string `json:"version"` Token string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct { // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } // Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var contentType = "application/octet-stream" return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{}) } func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) }
对于适量的负载,这个方案应该没有问题。可是负载增长之后这个方法就不能很好地工做。当咱们把这个版本部署到生产环境中后,若是咱们遇到了比预期大一个数量级的请求量。并发
那么这个方法就有些不尽如人意了。它没法控制建立goroutine的数量。由于咱们每分钟收到了一百万个POST请求,上面的代码很快就奔溃了。app
这就是咱们遇到的第一个问题,简单粗暴起协程处理耗时任务致使的系统不可控性。咱们天然而然就会想,怎么能让系统更可控呢?异步
建立带缓冲的channel。这样咱们能够把工做任务放到队列里而后再上传到S3。由于能够控制队列的长度而且有充足的内存,我以为把工做任务缓存在channel队列里应该没有问题。函数
因此一个很天然的思路那就是:创建任务队列。golang提供了线程安全的任务队列实现方式:带缓冲的channal。可是这样只是延后了请求的爆发。
做者意识到,要解决这一问题,必须控制协程的数量。如何控制协程的数量?Job/Worker模式!这里我将做者的代码修改了一下,单文件可执行,以记录并理解这一模式。
package main import ( "fmt" "reflect" "time" ) var ( MaxWorker = 10 ) type Payload struct { Num int } //待执行的工做 type Job struct { Payload Payload } //任务channal var JobQueue chan Job //执行任务的工做者单元 type Worker struct { WorkerPool chan chan Job //工做者池--每一个元素是一个工做者的私有任务channal JobChannel chan Job //每一个工做者单元包含一个任务管道 用于获取任务 quit chan bool //退出信号 no int //编号 } //建立一个新工做者单元 func NewWorker(workerPool chan chan Job, no int) Worker { fmt.Println("建立一个新工做者单元") return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool), no: no, } } //循环 监放任务和结束信号 func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel fmt.Println("w.WorkerPool <- w.JobChannel", w) select { case job := <-w.JobChannel: fmt.Println("job := <-w.JobChannel") // 收到任务 fmt.Println(job) time.Sleep(100 * time.Second) case <-w.quit: // 收到退出信号 return } } }() } // 中止信号 func (w Worker) Stop() { go func() { w.quit <- true }() } //调度中心 type Dispatcher struct { //工做者池 WorkerPool chan chan Job //工做者数量 MaxWorkers int } //建立调度中心 func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers} } //工做者池的初始化 func (d *Dispatcher) Run() { // starting n number of workers for i := 1; i < d.MaxWorkers+1; i++ { worker := NewWorker(d.WorkerPool, i) worker.Start() } go d.dispatch() } //调度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { //等待空闲worker (任务多的时候会阻塞这里) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 将任务放到上述woker的私有任务channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } } func main() { JobQueue = make(chan Job, 10) dispatcher := NewDispatcher(MaxWorker) dispatcher.Run() time.Sleep(1 * time.Second) go addQueue() time.Sleep(1000 * time.Second) } func addQueue() { for i := 0; i < 20; i++ { // 新建一个任务 payLoad := Payload{Num: 1} work := Job{Payload: payLoad} // 任务放入任务队列channal JobQueue <- work fmt.Println("JobQueue <- work") time.Sleep(1 * time.Second) } } /* 一个任务的执行过程以下 JobQueue <- work 新任务入队 job := <-JobQueue: 调度中心收到任务 jobChannel := <-d.WorkerPool 从工做者池取到一个工做者 jobChannel <- job 任务给到工做者 job := <-w.JobChannel 工做者取出任务 {{1}} 执行任务 w.WorkerPool <- w.JobChannel 工做者在放回工做者池 */
这样,咱们已经可以主动的控制worker的数量。这时候,我问哈你们,咱们彻底解决问题了么?若是有大量的任务同时涌入,会发生什么样的结果。程序会阻塞等待可用的worker
jobChannel := <-d.WorkerPool
下面是咱们的dispatcher实现代码:
//调度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { //等待空闲worker (任务多的时候会阻塞这里) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 将任务放到上述woker的私有任务channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } }
这里咱们提供了建立worker的最大数目做为参数,并把这些worker加入到worker池里。不要忘记,这个调度方法也是在不断的建立协程等待空闲的worker。咱们再改一下代码以下:
package main import ( "fmt" "reflect" "runtime" "time" ) var ( MaxWorker = 10 ) type Payload struct { Num int } //待执行的工做 type Job struct { Payload Payload } //任务channal var JobQueue chan Job //执行任务的工做者单元 type Worker struct { WorkerPool chan chan Job //工做者池--每一个元素是一个工做者的私有任务channal JobChannel chan Job //每一个工做者单元包含一个任务管道 用于获取任务 quit chan bool //退出信号 no int //编号 } //建立一个新工做者单元 func NewWorker(workerPool chan chan Job, no int) Worker { fmt.Println("建立一个新工做者单元") return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool), no: no, } } //循环 监放任务和结束信号 func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel fmt.Println("w.WorkerPool <- w.JobChannel", w) select { case job := <-w.JobChannel: fmt.Println("job := <-w.JobChannel") // 收到任务 fmt.Println(job) time.Sleep(100 * time.Second) case <-w.quit: // 收到退出信号 return } } }() } // 中止信号 func (w Worker) Stop() { go func() { w.quit <- true }() } //调度中心 type Dispatcher struct { //工做者池 WorkerPool chan chan Job //工做者数量 MaxWorkers int } //建立调度中心 func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers} } //工做者池的初始化 func (d *Dispatcher) Run() { // starting n number of workers for i := 1; i < d.MaxWorkers+1; i++ { worker := NewWorker(d.WorkerPool, i) worker.Start() } go d.dispatch() } //调度 func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: fmt.Println("job := <-JobQueue:") go func(job Job) { fmt.Println("等待空闲worker (任务多的时候会阻塞这里") //等待空闲worker (任务多的时候会阻塞这里) jobChannel := <-d.WorkerPool fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel)) // 将任务放到上述woker的私有任务channal中 jobChannel <- job fmt.Println("jobChannel <- job") }(job) } } } func main() { JobQueue = make(chan Job, 10) dispatcher := NewDispatcher(MaxWorker) dispatcher.Run() time.Sleep(1 * time.Second) go addQueue() time.Sleep(1000 * time.Second) } func addQueue() { for i := 0; i < 100; i++ { // 新建一个任务 payLoad := Payload{Num: i} work := Job{Payload: payLoad} // 任务放入任务队列channal JobQueue <- work fmt.Println("JobQueue <- work", i) fmt.Println("当前协程数:", runtime.NumGoroutine()) time.Sleep(100 * time.Millisecond) } }
执行结果以下:
这里咱们发现,咱们依然没能控制住协程数量,咱们只是控制住了worker的数量。这种状况下,若是worker数量设置的合理且异步任务耗时不是特别长的状况下通常没有问题。可是出于安全的考虑,我要把这个阻塞的协程数作一个控制,若是达到限制时候拒绝服务以保护系统该怎么处理?
咱们能够控制并发执行(包括等待执行)的任务数。咱们加入任务使用以下判断。用一个带缓冲的Channel控制并发执行的任务数。
当任务异步处理完成的时候执行<- DispatchNumControl
释放控制便可。用这种方法,
咱们能够根据压测结果设置合适的并发数从而保证系统可以尽量的发挥本身的能力,同时保证不会由于任务量太大而崩溃(由于达到极限的时候,系统会告诉调用方:牛仔我很忙)。
好比定义一个limit函数读取是否存在发送的任务队列:
//用于控制并发处理的协程数 var DispatchNumControl = make(chan bool, 10000) func Limit(work Job) bool { select { case <-time.After(time.Millisecond * 100): fmt.println("牛仔我很忙") return false case DispatchNumControl <- true: // 任务放入任务队列channal jobChannel <- work return true } }
咱们本能够经过大量的队列,后台workers,复杂的调度来设计一套复杂的系统,协程是个好的设计,但任何东西都不能过分使用。
咱们作系统设计的时候,必定也要时刻想着控制:要对本身设计的系统有着足够的控制力。另外综合上面的实现。为何 dispatch 这里要用 协程 呢?阻塞彻底没问题? 欢迎广大博友拍砖留言。。。。