原文:medium.com/smsjunk/han…c++
我在不一样公司从事反爬虫、反病毒、反恶意程序已经有15年了,我知道,因为天天须要处理和应对的大量数据,这些系统最终会所以变得十分复杂。golang
目前我是smsjunk.com的CEO以及KnowBe4的首席架构师,两家公司都是活跃与网络安全行业。web
有趣的是在过去10年做为一名软件工程师,几乎全部我参与的后端开发项目里面都是用Ruby on Rails来完成的。但是你不要误会,我热爱Ruby on Rails而且我认为它是一个很是出色的开发环境,但当你用ruby的思路在设计和开发系统一段时间之后,你每每会忘记,其实你还能够利用多线程,并行化,高速执行以及更小的内存开销来开发系统。我是一名c/c++,Delphi以及c#的开发人员已经不少年了,而后我开始慢慢意识到,使用合适的工具让系统变得更加简单明了才是一件正确的事情。docker
编程界对于编程语言以及框架的争论从未停歇,而我并不想参与到其中去。我相信效率高低,生产力大小以及代码的可维护性很大一部分取决于你所设计的架构是否足够简单。编程
当咱们开发一个匿名遥测以及数据分析系统的时候,其中一个需求是可以处理和应付百万数量级的POST请求,网络请求处理器会接收一个POST过来JSON,这个JSON里面会包含许多须要写入到Amazon S3的数据集合,以便咱们的map-reduce系统能够在后续来处理这些数据。json
通常状况下咱们会考虑构建一个worker分层的结构,而且利用一些中间件,例如:c#
而后设立两个不一样的集群,一个是给web客户端,另外一个是给worker,而后咱们能够将worker扩容到咱们处理业务时所须要的数量。后端
但在最开始的时候,咱们的团队就意识到能够用Go来实现全部这些,由于在讨论期间咱们认为这将会是一个很是高访问量的系统。我利用Go来开发也已经有两年了,用它来开发过一些系统,可是负载规模远没有这次的需求这么大。安全
咱们先定义一些struct来规定咱们POST接收的请求体,以及定义一个上传到S3 bucket的方法UploadToS3
ruby
type PayloadCollection struct { WindowsVersion string `json:"version"` Token string `json:"token"` Payloads []Payload `json:"data"` } type Payload struct { // [redacted] } func (p *Payload) UploadToS3() error { // the storageFolder method ensures that there are no name collision in // case we get same timestamp in the key name storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano()) bucket := S3Bucket b := new(bytes.Buffer) encodeErr := json.NewEncoder(b).Encode(payload) if encodeErr != nil { return encodeErr } // Everything we post to the S3 bucket should be marked 'private' var acl = s3.Private var contentType = "application/octet-stream" return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{}) } 复制代码
最开始的时候咱们很是天真地实现一个POST的钩子方法以下,只是简单地将每一个请求体的上传动做放到Go rutinues中让他们并行执行:
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) } 复制代码
在中等规模的负载状况下,这种方法对大部分人都是没有问题的,但在应对更大规模的请求量时候,咱们很快就招架不住了。当咱们把这个版本的代码部署到生产环境之后,咱们期待能有大量的请求进来但实际还不能达到百万级别的数量级。咱们彻底低估了这个系统要处理的流量数。
但无论怎么说上面的方法都是欠妥的。由于它没有任何方法让咱们去控制Go runtinues启动的数量。因此当咱们的系统在面对每分钟百万级POST请求的时候很快就垮掉了。
咱们须要找到另外的方法。在一开始咱们就在讨论如何让咱们的请求处理程序的生命周期尽量地缩短以及上传到S3的操做能在后台或者异步运行。固然,在Ruby on Rails里面你必须这么作,不然你将会阻塞到全部其余的网络请求处理程序。不管您使用的是美洲狮,独角兽仍是过路人(请不要参与JRuby讨论)。而后咱们想到使用消息队列这种比较常见的方法来处理来达到咱们的目的,例如Resque, Sidekiq, SQS等等,还有数不清的工具由于实在有太多方法来实现这个功能。
因此在第二次迭代的时候,咱们须要建立一个缓冲队列,咱们会将任务放入队列里面而后再一个个地上传到S3上,但因为咱们但愿达到可以控制这个队列的最大容量的目的,而且咱们有足够的RAM来容许咱们将请求体储存到内存当中,因此咱们认为直接使用了Go提供的channel,而后将咱们的请求直接入队到channel中处理就能够了。
var Queue chan Payload func init() { Queue = make(chan Payload, MAX_QUEUE) } func payloadHandler(w http.ResponseWriter, r *http.Request) { ... // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { Queue <- payload } ... } 复制代码
咱们会从channel中获取任务而且执行他们的上传操做
func StartProcessor() { for { select { case job := <-Queue: job.payload.UploadToS3() // <-- STILL NOT GOOD } } } 复制代码
但说句老实话,我并不知道这是在干吗。确定是由于那时已经太晚还有咱们已经喝了太多的红牛。😌😌
这个改动并无让咱们的困境获得任何改善,咱们将并发任务放到了队列中执行仅仅是看上去好像解决了问题。可是咱们的异步程序一次只会上传一个请求体到S3上面,可是咱们的请求数此时远远大于咱们上传到S3的数量,可想而知咱们的缓冲队列很快就到达了他的极限爆满了,而后它阻挡了其余网络请求的入队操做。
至关于咱们仅仅回避了问题,而且让咱们的系统的崩溃时间进入了倒数。咱们这个缺陷的版本发布之后,整个系统的延迟率在持续性地每分钟在上涨。
咱们决定采用协同的方式来改进咱们的Go channel,经过创建一个带有2个的channel处理系统,一个用于将请求体入队,另外一个是负责控制worker
在JobQueue
中并发运行时的数量。
这个想法的核心是以一个相对稳定的频率去并行上传数据到S3,这样的话既不会把咱们的服务器弄垮,也不会由于链接过多形成不少S3的链接错误。因此咱们开始着手于Job/Worker模式。这个对于熟悉Java,c#开发 来讲并不陌生,你能够理解为这是Go利用channel来实现worker线程池的方法。
var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") ) // Job represents the job to be run type Job struct { Payload Payload } // A buffered channel that we can send work requests on. var JobQueue chan Job // Worker represents the worker that executes the job type Worker struct { WorkerPool chan chan Job JobChannel chan Job quit chan bool } func NewWorker(workerPool chan chan Job) Worker { return Worker{ WorkerPool: workerPool, JobChannel: make(chan Job), quit: make(chan bool)} } // Start method starts the run loop for the worker, listening for a quit channel in // case we need to stop it func (w Worker) Start() { go func() { for { // register the current worker into the worker queue. w.WorkerPool <- w.JobChannel select { case job := <-w.JobChannel: // we have received a work request. if err := job.Payload.UploadToS3(); err != nil { log.Errorf("Error uploading to S3: %s", err.Error()) } case <-w.quit: // we have received a signal to stop return } } }() } // Stop signals the worker to stop listening for work requests. func (w Worker) Stop() { go func() { w.quit <- true }() } 复制代码
接下来修改咱们网络请求的钩子函数,负责建立一个Job的结构体的实例而后将其放入JobQueue channel中等待worker来获取执行。
func payloadHandler(w http.ResponseWriter, r *http.Request) { if r.Method != "POST" { w.WriteHeader(http.StatusMethodNotAllowed) return } // Read the body into a string for json decoding var content = &PayloadCollection{} err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content) if err != nil { w.Header().Set("Content-Type", "application/json; charset=UTF-8") w.WriteHeader(http.StatusBadRequest) return } // Go through each payload and queue items individually to be posted to S3 for _, payload := range content.Payloads { // let's create a job with the payload work := Job{Payload: payload} // Push the work onto the queue. JobQueue <- work } w.WriteHeader(http.StatusOK) } 复制代码
在咱们网络服务初始化的时候建立一个Dispather
而且调用Run()
建立一个装有必定数量worker的线程池,用来接收和处理来自JobQueue
的Job
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
复制代码
下面是咱们Dispather
的实现
type Dispatcher struct { // A pool of workers channels that are registered with the dispatcher WorkerPool chan chan Job } func NewDispatcher(maxWorkers int) *Dispatcher { pool := make(chan chan Job, maxWorkers) return &Dispatcher{WorkerPool: pool} } func (d *Dispatcher) Run() { // starting n number of workers for i := 0; i < d.maxWorkers; i++ { worker := NewWorker(d.pool) worker.Start() } go d.dispatch() } func (d *Dispatcher) dispatch() { for { select { case job := <-JobQueue: // a job request has been received go func(job Job) { // try to obtain a worker job channel that is available. // this will block until a worker is idle jobChannel := <-d.WorkerPool // dispatch the job to the worker job channel jobChannel <- job }(job) } } } 复制代码
注意咱们限制了worker在线程池的最大数量。咱们的应用运行在一个docker化的Go环境中,部署在Amazon的Elasticbeanstalk上,而且尽可能遵循12要素原则来配置咱们的生产环境,在环境变量中获取对应的参数值,这样咱们就能够控制worker的数量以及JobQueue
的最大容量经过直接修改对应的值而不须要从新去部署咱们的应用。
var ( MaxWorker = os.Getenv("MAX_WORKERS") MaxQueue = os.Getenv("MAX_QUEUE") ) 复制代码
当咱们将这个版本发布到生产环境之后咱们的延迟率立刻有明显的降低,咱们处理请求的能力有一个质的飞跃。
并且当咱们发布完新代码之后服务器的数量就从100台降低到并稳定在了20台。
当给集群加上合适的配置以及设置自动伸缩之后,甚至能够降到仅仅用4台c4.Large的EC2实例来处理平常业务。而且集群会自动增长新的实例当CPU使用率持续5分钟达到90%时。
简洁化设计永远是我所追求的东西。咱们能够设计一个复杂的系统用不少的队列,后台运行worker,复杂的部署等等,但取而代之咱们决定利用Elasticbeanstalk强大的自动伸缩功能以及Go所提供开箱即用的并发特性。
总会有一个工具适合你的工做,在有的时候当你Ruby on Rails系统须要一个强大的网络请求处理功能的时候,能够试着考虑一下除了ruby生态圈之外的更增强大和简洁的替代方案。
若是你能关注一下个人Twittwer而且分享给身边的朋友的话,我会很是感谢的!个人Twitter是 twitter.com/mcastilho