性能优化实战:百万级WebSockets和Go语言

翻译原文连接 转帖/转载请注明出处html

原文连接@medium.com 发表于2017/08/03nginx

你们好!个人名字叫Sergey Kamardin。我是来自Mail.Ru的一名工程师。这篇文章将讲述咱们是如何用Go语言开发一个高负荷的WebSocket服务。即便你对WebSockets熟悉但对Go语言知之甚少,我仍是但愿这篇文章里讲到的性能优化的思路和技术对你有所启发。git

1. 介绍

做为全文的铺垫,我想先讲一下咱们为何要开发这个服务。github

Mail.Ru有许多包含状态的系统。用户的电子邮件存储是其中之一。有不少办法来跟踪这些状态的改变。不外乎经过按期的轮询或者系统通知来获得状态的变化。这两种方法都有它们的优缺点。对邮件这个产品来讲,让用户尽快收到新的邮件是一个考量指标。邮件的轮询会产生大概每秒5万个HTTP请求,其中60%的请求会返回304状态(表示邮箱没有变化)。所以,为了减小服务器的负荷并加速邮件的接收,咱们决定重写一个publisher-subscriber服务(这个服务一般也会称做bus,message broker或者event-channel)。这个服务负责接收状态更新的通知,而后还处理对这些更新的订阅。golang

重写publisher-subscriber服务以前:web

0_pull.png

如今:浏览器

1_push.png

上面第一个图为旧的架构。浏览器(Browser)会按期轮询API服务来得到邮件存储服务(Storage)的更新。缓存

第二张图展现的是新的架构。浏览器(Browser)和通知API服务(notificcation API)创建一个WebSocket链接。通知API服务会发送相关的订阅到Bus服务上。当收到新的电子邮件时,存储服务(Storage)向Bus(1)发送一个通知,Bus又将通知发送给相应的订阅者(2)。API服务为收到的通知找到相应的链接,而后把通知推送到用户的浏览器(3)。性能优化

咱们今天就来讨论一下这个API服务(也能够叫作WebSocket服务)。在开始以前,我想提一下这个在线服务处理将近3百万个链接。服务器

2. 惯用的作法(The idiomatic way)

首先,咱们看一下不作任何优化会如何用Go来实现这个服务的部分功能。在使用net/http实现具体功能前,让咱们先讨论下咱们将如何发送和接收数据。这些数据是定义在WebSocket协议之上的(例如JSON对象)。咱们在下文中会成他们为packet。

咱们先来实现Channel结构。它包含相应的逻辑来经过WebScoket链接发送和接收packet。

2.1. Channel结构

// Packet represents application level data.
type Packet struct {
    ...
}

// Channel wraps user connection.
type Channel struct {
    conn net.Conn    // WebSocket connection.
    send chan Packet // Outgoing packets queue.
}

func NewChannel(conn net.Conn) *Channel {
    c := &Channel{
        conn: conn,
        send: make(chan Packet, N),
    }

    go c.reader()
    go c.writer()

    return c
}

这里我要强调的是读和写这两个goroutines。每一个goroutine都须要各自的内存栈。栈的初始大小由操做系统和Go的版本决定,一般在2KB到8KB之间。咱们以前提到有3百万个在线链接,若是每一个goroutine栈须要4KB的话,全部链接就须要24GB的内存。这还没算上给Channel结构,发送packet用的ch.send和其它一些内部字段分配的内存空间。

2.2. I/O goroutines

接下来看一下“reader”的实现:

func (c *Channel) reader() {
    // We make a buffered read to reduce read syscalls.
    buf := bufio.NewReader(c.conn)

    for {
        pkt, _ := readPacket(buf)
        c.handle(pkt)
    }
}

这里咱们使用了bufio.Reader。每次都会在buf大小容许的范围内尽可能读取多的字节,从而减小read()系统调用的次数。在无限循环中,咱们指望会接收到新的数据。请记住以前这句话:指望接收到新的数据。咱们以后会讨论到这一点。

咱们把packet的解析和处理逻辑都忽略掉了,由于它们和咱们要讨论的优化不相关。不过buf值得咱们的关注:它的缺省大小是4KB。这意味着全部链接将消耗掉额外的12 GB内存。“writer”也是相似的状况:

func (c *Channel) writer() {
    // We make buffered write to reduce write syscalls.
    buf := bufio.NewWriter(c.conn)

    for pkt := range c.send {
        _ := writePacket(buf, pkt)
        buf.Flush()
    }
}

咱们在待发送packet的c.send channel上循环将packet写到缓存(buffer)里。细心的读者确定已经发现,这又是额外的4KB内存。3百万个链接会占用12GB的内存。

2.3. HTTP

咱们已经有了一个简单的Channel实现。如今咱们须要一个WebSocket链接。由于还在一般作法(Idiomatic Way)的标题下,那么就先来看看一般是如何实现的。

注:若是你不知道WebSocket是怎么工做的,那么这里值得一提的是客户端是经过一个叫升级(Upgrade)请求的特殊HTTP机制来创建WebSocket的。在成功处理升级请求之后,服务端和客户端使用TCP链接来交换二进制的WebSocket帧(frames)。这里有关于帧结构的描述。

import (
    "net/http"
    "some/websocket"
)

http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
    conn, _ := websocket.Upgrade(r, w)
    ch := NewChannel(conn)
    //...
})

请注意这里的http.ResponseWriter结构包含bufio.Readerbufio.Writer(各自分别包含4KB的缓存)。它们用于\*http.Request初始化和返回结果。

无论是哪一个WebSocket,在成功回应一个升级请求以后,服务端在调用responseWriter.Hijack()以后会接收到一个I/O缓存和对应的TCP链接。

注:有时候咱们能够经过net/http.putBufio{Reader,Writer}调用把缓存释放回net/http里的sync.Pool

这样,这3百万个链接又须要额外的24 GB内存。

因此,为了这个什么都不干的程序,咱们已经占用了72 GB的内存!

3. 优化

咱们来回顾一下前面介绍的用户链接的工做流程。在创建WebSocket以后,客户端会发送请求订阅相关事件(咱们这里忽略相似ping/pong的请求)。接下来,在整个链接的生命周期里,客户端可能就不会发送任何其它数据了。

链接的生命周期可能会持续几秒钟到几天。

因此在大部分时间里,Channel.reader()Channel.writer()都在等待接收和发送数据。与它们一块儿等待的是各自分配的4 KB的I/O缓存。

如今,咱们发现有些地方是能够作进一步优化的,对吧?

3.1. Netpoll

你还记得Channel.reader()的实现使用了bufio.Reader.Read()吗?bufio.Reader.Read()又会调用conn.Read()。这个调用会被阻塞以等待接收链接上的新数据。若是链接上有新的数据,Go的运行环境(runtime)就会唤醒相应的goroutine让它去读取下一个packet。以后,goroutine会被再次阻塞来等待新的数据。咱们来研究下Go的运行环境是怎么知道goroutine须要被唤醒的。

若是咱们看一下conn.Read()的实现,就会看到它调用了net.netFD.Read()

// net/fd_unix.go

func (fd *netFD) Read(p []byte) (n int, err error) {
    //...
    for {
        n, err = syscall.Read(fd.sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN {
                if err = fd.pd.waitRead(); err == nil {
                    continue
                }
            }
        }
        //...
        break
    }
    //...
}

Go使用了sockets的非阻塞模式。EAGAIN表示socket里没有数据了但不会阻塞在空的socket上,OS会把控制权返回给用户进程。

这里它首先对链接文件描述符进行read()系统调用。若是read()返回的是EAGAIN错误,运行环境就是调用pollDesc.waitRead()

// net/fd_poll_runtime.go

func (pd *pollDesc) waitRead() error {
   return pd.wait('r')
}

func (pd *pollDesc) wait(mode int) error {
   res := runtime_pollWait(pd.runtimeCtx, mode)
   //...
}

若是继续深挖,咱们能够看到netpoll的实如今Linux里用的是epoll而在BSD里用的是kqueue。咱们的这些链接为何不采用相似的方式呢?只有在socket上有可读数据时,才分配缓存空间并启用读数据的goroutine。

在github.com/golang/go上,有一个关于开放(exporting)netpoll函数的问题

3.2. 干掉goroutines

假设咱们用Go语言实现了netpoll。咱们如今能够避免建立Channel.reader()的goroutine,取而代之的是从订阅链接里收到新数据的事件。

ch := NewChannel(conn)

// Make conn to be observed by netpoll instance.
poller.Start(conn, netpoll.EventRead, func() {
    // We spawn goroutine here to prevent poller wait loop
    // to become locked during receiving packet from ch.
    go ch.Receive()
})

// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {
    buf := bufio.NewReader(ch.conn)
    pkt := readPacket(buf)
    c.handle(pkt)
}

Channel.writer()相对容易一点,由于咱们只需在发送packet的时候建立goroutine并分配缓存。

func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        go ch.writer()
    }
    ch.send <- p
}

注意,这里咱们没有处理write()系统调用时返回的EAGAIN。咱们依赖Go运行环境去处理它。这种状况不多发生。若是须要的话咱们仍是能够像以前那样来处理。

ch.send读取待发送的packets以后,ch.writer()会完成它的操做,最后释放goroutine的栈和用于发送的缓存。

很不错!经过避免这两个连续运行的goroutine所占用的I/O缓存和栈内存,咱们已经节省了48 GB

3.3. 控制资源

大量的链接不只仅会形成大量的内存消耗。在开发服务端的时候,咱们还不停地遇到竞争条件(race conditions)和死锁(deadlocks)。随之而来的是所谓的自我分布式阻断攻击(self-DDOS)。在这种状况下,客户端会悍然地尝试从新链接服务端而把状况搞得更加糟糕。

举个例子,若是由于某种缘由咱们忽然没法处理ping/pong消息,这些空闲链接就会不断地被关闭(它们会觉得这些链接已经无效所以不会收到数据)。而后客户端每N秒就会觉得失去了链接并尝试从新创建链接,而不是继续等待服务端发来的消息。

在这种状况下,比较好的办法是让负载太重的服务端中止接受新的链接,这样负载均衡器(例如nginx)就能够把请求转到其它的服务端上去。

撇开服务端的负载不说,若是全部的客户端忽然(极可能是由于某个bug)向服务端发送一个packet,咱们以前节省的48 GB内存又将会被消耗掉。由于这时咱们又会和开始同样给每一个链接建立goroutine并分配缓存。

Goroutine池

能够用一个goroutine池来限制同时处理packets的数目。下面的代码是一个简单的实现:

package gopool

func New(size int) *Pool {
    return &Pool{
        work: make(chan func()),
        sem:  make(chan struct{}, size),
    }
}

func (p *Pool) Schedule(task func()) error {
    select {
    case p.work <- task:
    case p.sem <- struct{}{}:
        go p.worker(task)
    }
}

func (p *Pool) worker(task func()) {
    defer func() { <-p.sem }
    for {
        task()
        task = <-p.work
    }
}

咱们使用netpoll的代码就变成下面这样:

pool := gopool.New(128)

poller.Start(conn, netpoll.EventRead, func() {
    // We will block poller wait loop when
    // all pool workers are busy.
    pool.Schedule(func() {
        ch.Receive()
    })
})

如今咱们不只要等可读的数据出如今socket上才能读packet,还必须等到从池里获取到空闲的goroutine。

一样的,咱们修改下Send()的代码:

pool := gopool.New(128)

func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        pool.Schedule(ch.writer)
    }
    ch.send <- p
}

这里咱们没有调用go ch.writer(),而是想重复利用池里goroutine来发送数据。 因此,若是一个池有N个goroutines的话,咱们能够保证有N个请求被同时处理。而N + 1个请求不会分配N + 1个缓存。goroutine池容许咱们限制对新链接的Accept()Upgrade(),这样就避免了大部分DDoS的状况。

3.4. 零拷贝升级(Zero-copy upgrade)

以前已经提到,客户端经过HTTP升级(Upgrade)请求切换到WebSocket协议。下面显示的是一个升级请求:

GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket

HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket

咱们接收HTTP请求和它的头部只是为了切换到WebSocket协议,而http.Request里保存了全部头部的数据。从这里能够获得启发,若是是为了优化,咱们能够放弃使用标准的net/http服务并在处理HTTP请求的时候避免无用的内存分配和拷贝。

举个例子,http.Request包含了一个叫作Header的字段。标准net/http服务会将请求里的全部头部数据所有无条件地拷贝到Header字段里。你能够想象这个字段会保存许多冗余的数据,例如一个包含很长cookie的头部。

咱们如何来优化呢?

WebSocket实现

不幸的是,在咱们优化服务端的时候全部能找到的库只支持对标准net/http服务作升级。并且没有一个库容许咱们实现上面提到的读和写的优化。为了使这些优化成为可能,咱们必须有一套底层的API来操做WebSocket。为了重用缓存,咱们须要相似下面这样的协议函数:

func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error

若是咱们有一个包含这样API的库,咱们就按照下面的方式从链接上读取packets:

// getReadBuf, putReadBuf are intended to
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)

// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {
    buf := getReadBuf()
    defer putReadBuf(buf)

    buf.Reset(conn)
    frame, _ := ReadFrame(buf)
    parsePacket(frame.Payload)
    //...
}

简而言之,咱们须要本身写一个库。

github.com/gobwas/ws

ws库的主要设计思想是不将协议的操做逻辑暴露给用户。全部读写函数都接受通用的io.Readerio.Writer接口。所以它能够随意搭配是否使用缓存以及其它I/O的库。

除了标准库net/http里的升级请求,ws还支持零拷贝升级。它可以处理升级请求并切换到WebSocket模式而不产生任何内存分配或者拷贝。ws.Upgrade()接受io.ReadWriternet.Conn实现了这个接口)。换句话说,咱们可使用标准的net.Listen() 函数而后把从ln.Accept()收到的链接立刻交给ws.Upgrade()去处理。库也容许拷贝任何请求数据来知足未来应用的需求(举个例子,拷贝Cookie来验证一个session)。

下面是处理升级请求的性能测试:标准net/http库的实现和使用零拷贝升级的net.Listen()

BenchmarkUpgradeHTTP    5156 ns/op    8576 B/op    9 allocs/op
BenchmarkUpgradeTCP     973 ns/op     0 B/op       0 allocs/op

使用ws以及零拷贝升级为咱们节省了24 GB的空间。这些空间本来被用作net/http里处理请求的I/O缓存。

3.5. 回顾

让咱们来回顾一下以前提到过的优化:

  • 一个包含缓存的读goroutine会占用不少内存。方案: netpoll(epoll, kqueue);重用缓存。
  • 一个包含缓存的写goroutine会占用不少内存。方案: 在须要的时候建立goroutine;重用缓存。
  • 存在大量链接请求的时候,netpoll不能很好的限制链接数。方案: 重用goroutines而且限制它们的数目。
  • net/http对升级到WebSocket请求的处理不是最高效的。方案: 在TCP链接上实现零拷贝升级。

下面是服务端的大体实现代码:

import (
    "net"
    "github.com/gobwas/ws"
)

ln, _ := net.Listen("tcp", ":8080")

for {
    // Try to accept incoming connection inside free pool worker.
    // If there no free workers for 1ms, do not accept anything and try later.
    // This will help us to prevent many self-ddos or out of resource limit cases.
    err := pool.ScheduleTimeout(time.Millisecond, func() {
        conn := ln.Accept()
        _ = ws.Upgrade(conn)

        // Wrap WebSocket connection with our Channel struct.
        // This will help us to handle/send our app's packets.
        ch := NewChannel(conn)

        // Wait for incoming bytes from connection.
        poller.Start(conn, netpoll.EventRead, func() {
            // Do not cross the resource limits.
            pool.Schedule(func() {
                // Read and handle incoming packet(s).
                ch.Recevie()
            })
        })
    })
    if err != nil {
        time.Sleep(time.Millisecond)
    }
}

4. 结论

在程序设计时,过早优化是万恶之源。Donald Knuth

上面的优化是有意义的,但不是全部状况都适用。举个例子,若是空闲资源(内存,CPU)与在线链接数之间的比例很高的话,优化就没有太多意义。固然,知道什么地方能够优化以及如何优化老是有帮助的。

谢谢你的关注!

5. 引用

相关文章
相关标签/搜索