做者 | 刘晓敏 于雨java
Java 的世界里,你们普遍使用的一个高性能网络通讯框架 netty,不少 RPC 框架都是基于 netty 来实现的。在 golang 的世界里,getty 也是一个相似 netty 的高性能网络通讯库。getty 最初由 dubbogo 项目负责人于雨开发,做为底层通讯库在 dubbo-go 中使用。随着 dubbo-go 捐献给 apache 基金会,在社区小伙伴的共同努力下,getty 也最终进入到 apache 这个你们庭,并更名 dubbo-getty 。mysql
18 年的时候,我在公司里实践微服务,当时遇到最大的问题就是分布式事务问题。同年,阿里在社区开源他们的分布式事务解决方案,我也很快关注到这个项目,起初还叫 fescar,后来改名 seata。因为我对开源技术很感兴趣,加了不少社区群,当时也很关注 dubbo-go 这个项目,在里面默默潜水。随着对 seata 的了解,逐渐萌生了作一个 go 版本的分布式事务框架的想法。git
要作一个 golang 版的分布式事务框架,首要的一个问题就是如何实现 RPC 通讯。dubbo-go 就是很好的一个例子摆在眼前,遂开始研究 dubbo-go 的底层 getty。程序员
getty 框架的总体模型图以下:github
下面结合相关代码,详述 seata-golang 的 RPC 通讯过程。golang
实现 RPC 通讯,首先要创建网络链接吧,咱们从 client.go 开始看起。web
func (c *client) connect() { var ( err error ss Session ) for { // 创建一个 session 链接 ss = c.dial() if ss == nil { // client has been closed break } err = c.newSession(ss) if err == nil { // 收发报文 ss.(*session).run() // 此处省略部分代码 break } // don't distinguish between tcp connection and websocket connection. Because // gorilla/websocket/conn.go:(Conn)Close also invoke net.Conn.Close() ss.Conn().Close() } }
connect()
方法经过 dial()
方法获得了一个 session 链接,进入 dial() 方法:sql
func (c *client) dial() Session { switch c.endPointType { case TCP_CLIENT: return c.dialTCP() case UDP_CLIENT: return c.dialUDP() case WS_CLIENT: return c.dialWS() case WSS_CLIENT: return c.dialWSS() } return nil }
咱们关注的是 TCP 链接,因此继续进入 c.dialTCP()
方法:数据库
func (c *client) dialTCP() Session { var ( err error conn net.Conn ) for { if c.IsClosed() { return nil } if c.sslEnabled { if sslConfig, err := c.tlsConfigBuilder.BuildTlsConfig(); err == nil && sslConfig != nil { d := &net.Dialer{Timeout: connectTimeout} // 创建加密链接 conn, err = tls.DialWithDialer(d, "tcp", c.addr, sslConfig) } } else { // 创建 tcp 链接 conn, err = net.DialTimeout("tcp", c.addr, connectTimeout) } if err == nil && gxnet.IsSameAddr(conn.RemoteAddr(), conn.LocalAddr()) { conn.Close() err = errSelfConnect } if err == nil { // 返回一个 TCPSession return newTCPSession(conn, c) } log.Infof("net.DialTimeout(addr:%s, timeout:%v) = error:%+v", c.addr, connectTimeout, perrors.WithStack(err)) <-wheel.After(connectInterval) } }
至此,咱们知道了 getty 如何创建 TCP 链接,并返回 TCPSession。apache
那它是怎么收发报文的呢,咱们回到 connection 方法接着往下看,有这样一行 ss.(*session).run()
,在这行代码以后代码都是很简单的操做,咱们猜想这行代码运行的逻辑里面必定包含收发报文的逻辑,接着进入 run()
方法:
func (s *session) run() { // 省略部分代码 go s.handleLoop() go s.handlePackage() }
<br />这里起了两个 goroutine,handleLoop
和 handlePackage
,看字面意思符合咱们的猜测,进入 handleLoop()
方法:<br />
func (s *session) handleLoop() { // 省略部分代码 for { // A select blocks until one of its cases is ready to run. // It choose one at random if multiple are ready. Otherwise it choose default branch if none is ready. select { // 省略部分代码 case outPkg, ok = <-s.wQ: // 省略部分代码 iovec = iovec[:0] for idx := 0; idx < maxIovecNum; idx++ { // 经过 s.writer 将 interface{} 类型的 outPkg 编码成二进制的比特 pkgBytes, err = s.writer.Write(s, outPkg) // 省略部分代码 iovec = append(iovec, pkgBytes) //省略部分代码 } // 将这些二进制比特发送出去 err = s.WriteBytesArray(iovec[:]...) if err != nil { log.Errorf("%s, [session.handleLoop]s.WriteBytesArray(iovec len:%d) = error:%+v", s.sessionToken(), len(iovec), perrors.WithStack(err)) s.stop() // break LOOP flag = false } case <-wheel.After(s.period): if flag { if wsFlag { err := wsConn.writePing() if err != nil { log.Warnf("wsConn.writePing() = error:%+v", perrors.WithStack(err)) } } // 定时执行的逻辑,心跳等 s.listener.OnCron(s) } } } }
经过上面的代码,咱们不难发现,handleLoop()
方法处理的是发送报文的逻辑,RPC 须要发送的消息首先由 s.writer
编码成二进制比特,而后经过创建的 TCP 链接发送出去。这个 s.writer
对应的 Writer 接口是 RPC 框架必需要实现的一个接口。
继续看 handlePackage()
方法:
func (s *session) handlePackage() { // 省略部分代码 if _, ok := s.Connection.(*gettyTCPConn); ok { if s.reader == nil { errStr := fmt.Sprintf("session{name:%s, conn:%#v, reader:%#v}", s.name, s.Connection, s.reader) log.Error(errStr) panic(errStr) } err = s.handleTCPPackage() } else if _, ok := s.Connection.(*gettyWSConn); ok { err = s.handleWSPackage() } else if _, ok := s.Connection.(*gettyUDPConn); ok { err = s.handleUDPPackage() } else { panic(fmt.Sprintf("unknown type session{%#v}", s)) } }
进入 handleTCPPackage()
方法:
func (s *session) handleTCPPackage() error { // 省略部分代码 conn = s.Connection.(*gettyTCPConn) for { // 省略部分代码 bufLen = 0 for { // for clause for the network timeout condition check // s.conn.SetReadTimeout(time.Now().Add(s.rTimeout)) // 从 TCP 链接中收到报文 bufLen, err = conn.recv(buf) // 省略部分代码 break } // 省略部分代码 // 将收到的报文二进制比特写入 pkgBuf pktBuf.Write(buf[:bufLen]) for { if pktBuf.Len() <= 0 { break } // 经过 s.reader 将收到的报文解码成 RPC 消息 pkg, pkgLen, err = s.reader.Read(s, pktBuf.Bytes()) // 省略部分代码 s.UpdateActive() // 将收到的消息放入 TaskQueue 供 RPC 消费端消费 s.addTask(pkg) pktBuf.Next(pkgLen) // continue to handle case 5 } if exit { break } } return perrors.WithStack(err) }
从上面的代码逻辑咱们分析出,RPC 消费端须要将从 TCP 链接收到的二进制比特报文解码成 RPC 能消费的消息,这个工做由 s.reader 实现,因此,咱们要构建 RPC 通讯层也须要实现 s.reader 对应的 Reader 接口。
咱们都知道,netty 经过 boss 线程和 worker 线程实现了底层网络逻辑和业务逻辑的解耦。那么,getty 是如何实现的呢?
在 handlePackage()
方法最后,咱们看到,收到的消息被放入了 s.addTask(pkg)
这个方法,接着往下分析:
func (s *session) addTask(pkg interface{}) { f := func() { s.listener.OnMessage(s, pkg) s.incReadPkgNum() } if taskPool := s.EndPoint().GetTaskPool(); taskPool != nil { taskPool.AddTaskAlways(f) return } f() }
pkg
参数传递到了一个匿名方法,这个方法最终放入了 taskPool
。这个方法很关键,在我后来写 seata-golang 代码的时候,就遇到了一个坑,这个坑后面分析。
接着咱们看一下 taskPool 的定义:
// NewTaskPoolSimple build a simple task pool func NewTaskPoolSimple(size int) GenericTaskPool { if size < 1 { size = runtime.NumCPU() * 100 } return &taskPoolSimple{ work: make(chan task), sem: make(chan struct{}, size), done: make(chan struct{}), } }
构建了一个缓冲大小为 size (默认为 runtime.NumCPU() * 100
) 的 channel sem
。再看方法 AddTaskAlways(t task)
:
func (p *taskPoolSimple) AddTaskAlways(t task) { select { case <-p.done: return default: } select { case p.work <- t: return default: } select { case p.work <- t: case p.sem <- struct{}{}: p.wg.Add(1) go p.worker(t) default: goSafely(t) } }
加入的任务,会先由 len(p.sem) 个 goroutine 去消费,若是没有 goroutine 空闲,则会启动一个临时的 goroutine 去运行 t()。至关于有 len(p.sem) 个 goroutine 组成了 goroutine pool,pool 中的 goroutine 去处理业务逻辑,而不是由处理网络报文的 goroutine 去运行业务逻辑,从而实现了解耦。写 seata-golang 时遇到的一个坑,就是忘记设置 taskPool 形成了处理业务逻辑和处理底层网络报文逻辑的 goroutine 是同一个,我在业务逻辑中阻塞等待一个任务完成时,阻塞了整个 goroutine,使得阻塞期间收不到任何报文。
下面的代码见 getty.go:
// Reader is used to unmarshal a complete pkg from buffer type Reader interface { Read(Session, []byte) (interface{}, int, error) } // Writer is used to marshal pkg and write to session type Writer interface { // if @Session is udpGettySession, the second parameter is UDPContext. Write(Session, interface{}) ([]byte, error) } // ReadWriter interface use for handle application packages type ReadWriter interface { Reader Writer }
// EventListener is used to process pkg that received from remote session type EventListener interface { // invoked when session opened // If the return error is not nil, @Session will be closed. OnOpen(Session) error // invoked when session closed. OnClose(Session) // invoked when got error. OnError(Session, error) // invoked periodically, its period can be set by (Session)SetCronPeriod OnCron(Session) // invoked when getty received a package. Pls attention that do not handle long time // logic processing in this func. You'd better set the package's maximum length. // If the message's length is greater than it, u should should return err in // Reader{Read} and getty will close this connection soon. // // If ur logic processing in this func will take a long time, u should start a goroutine // pool(like working thread pool in cpp) to handle the processing asynchronously. Or u // can do the logic processing in other asynchronous way. // !!!In short, ur OnMessage callback func should return asap. // // If this is a udp event listener, the second parameter type is UDPContext. OnMessage(Session, interface{}) }
经过对整个 getty 代码的分析,咱们只要实现 ReadWriter
来对 RPC 消息编解码,再实现 EventListener
来处理 RPC 消息的对应的具体逻辑,将 ReadWriter
实现和 EventLister
实现注入到 RPC 的 Client 和 Server 端,则可实现 RPC 通讯。
下面是 seata 协议的定义:
在 ReadWriter 接口的实现 RpcPackageHandler
中,调用 Codec 方法对消息体按照上面的格式编解码:
// 消息编码为二进制比特 func MessageEncoder(codecType byte, in interface{}) []byte { switch codecType { case SEATA: return SeataEncoder(in) default: log.Errorf("not support codecType, %s", codecType) return nil } } // 二进制比特解码为消息体 func MessageDecoder(codecType byte, in []byte) (interface{}, int) { switch codecType { case SEATA: return SeataDecoder(in) default: log.Errorf("not support codecType, %s", codecType) return nil, 0 } }
再来看 client 端 EventListener
的实现 RpcRemotingClient
:
func (client *RpcRemoteClient) OnOpen(session getty.Session) error { go func() request := protocal.RegisterTMRequest{AbstractIdentifyRequest: protocal.AbstractIdentifyRequest{ ApplicationId: client.conf.ApplicationId, TransactionServiceGroup: client.conf.TransactionServiceGroup, }} // 创建链接后向 Transaction Coordinator 发起注册 TransactionManager 的请求 _, err := client.sendAsyncRequestWithResponse(session, request, RPC_REQUEST_TIMEOUT) if err == nil { // 将与 Transaction Coordinator 创建的链接保存在链接池供后续使用 clientSessionManager.RegisterGettySession(session) client.GettySessionOnOpenChannel <- session.RemoteAddr() } }() return nil } // OnError ... func (client *RpcRemoteClient) OnError(session getty.Session, err error) { clientSessionManager.ReleaseGettySession(session) } // OnClose ... func (client *RpcRemoteClient) OnClose(session getty.Session) { clientSessionManager.ReleaseGettySession(session) } // OnMessage ... func (client *RpcRemoteClient) OnMessage(session getty.Session, pkg interface{}) { log.Info("received message:{%v}", pkg) rpcMessage, ok := pkg.(protocal.RpcMessage) if ok { heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage) if isHeartBeat && heartBeat == protocal.HeartBeatMessagePong { log.Debugf("received PONG from %s", session.RemoteAddr()) } } if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST || rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY { log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body) // 处理事务消息,提交 or 回滚 client.onMessage(rpcMessage, session.RemoteAddr()) } else { resp, loaded := client.futures.Load(rpcMessage.Id) if loaded { response := resp.(*getty2.MessageFuture) response.Response = rpcMessage.Body response.Done <- true client.futures.Delete(rpcMessage.Id) } } } // OnCron ... func (client *RpcRemoteClient) OnCron(session getty.Session) { // 发送心跳 client.defaultSendRequest(session, protocal.HeartBeatMessagePing) }
clientSessionManager.RegisterGettySession(session)
的逻辑将在下文中分析。
代码见 DefaultCoordinator
:
func (coordinator *DefaultCoordinator) OnOpen(session getty.Session) error { log.Infof("got getty_session:%s", session.Stat()) return nil } func (coordinator *DefaultCoordinator) OnError(session getty.Session, err error) { // 释放 TCP 链接 SessionManager.ReleaseGettySession(session) session.Close() log.Errorf("getty_session{%s} got error{%v}, will be closed.", session.Stat(), err) } func (coordinator *DefaultCoordinator) OnClose(session getty.Session) { log.Info("getty_session{%s} is closing......", session.Stat()) } func (coordinator *DefaultCoordinator) OnMessage(session getty.Session, pkg interface{}) { log.Debugf("received message:{%v}", pkg) rpcMessage, ok := pkg.(protocal.RpcMessage) if ok { _, isRegTM := rpcMessage.Body.(protocal.RegisterTMRequest) if isRegTM { // 将 TransactionManager 信息和 TCP 链接创建映射关系 coordinator.OnRegTmMessage(rpcMessage, session) return } heartBeat, isHeartBeat := rpcMessage.Body.(protocal.HeartBeatMessage) if isHeartBeat && heartBeat == protocal.HeartBeatMessagePing { coordinator.OnCheckMessage(rpcMessage, session) return } if rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST || rpcMessage.MessageType == protocal.MSGTYPE_RESQUEST_ONEWAY { log.Debugf("msgId:%s, body:%v", rpcMessage.Id, rpcMessage.Body) _, isRegRM := rpcMessage.Body.(protocal.RegisterRMRequest) if isRegRM { // 将 ResourceManager 信息和 TCP 链接创建映射关系 coordinator.OnRegRmMessage(rpcMessage, session) } else { if SessionManager.IsRegistered(session) { defer func() { if err := recover(); err != nil { log.Errorf("Catch Exception while do RPC, request: %v,err: %w", rpcMessage, err) } }() // 处理事务消息,全局事务注册、分支事务注册、分支事务提交、全局事务回滚等 coordinator.OnTrxMessage(rpcMessage, session) } else { session.Close() log.Infof("close a unhandled connection! [%v]", session) } } } else { resp, loaded := coordinator.futures.Load(rpcMessage.Id) if loaded { response := resp.(*getty2.MessageFuture) response.Response = rpcMessage.Body response.Done <- true coordinator.futures.Delete(rpcMessage.Id) } } } } func (coordinator *DefaultCoordinator) OnCron(session getty.Session) { }
coordinator.OnRegTmMessage(rpcMessage, session)
注册 Transaction Manager,coordinator.OnRegRmMessage(rpcMessage, session)
注册 Resource Manager。具体逻辑分析见下文。
消息进入 coordinator.OnTrxMessage(rpcMessage, session)
方法,将按照消息的类型码路由到具体的逻辑当中:
switch msg.GetTypeCode() { case protocal.TypeGlobalBegin: req := msg.(protocal.GlobalBeginRequest) resp := coordinator.doGlobalBegin(req, ctx) return resp case protocal.TypeGlobalStatus: req := msg.(protocal.GlobalStatusRequest) resp := coordinator.doGlobalStatus(req, ctx) return resp case protocal.TypeGlobalReport: req := msg.(protocal.GlobalReportRequest) resp := coordinator.doGlobalReport(req, ctx) return resp case protocal.TypeGlobalCommit: req := msg.(protocal.GlobalCommitRequest) resp := coordinator.doGlobalCommit(req, ctx) return resp case protocal.TypeGlobalRollback: req := msg.(protocal.GlobalRollbackRequest) resp := coordinator.doGlobalRollback(req, ctx) return resp case protocal.TypeBranchRegister: req := msg.(protocal.BranchRegisterRequest) resp := coordinator.doBranchRegister(req, ctx) return resp case protocal.TypeBranchStatusReport: req := msg.(protocal.BranchReportRequest) resp := coordinator.doBranchReport(req, ctx) return resp default: return nil }
Client 端同 Transaction Coordinator 创建链接起链接后,经过 clientSessionManager.RegisterGettySession(session)
将链接保存在 serverSessions = sync.Map{}
这个 map 中。map 的 key 为从 session 中获取的 RemoteAddress 即 Transaction Coordinator 的地址,value 为 session。这样,Client 端就能够经过 map 中的一个 session 来向 Transaction Coordinator 注册 Transaction Manager 和 Resource Manager 了。具体代码见 getty_client_session_manager.go
。
Transaction Manager 和 Resource Manager 注册到 Transaction Coordinator 后,一个链接既有可能用来发送 TM 消息也有可能用来发送 RM 消息。咱们经过 RpcContext 来标识一个链接信息:
type RpcContext struct { Version string TransactionServiceGroup string ClientRole meta.TransactionRole ApplicationId string ClientId string ResourceSets *model.Set Session getty.Session }
当收到事务消息时,咱们须要构造这样一个 RpcContext 供后续事务处理逻辑使用。因此,咱们会构造下列 map 来缓存映射关系:
var ( // session -> transactionRole // TM will register before RM, if a session is not the TM registered, // it will be the RM registered session_transactionroles = sync.Map{} // session -> applicationId identified_sessions = sync.Map{} // applicationId -> ip -> port -> session client_sessions = sync.Map{} // applicationId -> resourceIds client_resources = sync.Map{} )
这样,Transaction Manager 和 Resource Manager 分别经过 coordinator.OnRegTmMessage(rpcMessage, session)
和 coordinator.OnRegRmMessage(rpcMessage, session)
注册到 Transaction Coordinator 时,会在上述 client_sessions map 中缓存 applicationId、ip、port 与 session 的关系,在 client_resources map 中缓存 applicationId 与 resourceIds(一个应用可能存在多个 Resource Manager) 的关系。在须要时,咱们就能够经过上述映射关系构造一个 RpcContext。这部分的实现和 java 版 seata 有很大的不一样,感兴趣的能够深刻了解一下。具体代码见 getty_session_manager.go
。
至此,咱们就分析完了 seata-golang 整个 RPC 通讯模型的机制。
seata-golang 从今年 4 月份开始开发,到 8 月份基本实现和 java 版 seata 1.2 协议的互通,对 mysql 数据库实现了 AT 模式(自动协调分布式事务的提交回滚),实现了 TCC 模式,TC 端使用 mysql 存储数据,使 TC 变成一个无状态应用支持高可用部署。下图展现了 AT 模式的原理:
后续,还有许多工做能够作,好比:对注册中心的支持、对配置中心的支持、和 java 版 seata 1.4 的协议互通、其余数据库的支持、raft transaction coordinator 的实现等,但愿对分布式事务问题感兴趣的开发者能够加入进来一块儿来打造一个完善的 golang 的分布式事务框架。
刘晓敏 (GitHubID dk-lockdown),目前就任于 h3c 成都分公司,擅长使用 Java/Go 语言,在云原生和微服务相关技术方向均有涉猎,目前专攻分布式事务。
于雨(github @AlexStocks),dubbo-go 项目和社区负责人,一个有十多年服务端基础架构研发一线工做经验的程序员,陆续参与改进过 Muduo/Pika/Dubbo/Sentinel-go 等知名项目,目前在蚂蚁金服可信原生部从事容器编排和 service mesh 工做。