基于websocket单台机器支持百万链接分布式聊天(IM)系统

基于websocket单台机器支持百万链接分布式聊天(IM)系统

本文将介绍如何实现一个基于websocket分布式聊天(IM)系统。php

使用golang实现websocket通信,单机能够支持百万链接,使用gin框架、nginx负载、能够水平部署、程序内部相互通信、使用grpc通信协议。html

本文内容比较长,若是直接想clone项目体验直接进入项目体验 goWebSocket项目下载 ,文本从介绍webSocket是什么开始,而后开始介绍这个项目,以及在Nginx中配置域名作webSocket的转发,而后介绍如何搭建一个分布式系统。前端

目录

一、项目说明

1.1 goWebSocket

本文将介绍如何实现一个基于websocket聊天(IM)分布式系统。

使用golang实现websocket通信,单机支持百万链接,使用gin框架、nginx负载、能够水平部署、程序内部相互通信、使用grpc通信协议。

  • 通常项目中webSocket使用的架构图

网站架构图

1.2 项目体验

二、介绍webSocket

2.1 webSocket 是什么

WebSocket 协议在2008年诞生,2011年成为国际标准。全部浏览器都已经支持了。

它的最大特色就是,服务器能够主动向客户端推送信息,客户端也能够主动向服务器发送信息,是真正的双向平等对话,属于服务器推送技术的一种。

  • HTTP和WebSocket在通信过程的比较

HTTP协议和WebSocket比较

  • HTTP和webSocket都支持配置证书,ws:// 无证书 wss:// 配置证书的协议标识

HTTP协议和WebSocket比较

2.2 webSocket的兼容性

  • 浏览器的兼容性,开始支持webSocket的版本

浏览器开始支持webSocket的版本

  • 服务端的支持

golang、java、php、node.js、python、nginx 都有不错的支持

  • Android和IOS的支持

Android可使用java-webSocket对webSocket支持

iOS 4.2及更高版本具备WebSockets支持

2.3 为何要用webSocket

    1. 从业务上出发,须要一个主动通达客户端的能力
目前大多数的请求都是使用HTTP,都是由客户端发起一个请求,有服务端处理,而后返回结果,不能够服务端主动向某一个客户端主动发送数据

服务端处理一个请求

    1. 大多数场景咱们须要主动通知用户,如:聊天系统、用户完成任务主动告诉用户、一些运营活动须要通知到在线的用户
    1. 能够获取用户在线状态
    1. 在没有长连接的时候经过客户端主动轮询获取数据
    1. 能够经过一种方式实现,多种不一样平台(H5/Android/IOS)去使用

2.4 webSocket创建过程

    1. 客户端先发起升级协议的请求

客户端发起升级协议的请求,采用标准的HTTP报文格式,在报文中添加头部信息

Connection: Upgrade代表链接须要升级

Upgrade: websocket须要升级到 websocket协议

Sec-WebSocket-Version: 13 协议的版本为13

Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA== 这个是base64 encode 的值,是浏览器随机生成的,与服务器响应的 Sec-WebSocket-Accept对应

# Request Headers
Connection: Upgrade
Host: im.91vh.com
Origin: http://im.91vh.com
Pragma: no-cache
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
Sec-WebSocket-Version: 13
Upgrade: websocket

浏览器 Network

    1. 服务器响应升级协议

服务端接收到升级协议的请求,若是服务端支持升级协议会作以下响应

返回:

Status Code: 101 Switching Protocols 表示支持切换协议

# Response Headers
Connection: upgrade
Date: Fri, 09 Aug 2019 07:36:59 GMT
Sec-WebSocket-Accept: mB5emvxi2jwTUhDdlRtADuBax9E=
Server: nginx/1.12.1
Upgrade: websocket
    1. 升级协议完成之后,客户端和服务器就能够相互发送数据

websocket接收和发送数据

三、如何实现基于webSocket的长连接系统

3.1 使用go实现webSocket服务端

3.1.1 启动端口监听

  • websocket须要监听端口,因此须要在golang 成功的 main 函数中用协程的方式去启动程序
  • main.go 实现启动
go websocket.StartWebSocket()
  • init_acc.go 启动程序
// 启动程序
func StartWebSocket() {
    http.HandleFunc("/acc", wsPage)
    http.ListenAndServe(":8089", nil)
}

3.1.2 升级协议

  • 客户端是经过http请求发送到服务端,咱们须要对http协议进行升级为websocket协议
  • 对http请求协议进行升级 golang 库gorilla/websocket 已经作得很好了,咱们直接使用就能够了
  • 在实际使用的时候,建议每一个链接使用两个协程处理客户端请求数据和向客户端发送数据,虽然开启协程会占用一些内存,可是读取分离,减小收发数据堵塞的可能
  • init_acc.go
func wsPage(w http.ResponseWriter, req *http.Request) {

    // 升级协议
    conn, err := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool {
        fmt.Println("升级协议", "ua:", r.Header["User-Agent"], "referer:", r.Header["Referer"])

        return true
    }}).Upgrade(w, req, nil)
    if err != nil {
        http.NotFound(w, req)

        return
    }

    fmt.Println("webSocket 创建链接:", conn.RemoteAddr().String())

    currentTime := uint64(time.Now().Unix())
    client := NewClient(conn.RemoteAddr().String(), conn, currentTime)

    go client.read()
    go client.write()

    // 用户链接事件
    clientManager.Register <- client
}

3.1.3 客户端链接的管理

  • 当前程序有多少用户链接,还须要对用户广播的须要,这里咱们就须要一个管理者(clientManager),处理这些事件:
  • 记录所有的链接、登陆用户的能够经过 appId+uuid 查到用户链接
  • 使用map存储,就涉及到多协程并发读写的问题,因此须要加读写锁
  • 定义四个channel ,分别处理客户端创建链接、用户登陆、断开链接、全员广播事件
// 链接管理
type ClientManager struct {
    Clients     map[*Client]bool   // 所有的链接
    ClientsLock sync.RWMutex       // 读写锁
    Users       map[string]*Client // 登陆的用户 // appId+uuid
    UserLock    sync.RWMutex       // 读写锁
    Register    chan *Client       // 链接链接处理
    Login       chan *login        // 用户登陆处理
    Unregister  chan *Client       // 断开链接处理程序
    Broadcast   chan []byte        // 广播 向所有成员发送数据
}

// 初始化
func NewClientManager() (clientManager *ClientManager) {
    clientManager = &ClientManager{
        Clients:    make(map[*Client]bool),
        Users:      make(map[string]*Client),
        Register:   make(chan *Client, 1000),
        Login:      make(chan *login, 1000),
        Unregister: make(chan *Client, 1000),
        Broadcast:  make(chan []byte, 1000),
    }

    return
}

3.1.4 注册客户端的socket的写的异步处理程序

  • 防止发生程序崩溃,因此须要捕获异常
  • 为了显示异常崩溃位置这里使用string(debug.Stack())打印调用堆栈信息
  • 若是写入数据失败了,可能链接有问题,就关闭链接
  • client.go
// 向客户端写数据
func (c *Client) write() {
    defer func() {
        if r := recover(); r != nil {
            fmt.Println("write stop", string(debug.Stack()), r)

        }
    }()

    defer func() {
        clientManager.Unregister <- c
        c.Socket.Close()
        fmt.Println("Client发送数据 defer", c)
    }()

    for {
        select {
        case message, ok := <-c.Send:
            if !ok {
                // 发送数据错误 关闭链接
                fmt.Println("Client发送数据 关闭链接", c.Addr, "ok", ok)

                return
            }

            c.Socket.WriteMessage(websocket.TextMessage, message)
        }
    }
}

3.1.5 注册客户端的socket的读的异步处理程序

  • 循环读取客户端发送的数据并处理
  • 若是读取数据失败了,关闭channel
  • client.go
// 读取客户端数据
func (c *Client) read() {
    defer func() {
        if r := recover(); r != nil {
            fmt.Println("write stop", string(debug.Stack()), r)
        }
    }()

    defer func() {
        fmt.Println("读取客户端数据 关闭send", c)
        close(c.Send)
    }()

    for {
        _, message, err := c.Socket.ReadMessage()
        if err != nil {
            fmt.Println("读取客户端数据 错误", c.Addr, err)

            return
        }

        // 处理程序
        fmt.Println("读取客户端数据 处理:", string(message))
        ProcessData(c, message)
    }
}

3.1.6 接收客户端数据并处理

  • 约定发送和接收请求数据格式,为了js处理方便,采用了json的数据格式发送和接收数据(人类能够阅读的格式在工做开发中使用是比较方便的)
  • 登陆发送数据示例:
{"seq":"1565336219141-266129","cmd":"login","data":{"userId":"马远","appId":101}}
  • 登陆响应数据示例:
{"seq":"1565336219141-266129","cmd":"login","response":{"code":200,"codeMsg":"Success","data":null}}
  • websocket是双向的数据通信,能够连续发送,若是发送的数据须要服务端回复,就须要一个seq来肯定服务端的响应是回复哪一次的请求数据
  • cmd 是用来肯定动做,websocket没有相似于http的url,因此规定 cmd 是什么动做
  • 目前的动做有:login/heartbeat 用来发送登陆请求和链接保活(长时间没有数据发送的长链接容易被浏览器、移动中间商、nginx、服务端程序断开)
  • 为何须要AppId,UserId是表示用户的惟一字段,设计的时候为了作成通用性,设计AppId用来表示用户在哪一个平台登陆的(web、app、ios等),方便后续扩展
  • request_model.go 约定的请求数据格式
/************************  请求数据  **************************/
// 通用请求数据格式
type Request struct {
    Seq  string      `json:"seq"`            // 消息的惟一Id
    Cmd  string      `json:"cmd"`            // 请求命令字
    Data interface{} `json:"data,omitempty"` // 数据 json
}

// 登陆请求数据
type Login struct {
    ServiceToken string `json:"serviceToken"` // 验证用户是否登陆
    AppId        uint32 `json:"appId,omitempty"`
    UserId       string `json:"userId,omitempty"`
}

// 心跳请求数据
type HeartBeat struct {
    UserId string `json:"userId,omitempty"`
}
  • response_model.go
/************************  响应数据  **************************/
type Head struct {
    Seq      string    `json:"seq"`      // 消息的Id
    Cmd      string    `json:"cmd"`      // 消息的cmd 动做
    Response *Response `json:"response"` // 消息体
}

type Response struct {
    Code    uint32      `json:"code"`
    CodeMsg string      `json:"codeMsg"`
    Data    interface{} `json:"data"` // 数据 json
}

3.1.7 使用路由的方式处理客户端的请求数据

  • 使用路由的方式处理由客户端发送过来的请求数据
  • 之后添加请求类型之后就能够用类是用http相相似的方式(router-controller)去处理
  • acc_routers.go
// Websocket 路由
func WebsocketInit() {
    websocket.Register("login", websocket.LoginController)
    websocket.Register("heartbeat", websocket.HeartbeatController)
}

3.1.8 防止内存溢出和Goroutine不回收

    1. 定时任务清除超时链接

没有登陆的链接和登陆的链接6分钟没有心跳则断开链接

client_manager.go

// 定时清理超时链接
func ClearTimeoutConnections() {
    currentTime := uint64(time.Now().Unix())

    for client := range clientManager.Clients {
        if client.IsHeartbeatTimeout(currentTime) {
            fmt.Println("心跳时间超时 关闭链接", client.Addr, client.UserId, client.LoginTime, client.HeartbeatTime)

            client.Socket.Close()
        }
    }
}
    1. 读写的Goroutine有一个失败,则相互关闭

write()Goroutine写入数据失败,关闭c.Socket.Close()链接,会关闭read()Goroutine
read()Goroutine读取数据失败,关闭close(c.Send)链接,会关闭write()Goroutine

    1. 客户端主动关闭

关闭读写的Goroutine
ClientManager删除链接

    1. 监控用户链接、Goroutine数

十个内存溢出有九个和Goroutine有关
添加一个http的接口,能够查看系统的状态,防止Goroutine不回收
查看系统状态

    1. Nginx 配置不活跃的链接释放时间,防止忘记关闭的链接
    1. 使用 pprof 分析性能、耗时

3.2 使用javaScript实现webSocket客户端

3.2.1 启动并注册监听程序

  • js 创建链接,并处理链接成功、收到数据、断开链接的事件处理
ws = new WebSocket("ws://127.0.0.1:8089/acc");

 
ws.onopen = function(evt) {
  console.log("Connection open ...");
};
 
ws.onmessage = function(evt) {
  console.log( "Received Message: " + evt.data);
  data_array = JSON.parse(evt.data);
  console.log( data_array);
};
 
ws.onclose = function(evt) {
  console.log("Connection closed.");
};

3.2.2 发送数据

  • 须要注意:链接创建成功之后才能够发送数据
  • 创建链接之后由客户端向服务器发送数据示例
登陆:
ws.send('{"seq":"2323","cmd":"login","data":{"userId":"11","appId":101}}');

心跳:
ws.send('{"seq":"2324","cmd":"heartbeat","data":{}}');
 
关闭链接:
ws.close();

四、goWebSocket 项目

4.1 项目说明

  • 本项目是基于webSocket实现的分布式IM系统
  • 客户端随机分配用户名,全部人进入一个聊天室,实现群聊的功能
  • 单台机器(24核128G内存)支持百万客户端链接
  • 支持水平部署,部署的机器之间能够相互通信
  • 项目架构图

网站架构图

4.2 项目依赖

  • 本项目只须要使用 redis 和 golang
  • 本项目使用govendor管理依赖,克隆本项目就能够直接使用
# 主要使用到的包
github.com/gin-gonic/gin@v1.4.0
github.com/go-redis/redis
github.com/gorilla/websocket
github.com/spf13/viper
google.golang.org/grpc
github.com/golang/protobuf

4.3 项目启动

  • 克隆项目
git clone git@github.com:link1st/gowebsocket.git
# 或
git clone https://github.com/link1st/gowebsocket.git
  • 修改项目配置
cd gowebsocket
cd config
mv app.yaml.example app.yaml
# 修改项目监听端口,redis链接等(默认127.0.0.1:3306)
vim app.yaml
# 返回项目目录,为之后启动作准备
cd ..
  • 配置文件说明
app:
  logFile: log/gin.log # 日志文件位置
  httpPort: 8080 # http端口
  webSocketPort: 8089 # webSocket端口
  rpcPort: 9001 # 分布式部署程序内部通信端口
  httpUrl: 127.0.0.1:8080
  webSocketUrl:  127.0.0.1:8089


redis:
  addr: "localhost:6379"
  password: ""
  DB: 0
  poolSize: 30
  minIdleConns: 30
  • 启动项目
go run main.go
  • 进入IM聊天地址

http://127.0.0.1:8080/home/index

  • 到这里,就能够体验到基于webSocket的IM系统

五、webSocket项目Nginx配置

5.1 为何要配置Nginx

  • 使用nginx实现内外网分离,对外只暴露Nginx的Ip(通常的互联网企业会在nginx以前加一层LVS作负载均衡),减小入侵的可能
  • 使用Nginx能够利用Nginx的负载功能,前端再使用的时候只须要链接固定的域名,经过Nginx将流量分发了到不一样的机器
  • 同时咱们也可使用Nginx的不一样的负载策略(轮询、weight、ip_hash)

5.2 nginx配置

  • 使用域名 im.91vh.com 为示例,参考配置
  • 一级目录im.91vh.com/acc 是给webSocket使用,是用nginx stream转发功能(nginx 1.3.31 开始支持,使用Tengine配置也是相同的),转发到golang 8089 端口处理
  • 其它目录是给HTTP使用,转发到golang 8080 端口处理
upstream  go-im
{
    server 127.0.0.1:8080 weight=1 max_fails=2 fail_timeout=10s;
    keepalive 16;
}

upstream  go-acc
{
    server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s;
    keepalive 16;
}


server {
    listen       80 ;
    server_name  im.91vh.com;
    index index.html index.htm ;


    location /acc {
        proxy_set_header Host $host;
        proxy_pass http://go-acc;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
        proxy_set_header Connection "";
        proxy_redirect off;
        proxy_intercept_errors on;
        client_max_body_size 10m;
    }

    location /
    {
        proxy_set_header Host $host;
        proxy_pass http://go-im;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_redirect off;
        proxy_intercept_errors on;
        client_max_body_size 30m;
    }

    access_log  /link/log/nginx/access/im.log;
    error_log   /link/log/nginx/access/im.error.log;
}

5.3 问题处理

  • 运行nginx测试命令,查看配置文件是否正确
/link/server/tengine/sbin/nginx -t
  • 若是出现错误
nginx: [emerg] unknown "connection_upgrade" variable
configuration file /link/server/tengine/conf/nginx.conf test failed
  • 处理方法
  • nginx.com添加
http{
    fastcgi_temp_file_write_size 128k;
..... # 须要添加的内容

    #support websocket
    map $http_upgrade $connection_upgrade {
        default upgrade;
        ''      close;
    }

.....
    gzip on;
    
}
  • 缘由:Nginx代理webSocket的时候就会遇到Nginx的设计问题 End-to-end and Hop-by-hop Headers

六、压测

6.1 Linux内核优化

  • 设置文件打开句柄数
ulimit -n 1000000
  • 设置sockets链接参数
vim /etc/sysctl.conf
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 0

6.2 压测准备

  • 待压测,若是你们有压测的结果欢迎补充

6.3 压测数据

  • 项目在实际使用的时候,每一个链接约占 24Kb内存,一个Goroutine 约占11kb
  • 支持百万链接须要22G内存
在线用户数 cup 内存 I/O net.out
1W
10W
100W

七、如何基于webSocket实现一个分布式Im

7.1 说明

获取所有在线的用户,查询单前服务的所有用户+集群中服务的所有用户
发送消息,这里采用的是http接口发送(微信网页版发送消息也是http接口),这里考虑主要是两点:
1.服务分离,让acc系统尽可能的简单一点,不掺杂其它业务逻辑
2.发送消息是走http接口,不使用webSocket链接,才用收和发送数据分离的方式,能够加快收发数据的效率

7.2 架构

  • 项目启动注册和用户链接时序图

用户链接时序图

  • 其它系统(IM、任务)向webSocket(acc)系统链接的用户发送消息时序图

分布是系统随机给用户发送消息

八、回顾和反思

8.1 在其它系统应用

  • 本系统设计的初衷就是:和客户端保持一个长连接、对外部系统两个接口(查询用户是否在线、给在线的用户推送消息),实现业务的分离
  • 只有和业务分离可,才能够供多个业务使用,而不是每一个业务都创建一个长连接

8.2 已经实现的功能

  • gin log日志(请求日志+debug日志)
  • 读取配置文件 完成
  • 定时脚本,清理过时未心跳连接 完成
  • http接口,获取登陆、连接数量 完成
  • http接口,发送push、查询有多少人在线 完成
  • grpc 程序内部通信,发送消息 完成
  • appIds 一个用户在多个平台登陆
  • 界面,把全部在线的人拉倒一个群里面,发送消息 完成
  • 单聊、群聊 完成
  • 实现分布式,水平扩张 完成
  • 压测脚本
  • 文档整理
  • 文档目录、百万长连接的实现、为何要实现一个IM、怎么实现一个Im
  • 架构图以及扩展

IM实现细节:

  • 定义文本消息结构 完成
  • html发送文本消息 完成
  • 接口接收文本消息并发送给全体 完成
  • html接收到消息 显示到界面 完成
  • 界面优化 须要持续优化
  • 有人加入之后广播全体 完成
  • 定义加入聊天室的消息结构 完成
  • 引入机器人 待定

8.2 须要完善、优化

  • 登陆,使用微信登陆 获取昵称、头像等
  • 有帐号系统、资料系统
  • 界面优化、适配手机端
  • 消息 文本消息(支持表情)、图片、语音、视频消息
  • 微服务注册、发现、熔断等
  • 添加配置项,单台机器最大链接数量

8.3 总结

  • 虽然实现了一个分布式在聊天的IM,可是有不少细节没有处理(登陆没有鉴权、界面还待优化等),可是能够经过这个示例能够了解到:经过WebSocket解决不少业务上需求
  • 本文虽然号称单台机器能有百万长连接(内存上能知足),可是实际在场景远比这个复杂(cpu有些压力),固然了若是你有这么大的业务量能够购买更多的机器更好的去支撑你的业务,本程序只是演示如何在实际工做用使用webSocket.
  • 参考本文,你能够实现出来符合你须要的程序

九、参考文献

维基百科 WebSocket

阮一峰 WebSocket教程

WebSocket协议:5分钟从入门到精通

link1st gowebsocket

相关文章
相关标签/搜索