我在几家不一样的公司从事反垃圾邮件,防病毒和反恶意软件行业工做超过15年,如今我知道这些系统最终会由于咱们天天处理的大量数据而变得复杂。前端
目前,我是smsjunk.com的CEO和KnowBe4的首席架构师,他们都是网络安全行业的公司。golang
有趣的是,在过去的10年左右,做为一名软件工程师,我参与过的全部Web后端开发大部分都是在Ruby on Rails中完成的。不要误会个人意思,我喜欢Ruby on Rails,我相信这是一个了不得的环境,可是过了一段时间,你开始用ruby的方式思考和设计系统,并且若是你忘记了软件架构的效率和简单性-能够利用多线程,并行化,快速执行和小内存开销。多年来,我是一名C / C ++,Delphi和C#开发人员,并且我刚开始意识到使用正确的工具进行工做可能会有多复杂。web
我对互联网老是争论的语言和框架战争并不太感兴趣。我相信效率,生产力和代码可维护性主要取决于您构建解决方案的简单程度。
json
在处理咱们的匿名遥测和分析系统时,咱们的目标是可以处理来自数百万端点的大量POST请求。Web处理程序将收到一个JSON文档,该文档可能包含须要写入Amazon S3的多个有效内容的集合,以便咱们的map-reduce系统稍后对这些数据进行操做。后端
传统上,咱们会考虑建立一个工做层架构,利用诸如如下方面的内容:安全
并搭建2个不一样的集群,一个用于web前端,一个用于worker,所以咱们能够扩大咱们能够处理的后台工做量。ruby
可是从一开始,咱们的团队就知道咱们应该在Go中这样作,由于在讨论阶段咱们看到这多是一个很是大的交通系统。我一直在使用Go,大约快2年时间了,并且咱们这里开发了一些Go的系统,可是没有一个系统可以达到这个数量级。咱们首先建立了几个struct来定义咱们经过POST调用接收到的Web请求负载,并将其上传到S3存储中。bash
type PayloadCollection struct {
WindowsVersion string `json:"version"`
Token string `json:"token"`
Payloads []Payload `json:"data"`
}
type Payload struct {
// [redacted]
}
func (p *Payload) UploadToS3() error {
// the storageFolder method ensures that there are no name collision in
// case we get same timestamp in the key name
storage_path := fmt.Sprintf("%v/%v", p.storageFolder, time.Now().UnixNano())
bucket := S3Bucket
b := new(bytes.Buffer)
encodeErr := json.NewEncoder(b).Encode(payload)
if encodeErr != nil {
return encodeErr
}
// Everything we post to the S3 bucket should be marked 'private'
var acl = s3.Private
var contentType = "application/octet-stream"
return bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
复制代码
最初,咱们对POST处理程序进行了很是幼稚的实现,试图将做业处理并行化为一个简单的goroutine:服务器
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
go payload.UploadToS3() // <----- DON'T DO THIS } w.WriteHeader(http.StatusOK) } 复制代码
对于中等负载,这能够适用于大多数人,但这很快就证实不能很好地大规模工做。咱们期待着不少请求,可是在咱们将第一个版本部署到生产环境时,咱们开始看到的数量级并非如此。咱们忽视了流量。网络
上述的方法有几个问题。没有办法控制正在工做的go routine的数量。并且,因为咱们每分钟得到100万POST请求,因此系统很快崩溃了。
咱们须要找到一种不一样的方式。从一开始咱们就开始讨论如何保持请求处理程序的生命周期很是短,并在后台产生处理。固然,这就是Ruby on Rails必需要作的事情,不然,无论你是使用puma, unicorn仍是passenger,你的全部的可用的web worker都将阻塞。
那么咱们就须要利用常见的解决方案来完成这项工做,好比Resque,Sidekiq,SQS等。这个名单还在继续,由于有不少方法能够实现这一目标。
所以,第二次迭代是建立一个buffer channel,咱们能够将一些做业排队并将它们上传到S3,因为咱们能够控制队列中的最大物品数量,而且有足够的RAM来排队处理内存中的做业,所以咱们认为只要在通道队列中缓冲做业就好了。
var Queue chan Payload
func init() {
Queue = make(chan Payload, MAX_QUEUE)
}
func payloadHandler(w http.ResponseWriter, r *http.Request) {
...
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
Queue <- payload
}
...
}
复制代码
而后,为了将任务从buffer channel中取出并处理它们,咱们正在使用这样的方式:
func StartProcessor() {
for {
select {
case job := <-Queue:
job.payload.UploadToS3() // <-- STILL NOT GOOD
}
}
}
复制代码
说实话,我不知道咱们在想什么。这确定是一个全线飘红的深夜。这种方法并无给咱们带来什么,咱们用一个缓冲的队列交换了有缺陷的并发,这只是简单地推迟了这个问题。咱们的同步处理器每次只向S3上传一个有效载荷,因为传入请求的速率远远大于单个处理器上传到S3的能力,所以咱们的buffer channel迅速达到极限,并阻止了处理程序继续往里面添加更多的请求数据。
咱们只是避免了这个问题,并最终开始倒计时,直到咱们的系统死亡。在咱们部署这个有缺陷的版本后,咱们的延迟率以不变的速度持续增加。
咱们决定在Go channel上使用一个通用模式来建立一个双层channel系统,一个用来处理排队的job,一个用来控制有多少worker在JobQueue 上并发工做。
这个想法是将上传到S3的并行化速度提升到一个可持续的速度,不会形成机器瘫痪,也不会引起S3的链接错误。 因此咱们选择建立一个Job / Worker模式。对于那些熟悉Java,C#等的人来讲,能够将其视为Golang使用channel来实现Worker Thread-Pool的方式。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
// Job represents the job to be run
type Job struct {
Payload Payload
}
// A buffered channel that we can send work requests on.
var JobQueue chan Job
// Worker represents the worker that executes the job
type Worker struct {
WorkerPool chan chan Job
JobChannel chan Job
quit chan bool
}
func NewWorker(workerPool chan chan Job) Worker {
return Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan bool)}
}
// Start method starts the run loop for the worker, listening for a quit channel in
// case we need to stop it
func (w Worker) Start() {
go func() {
for {
// register the current worker into the worker queue.
w.WorkerPool <- w.JobChannel
select {
case job := <-w.JobChannel:
// we have received a work request.
if err := job.Payload.UploadToS3(); err != nil {
log.Errorf("Error uploading to S3: %s", err.Error())
}
case <-w.quit:
// we have received a signal to stop
return
}
}
}()
}
// Stop signals the worker to stop listening for work requests.
func (w Worker) Stop() {
go func() {
w.quit <- true
}()
}
复制代码
咱们修改了咱们的Web请求处理程序以建立具备有效负载的Job struct,并将其发送到JobQueue channel以供worker获取处理。
func payloadHandler(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// Read the body into a string for json decoding
var content = &PayloadCollection{}
err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
if err != nil {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusBadRequest)
return
}
// Go through each payload and queue items individually to be posted to S3
for _, payload := range content.Payloads {
// let's create a job with the payload work := Job{Payload: payload} // Push the work onto the queue. JobQueue <- work } w.WriteHeader(http.StatusOK) } 复制代码
在咱们的Web服务器初始化期间,咱们建立一个Dispatcher并调用Run()来建立worker池并开始监听JobQueue中出现的Job。
dispatcher := NewDispatcher(MaxWorker)
dispatcher.Run()
复制代码
如下是咱们调度程序实现的代码:
type Dispatcher struct {
// A pool of workers channels that are registered with the dispatcher
WorkerPool chan chan Job
}
func NewDispatcher(maxWorkers int) *Dispatcher {
pool := make(chan chan Job, maxWorkers)
return &Dispatcher{WorkerPool: pool}
}
func (d *Dispatcher) Run() {
// starting n number of workers
for i := 0; i < d.maxWorkers; i++ {
worker := NewWorker(d.pool)
worker.Start()
}
go d.dispatch()
}
func (d *Dispatcher) dispatch() {
for {
select {
case job := <-JobQueue:
// a job request has been received
go func(job Job) {
// try to obtain a worker job channel that is available.
// this will block until a worker is idle
jobChannel := <-d.WorkerPool
// dispatch the job to the worker job channel
jobChannel <- job
}(job)
}
}
}
复制代码
请注意,咱们实例化了最大数量的worker,并将其保存到worker池中(就是上面的WorkerPool channel)。因为咱们已经将Amazon Elasticbeanstalk用于Docker化的Go项目,而且咱们始终尝试遵循12因子方法来配置生产中的系统,所以咱们从环境变量中读取这些值。这样咱们就能够控制工做队列的数量和最大规模,因此咱们能够快速调整这些值,而不须要从新部署集群。
var (
MaxWorker = os.Getenv("MAX_WORKERS")
MaxQueue = os.Getenv("MAX_QUEUE")
)
复制代码
在咱们部署它以后,咱们当即看到咱们的全部延迟率都降低到微不足道的数字,咱们处理请求的能力急剧上升。
在咱们的弹性负载均衡器彻底热身以后的几分钟,咱们看到咱们的ElasticBeanstalk应用程序每分钟提供近100万次请求。咱们一般在早上的几个小时里,咱们的流量高达每分钟100多万。
只要咱们部署了新代码,服务器的数量就会从100台服务器大幅降低到大约20台服务器。
在咱们正确地配置了咱们的集群和自动缩放设置后,咱们能够将它下降到只有4x EC2 c4的配置。大型实例和Elastic Auto-Scaling设置为在CPU连续5分钟超过90%时产生一个新实例。
朴素老是在个人书中获胜。咱们能够设计一个拥有许多队列,后台工做人员和复杂部署的复杂系统,但咱们决定利用Elasticbeanstalk自动扩展的强大功能以及Golang为咱们提供开箱即用的并发性效率和简单方法。
并非天天都是只有4台机器的集群,这可能远不及我如今的MacBook Pro,它可以每分钟处理100w次的请求。
老是有适合指定需求的工具。有时,当您的Ruby on Rails系统须要一个很是强大的Web处理程序时,请考虑在Ruby生态系统以外寻找更简单但更强大的替代解决方案。
原文地址:https://medium.com/smsjunk/handling-1-million-requests-per-minute-with-golang-f70ac505fcaa