本文将介绍如何实现一个基于websocket分布式聊天(IM)系统。php
使用golang实现websocket通信,单机能够支持百万链接,使用gin框架、nginx负载、能够水平部署、程序内部相互通信、使用grpc通信协议。html
本文内容比较长,若是直接想clone项目体验直接进入项目体验 goWebSocket项目下载 ,文本从介绍webSocket是什么开始,而后开始介绍这个项目,以及在Nginx中配置域名作webSocket的转发,而后介绍如何搭建一个分布式系统。前端
一、项目说明java
二、介绍webSocketnode
三、如何实现基于webSocket的长连接系统python
五、webSocket项目Nginx配置github
本文将介绍如何实现一个基于websocket聊天(IM)分布式系统。
使用golang实现websocket通信,单机支持百万链接,使用gin框架、nginx负载、能够水平部署、程序内部相互通信、使用grpc通信协议。
WebSocket 协议在2008年诞生,2011年成为国际标准。全部浏览器都已经支持了。
它的最大特色就是,服务器能够主动向客户端推送信息,客户端也能够主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。
ws://
无证书 wss://
配置证书的协议标识golang、java、php、node.js、python、nginx 都有不错的支持
Android可使用java-webSocket对webSocket支持
iOS 4.2及更高版本具备WebSockets支持
目前大多数的请求都是使用HTTP,都是由客户端发起一个请求,有服务端处理,而后返回结果,不能够服务端主动向某一个客户端主动发送数据
客户端发起升级协议的请求,采用标准的HTTP报文格式,在报文中添加头部信息
Connection: Upgrade
代表链接须要升级
Upgrade: websocket
须要升级到 websocket协议
Sec-WebSocket-Version: 13
协议的版本为13
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
这个是base64 encode 的值,是浏览器随机生成的,与服务器响应的 Sec-WebSocket-Accept
对应
# Request Headers Connection: Upgrade Host: im.91vh.com Origin: http://im.91vh.com Pragma: no-cache Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA== Sec-WebSocket-Version: 13 Upgrade: websocket
服务端接收到升级协议的请求,若是服务端支持升级协议会作以下响应
返回:
Status Code: 101 Switching Protocols
表示支持切换协议
# Response Headers Connection: upgrade Date: Fri, 09 Aug 2019 07:36:59 GMT Sec-WebSocket-Accept: mB5emvxi2jwTUhDdlRtADuBax9E= Server: nginx/1.12.1 Upgrade: websocket
golang
成功的 main
函数中用协程的方式去启动程序go websocket.StartWebSocket()
// 启动程序 func StartWebSocket() { http.HandleFunc("/acc", wsPage) http.ListenAndServe(":8089", nil) }
func wsPage(w http.ResponseWriter, req *http.Request) { // 升级协议 conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { fmt.Println("升级协议", "ua:", r.Header["User-Agent"], "referer:", r.Header["Referer"]) return true }}).Upgrade(w, req, nil) if err != nil { http.NotFound(w, req) return } fmt.Println("webSocket 创建链接:", conn.RemoteAddr().String()) currentTime := uint64(time.Now().Unix()) client := NewClient(conn.RemoteAddr().String(), conn, currentTime) go client.read() go client.write() // 用户链接事件 clientManager.Register <- client }
// 链接管理 type ClientManager struct { Clients map[*Client]bool // 所有的链接 ClientsLock sync.RWMutex // 读写锁 Users map[string]*Client // 登陆的用户 // appId+uuid UserLock sync.RWMutex // 读写锁 Register chan *Client // 链接链接处理 Login chan *login // 用户登陆处理 Unregister chan *Client // 断开链接处理程序 Broadcast chan []byte // 广播 向所有成员发送数据 } // 初始化 func NewClientManager() (clientManager *ClientManager) { clientManager = &ClientManager{ Clients: make(map[*Client]bool), Users: make(map[string]*Client), Register: make(chan *Client, 1000), Login: make(chan *login, 1000), Unregister: make(chan *Client, 1000), Broadcast: make(chan []byte, 1000), } return }
string(debug.Stack())
打印调用堆栈信息// 向客户端写数据 func (c *Client) write() { defer func() { if r := recover(); r != nil { fmt.Println("write stop", string(debug.Stack()), r) } }() defer func() { clientManager.Unregister <- c c.Socket.Close() fmt.Println("Client发送数据 defer", c) }() for { select { case message, ok := <-c.Send: if !ok { // 发送数据错误 关闭链接 fmt.Println("Client发送数据 关闭链接", c.Addr, "ok", ok) return } c.Socket.WriteMessage(websocket.TextMessage, message) } } }
// 读取客户端数据 func (c *Client) read() { defer func() { if r := recover(); r != nil { fmt.Println("write stop", string(debug.Stack()), r) } }() defer func() { fmt.Println("读取客户端数据 关闭send", c) close(c.Send) }() for { _, message, err := c.Socket.ReadMessage() if err != nil { fmt.Println("读取客户端数据 错误", c.Addr, err) return } // 处理程序 fmt.Println("读取客户端数据 处理:", string(message)) ProcessData(c, message) } }
json
的数据格式发送和接收数据(人类能够阅读的格式在工做开发中使用是比较方便的){"seq":"1565336219141-266129","cmd":"login","data":{"userId":"马远","appId":101}}
{"seq":"1565336219141-266129","cmd":"login","response":{"code":200,"codeMsg":"Success","data":null}}
/************************ 请求数据 **************************/ // 通用请求数据格式 type Request struct { Seq string `json:"seq"` // 消息的惟一Id Cmd string `json:"cmd"` // 请求命令字 Data interface{} `json:"data,omitempty"` // 数据 json } // 登陆请求数据 type Login struct { ServiceToken string `json:"serviceToken"` // 验证用户是否登陆 AppId uint32 `json:"appId,omitempty"` UserId string `json:"userId,omitempty"` } // 心跳请求数据 type HeartBeat struct { UserId string `json:"userId,omitempty"` }
/************************ 响应数据 **************************/ type Head struct { Seq string `json:"seq"` // 消息的Id Cmd string `json:"cmd"` // 消息的cmd 动做 Response *Response `json:"response"` // 消息体 } type Response struct { Code uint32 `json:"code"` CodeMsg string `json:"codeMsg"` Data interface{} `json:"data"` // 数据 json }
// Websocket 路由 func WebsocketInit() { websocket.Register("login", websocket.LoginController) websocket.Register("heartbeat", websocket.HeartbeatController) }
没有登陆的链接和登陆的链接6分钟没有心跳则断开链接
client_manager.go
// 定时清理超时链接 func ClearTimeoutConnections() { currentTime := uint64(time.Now().Unix()) for client := range clientManager.Clients { if client.IsHeartbeatTimeout(currentTime) { fmt.Println("心跳时间超时 关闭链接", client.Addr, client.UserId, client.LoginTime, client.HeartbeatTime) client.Socket.Close() } } }
write()
Goroutine写入数据失败,关闭c.Socket.Close()
链接,会关闭read()
Goroutineread()
Goroutine读取数据失败,关闭close(c.Send)
链接,会关闭write()
Goroutine
关闭读写的Goroutine
从ClientManager
删除链接
十个内存溢出有九个和Goroutine有关
添加一个http的接口,能够查看系统的状态,防止Goroutine不回收
查看系统状态
ws = new WebSocket("ws://127.0.0.1:8089/acc"); ws.onopen = function(evt) { console.log("Connection open ..."); }; ws.onmessage = function(evt) { console.log( "Received Message: " + evt.data); data_array = JSON.parse(evt.data); console.log( data_array); }; ws.onclose = function(evt) { console.log("Connection closed."); };
登陆: ws.send('{"seq":"2323","cmd":"login","data":{"userId":"11","appId":101}}'); 心跳: ws.send('{"seq":"2324","cmd":"heartbeat","data":{}}'); 关闭链接: ws.close();
# 主要使用到的包 github.com/gin-gonic/gin@v1.4.0 github.com/go-redis/redis github.com/gorilla/websocket github.com/spf13/viper google.golang.org/grpc github.com/golang/protobuf
git clone git@github.com:link1st/gowebsocket.git # 或 git clone https://github.com/link1st/gowebsocket.git
cd gowebsocket cd config mv app.yaml.example app.yaml # 修改项目监听端口,redis链接等(默认127.0.0.1:3306) vim app.yaml # 返回项目目录,为之后启动作准备 cd ..
app: logFile: log/gin.log # 日志文件位置 httpPort: 8080 # http端口 webSocketPort: 8089 # webSocket端口 rpcPort: 9001 # 分布式部署程序内部通信端口 httpUrl: 127.0.0.1:8080 webSocketUrl: 127.0.0.1:8089 redis: addr: "localhost:6379" password: "" DB: 0 poolSize: 30 minIdleConns: 30
go run main.go
http://127.0.0.1:8080/home/index
upstream go-im { server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s; keepalive 16; } upstream go-acc { server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s; keepalive 16; } server { listen 80 ; server_name im.91vh.com; index index.html index.htm ; location /acc { proxy_set_header Host $host; proxy_pass http://go-acc; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection $connection_upgrade; proxy_set_header Connection ""; proxy_redirect off; proxy_intercept_errors on; client_max_body_size 10m; } location / { proxy_set_header Host $host; proxy_pass http://go-im; proxy_http_version 1.1; proxy_set_header Connection ""; proxy_redirect off; proxy_intercept_errors on; client_max_body_size 30m; } access_log /link/log/nginx/access/im.log; error_log /link/log/nginx/access/im.error.log; }
/link/server/tengine/sbin/nginx -t
nginx: [emerg] unknown "connection_upgrade" variable configuration file /link/server/tengine/conf/nginx.conf test failed
http{ fastcgi_temp_file_write_size 128k; ..... # 须要添加的内容 #support websocket map $http_upgrade $connection_upgrade { default upgrade; '' close; } ..... gzip on; }
ulimit -n 1000000
vim /etc/sysctl.conf net.ipv4.tcp_tw_reuse = 1 net.ipv4.tcp_tw_recycle = 0
在线用户数 | cup | 内存 | I/O | net.out |
---|---|---|---|---|
1W | ||||
10W | ||||
100W |
获取所有在线的用户,查询单前服务的所有用户+集群中服务的所有用户
发送消息,这里采用的是http接口发送(微信网页版发送消息也是http接口),这里考虑主要是两点:
1.服务分离,让acc系统尽可能的简单一点,不掺杂其它业务逻辑
2.发送消息是走http接口,不使用webSocket链接,才用收和发送数据分离的方式,能够加快收发数据的效率
IM实现细节: