基于 Serverless 与 Websocket 的聊天工具实现

传统业务实现 Websocket 并不难,然而函数计算基本上都是事件驱动,不支持长连接操做。若是将函数计算与 API 网关结合,是否能够有 Websocket 的实现方案呢?javascript

API 网关触发器实现 Websocket

WebSocket 协议是基于 TCP 的一种新的网络协议。它实现了浏览器与服务器全双工 (full-duplex) 通讯,即容许服务器主动发送信息给客户端。WebSocket 在服务端有数据推送需求时,能够主动发送数据至客户端。而原有 HTTP 协议的服务端对于需推送的数据,仅能经过轮询或 long poll 的方式来让客户端得到。java

因为云函数是无状态且以触发式运行,即在有事件到来时才会被触发。所以,为了实现 WebSocket,云函数 SCF 与 API 网关相结合,经过 API 网关承接及保持与客户端的链接。您能够认为云函数与 API 网关一块儿实现了服务端。当客户端有消息发出时,会先传递给 API 网关,再由 API 网关触发云函数执行。当服务端云函数要向客户端发送消息时,会先由云函数将消息 POST 到 API 网关的反向推送连接,再由 API 网关向客户端完成消息的推送。python

具体的实现架构以下:git

实现架构

对于 WebSocket 的整个生命周期,主要由如下几个事件组成:github

  • 链接创建:客户端向服务端请求创建链接并完成链接创建;
  • 数据上行:客户端经过已经创建的链接向服务端发送数据;
  • 数据下行:服务端经过已经创建的链接向客户端发送数据;
  • 客户端断开:客户端要求断开已经创建的链接;
  • 服务端断开:服务端要求断开已经创建的链接。

对于 WebSocket 整个生命周期的事件,云函数和 API 网关的处理过程以下:web

  • 链接创建:客户端与 API 网关创建 WebSocket 链接,API 网关将链接创建事件发送给 SCF;
  • 数据上行:客户端经过 WebSocket 发送数据,API 网关将数据转发送给 SCF;
  • 数据下行:SCF 经过向 API 网关指定的推送地址发送请求,API 网关收到后会将数据经过 WebSocket 发送给客户端;
  • 客户端断开:客户端请求断开链接,API 网关将链接断开事件发送给 SCF;
  • 服务端断开:SCF 经过向 API 网关指定的推送地址发送断开请求,API 网关收到后断开 WebSocket 链接。

所以,云函数与 API 网关之间的交互,须要由 3 类云函数来承载:express

  • 注册函数:在客户端发起和 API 网关之间创建 WebSocket 链接时触发该函数,通知 SCF WebSocket 链接的 secConnectionID。一般会在该函数记录 secConnectionID 到持久存储中,用于后续数据的反向推送;
  • 清理函数:在客户端主动发起 WebSocket 链接中断请求时触发该函数,通知 SCF 准备断开链接的 secConnectionID。一般会在该函数清理持久存储中记录的该 secConnectionID;
  • 传输函数:在客户端经过 WebSocket 链接发送数据时触发该函数,告知 SCF 链接的 secConnectionID 以及发送的数据。一般会在该函数处理业务数据。例如,是否将数据推送给持久存储中的其余 secConnectionID。

Websocket 功能实现

根据腾讯云官网提供的该功能的总体架构图:json

总体架构图

这里咱们能够使用对象存储 COS 做为持久化的方案,当用户创建连接存储 ConnectionId 到 COS 中,当用户断开链接删除该连接 ID。api

其中注册函数:浏览器

# -*- coding: utf8 -*-
import os
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client

bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))


def main_handler(event, context):
    print("event is %s" % event)

    connectionID = event['websocket']['secConnectionID']

    retmsg = {}
    retmsg['errNo'] = 0
    retmsg['errMsg'] = "ok"
    retmsg['websocket'] = {
        "action": "connecting",
        "secConnectionID": connectionID
    }

    cosClient.put_object(
        Bucket=bucket,
        Body='websocket'.encode("utf-8"),
        Key=str(connectionID),
        EnableMD5=False
    )

    return retmsg

传输函数:

# -*- coding: utf8 -*-
import os
import json
import requests
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client

bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))

sendbackHost = os.environ.get("url")


def Get_ConnectionID_List():
    response = cosClient.list_objects(
        Bucket=bucket,
    )
    return [eve['Key'] for eve in response['Contents']]


def send(connectionID, data):
    retmsg = {}
    retmsg['websocket'] = {}
    retmsg['websocket']['action'] = "data send"
    retmsg['websocket']['secConnectionID'] = connectionID
    retmsg['websocket']['dataType'] = 'text'
    retmsg['websocket']['data'] = data
    requests.post(sendbackHost, json=retmsg)


def main_handler(event, context):
    print("event is %s" % event)

    connectionID_List = Get_ConnectionID_List()
    connectionID = event['websocket']['secConnectionID']
    count = len(connectionID_List)
    data = event['websocket']['data'] + "(===Online people:" + str(count) + "===)"
    for ID in connectionID_List:
        if ID != connectionID:
            send(ID, data)

    return "send success"

清理函数:

# -*- coding: utf8 -*-
import os
import requests
from qcloud_cos_v5 import CosConfig
from qcloud_cos_v5 import CosS3Client

bucket = os.environ.get('bucket')
region = os.environ.get('region')
secret_id = os.environ.get('secret_id')
secret_key = os.environ.get('secret_key')
cosClient = CosS3Client(CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key))

sendbackHost = os.environ.get("url")


def main_handler(event, context):
    print("event is %s" % event)

    connectionID = event['websocket']['secConnectionID']

    retmsg = {}
    retmsg['websocket'] = {}
    retmsg['websocket']['action'] = "closing"
    retmsg['websocket']['secConnectionID'] = connectionID
    requests.post(sendbackHost, json=retmsg)

    cosClient.delete_object(
        Bucket=bucket,
        Key=str(connectionID),
    )

    return event

Yaml 文件以下:

Conf:
  component: "serverless-global"
  inputs:
    region: ap-guangzhou
    bucket: chat-cos-1256773370
    secret_id: 
    secret_key: 

myBucket:
  component: '@serverless/tencent-cos'
  inputs:
    bucket: ${Conf.bucket}
    region: ${Conf.region}

restApi:
  component: '@serverless/tencent-apigateway'
  inputs:
    region: ${Conf.region}
    protocols:
      - http
      - https
    serviceName: ChatDemo
    environment: release
    endpoints:
      - path: /
        method: GET
        protocol: WEBSOCKET
        serviceTimeout: 800
        function:
          transportFunctionName: ChatTrans
          registerFunctionName: ChatReg
          cleanupFunctionName: ChatClean


ChatReg:
  component: "@serverless/tencent-scf"
  inputs:
    name: ChatReg
    codeUri: ./code
    handler: reg.main_handler
    runtime: Python3.6
    region:  ${Conf.region}
    environment:
      variables:
        region: ${Conf.region}
        bucket: ${Conf.bucket}
        secret_id: ${Conf.secret_id}
        secret_key: ${Conf.secret_key}
        url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw

ChatTrans:
  component: "@serverless/tencent-scf"
  inputs:
    name: ChatTrans
    codeUri: ./code
    handler: trans.main_handler
    runtime: Python3.6
    region:  ${Conf.region}
    environment:
      variables:
        region: ${Conf.region}
        bucket: ${Conf.bucket}
        secret_id: ${Conf.secret_id}
        secret_key: ${Conf.secret_key}
        url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw

ChatClean:
  component: "@serverless/tencent-scf"
  inputs:
    name: ChatClean
    codeUri: ./code
    handler: clean.main_handler
    runtime: Python3.6
    region:  ${Conf.region}
    environment:
      variables:
        region: ${Conf.region}
        bucket: ${Conf.bucket}
        secret_id: ${Conf.secret_id}
        secret_key: ${Conf.secret_key}
        url: http://set-gwm9thyc.cb-guangzhou.apigateway.tencentyun.com/api-etj7lhtw

注意,这里须要先部署 API 网关。当部署完成,得到回推地址,将回推地址以 url 的形式写入到对应函数的环境变量中:

理论上应该是能够经过 ${restApi.url[0].internalDomain} 自动得到到 url 的,可是我并无成功得到到这个 url,只能先部署 API 网关,得到到这个地址以后,再从新部署。

部署完成以后,咱们能够编写 HTML 代码,实现可视化的 Websocket Client,其核心的 JavaScript 代码为:

window.onload = function () {
    var conn;
    var msg = document.getElementById("msg");
    var log = document.getElementById("log");

    function appendLog(item) {
        var doScroll = log.scrollTop === log.scrollHeight - log.clientHeight;
        log.appendChild(item);
        if (doScroll) {
            log.scrollTop = log.scrollHeight - log.clientHeight;
        }
    }

    document.getElementById("form").onsubmit = function () {
        if (!conn) {
            return false;
        }
        if (!msg.value) {
            return false;
        }
        conn.send(msg.value);
        //msg.value = "";
		
		var item = document.createElement("div");
		item.innerText = "发送↑:";
		appendLog(item);
		
		var item = document.createElement("div");
		item.innerText = msg.value;
		appendLog(item);
		
        return false;
    };

    if (window["WebSocket"]) {
        //替换为websocket链接地址
        conn = new WebSocket("ws://service-01era6ni-1256773370.gz.apigw.tencentcs.com/release/");
        conn.onclose = function (evt) {
            var item = document.createElement("div");
            item.innerHTML = "<b>Connection closed.</b>";
            appendLog(item);
        };
        conn.onmessage = function (evt) {
			var item = document.createElement("div");
			item.innerText = "接收↓:";
			appendLog(item);
		
            var messages = evt.data.split('\n');
            for (var i = 0; i < messages.length; i++) {
                var item = document.createElement("div");
                item.innerText = messages[i];
                appendLog(item);
            }
        };
    } else {
        var item = document.createElement("div");
        item.innerHTML = "<b>Your browser does not support WebSockets.</b>";
        appendLog(item);
    }
};

完成以后,咱们打开两个页面,进行测试:

总结

经过云函数 + API 网关进行 Websocket 的实践,绝对不单单是一个聊天工具这么简单,它能够用在不少方面,例如经过 Websocket 进行实时日志系统的制做等。

单独的函数计算,仅仅是一个计算平台,只有和周边的 BaaS 结合,才能展现出 Serverless 架构的价值和真正的能力。这也是为何不少人说 Serverless=FaaS+BaaS 的一个缘由。

期待更多小伙伴,能够经过 Serverless 架构,创造出更多有趣的应用。

Serverless Framework 30 天试用计划

咱们诚邀您来体验最便捷的 Serverless 开发和部署方式。在试用期内,相关联的产品及服务均提供免费资源和专业的技术支持,帮助您的业务快速、便捷地实现 Serverless!

详情可查阅:Serverless Framework 试用计划

One More Thing

3 秒你能作什么?喝一口水,看一封邮件,仍是 —— 部署一个完整的 Serverless 应用?

复制连接至 PC 浏览器访问:https://serverless.cloud.tencent.com/deploy/express

3 秒极速部署,当即体验史上最快的 Serverless HTTP 实战开发!

传送门:

欢迎访问:Serverless 中文网,您能够在 最佳实践 里体验更多关于 Serverless 应用的开发!


推荐阅读:《Serverless 架构:从原理、设计到项目实战》

相关文章
相关标签/搜索