[译] Go 实现百万 WebSocket 链接

你们好!我是 Sergey Kamardin,是 Mail.Ru 的一名工程师。html

本文主要介绍如何使用 Go 开发高负载的 WebSocket 服务。linux

若是你熟悉 WebSockets,但对 Go 了解很少,仍但愿你对这篇文章的想法和性能优化方面感兴趣。nginx

1. 简介

为了定义本文的讨论范围,有必要说明咱们为何须要这个服务。git

Mail.Ru 有不少有状态系统。用户的电子邮件存储就是其中之一。咱们有几种方法能够跟踪该系统的状态变化以及系统事件,主要是经过按期系统轮询或者状态变化时的系统通知来实现。github

两种方式各有利弊。可是对于邮件而言,用户收到新邮件的速度越快越好。golang

邮件轮询大约每秒 50,000 个 HTTP 查询,其中 60% 返回 304 状态,这意味着邮箱中没有任何更改。web

所以,为了减小服务器的负载并加快向用户发送邮件的速度,咱们决定经过用发布 - 订阅服务(也称为消息总线,消息代理或事件管道)的模式来造一个轮子。一端接收有关状态更改的通知,另外一端订阅此类通知。编程

以前的架构:浏览器

如今的架构:性能优化

第一个方案是以前的架构。浏览器按期轮询 API 并查询存储(邮箱服务)是否有更改。

第二种方案是如今的架构。浏览器与通知 API 创建了 WebSocket 链接,通知 API 是总线服务的消费者。一旦接收到新邮件后,Storage 会将有关它的通知发送到总线(1),总线将其发送给订阅者(2)。 API 经过链接发送这个收到的通知,将其发送到用户的浏览器(3)。

因此如今咱们将讨论这个 API 或者这个 WebSocket 服务。展望一下将来,咱们的服务未来可能会有 300 万个在线链接。

2. 经常使用的方式

咱们来看看如何在没有任何优化的状况下使用 Go 实现服务器的某些部分。

在咱们继续使用 net/http 以前,来谈谈如何发送和接收数据。这个数据位于 WebSocket 协议上(例如 JSON 对象),咱们在下文中将其称为包。

咱们先来实现 Channel 结构体,该结构体将包含在 WebSocket 链接上发送和接收数据包的逻辑。

2.1 Channel 结构体

// WebSocket Channel 的实现
// Packet 结构体表示应用程序级数据
type Packet struct {
    ...
}

// Channel 装饰用户链接
type Channel struct {
    conn net.Conn    // WebSocket 链接
    send chan Packet // 传出的 packets 队列
}

func NewChannel(conn net.Conn) *Channel {
    c := &Channel{
        conn: conn,
        send: make(chan Packet, N),
    }

    go c.reader()
    go c.writer()

    return c
}
复制代码

我想让你注意的是 readerwriter goroutines。每一个 goroutine 都须要内存栈,初始大小可能为 2 到 8 KB,具体取决于操做系统和 Go 版本。

关于上面提到的 300 万个线上链接,为此咱们须要消耗 24 GB 的内存(假设单个 goroutine 消耗 4 KB 栈内存)用于全部的链接。而且这还没包括为 Channel 结构体分配的内存,ch.send传出的数据包占用的内存以及其余内部字段的内存。

2.2 I/O goroutines

让咱们来看看 reader 的实现:

// Channel’s reading goroutine.
func (c *Channel) reader() {
    // 建立一个缓冲 read 来减小 read 的系统调用
    buf := bufio.NewReader(c.conn)

    for {
        pkt, _ := readPacket(buf)
        c.handle(pkt)
    }
}
复制代码

这里咱们使用了 bufio.Reader 来减小 read() 系统调用的次数,并尽量多地读取 buf 中缓冲区大小所容许的数量。在这个无限循环中,咱们等待新数据的到来。请先记住这句话:等待新数据的到来。咱们稍后会回顾。

咱们先不考虑传入的数据包的解析和处理,由于它对咱们讨论的优化并不重要。可是,buf 值得咱们关注:默认状况下,它是 4 KB,这意味着链接还须要 12 GB 的内存。writer 也有相似的状况:

// Channel’s writing goroutine.
func (c *Channel) writer() {
    // 建立一个缓冲 write 来减小 write 的系统调用
    buf := bufio.NewWriter(c.conn)

    for pkt := range c.send {
        _ := writePacket(buf, pkt)
        buf.Flush()
    }
}
复制代码

咱们经过 Channel 的 c.send 遍历将数据包传出 并将它们写入缓冲区。细心的读者可能猜到了,这是咱们 300 万个链接的另外 12 GB 的内存消耗。

2.3 HTTP

已经实现了一个简单的 Channel,如今咱们须要使用 WebSocket 链接。因为仍然处于经常使用的方式的标题下,因此咱们以经常使用的方式继续。

注意:若是你不知道 WebSocket 的运行原理,须要记住客户端会经过名为 Upgrade 的特殊 HTTP 机制转换到 WebSocket 协议。在成功处理 Upgrade 请求后,服务端和客户端将使用 TCP 链接来传输二进制的 WebSocket 帧。这里是链接的内部结构的说明。

// 经常使用的转换为 WebSocket 的方法
import (
    "net/http"
    "some/websocket"
)

http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {
    conn, _ := websocket.Upgrade(r, w)
    ch := NewChannel(conn)
    //...
})
复制代码

须要注意的是,http.ResponseWriterbufio.Readerbufio.Writer(均为 4 KB 的缓冲区)分配了内存,用于对 *http.Request 的初始化和进一步的响应写入。

不管使用哪一种 WebSocket 库,在 Upgrade 成功后,服务端在调用 responseWriter.Hijack() 以后都会收到 I/O 缓冲区和 TCP 链接。

提示:在某些状况下,go:linkname 可被用于经过调用 net/http.putBufio {Reader, Writer} 将缓冲区返回给 net/http 内的 sync.Pool

所以,咱们还须要 24 GB 的内存用于 300 万个链接。

那么,如今为了一个什么功能都没有的应用程序,一共须要消耗 72 GB 的内存!

3. 优化

咱们回顾一下在简介部分中谈到的内容,并记住用户链接的方式。在切换到 WebSocket 后,客户端会经过链接发送包含相关事件的数据包。而后(不考虑 ping/pong 等消息),客户端可能在整个链接的生命周期中不会发送任何其余内容。

链接的生命周期可能持续几秒到几天。

所以,大部分时间 Channel.reader()Channel.writer() 都在等待接收或发送数据。与它们一块儿等待的还有每一个大小为 4 KB 的 I/O 缓冲区。

如今咱们对哪些地方能够作优化应该比较清晰了。

3.1 Netpoll

Channel.reader() 经过给 bufio.Reader.Read() 内的 conn.Read() 加锁来等待新数据的到来(译者注:上文中的伏笔),一旦链接中有数据,Go runtime(译者注:runtime 包含 Go 运行时的系统交互的操做,这里保留原文)“唤醒” goroutine 并容许它读取下一个数据包。在此以后,goroutine 再次被锁定,同时等待新的数据。让咱们看看 Go runtime 来理解 goroutine 为何必须“被唤醒”。

若是咱们查看 conn.Read() 的实现,将会在其中看到 net.netFD.Read() 调用

// Go 内部的非阻塞读.
// net/fd_unix.go

func (fd *netFD) Read(p []byte) (n int, err error) {
    //...
    for {
        n, err = syscall.Read(fd.sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN {
                if err = fd.pd.waitRead(); err == nil {
                    continue
                }
            }
        }
        //...
        break
    }
    //...
}
复制代码

Go 在非阻塞模式下使用套接字。 EAGAIN 表示套接字中没有数据,而且读取空套接字时不会被锁定,操做系统将返回控制权给咱们。(译者注:EAGAIN 表示目前没有可用数据,请稍后再试)

咱们从链接文件描述符中看到一个 read() 系统调用函数。若是 read 返回 EAGAIN 错误,则 runtime 调用 pollDesc.waitRead()

// Go 内部关于 netpoll 的使用
// net/fd_poll_runtime.go

func (pd *pollDesc) waitRead() error {
   return pd.wait('r')
}

func (pd *pollDesc) wait(mode int) error {
   res := runtime_pollWait(pd.runtimeCtx, mode)
   //...
}
复制代码

若是深刻挖掘,咱们将看到 netpoll 在 Linux 中是使用 epoll 实现的,而在 BSD 中是使用 kqueue 实现的。为何不对链接使用相同的方法?咱们能够分配一个 read 缓冲区并仅在真正须要时启动 read goroutine:当套接字中有可读的数据时。

在 github.com/golang/go 上,有一个导出 netpoll 函数的 issue

3.2 去除 goroutines 的内存消耗

假设咱们有 Go 的 netpoll 实现。如今咱们能够避免在内部缓冲区启动 Channel.reader() goroutine,而是在链接中订阅可读数据的事件:

// 使用 netpoll
ch := NewChannel(conn)

// 经过 netpoll 实例观察 conn
poller.Start(conn, netpoll.EventRead, func() {
    // 咱们在这里产生 goroutine 以防止在轮询从 ch 接收数据包时被锁。
    go Receive(ch)
})

// Receive 从 conn 读取数据包并以某种方式处理它。
func (ch *Channel) Receive() {
    buf := bufio.NewReader(ch.conn)
    pkt := readPacket(buf)
    c.handle(pkt)
}
复制代码

Channel.writer() 更简单,由于咱们只能在发送数据包时运行 goroutine 并分配缓冲区:

// 当咱们须要时启动 writer goroutine
func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        go ch.writer()
    }
    ch.send <- p
}
复制代码

须要注意的是,当操做系统在 write() 调用上返回 EAGAIN 时,咱们不处理这种状况。咱们依靠 Go runtime 来处理这种状况,由于这种状况在服务器上不多见。然而,若是有必要,它能够以与 reader() 相同的方式处理。

当从 ch.send(一个或几个)读取传出数据包后,writer 将完成其操做并释放 goroutine 的内存和发送缓冲区的内存。

完美!咱们经过去除两个运行的 goroutine 中的内存消耗和 I/O 缓冲区的内存消耗节省了 48 GB。

3.3 资源控制

大量链接不只仅涉及到内存消耗高的问题。在开发服务时,咱们遇到了反复出现的竞态条件和 self-DDoS 形成的死锁。

例如,若是因为某种缘由咱们忽然没法处理 ping/pong 消息,可是空闲链接的处理程序继续关闭这样的链接(假设链接被破坏,没有提供数据),客户端每隔 N 秒失去链接并尝试再次链接而不是等待事件。

被锁或超载的服务器中止服务,若是它以前的负载均衡器(例如,nginx)将请求传递给下一个服务器实例,这将是不错的。

此外,不管服务器负载如何,若是全部客户端忽然(多是因为错误缘由)向咱们发送数据包,以前的 48 GB 内存的消耗将不可避免,由于须要为每一个链接分配 goroutine 和缓冲区。

Goroutine 池

上面的状况,咱们可使用 goroutine 池限制同时处理的数据包数量。下面是这种池的简单实现:

// goroutine 池的简单实现
package gopool

func New(size int) *Pool {
    return &Pool{
        work: make(chan func()), sem: make(chan struct{}, size), } } func (p *Pool) Schedule(task func()) error {
    select {
    case p.work <- task:
    case p.sem <- struct{}{}:
        go p.worker(task)
    }
}

func (p *Pool) worker(task func()) {
    defer func() { <-p.sem }
    for {
        task()
        task = <-p.work
    }
}
复制代码

如今咱们的 netpoll 代码以下:

// 处理 goroutine 池中的轮询事件。
pool := gopool.New(128)

poller.Start(conn, netpoll.EventRead, func() {
    // 咱们在全部 worker 被占用时阻塞 poller
    pool.Schedule(func() {
        Receive(ch)
    })
})
复制代码

如今咱们不只在套接字中有可读数据时读取,并且还能够占用池中的空闲的 goroutine。

一样,咱们修改 Send()

// 复用 writing goroutine
pool := gopool.New(128)

func (ch *Channel) Send(p Packet) {
    if c.noWriterYet() {
        pool.Schedule(ch.writer)
    }
    ch.send <- p
}
复制代码

取代 go ch.writer() ,咱们想写一个复用的 goroutines。所以,对于拥有 N 个 goroutines 的池,咱们能够保证同时处理 N 个请求而且在 N + 1的时候, 咱们不会分配 N + 1 个缓冲区。 goroutine 池还容许咱们限制新链接的 Accept()Upgrade() ,并避免大多数的 DDoS 攻击。

3.4 upgrade 零拷贝

如前所述,客户端使用 HTTP Upgrade 切换到 WebSocket 协议。这就是 WebSocket 协议的样子:

## HTTP Upgrade 示例

GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocket

HTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket
复制代码

也就是说,在咱们的例子中,须要 HTTP 请求及其 Header 用于切换到 WebSocket 协议。这些知识以及 http.Request 中存储的内容代表,为了优化,咱们须要在处理 HTTP 请求时放弃没必要要的内存分配和内存复制,并弃用 net/http 库。

例如,http.Request 有一个与 Header 具备相同名称的字段,这个字段用于将数据从链接中复制出来填充请求头。想象一下,该字段须要消耗多少额外内存,例如碰到比较大的 Cookie 头。

WebSocket 的实现

不幸的是,在咱们优化的时候全部存在的库都是使用标准的 net/http 库进行升级。并且,(两个)库都不能使用上述的读写优化方案。为了采用这些优化方案,咱们须要用一个比较低级的 API 来处理 WebSocket。要重用缓冲区,咱们须要把协议函数变成这样:

func ReadFrame(io.Reader) (Frame, error) func WriteFrame(io.Writer, Frame) error 复制代码

若是有一个这种 API 的库,咱们能够按下面的方式从链接中读取数据包(数据包的写入也同样):

// 预期的 WebSocket 实现API
// getReadBuf, putReadBuf 用来复用 *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader func putReadBuf(*bufio.Reader) // 当 conn 中的数据可读取时,readPacket 被调用 func readPacket(conn io.Reader) error {
    buf := getReadBuf()
    defer putReadBuf(buf)

    buf.Reset(conn)
    frame, _ := ReadFrame(buf)
    parsePacket(frame.Payload)
    //...
}
复制代码

简单来讲,咱们须要本身的 WebSocket 库。

github.com/gobwas/ws

在乎识形态上,编写 ws 库是为了避免将其协议操做逻辑强加给用户。全部读写方法都实现了标准的 io.Reader 和 io.Writer 接口,这样就可使用或不使用缓冲或任何其余 I/O 。

除了来自标准库 net/http 的升级请求以外,ws 还支持零拷贝升级,升级请求的处理以及切换到 WebSocket 无需分配内存或复制内存。ws.Upgrade() 接受 io.ReadWriternet.Conn 实现了此接口)。换句话说,咱们可使用标准的 net.Listen() 将接收到的链接从 ln.Accept() 转移给 ws.Upgrade() 。该库使得能够复制任何请求数据以供应用程序使用(例如,Cookie 用来验证会话)。

下面是升级请求的基准测试结果:标准库 net/http 的服务与用零拷贝升级的 net.Listen()

BenchmarkUpgradeHTTP    5156 ns/op    8576 B/op    9 allocs/op
BenchmarkUpgradeTCP     973 ns/op     0 B/op       0 allocs/op
复制代码

切换到 ws零拷贝升级为咱们节省了另外的 24 GB 内存 - 在 net/http 处理请求时为 I/O 缓冲区分配的空间。

3.5 摘要

咱们总结一下这些优化。

  • 内部有缓冲区的 read goroutine 是代价比较大的。解决方案:netpoll(epoll,kqueue); 重用缓冲区。
  • 内部有缓冲区的 write goroutine 是代价比较大的。解决方案:须要的时候才启动 goroutine; 重用缓冲区。
  • 若是有大量的链接,netpoll 将没法正常工做。解决方案:使用 goroutines 池并限制池的 worker 数。
  • net/http 不是处理升级到 WebSocket 的最快方法。解决方案:在裸 TCP 链接上使用内存零拷贝升级。

服务的代码看起来以下所示:

// WebSocket 服务器示例,包含 netpoll,goroutine 池和内存零拷贝的升级。
import (
    "net"
    "github.com/gobwas/ws"
)

ln, _ := net.Listen("tcp", ":8080")

for {
    // 尝试在空闲池的 worker 内的接收传入的链接。若是超过 1ms 没有空闲 worker,则稍后再试。这有助于防止 self-ddos 或耗尽服务器资源的状况。
    err := pool.ScheduleTimeout(time.Millisecond, func() {
        conn := ln.Accept()
        _ = ws.Upgrade(conn)

        // 使用 Channel 结构体包装 WebSocket 链接
        // 将帮助咱们处理应用包
        ch := NewChannel(conn)

        // 等待链接传入字节
        poller.Start(conn, netpoll.EventRead, func() {
            // 不要超过资源限制
            pool.Schedule(func() {
                // 读取并处理传入的包
                ch.Recevie()
            })
        })
    })
    if err != nil {
        time.Sleep(time.Millisecond)
    }
}
复制代码

总结

过早优化是编程中全部邪恶(或至少大部分)的根源。 -- Donald Knuth

固然,上述优化是和需求相关的,但并不是全部状况下都是如此。例如,若是空闲资源(内存,CPU)和线上链接数之间的比率比较高,则优化可能没有意义。可是,经过了解优化的位置和内容,咱们会受益不浅。

感谢你的关注!

引用


via: www.freecodecamp.org/news/millio…

做者:Sergey Kamardin 译者:咔叽咔叽 校对:polaris1119

本文由 GCTT 原创编译,Go 中文网 荣誉推出

相关文章
相关标签/搜索