frp几乎全部的链接处理都是构建在mux模块之上的,重要性没必要多说,来看一下这是个啥吧git
ps: 安装方法github
go get "github.com/fatedier/golib/net/mux"
该模块很小,不到300行,分为两个文件:mux.go
和rule.go
。
由于rule.go
文件相对简单一些,咱们先来看这个。golang
首先看其中所命名的函数类型MatchFunc
:算法
type MatchFunc func(data []byte) (match bool)
该类型的函数用来判断data
属于什么协议。编程
那么具体如何判断呢,这里也实现了三个例子:网络
var ( HttpsNeedBytesNum uint32 = 1 HttpNeedBytesNum uint32 = 3 YamuxNeedBytesNum uint32 = 2 ) var HttpsMatchFunc MatchFunc = func(data []byte) bool { if len(data) < int(HttpsNeedBytesNum) { return false } if data[0] == 0x16 { return true } else { return false } } // From https://developer.mozilla.org/en-US/docs/Web/HTTP/Methods var httpHeadBytes = map[string]struct{}{ "GET": struct{}{}, "HEA": struct{}{}, "POS": struct{}{}, "PUT": struct{}{}, "DEL": struct{}{}, "CON": struct{}{}, "OPT": struct{}{}, "TRA": struct{}{}, "PAT": struct{}{}, } var HttpMatchFunc MatchFunc = func(data []byte) bool { if len(data) < int(HttpNeedBytesNum) { return false } _, ok := httpHeadBytes[string(data[:3])] return ok } // From https://github.com/hashicorp/yamux/blob/master/spec.md var YamuxMatchFunc MatchFunc = func(data []byte) bool { if len(data) < int(YamuxNeedBytesNum) { return false } if data[0] == 0 && data[1] >= 0x0 && data[1] <= 0x3 { return true } return false }
这三个函数分别实现了区分HTTPS
,HTTP
以及go中特有的yamux
(实际上这是一个库,能够参考Go中的I/O多路复用)。app
先来看其中的struct
,第一个是Mux
第二个是listener
,这里先来看一下较为简单的listener
。tcp
type listener struct { mux *Mux priority int needBytesNum uint32 matchFn MatchFunc c chan net.Conn mu sync.RWMutex } // Accept waits for and returns the next connection to the listener. func (ln *listener) Accept() (net.Conn, error) { ... } // Close removes this listener from the parent mux and closes the channel. func (ln *listener) Close() error { ... } func (ln *listener) Addr() net.Addr { ... }
刚看到这个结构体咱们可能很迷惑,不知道都是干啥的,并且网络编程中通常listener这种东西要绑定在一个套接字上,但很明显listener
没有,不过其惟一跟套接字相关的多是其c
字段,其是一个由net
包中的Conn
接口组成的chanel
;而后mu
字段就是读写锁了,这个很简单;而后mux
字段则是上面提到的两个结构体中的另外一个结构体Mux
的指针;接下来到了priority
字段上,顾名思义,这个彷佛跟优先级有关系,暂且存疑;needBytesNum
则更有些蒙了,不过感受其是跟读取byte的数量有关系,最后是matchFn
。函数
好,初步认识了这个结构体的结构后,咱们看看其方法。三个方法的listener
实现了net
模块中的Listener
接口:ui
// A Listener is a generic network listener for stream-oriented protocols. // // Multiple goroutines may invoke methods on a Listener simultaneously. type Listener interface { // Accept waits for and returns the next connection to the listener. Accept() (Conn, error) // Close closes the listener. // Any blocked Accept operations will be unblocked and return errors. Close() error // Addr returns the listener's network address. Addr() Addr }
而后先来分析其Accept
方法:
func (ln *listener) Accept() (net.Conn, error) { conn, ok := <-ln.c if !ok { return nil, fmt.Errorf("network connection closed") } return conn, nil }
该方法很简单,就是从c
这个由Conn
组成的channel
中,获取Conn
对象,好这里咱们就明白了,这个listener
和普通的不同,他很特别,普通的listener
监听的是套接字,而他监听的是channel
,另外,确定有某个地方在不停的往c
这个channel
中放Conn
。
接下来是Close
方法:
func (ln *listener) Close() error { if ok := ln.mux.release(ln); ok { // Close done to signal to any RLock holders to release their lock. close(ln.c) } return nil }
咱们暂且先把这个ln.mux.release(ln)
放到一边,由于还不知道这个东西干了啥,暂且只需关注close(ln.c)
,咱们知道这个函数是用来关闭channel
的,go推荐由发送端调用,但这里彷佛listener
是一个消费端,能够看一下如何优雅的关闭Go Channel,看来重点在于ln.mux.release(ln)
这里,咱们暂且存疑[1],留待下面解决。
最后是Addr
方法:
func (ln *listener) Addr() net.Addr { if ln.mux == nil { return nil } ln.mux.mu.RLock() defer ln.mux.mu.RUnlock() if ln.mux.ln == nil { return nil } return ln.mux.ln.Addr() }
在这里,mu
字段就用上了,加读锁,而后返回mux
字段中的ln
字段的Addr
方法。也就是这句return ln.mux.ln.Addr()
。
Mux结构体则相对来讲复杂不少,先来看一下他的字段定义:
type Mux struct { ln net.Listener defaultLn *listener // sorted by priority lns []*listener maxNeedBytesNum uint32 mu sync.RWMutex }
好,第一个字段ln
是一个Listener
接口;而后defaultLn
是一个listener
的指针;lns
则是由listener
的指针组成的切片,根据注释// sorted by priority
,咱们终于知道listener
的priority
字段是干啥的了;接下来是maxNeedBytesNum
字段,好奇怪,比起listener
的needBytesNum
多了个“Max”,因此咱们推测这个值取得是lns
以及defaultLn
字段中全部listener
中needBytesNum
值最大的;最后的mu
字段咱们就不说了。
须要注意的是:咱们可能会发现Mux
和listener
存在相互引用,但在Go
中咱们倒也不用太担忧,由于Go
采用“标记-回收”或者其变种的垃圾回收算法,感兴趣能够参考Golang 垃圾回收剖析
在mux.go
文件中定义了Mux
的生成函数NewMux
:
func NewMux(ln net.Listener) (mux *Mux) { mux = &Mux{ ln: ln, lns: make([]*listener, 0), } return }
很简单,须要注意的是ln
字段存储的通常不是listener
这样的很是规Listener,通常是TCPListener
这样具体的绑定了套接字的监听器。
接下来看Mux
结构体的方法,首先看Listen
和copyLns
// priority func (mux *Mux) Listen(priority int, needBytesNum uint32, fn MatchFunc) net.Listener { // 1 ln := &listener{ c: make(chan net.Conn), mux: mux, priority: priority, needBytesNum: needBytesNum, matchFn: fn, } mux.mu.Lock() defer mux.mu.Unlock() // 2 if needBytesNum > mux.maxNeedBytesNum { mux.maxNeedBytesNum = needBytesNum } // 3 newlns := append(mux.copyLns(), ln) sort.Slice(newlns, func(i, j int) bool { if newlns[i].priority == newlns[j].priority { return newlns[i].needBytesNum < newlns[j].needBytesNum } return newlns[i].priority < newlns[j].priority }) mux.lns = newlns return ln } func (mux *Mux) copyLns() []*listener { lns := make([]*listener, 0, len(mux.lns)) for _, l := range mux.lns { lns = append(lns, l) } return lns }
copyLns
方法很简单,就是跟名字的含义同样,生成一个lns
字段的副本并返回。
Listen
基本作了三步:
listener
结构体实例,并获取互斥锁needBytesNum
字段listener
实例按照优先级放入lns
字段对应的slice中接下来是ListenHttp
和ListenHttps
方法:
func (mux *Mux) ListenHttp(priority int) net.Listener { return mux.Listen(priority, HttpNeedBytesNum, HttpMatchFunc) } func (mux *Mux) ListenHttps(priority int) net.Listener { return mux.Listen(priority, HttpsNeedBytesNum, HttpsMatchFunc) }
这两个差很少,因此放到一块儿说,基本都是专门写了一个方法让咱们能方便的建立处理Http
或者Https
的listener
。
再来看DefaultListener
方法:
func (mux *Mux) DefaultListener() net.Listener { mux.mu.Lock() defer mux.mu.Unlock() if mux.defaultLn == nil { mux.defaultLn = &listener{ c: make(chan net.Conn), mux: mux, } } return mux.defaultLn }
这个方法很简单,基本就是有则返回没有则生成而后返回的套路。不过咱们要注意defaultLn
字段中的listener
是不放入lns
字段中的。
接下来是Server
方法:
// Serve handles connections from ln and multiplexes then across registered listeners. func (mux *Mux) Serve() error { for { // Wait for the next connection. // If it returns a temporary error then simply retry. // If it returns any other error then exit immediately. conn, err := mux.ln.Accept() if err, ok := err.(interface { Temporary() bool }); ok && err.Temporary() { continue } if err != nil { return err } go mux.handleConn(conn) } }
通常来讲,当咱们调用NewMux
函数之后,接下来就会调用Server
方法,该方法基本上就是阻塞监听某个套接字,当有链接创建成功后当即另起一个goroutine调用handleConn
方法;当链接创建失败根据err
是否含有Temporary
方法,若是有则执行并忽略错误,没有则返回错误。
如今咱们看看handleConn
方法干了些啥:
func (mux *Mux) handleConn(conn net.Conn) { // 1 mux.mu.RLock() maxNeedBytesNum := mux.maxNeedBytesNum lns := mux.lns defaultLn := mux.defaultLn mux.mu.RUnlock() // 2 sharedConn, rd := gnet.NewSharedConnSize(conn, int(maxNeedBytesNum)) data := make([]byte, maxNeedBytesNum) conn.SetReadDeadline(time.Now().Add(DefaultTimeout)) _, err := io.ReadFull(rd, data) if err != nil { conn.Close() return } conn.SetReadDeadline(time.Time{}) // 3 for _, ln := range lns { if match := ln.matchFn(data); match { err = errors.PanicToError(func() { ln.c <- sharedConn }) if err != nil { conn.Close() } return } } // No match listeners if defaultLn != nil { err = errors.PanicToError(func() { defaultLn.c <- sharedConn }) if err != nil { conn.Close() } return } // No listeners for this connection, close it. conn.Close() return }
handleConn
方法也不算复杂,大致能够分为三步:
conn
中读取数据,注意:shareConn
和rd
存在单向关系,若是从rd
中读取数据的话,数据也会复制一份放到shareConn
中,反过来就不成立了与matchFunc
匹配的最高优先级的listener
,并将shareConn
放入该listener
的c
字段中,若是没有匹配到则放到defaultLn
中的c
字段中,若是defaultLn
是nil
的话就不处理,直接关闭conn
。最后来到了release
方法了:
func (mux *Mux) release(ln *listener) bool { result := false mux.mu.Lock() defer mux.mu.Unlock() lns := mux.copyLns() for i, l := range lns { if l == ln { lns = append(lns[:i], lns[i+1:]...) result = true break } } mux.lns = lns return result }
release方法意思很明确:把对应的listener
从lns
中移除,并把结果返回,整个过程有互斥锁,咱们回到存疑1,尽管有互斥锁,但在这种状况下:当某个goroutine运行到handleConn
已经执行到了第三阶段的开始状态(也就是尚未找到匹配的listener
)时,且Go
运行在多核状态下,当另外一个goroutine运行完listener
的Close
方法时,这时就可能发生往一个已经关闭的channel
中send数据,但请注意handleConn
的第三步的这段代码:
err = errors.PanicToError(func() { // 就是这里了 ln.c <- sharedConn }) if err != nil { conn.Close() }
这个PanicToError
是这样的:
func PanicToError(fn func()) (err error) { defer func() { if r := recover(); r != nil { err = fmt.Errorf("Panic error: %v", r) } }() fn() return }
基本上就是执行了recover
而后将错误打印出来,结合下面的对err的判断,就会将send失败的conn关闭。
Mux
中包含了一个初始监听器,基本上全部的事件(好比说新的链接创建,之因此叫事件是由于我实在想不出更精确的词语了)都起源于此listener
实现了net.Listener
接口,能够做为二级监听器使用(好比传给net/http.Server
结构体的Server
方法进行处理)。Mux
包含了一个由listener
组成的有序slice,当有事件产生时就会遍历这个slice找出合适的listener
并将事件传给他。讲到这里基本上是完事了。整个mux
模块仍是比较简单的,起码是由一个个简单的东西组合而成。那么一块儿来意淫一下总体流程吧。
假如我要实现这么一个网络程序:
就这么两个很简单的要求,不难吧。
那么咱们一块儿来实现吧:
type HandleFunc func(c net.Conn) (n int, err error) type MyServer struct { l net.Listener hFunc HandleFunc } func (h *MyServer) Server() (err error) { for { conn, err := h.l.Accept() if err != nil { return } go h.hFunc(conn) } } func HandleHttp(c net.Conn)(n int, err error){ n, err = c.Write([]byte("Get Off! Don't you know that it is not safe?")) } func HandleHttps(c net.Conn)(n int, err error){ n, err = c.Write([]byte("Get Off! Don't you know that this is more complicated than http?")) } func main() (err error){ ln, err := net.Listen("tcp", "0.0.0.0:12345") if err != nil { err = fmt.Errorf("Create server listener error, %v", err) return } muxer = mux.NewMux(ln) var lHttp, lHttps net.Listener lHttp = muxer.ListenHttp(1) httpServer := *MyServer{lHttp, HandleHttp} lHttps = muxer.ListenHttps(2) httpsServer := *MyServer{lHttps, HandleHttps} go httpServer.Server() go httpsServer.Server() err = muxer.Serve() }