李腾飞,腾讯容器技术研发工程师,腾讯云TKE后台研发,SuperEdge核心开发成员。node
杜杨浩,腾讯云高级工程师,热衷于开源、容器和Kubernetes。目前主要从事镜像仓库,Kubernetes集群高可用&备份还原,以及边缘计算相关研发工做。git
SuperEdge 是 Kubernetes 原生的边缘容器方案,它将 Kubernetes 强大的容器管理能力扩展到边缘计算场景中,针对边缘计算场景中常见的技术挑战提供了解决方案,如:单集群节点跨地域、云边网络不可靠、边缘节点位于 NAT 网络等。这些能力可让应用很容易地部署到边缘计算节点上,而且可靠地运行,能够帮助您很方便地把分布在各处的计算资源放到一个 Kubernetes 集群中管理,包括但不限于:边缘云计算资源、私有云资源、现场设备,打造属于您的边缘 PaaS 平台。SuperEdge 支持全部 Kubernetes 资源类型、API 接口、使用方式、运维工具,无额外的学习成本,也兼容其余云原生项目,如:Promethues,使用者能够结合其余所需的云原生项目一块儿使用。项目由如下公司共同发起:腾讯、Intel、VMware、虎牙直播、寒武纪、首都在线和美团。github
在边缘场景中,不少时候都是单向网络,即只有边缘节点能主动访问云端。云边隧道主要用于代理云端访问边缘节点组件的请求,解决云端没法直接访问边缘节点的问题。api
架构图以下所示:网络
实现原理为:架构
而整个请求的代理转发流程以下:并发
在介绍完 Tunnel 的配置后,下面介绍 Tunnel 的内部数据流转:app
上图标记出了 HTTPS 代理的数据流转,TCP 代理数据流转和 HTTPS 的相似,其中的关键步骤:负载均衡
nodeContext 和 connContext 都是作链接的管理,可是 nodeContext 管理 gRPC 长链接的和 connContext 管理的上层转发请求的链接(TCP 和 HTTPS)的生命周期是不相同的,所以须要分开管理运维
Tunnel 管理的链接能够分为底层链接(云端隧道的 gRPC 链接)和上层应用链接(HTTPS 链接和 TCP 链接),链接异常的管理的能够分为如下几种场景:
以 HTTPS 链接为例,tunnel-edge 的 HTTPS Client 与边缘节点 Server 链接异常断开,会发送 StreamMsg (StreamMsg.Type=CLOSE) 消息,tunnel-cloud 在接收到 StreamMsg 消息以后会主动关闭 HTTPS Server与HTTPS Client 的链接。
gRPC 链接异常,Stream 模块会根据与 gPRC 链接绑定的 node.connContext,向 HTTPS 和 TCP 模块发送 StreamMsg(StreamMsg.Type=CLOSE),HTTPS 或 TCP 模块接收消息以后主动断开链接。
func (stream *Stream) Start(mode string) { context.GetContext().RegisterHandler(util.STREAM_HEART_BEAT, util.STREAM, streammsg.HeartbeatHandler) if mode == util.CLOUD { ... //启动gRPC server go connect.StartServer() ... //同步coredns的hosts插件的配置文件 go connect.SynCorefile() } else { //启动gRPC client go connect.StartSendClient() ... } ... }
tunnel-cloud 首先调用 RegisterHandler 注册心跳消息处理函数 HeartbeatHandler
SynCorefile 执行同步 tunnel-coredns 的 hosts 插件的配置文件,每隔一分钟(考虑到 configmap 同步 tunnel-cloud 的 pod 挂载文件的时间)执行一次 checkHosts,以下:
func SynCorefile() { for { ... err := coreDns.checkHosts() ... time.Sleep(60 * time.Second) } }
而 checkHosts 负责 configmap 具体的刷新操做:
func (dns *CoreDns) checkHosts() error { nodes, flag := parseHosts() if !flag { return nil } ... _, err = dns.ClientSet.CoreV1().ConfigMaps(dns.Namespace).Update(cctx.TODO(), cm, metav1.UpdateOptions{}) ... }
checkHosts 首先调用 parseHosts 获取本地 hosts 文件中边缘节点名称以及对应 tunnel-cloud podIp 映射列表,对比 podIp 的对应节点名和内存中节点名,若是有变化则将这个内容覆盖写入 configmap 并更新:
另外,这里 tunnel-cloud 引入 configmap 本地挂载文件的目的是:优化托管模式下众多集群同时同步 tunnel-coredns 时的性能
tunnel-edge 首先调用 StartClient 与 tunnel-edge 创建 gRPC 链接,返回 grpc.ClientConn
func StartClient() (*grpc.ClientConn, ctx.Context, ctx.CancelFunc, error) { ... opts := []grpc.DialOption{grpc.WithKeepaliveParams(kacp), grpc.WithStreamInterceptor(ClientStreamInterceptor), grpc.WithTransportCredentials(creds)} conn, err := grpc.Dial(conf.TunnelConf.TunnlMode.EDGE.StreamEdge.Client.ServerName, opts...) ... }
在调用 grpc.Dial 时会传递grpc.WithStreamInterceptor(ClientStreamInterceptor)
DialOption,将 ClientStreamInterceptor 做为 StreamClientInterceptor 传递给 grpc.ClientConn,等待 gRPC 链接状态变为 Ready,而后执行 Send 函数。streamClient.TunnelStreaming 调用StreamClientInterceptor 返回 wrappedClientStream 对象
func ClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { ... opts = append(opts, grpc.PerRPCCredentials(oauth.NewOauthAccess(&oauth2.Token{AccessToken: clientToken}))) ... return newClientWrappedStream(s), nil }
ClientStreamInterceptor 会将边缘节点名称以及 token 构形成 oauth2.Token.AccessToken 进行认证传递,并构建 wrappedClientStream
stream.Send 会并发调用 wrappedClientStream.SendMsg 以及 wrappedClientStream.RecvMsg 分别用于 tunnel-edge 发送以及接受,并阻塞等待
注意:tunnel-edge 向 tunnel-cloud 注册节点信息是在创建 gRPC Stream 时,而不是建立 grpc.connClient 的时候
整个过程以下图所示:
相应的,在初始化 tunnel-cloud 时,会将grpc.StreamInterceptor(ServerStreamInterceptor)
构建成 gRPC ServerOption,并将 ServerStreamInterceptor 做为 StreamServerInterceptor 传递给 grpc.Server:
func StartServer() { ... opts := []grpc.ServerOption{grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp), grpc.StreamInterceptor(ServerStreamInterceptor), grpc.Creds(creds)} s := grpc.NewServer(opts...) proto.RegisterStreamServer(s, &stream.Server{}) ... }
云端 gRPC 服务在接受到 tunnel-edge 请求(创建 Stream 流)时,会调用 ServerStreamInterceptor,而 ServerStreamInterceptor 会从gRPC metadata 中解析出此 gRPC 链接对应的边缘节点名和token,并对该 token 进行校验,而后根据节点名构建 wrappedServerStream 做为与该边缘节点通讯的处理对象(每一个边缘节点对应一个处理对象),handler 函数会调用 stream.TunnelStreaming,并将 wrappedServerStream 传递给它(wrappedServerStream 实现了proto.Stream_TunnelStreamingServer 接口)
func ServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { md, ok := metadata.FromIncomingContext(ss.Context()) ... tk := strings.TrimPrefix(md["authorization"][0], "Bearer ") auth, err := token.ParseToken(tk) ... if auth.Token != token.GetTokenFromCache(auth.NodeName) { klog.Errorf("invalid token node = %s", auth.NodeName) return ErrInvalidToken } err = handler(srv, newServerWrappedStream(ss, auth.NodeName)) if err != nil { ctx.GetContext().RemoveNode(auth.NodeName) klog.Errorf("node disconnected node = %s err = %v", auth.NodeName, err) } return err }
而当 TunnelStreaming 方法退出时,就会执 ServerStreamInterceptor 移除节点的逻辑ctx.GetContext().RemoveNode
TunnelStreaming 会并发调用 wrappedServerStream.SendMsg 以及 wrappedServerStream.RecvMsg 分别用于 tunnel-cloud 发送以及接受,并阻塞等待:
func (s *Server) TunnelStreaming(stream proto.Stream_TunnelStreamingServer) error { errChan := make(chan error, 2) go func(sendStream proto.Stream_TunnelStreamingServer, sendChan chan error) { sendErr := sendStream.SendMsg(nil) ... sendChan <- sendErr }(stream, errChan) go func(recvStream proto.Stream_TunnelStreamingServer, recvChan chan error) { recvErr := stream.RecvMsg(nil) ... recvChan <- recvErr }(stream, errChan) e := <-errChan return e }
SendMsg 会从 wrappedServerStream 对应边缘节点 node 中接受 StreamMsg,并调用 ServerStream.SendMsg 发送该消息给 tunnel-edge
func (w *wrappedServerStream) SendMsg(m interface{}) error { if m != nil { return w.ServerStream.SendMsg(m) } node := ctx.GetContext().AddNode(w.node) ... for { msg := <-node.NodeRecv() ... err := w.ServerStream.SendMsg(msg) ... }}
而 RecvMsg 会不断接受来自 tunnel-edge 的 StreamMsg,并调用 StreamMsg.对应的处理函数进行操做
HTTPS 模块负责创建云边的 HTTPS 代理,将云端组件(例如:kube-apiserver)的 https 请求转发给边端服务(例如:kubelet)
func (https *Https) Start(mode string) { context.GetContext().RegisterHandler(util.CONNECTING, util.HTTPS, httpsmsg.ConnectingHandler) context.GetContext().RegisterHandler(util.CONNECTED, util.HTTPS, httpsmsg.ConnectedAndTransmission) context.GetContext().RegisterHandler(util.CLOSED, util.HTTPS, httpsmsg.ConnectedAndTransmission) context.GetContext().RegisterHandler(util.TRANSNMISSION, util.HTTPS, httpsmsg.ConnectedAndTransmission) if mode == util.CLOUD { go httpsmng.StartServer() }}
Start 函数首先注册了 StreamMsg 的处理函数,其中 CLOSED 处理函数主要处理关闭链接的消息,并启动 HTTPS Server。
当云端组件向 tunnel-cloud 发送 HTTPS 请求时,serverHandler 会首先从 request.Host 字段解析节点名,若先创建 TLS 链接,而后在链接中写入 HTTP 的 request 对象,此时的 request.Host 能够不设置,则须要从 request.TLS.ServerName 解析节点名。HTTPS Server 读取 request.Body 以及 request.Header 构建 HttpsMsg 结构体,并序列化后封装成 StreamMsg,经过 Send2Node 发送 StreamMsg 放入 StreamMsg.node 对应的 node 的 Channel 中,由 Stream 模块发送到 tunnel-edge
func (serverHandler *ServerHandler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { var nodeName string nodeinfo := strings.Split(request.Host, ":") if context.GetContext().NodeIsExist(nodeinfo[0]) { nodeName = nodeinfo[0] } else { nodeName = request.TLS.ServerName } ... node.Send2Node(StreamMsg)}
tunnel-edge 接受到 StreamMsg,并调用 ConnectingHandler 函数进行处理:
func ConnectingHandler(msg *proto.StreamMsg) error { go httpsmng.Request(msg) return nil}func Request(msg *proto.StreamMsg) { httpConn, err := getHttpConn(msg) ... rawResponse := bytes.NewBuffer(make([]byte, 0, util.MaxResponseSize)) rawResponse.Reset() respReader := bufio.NewReader(io.TeeReader(httpConn, rawResponse)) resp, err := http.ReadResponse(respReader, nil) ... node.BindNode(msg.Topic) ... if resp.StatusCode != http.StatusSwitchingProtocols { handleClientHttp(resp, rawResponse, httpConn, msg, node, conn) } else { handleClientSwitchingProtocols(httpConn, rawResponse, msg, node, conn) }}
ConnectingHandler 会调用 Request 对该 StreamMsg 进行处理。Reqeust 首先经过 getHttpConn 与边缘节点 Server 创建的 TLS 链接。解析 TLS 链接中返回的数据获取 HTTP Response,Status Code 为200,将 Response 的内容发送到 tunnel-cloud,Status Code 为101,将从TLS 链接读取 Response 的二进制数据发送到 tunnel-cloud,其中 StreamMsg.Type为CONNECTED。
tunnel-cloud 在接受到该 StreamMsg 后,会调用 ConnectedAndTransmission 进行处理:
func ConnectedAndTransmission(msg *proto.StreamMsg) error { conn := context.GetContext().GetConn(msg.Topic) ... conn.Send2Conn(msg) return nil}
经过 msg.Topic(conn uid) 获取 conn,并经过 Send2Conn 将消息塞到该 conn 对应的管道中
云端 HTTPS Server 在接受到云端的 CONNECTED 消息以后,认为HTTPS 代理成功创建。并继续执行 handleClientHttp or handleClientSwitchingProtocols 进行数据传输,这里只分析 handleClientHttp 非协议提高下的数据传输过程,HTTPS Client 端的处理逻辑以下:
func handleClientHttp(resp *http.Response, rawResponse *bytes.Buffer, httpConn net.Conn, msg *proto.StreamMsg, node context.Node, conn context.Conn) { ... go func(read chan *proto.StreamMsg, response *http.Response, buf *bytes.Buffer, stopRead chan struct{}) { rrunning := true for rrunning { bbody := make([]byte, util.MaxResponseSize) n, err := response.Body.Read(bbody) respMsg := &proto.StreamMsg{ Node: msg.Node, Category: msg.Category, Type: util.CONNECTED, Topic: msg.Topic, Data: bbody[:n], } ... read <- respMsg } ... }(readCh, resp, rawResponse, stop) running := true for running { select { case cloudMsg := <-conn.ConnRecv(): ... case respMsg := <-readCh: ... node.Send2Node(respMsg) ... } } ...}
这里 handleClientHttp 会一直尝试读取来自边端组件的数据包,并构建成 TRANSNMISSION 类型的 StreamMsg 发送给 tunnel-cloud,tunnel-cloud 在接受到StreamMsg 后调用 ConnectedAndTransmission 函数,将 StreamMsg 放入 StreamMsg.Type 对应的 HTTPS 模块的 conn.Channel 中
func handleServerHttp(rmsg *HttpsMsg, writer http.ResponseWriter, request *http.Request, node context.Node, conn context.Conn) { for k, v := range rmsg.Header { writer.Header().Add(k, v) } flusher, ok := writer.(http.Flusher) if ok { running := true for running { select { case <-request.Context().Done(): ... case msg := <-conn.ConnRecv(): ... _, err := writer.Write(msg.Data) flusher.Flush() ... } } ...}
handleServerHttp 在接受到 StreamMsg 后,会将 msg.Data,也即边端组件的数据包,发送给云端组件。整个数据流是单向的由边端向云端传送,以下所示:
而对于相似kubectl exec
的请求,数据流是双向的,此时边端组件 (kubelet) 会返回 StatusCode 为101的回包,标示协议提高,以后 tunnel-cloud 以及 tunnel-edge 会分别切到 handleServerSwitchingProtocols 以及 handleClientSwitchingProtocols 对 HTTPS 底层链接进行读取和写入,完成数据流的双向传输。
架构以下所示:
总结HTTPS模块以下:
TCP 模块负责在多集群管理中创建云端管控集群与边缘独立集群的一条 TCP 代理隧道:
func (tcp *TcpProxy) Start(mode string) { context.GetContext().RegisterHandler(util.TCP_BACKEND, tcp.Name(), tcpmsg.BackendHandler) context.GetContext().RegisterHandler(util.TCP_FRONTEND, tcp.Name(), tcpmsg.FrontendHandler) context.GetContext().RegisterHandler(util.CLOSED, tcp.Name(), tcpmsg.ControlHandler) if mode == util.CLOUD { ... for front, backend := range Tcp.Addr { go func(front, backend string) { ln, err := net.Listen("tcp", front) ... for { rawConn, err := ln.Accept() .... fp := tcpmng.NewTcpConn(uuid, backend, node) fp.Conn = rawConn fp.Type = util.TCP_FRONTEND go fp.Write() go fp.Read() } }(front, backend) } }
Start 函数首先注册了 StreamMsg 的处理函数,其中 CLOSED 处理函数主要处理关闭链接的消息,以后在云端启动 TCP Server。
在接受到云端组件的请求后,TCP Server 会将请求封装成 StremMsg 发送给 StreamServer,由 StreamServer 发送到 tunnel-edge,其中 StreamMsg.Type=FrontendHandler,StreamMsg.Node 从已创建的云边隧道的节点中随机选择一个。
tunnel-edge 在接受到该StreamMsg 后,会调用 FrontendHandler 函数处理
func FrontendHandler(msg *proto.StreamMsg) error { c := context.GetContext().GetConn(msg.Topic) if c != nil { c.Send2Conn(msg) return nil } tp := tcpmng.NewTcpConn(msg.Topic, msg.Addr, msg.Node) tp.Type = util.TCP_BACKEND tp.C.Send2Conn(msg) tcpAddr, err := net.ResolveTCPAddr("tcp", tp.Addr) if err != nil { ... conn, err := net.DialTCP("tcp", nil, tcpAddr) ... tp.Conn = conn go tp.Read() go tp.Write() return nil}
FrontendHandler 首先使用 StreamMsg.Addr 与 Edge Server 创建 TCP 链接,启动协程异步对 TCP 链接 Read 和 Write,同时新建 conn 对象(conn.uid=StreamMsg.Topic),并 eamMsg.Data 写入 TCP 链接。tunnel-edge 在接收到 Edge Server 的返回数据将其封装为 StreamMsg(StreamMsg.Topic=BackendHandler) 发送到 tunnel-cloud
整个过程如图所示:
【腾讯云原生】云说新品、云研新术、云游新活、云赏资讯,扫码关注同名公众号,及时获取更多干货!!
![]()