本文中代码能够在 github.com/alfred-zhon… 获取。html
最近拿到需求要在网页上展现报警信息。以往报警信息都是经过短信,微信和 App 推送给用户的,如今要让登陆用户在网页端也能实时接收到报警推送。html5
依稀记得之前工做的时候遇到过相似的需求。由于之前的浏览器标准比较陈旧,而且那时用 Java 较多,因此那时候解决这个问题就用了 Comet4J。具体的原理就是长轮询,长连接。但如今毕竟 html5 流行开来了,IE 都被 Edge 接替了,再用之前这种技术就显得过期。jquery
很早之前就听过 WebSocket 的大名,但由于那时不少用户的浏览器还不支持,因此对这个技术也就是浅尝辄止,没有太深刻研究过。如今趁着项目须要,就来稍微深刻了解一下。git
以往浏览器要获取服务端数据,都是经过发送 HTTP 请求,而后等待服务端回应的。也就是说浏览器端一直是整个请求的发起者,只有它主动,才能获取到数据。而要让浏览器一侧可以获取到服务端的实时数据,就须要不停地向服务端发起请求。虽然大多数状况下并无获取到实际数据,但这大大增长了网络压力,对于服务端来讲压力也直线上升。github
后来咱们学会了使用长链接 + 长轮询的方式。换句话说,也就是延长 HTTP 请求的存在时间,尽可能保持 HTTP 链接。虽然这在必定程度上下降了很多压力,但仍然须要不停地进行轮询,也作不到真正的实时性。(借用一张图)golang
随着 HTML5 的到来,WebSocket 在 2011 年被定为标准(详情请参见 RFC 6455)。web
借用 《Go Web 编程》的话。WebSocket 采用了一些特殊的报头,使得浏览器和服务器只须要作一个握手的动做,就能够在浏览器和服务器之间创建一条链接通道。且此链接会保持在活动状态,你可使用 JavaScript 来向链接写入或从中接收数据,就像在使用一个常规的 TCP Socket 同样。它解决了 Web 实时化的问题。ajax
因为 WebSocket 是全双工通讯,因此当创建了 WebSocket 链接以后,接下来的通讯就相似于传统的 TCP 通讯了。客户端和服务端能够相互发送数据,再也不有实时性的问题。编程
在 Go 官方的 SDK 中,并不包含对 WebSocket 的支持,因此必须使用第三方库。json
要使用 Golang 开发 WebSocket,选择基本就在 x/net/websocket 和 gorilla/websocket 之间。《Go Web 编程》一书中的例子使用了 x/net/websocket
做为开发包,并且貌似它也更加官方且正式。而实际根据我在网上查询获得的反馈看来,并不是如此。x/net/websocket
貌似 Bug 较多,且较为不稳定,问题解决也并不及时。相比之下,gorilla/websocket
则更加优秀。
还有对于 Gorilla web toolkit 组织的贡献,必须予以感谢。🙏。其下不只有 WebSocket 的实现,也有一些其余工具。欢迎你们使用而且可以给予反馈或贡献。
项目初步设计以下:
server 启动之后会注册两个 Handler。
浏览器首先链接 websocketHandler (默认地址为 ws://ip:port/ws
)升级请求为 WebSocket 链接,当链接创建以后须要发送注册信息进行注册。这里注册信息中包含一个 token 信息。server 会对提供的 token 进行验证并获取到相应的 userId(一般来讲,一个 userId 可能同时关联许多 token),并保存维护好 token, userId 和 conn(链接)之间的关系。
推送端发送推送数据的请求到 pushHandler(默认地址为 ws://ip:port/push
),请求中包含了 userId 字段和 message 字段。server 会根据 userId 获取到全部此时链接到该 server 的 conn,而后将 message 一一进行推送。
因为推送服务的实时性,推送的数据并无也不须要进行缓存。
我在此处会稍微讲述一下代码的基本构成,也顺便说说 Go 语言中一些经常使用的写法和模式(本人也是从其余语言转向 Go 语言,毕竟 Go 语言也至关年轻。因此有建议的话,敬请提出。)。因为 Go 语言的发明人和一些主要维护者大都来自于 C/C++ 语言,因此 Go 语言的代码也更偏向于 C/C++ 系。
首先先看一下 Server
的结构:
// Server defines parameters for running websocket server.
type Server struct {
// Address for server to listen on
Addr string
// Path for websocket request, default "/ws".
WSPath string
// Path for push message, default "/push".
PushPath string
// Upgrader is for upgrade connection to websocket connection using
// "github.com/gorilla/websocket".
//
// If Upgrader is nil, default upgrader will be used. Default upgrader is
// set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always
// returns true.
Upgrader *websocket.Upgrader
// Check token if it's valid and return userID. If token is valid, userID
// must be returned and ok should be true. Otherwise ok should be false.
AuthToken func(token string) (userID string, ok bool) // Authorize push request. Message will be sent if it returns true, // otherwise the request will be discarded. Default nil and push request // will always be accepted. PushAuth func(r *http.Request) bool wh *websocketHandler ph *pushHandler } 复制代码
PS: 因为我整个项目的注释都是用英文写的,因此见谅了,但愿不妨碍阅读。
这里说一下 Upgrader *websocket.Upgrader
,这是 gorilla/websocket
包的对象,它用来升级 HTTP 请求。
若是一个结构体参数过多,一般不建议直接初始化,而是使用它提供的 New 方法。这里是:
// NewServer creates a new Server.
func NewServer(addr string) *Server {
return &Server{
Addr: addr,
WSPath: serverDefaultWSPath,
PushPath: serverDefaultPushPath,
}
}
复制代码
这也是 Go 语言对外提供初始化方法的一种常见用法。
而后 Server
使用 ListenAndServe
方法启动并监听端口,与 http
包的使用相似:
// ListenAndServe listens on the TCP network address and handle websocket
// request.
func (s *Server) ListenAndServe() error {
b := &binder{
userID2EventConnMap: make(map[string]*[]eventConn),
connID2UserIDMap: make(map[string]string),
}
// websocket request handler
wh := websocketHandler{
upgrader: defaultUpgrader,
binder: b,
}
if s.Upgrader != nil {
wh.upgrader = s.Upgrader
}
if s.AuthToken != nil {
wh.calcUserIDFunc = s.AuthToken
}
s.wh = &wh
http.Handle(s.WSPath, s.wh)
// push request handler
ph := pushHandler{
binder: b,
}
if s.PushAuth != nil {
ph.authFunc = s.PushAuth
}
s.ph = &ph
http.Handle(s.PushPath, s.ph)
return http.ListenAndServe(s.Addr, nil)
}
复制代码
这里咱们生成了两个 Handler
,分别为 websocketHandler
和 pushHandler
。websocketHandler
负责与浏览器创建链接并传输数据,而 pushHandler
则处理推送端的请求。能够看到,这里两个 Handler
都封装了一个 binder
对象。这个 binder
用于维护 token <-> userID <-> Conn 的关系:
// binder is defined to store the relation of userID and eventConn
type binder struct {
mu sync.RWMutex
// map stores key: userID and value of related slice of eventConn
userID2EventConnMap map[string]*[]eventConn
// map stores key: connID and value: userID
connID2UserIDMap map[string]string
}
复制代码
具体看一下 websocketHandler
的实现。
// websocketHandler defines to handle websocket upgrade request.
type websocketHandler struct {
// upgrader is used to upgrade request.
upgrader *websocket.Upgrader
// binder stores relations about websocket connection and userID.
binder *binder
// calcUserIDFunc defines to calculate userID by token. The userID will
// be equal to token if this function is nil.
calcUserIDFunc func(token string) (userID string, ok bool) } 复制代码
很简单的结构。websocketHandler
实现了 http.Handler
接口:
// First try to upgrade connection to websocket. If success, connection will
// be kept until client send close message or server drop them.
func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
wsConn, err := wh.upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer wsConn.Close()
// handle Websocket request
conn := NewConn(wsConn)
conn.AfterReadFunc = func(messageType int, r io.Reader) {
var rm RegisterMessage
decoder := json.NewDecoder(r)
if err := decoder.Decode(&rm); err != nil {
return
}
// calculate userID by token
userID := rm.Token
if wh.calcUserIDFunc != nil {
uID, ok := wh.calcUserIDFunc(rm.Token)
if !ok {
return
}
userID = uID
}
// bind
wh.binder.Bind(userID, rm.Event, conn)
}
conn.BeforeCloseFunc = func() {
// unbind
wh.binder.Unbind(conn)
}
conn.Listen()
}
复制代码
首先将传入的 http.Request
转换为 websocket.Conn
,再将其分装为咱们自定义的一个 wserver.Conn
(封装,或者说是组合,是 Go 语言的典型用法。记住,Go 语言没有继承,只有组合)。而后设置了 Conn
的 AfterReadFunc
和 BeforeCloseFunc
方法,接着启动了 conn.Listen()
。AfterReadFunc
意思是当 Conn
读取到数据后,尝试验证并根据 token
计算 userID
,然乎 bind
注册绑定。BeforeCloseFunc
则为 Conn
关闭前进行解绑操做。
pushHandler
则容易理解。它解析请求而后推送数据:
// Authorize if needed. Then decode the request and push message to each
// realted websocket connection.
func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
// authorize
if s.authFunc != nil {
if ok := s.authFunc(r); !ok {
w.WriteHeader(http.StatusUnauthorized)
return
}
}
// read request
var pm PushMessage
decoder := json.NewDecoder(r.Body)
if err := decoder.Decode(&pm); err != nil {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(ErrRequestIllegal.Error()))
return
}
// validate the data
if pm.UserID == "" || pm.Event == "" || pm.Message == "" {
w.WriteHeader(http.StatusBadRequest)
w.Write([]byte(ErrRequestIllegal.Error()))
return
}
cnt, err := s.push(pm.UserID, pm.Event, pm.Message)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
result := strings.NewReader(fmt.Sprintf("message sent to %d clients", cnt))
io.Copy(w, result)
}
复制代码
Conn
(此处指 wserver.Conn
) 为 websocket.Conn
的包装。
// Conn wraps websocket.Conn with Conn. It defines to listen and read
// data from Conn.
type Conn struct {
Conn *websocket.Conn
AfterReadFunc func(messageType int, r io.Reader) BeforeCloseFunc func() once sync.Once id string stopCh chan struct{}
}
复制代码
最主要的方法为 Listen()
:
// Listen listens for receive data from websocket connection. It blocks
// until websocket connection is closed.
func (c *Conn) Listen() {
c.Conn.SetCloseHandler(func(code int, text string) error {
if c.BeforeCloseFunc != nil {
c.BeforeCloseFunc()
}
if err := c.Close(); err != nil {
log.Println(err)
}
message := websocket.FormatCloseMessage(code, "")
c.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second))
return nil
})
// Keeps reading from Conn util get error.
ReadLoop:
for {
select {
case <-c.stopCh:
break ReadLoop
default:
messageType, r, err := c.Conn.NextReader()
if err != nil {
// TODO: handle read error maybe
break ReadLoop
}
if c.AfterReadFunc != nil {
c.AfterReadFunc(messageType, r)
}
}
}
}
复制代码
主要设置了当 websocket 链接关闭时的处理和不停地读取数据。
文中很难全面地描述整个代码的运做流程,像具体阅读代码,请前往 github.com/alfred-zhon… 获取。
代码我已经进行了必定的测试,也已经在正式环境中运行了一段时间。可是代码可能仍然不够稳定,因此在使用过程当中出现问题,也实属正常。随意随时欢迎你们给我提 issues 或者 PRs。