原文记录: http://www.codedata.cn/hackne...前端
我在反广告、杀病毒、检木马等行业的不一样软件公司里已经工做 15 年以上了,很是了解这类系统软件因天天处理海量数据而致使的复杂性。golang
目前我做为 smsjunk.com 的 CEO 和 KnowBe4 的主架构师,在这两个网络安全领域的公司里工做。 web
有趣的是,在过去的 10 年里,做为软件工程师,我接触到的 web 后端代码大可能是用 Ruby on Rails 开发的。请不要误会,我很喜欢 Ruby on Railds 框架,并且我认为它是一套使人称赞的框架,不过期间一长,你就会习惯于使用 ruby 语言的方式思考和设计系统,会忘记利用多线程,并行化,快速执行和小的内存消耗,软件架构本能够如此高效且简单。不少年来,我也是一个 C/C++,Delphi 以及 C# 的使用者,并且我开始认识到使用正确的工具能让事情变得更简单。docker
我对互联网上没完没了的语言框架之间的论战并不感冒。由于我相信解决方案的效能及代码可维护性主要倚仗于你的架构能作到多简单。json
在实现某个遥测分析系统时,咱们遇到一个实际问题,要处理来自数百万终端的 POST 请求。其中的 web 请求处理过程会接收到一个 JSON 文档,它包含一个由许多荷载数据组成的集合,咱们要把它写到 Amazon S3 存储中,以后咱们的 map-reduce 系统就能够对这些数据进行处理。swift
通常咱们会利用以下的组件去建立一个有后台工做层的架构,如:后端
而且创建两个不一样的服务集群,一个用做 web 前端接收数据,另外一个执行具体的工做,这样咱们就能动态调整后台处理工做的能力了。安全
不过从项目伊始,咱们的团队就认为应该用 Go 语言来实现这项工做,由于在讨论过程当中咱们发现这多是一个流量巨大的系统。我已经使用 Go 语言快两年了,并且咱们已经在工做中用它开发了一些系统,只是还没遇到过负载如此大的系统。ruby
咱们从定义一些 web 的 POST 请求载荷数据结构开始,还有一个用于上传到 S3 存储的方法。服务器
type PayloadCollection struct { WindowsVersion string `json:"version"` Token string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct { // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } // Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var contentType = "application/octet-stream" return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{}) }
起初咱们实现了一个很是简单的 POST 处理接口,尝试用一个简单的 goroutine 并行工做处理过程:
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) }
在普通负载的状况下,这段代码对于大多数人已经够用了,不过很快就被证实了不适合大流量的情形。当咱们把第一个版本的代码部署到生产环境后,才发现实际状况远远超出咱们的预期,系统流量比以前预计的大许多,咱们低估了数据负载量。
上面的处理方式从几个方面来看都有问题。咱们没法办法控制建立的 go routines 的数量。并且咱们每分钟收到一百万次的 POST 请求,代码必然很快就崩溃。
咱们须要寻找别的出路。从一开始,咱们就在讨论怎样保证请求处理时间较短,而后在后台进行工做处理。固然,在 Ruby on Rails 里必须这样作,不然你会阻塞掉全部的 web 处理进程,不管你是否使用了 puma,unicorn,passenger(咱们这里就不讨论 JRuby 了)。而后咱们可能会使用常见的解决方案,好比 Resque,Sidkiq,SQS,等等。有许多方法能够完成这个任务。
因此第二次迭代采用了缓冲通道( buffered channel ),咱们能够将一些工做先放入队列,再将它们上传至 S3,因为咱们可以控制队列的大小,并且有充足的内存可用,因此咱们觉得将任务缓冲到 channel 队列中就能够了。
var Queue chan Payload func init() { Queue = make(chan Payload, MAX_QUEUE) } func payloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { Queue <- payload } ... }
而后将任务从队列中取出再进行处理,咱们使用了相似下面的代码:
func StartProcessor() { for { select { case job := <-Queue: job.payload.UploadToS3() // <-- 仍然很差使! } } }
老实说,我都不知道当时咱们在想些什么。这必定是喝红牛熬夜致使的结果。这个方案没给咱们带来任何好处,咱们只是将一个有问题的并发过程替换为了一个缓冲队列,它只是将问题推后了而已。咱们的同步处理过程每次只将一份载荷数据上传到 S3,因为接受到请求的速率远大于单例程上传到 S3 的能力,咱们的缓冲队列很快就满了,致使请求处理过程阻塞,没法将更多的数据送入队列。
咱们傻乎乎地忽略了问题,最终开始了系统的死亡倒计时。在部署了这个问题版本以后几分钟里,系统的延迟以固定的速率不断增长。
咱们决定使用 Go 通道的一种经常使用模式构建一个两层的通道系统,一个通道用做任务队列,另外一个来控制处理任务时的并发量。
这个办法是想以一种可持续的速率、并发地上传数据至 S3 存储,这样既不会把机器跑挂掉也不会产生 S3 的链接错误。所以咱们选择使用了一种 Job/Worker 模式。若是你熟悉 Java,C# 等语言,能够认为这是使用通道以 Go 语言的方式实现了一个工做线程池。
var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") ) // Job represents the job to be run type Job struct { Payload Payload } // A buffered channel that we can send work requests on. var JobQueue chan Job // Worker represents the worker that executes the job type Worker struct { WorkerPool chan chan Job JobChannel chan Job quit chan bool } func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool)} } // Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // we have received a work request. if err := job.Payload.UploadToS3(); err != nil { log.Errorf("Error uploading to S3: %s", err.Error()) } case <-w.quit: // we have received a signal to stop return } } }() } // Stop signals the worker to stop listening for work requests. func (w Worker) Stop() { go func() { w.quit <- true }() }
咱们修改了 web 请求处理过程,使用数据载荷建立了一个 Job
实例,而后将其送入 JobQueue
通道中供工做例程使用。
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { // let's create a job with the payload work := Job{Payload: payload} // Push the work onto the queue. JobQueue <- work } w.WriteHeader(http.StatusOK) }
在 web 服务初始化的过程当中,咱们建立了一个 Dispatcher
实例,调用 Run()
方法建立了工做例程池,而且经过监听 JobQueue
获取工做任务。
dispatcher := NewDispatcher(MaxWorker) dispatcher.Run()
下面的代码是任务分派器的具体实现:
type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher WorkerPool chan chan Job } func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool} } func (d *Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.pool) worker.Start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job) } } }
注意咱们提供了一个最大数量的参数,用于控制工做池中初始的例程数量。由于这个项目使用了 Amazon Elasticbeanstalk 以及 docker 中的 Go 环境,因此咱们努力遵循 12-factor 的方法,从环境变量中读取配置值,便于在生产环境中进行系统配置。经过这种方式,咱们能够控制工做例程的数量和工做队列的长度,无需对集群进行从新部署,咱们就能快速调整参数值。
var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") )
在部署这份代码后,咱们发现系统的延迟马上大幅降低,而咱们处理请求的能力获得了巨大的提高。
在咱们的 Elastic Load Balancers 所有预热完成几分钟后,能够看到咱们的 ElasticBeanstalk 应用每分钟能够处理近一百万的请求,经常会在流量早高峰的时候突破每分钟一百万。
咱们刚把新代码部署上去,服务器数量就从 100 台服务器大幅降低到大约 20 台服务器。
在咱们调整集群配置和自动缩放配置后,咱们能将服务器的使用数量下降到四个 EC2 c4.Large 实例,再将 Elastic Auto-Scaling 设置为 CPU 使用率持续五分钟超 90% 的时候,增长一个实例。
在个人认知中,「简单化」才是常胜秘诀。咱们本可能设计一个更复杂的系统,拥有许多队列和后台工做例程,部署也更复杂。可是咱们最终利用了 Elasticbeanstalk 的自动缩放能力和 Go 语言为咱们带来的高效简单的并发解决方案。
并非天天都能发生这样的事情:一个只有四台机器集群处理着每分钟一百万的 POST 请求,把数据写入 Amazon S3 存储中,并且这些机器可能比我如今的 MacBook Pro 性能还差。
每件工做总会有更合适的工具。当你的 Ruby on Rails 系统须要强大的请求处理能力时,不妨尝试一下 ruby 生态圈外那些更加简单有效的解决方案。