这篇文章在medium上很火,做者以实际案例来分析,讲得很好。前端
咱们常常据说使用Go的goroutine和channel很容易实现高并发,那是否是所有代码都放在goroutine中运行就能够实现高并发程序了呢?很显然并非。这篇文章将教你们如何一步一步写出一个简单的, 高并发的Go程序。golang
我在几家不一样的公司从事反垃圾邮件,防病毒和反恶意软件的工做超过15年,如今我知道这些系统最终会由于咱们要天天处理大量数据而变得愈来愈复杂。web
目前,我是smsjunk.com
的CEO和 KnowBe4
的首席架构师,他们都是网络安全行业的公司。json
有趣的是,在过去的10年里,做为一名软件工程师,我参与过的全部Web后端开发大部分都是使用RubyonRails
完成的。不要误会个人意思,我喜欢 RubyonRails
,我相信这是一个了不得的生态,可是过了一段时间,你开始以 Ruby
的方式思考和设计系统,忘了如何高效和本来能够利用多线程、并行、快速执行和小的内存消耗来简化软件架构。多年来,我是一名C/C++
,Delphi
和 C#
开发人员,并且我刚开始意识到如何正确的使用工具进行工做可能会有多复杂。后端
我对互联网中那些语言和框架战争并不太感兴趣,好比哪门语言更好,哪一个框架更快。 我始终相信效率,生产力和代码可维护性主要取决于如何简单的构建解决方案。
安全
在处理咱们的匿名监测和分析系统时,咱们的目标是可以处理来自数百万端点的大量POST请求。Web处理程序将收到一个JSON文档,该文档可能包含须要写入 AmazonS3
的多个有效内容的集合,以便咱们的 map-reduce
系统稍后对这些数据进行操做。服务器
传统上,咱们会考虑建立一个工做层架构,利用诸如如下的技术栈:markdown
并搭建2个不一样的集群,一个用于web前端,一个用于worker,所以咱们能够随意扩容机器来处理即将到来的请求。网络
从一开始,咱们的团队就知道咱们能够在Go中这样作,由于在讨论阶段咱们看到这多是一个很是大流量的系统。我一直在使用Go,大约快2年时间了,并且咱们也使用Go开发了一些系统,可是没有一个系统的流量可以达到这个数量级。咱们首先建立了几个struct来定义咱们经过POST调用接收到的Web请求,并将其上传到S3存储中。多线程
type PayloadCollection struct { WindowsVersion string `json:"version"` Token string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct { // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } // Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var contentType = "application/octet-stream" return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{}) } 复制代码
最初,咱们对POST处理程序进行了很是简单粗暴的实现,将每一个请求直接放到新的goroutine中运行:
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) } 复制代码
对于通常的并发量,这实际上是可行的,但这很快就证实不能适用于高并发场景。咱们可能有更多的请求,当咱们将第一个版本部署到生产环境时,咱们开始看到的数量级并非如此,咱们低估了并发量。
上述的方法有几个问题。没有办法控制正在工做的goroutine的数量。并且,因为咱们每分钟有100万个POST请求,因此系统很快就崩溃了。
咱们须要找到另外一种的方法。从一开始咱们就开始讨论如何让请求处理程序的生命周期尽量的短,并在后台产生处理。固然,这是在 RubyonRails
必需要作的事情,不然,无论你是使用puma
,unicorn
仍是 passenger
,你的全部的可用的web worker都将阻塞。
那么咱们就须要利用常见的解决方案来完成这项工做,好比Resque
,Sidekiq
, SQS
等。固然还有其余工具,由于有不少方法能够实现。
所以,咱们第二次改进是建立一个buffer channel
,咱们能够将一些做业请求扔进队列并将它们上传到S3,因为咱们能够控制队列的最大长度,而且有足够的RAM来排队处理内存中的做业,所以咱们认为只要在通道队列中缓冲做业就好了。
var Queue chan Payload func init() { Queue = make(chan Payload, MAX_QUEUE) } func payloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { Queue <- payload } ... } 复制代码
而后,为了将任务从buffer channel
中取出并处理它们,咱们正在使用这样的方式:
func StartProcessor() { for { select { case job := <-Queue: job.payload.UploadToS3() // <-- STILL NOT GOOD } } } 复制代码
说实话,我不知道咱们在想什么,这确定是一个难熬的夜晚。这种方法并无给咱们带来什么提高,咱们用一个缓冲的队列替换了有缺陷的并发,也只是推迟了问题的产生时间而已。咱们的同步处理器每次只向S3上传一个有效载荷,因为传入请求的速率远远大于单个处理器上传到S3的能力,所以咱们的buffer channel
迅速达到极限,队列已经阻塞而且没法再往里边添加做业。
咱们只是简单的绕过了这个问题,最终致使咱们的系统彻底崩溃。在咱们部署这个有缺陷的版本后,咱们的延迟持续的升高。
咱们决定在Go channel上使用一个通用模式来建立一个 2-tier(双重)channel
系统,一个用来处理排队的job,一个用来控制有多少worker在 JobQueue
上并发工做。
这个想法是将上传到S3的并行速度提升到一个可持续的速度,同时不会形成机器瘫痪,也不会引起S3的链接错误。
因此咱们选择建立一个 Job/Worker
模式。对于那些熟悉Java,C#等的人来讲,能够将其视为Golang使用channel来实现WorkerThread-Pool
的方式。
var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") ) // Job represents the job to be run type Job struct { Payload Payload } // A buffered channel that we can send work requests on. var JobQueue chan Job // Worker represents the worker that executes the job type Worker struct { WorkerPool chan chan Job JobChannel chan Job quit chan bool } func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool)} } // Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // we have received a work request. if err := job.Payload.UploadToS3(); err != nil { log.Errorf("Error uploading to S3: %s", err.Error()) } case <-w.quit: // we have received a signal to stop return } } }() } // Stop signals the worker to stop listening for work requests. func (w Worker) Stop() { go func() { w.quit <- true }() } 复制代码
咱们修改了咱们的Web请求处理程序以建立具备有效负载的Job struct,并将其发送到 JobQueueChannel
以供worker处理。
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { // let's create a job with the payload work := Job{Payload: payload} // Push the work onto the queue. JobQueue <- work } w.WriteHeader(http.StatusOK) } 复制代码
在咱们的Web服务器初始化期间,咱们建立一个Dispatcher
并调用Run()
来建立worker池并开始监听JobQueue
中出现的Job。
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
复制代码
如下是咱们调度程序实现的代码:
type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher WorkerPool chan chan Job } func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool} } func (d *Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.pool) worker.Start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job) } } } 复制代码
请注意,咱们实例化了最大数量的worker,并将其保存到worker池中(就是上面的 WorkerPoolChannel
)。因为咱们已经将Amazon Elasticbeanstalk用于Docker化的Go项目,而且咱们始终尝试遵循12要素方法来配置生产中的系统,所以咱们从环境变量中读取这些值,这样咱们就能够快速调整这些值以控制工做队列的数量和最大规模,而不须要从新部署集群。
var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") ) 复制代码
在咱们发布了这个版本以后,咱们当即看到咱们的全部的请求延迟都降低到了一个很低的数字,咱们处理请求的效率大大提高。
在咱们的弹性负载均衡器彻底热身以后的几分钟,咱们看到咱们的ElasticBeanstalk应用程序每分钟提供近100万次请求。一般在早晨的几个小时里,流量高峰会超过每分钟100万个请求。
咱们部署了新的代码,服务器的数量从100台减小到大约20台。
在恰当地配置了集群和自动缩放设置之后,咱们在生成环境用4台EC2 c4就能完成工做了。若是CPU在连续5分钟内超过90%,弹性自动缩放系统就自动扩容一个新的实例。
简单老是个人制胜法宝。咱们能够设计一个拥有多队列,多后台进程和难以部署的复杂系统,可是相反咱们决定利用Elasticbeanstalk的自动缩放和高效简单的方式去并发,Go语言很好的提供了这些功能。
经验告诉咱们,用最合适的工具去完成工做。有时,当你的 RubyonRails
系统须要实现一个很是强大的处理程序时,能够考虑在 Ruby
生态系统以外寻找更简单且更强大的替代解决方案。
做者:MarcioCastilho