对kcp-go的源码解析,有错误之处,请必定告之。
sheepbao 2017.0612html
ARQ:自动重传请求(Automatic Repeat-reQuest,ARQ)是OSI模型中数据链路层的错误纠正协议之一.
RTO:Retransmission TimeOut
FEC:Forward Error Correctionnode
kcp是一个基于udp实现快速、可靠、向前纠错的的协议,能以比TCP浪费10%-20%的带宽的代价,换取平均延迟下降30%-40%,且最大延迟下降三倍的传输效果。纯算法实现,并不负责底层协议(如UDP)的收发。查看官方文档kcpgit
kcp-go是用go实现了kcp协议的一个库,其实kcp相似tcp,协议的实现也不少参考tcp协议的实现,滑动窗口,快速重传,选择性重传,慢启动等。
kcp和tcp同样,也分客户端和监听端。github
+-+-+-+-+-+ +-+-+-+-+-+ | Client | | Server | +-+-+-+-+-+ +-+-+-+-+-+ |------ kcp data ------>| |<----- kcp data -------|
+----------------------+ | Session | +----------------------+ | KCP(ARQ) | +----------------------+ | FEC(OPTIONAL) | +----------------------+ | CRYPTO(OPTIONAL)| +----------------------+ | UDP(Packet) | +----------------------+
KCP Header Format算法
4 1 1 2 (Byte) +---+---+---+---+---+---+---+---+ | conv |cmd|frg| wnd | +---+---+---+---+---+---+---+---+ | ts | sn | +---+---+---+---+---+---+---+---+ | una | len | +---+---+---+---+---+---+---+---+ | | + DATA + | | +---+---+---+---+---+---+---+---+
src/vendor/github.com/xtaci/kcp-go/ ├── LICENSE ├── README.md ├── crypt.go 加解密实现 ├── crypt_test.go ├── donate.png ├── fec.go 向前纠错实现 ├── frame.png ├── kcp-go.png ├── kcp.go kcp协议实现 ├── kcp_test.go ├── sess.go 会话管理实现 ├── sess_test.go ├── snmp.go 数据统计实现 ├── updater.go 任务调度实现 ├── xor.go xor封装 └── xor_test.go
着重研究两个文件kcp.go
和sess.go
api
kcp是基于udp实现的,全部udp的实现这里不作介绍,kcp作的事情就是怎么封装udp的数据和怎么解析udp的数据,再加各类处理机制,为了重传,拥塞控制,纠错等。下面介绍kcp客户端和服务端总体实现的流程,只是大概介绍一下函数流,不作详细解析,详细解析看后面数据流的解析。session
和tcp同样,kcp要链接服务端须要先拨号,可是和tcp有个很大的不一样是,即便服务端没有启动,客户端同样能够拨号成功,由于实际上这里的拨号没有发送任何信息,而tcp在这里须要三次握手。app
DialWithOptions(raddr string, block BlockCrypt, dataShards, parityShards int) V net.DialUDP("udp", nil, udpaddr) V NewConn() V newUDPSession() {初始化UDPSession} V NewKCP() {初始化kcp} V updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务} V go sess.readLoop() V go s.receiver(chPacket) V s.kcpInput(data) V s.fecDecoder.decodeBytes(data) V s.kcp.Input(data, true, s.ackNoDelay) V kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲} V notifyReadEvent()
客户端大致的流程如上面所示,先Dial
,创建udp链接,将这个链接封装成一个会话,而后启动一个go程,接收udp的消息。tcp
ListenWithOptions() V net.ListenUDP() V ServerConn() V newFECDecoder() V go l.monitor() {从chPacket接收udp数据,写入kcp} V go l.receiver(chPacket) {从upd接收数据,并入队列} V newUDPSession() V updater.addSession(sess) {管理session会话,任务管理,根据用户设置的internal参数间隔来轮流唤醒任务} V s.kcpInput(data)` V s.fecDecoder.decodeBytes(data) V s.kcp.Input(data, true, s.ackNoDelay) V kcp.parse_data(seg) {将分段好的数据插入kcp.rcv_buf缓冲} V notifyReadEvent()
服务端的大致流程如上图所示,先Listen
,启动udp监听,接着用一个go程监控udp的数据包,负责将不一样session的数据写入不一样的udp链接,而后解析封装将数据交给上层。函数
无论是kcp的客户端仍是服务端,他们都有io行为,就是读与写,咱们只分析一个就行了,由于它们读写的实现是同样的,这里分析客户端的读与写。
s.Write(b []byte) V s.kcp.WaitSnd() {} V s.kcp.Send(b) {将数据根据mss分段,并存在kcp.snd_queue} V s.kcp.flush(false) [flush data to output] { if writeDelay==true { flush }else{ 每隔`interval`时间flush一次 } } V kcp.output(buffer, size) V s.output(buf) V s.conn.WriteTo(ext, s.remote) V s.conn..Conn.WriteTo(buf)
读写都是在sess.go
文件中实现的,Write方法:
// Write implements net.Conn func (s *UDPSession) Write(b []byte) (n int, err error) { for { ... // api flow control if s.kcp.WaitSnd() < int(s.kcp.snd_wnd) { n = len(b) for { if len(b) <= int(s.kcp.mss) { s.kcp.Send(b) break } else { s.kcp.Send(b[:s.kcp.mss]) b = b[s.kcp.mss:] } } if !s.writeDelay { s.kcp.flush(false) } s.mu.Unlock() atomic.AddUint64(&DefaultSnmp.BytesSent, uint64(n)) return n, nil } ... // wait for write event or timeout select { case <-s.chWriteEvent: case <-c: case <-s.die: } if timeout != nil { timeout.Stop() } } }
假设发送一个hello消息,Write方法会先判断发送窗口是否已满,满的话该函数阻塞,不满则kcp.Send("hello"),而Send函数实现根据mss的值对数据分段,固然这里的发送的hello,长度过短,只分了一个段,并把它们插入发送的队列里。
func (kcp *KCP) Send(buffer []byte) int { ... for i := 0; i < count; i++ { var size int if len(buffer) > int(kcp.mss) { size = int(kcp.mss) } else { size = len(buffer) } seg := kcp.newSegment(size) copy(seg.data, buffer[:size]) if kcp.stream == 0 { // message mode seg.frg = uint8(count - i - 1) } else { // stream mode seg.frg = 0 } kcp.snd_queue = append(kcp.snd_queue, seg) buffer = buffer[size:] } return 0 }
接着判断参数writeDelay
,若是参数设置为false,则立马发送消息,不然须要任务调度后才会触发发送,发送消息是由flush函数实现的。
// flush pending data func (kcp *KCP) flush(ackOnly bool) { var seg Segment seg.conv = kcp.conv seg.cmd = IKCP_CMD_ACK seg.wnd = kcp.wnd_unused() seg.una = kcp.rcv_nxt buffer := kcp.buffer // flush acknowledges ptr := buffer for i, ack := range kcp.acklist { size := len(buffer) - len(ptr) if size+IKCP_OVERHEAD > int(kcp.mtu) { kcp.output(buffer, size) ptr = buffer } // filter jitters caused by bufferbloat if ack.sn >= kcp.rcv_nxt || len(kcp.acklist)-1 == i { seg.sn, seg.ts = ack.sn, ack.ts ptr = seg.encode(ptr) } } kcp.acklist = kcp.acklist[0:0] if ackOnly { // flash remain ack segments size := len(buffer) - len(ptr) if size > 0 { kcp.output(buffer, size) } return } // probe window size (if remote window size equals zero) if kcp.rmt_wnd == 0 { current := currentMs() if kcp.probe_wait == 0 { kcp.probe_wait = IKCP_PROBE_INIT kcp.ts_probe = current + kcp.probe_wait } else { if _itimediff(current, kcp.ts_probe) >= 0 { if kcp.probe_wait < IKCP_PROBE_INIT { kcp.probe_wait = IKCP_PROBE_INIT } kcp.probe_wait += kcp.probe_wait / 2 if kcp.probe_wait > IKCP_PROBE_LIMIT { kcp.probe_wait = IKCP_PROBE_LIMIT } kcp.ts_probe = current + kcp.probe_wait kcp.probe |= IKCP_ASK_SEND } } } else { kcp.ts_probe = 0 kcp.probe_wait = 0 } // flush window probing commands if (kcp.probe & IKCP_ASK_SEND) != 0 { seg.cmd = IKCP_CMD_WASK size := len(buffer) - len(ptr) if size+IKCP_OVERHEAD > int(kcp.mtu) { kcp.output(buffer, size) ptr = buffer } ptr = seg.encode(ptr) } // flush window probing commands if (kcp.probe & IKCP_ASK_TELL) != 0 { seg.cmd = IKCP_CMD_WINS size := len(buffer) - len(ptr) if size+IKCP_OVERHEAD > int(kcp.mtu) { kcp.output(buffer, size) ptr = buffer } ptr = seg.encode(ptr) } kcp.probe = 0 // calculate window size cwnd := _imin_(kcp.snd_wnd, kcp.rmt_wnd) if kcp.nocwnd == 0 { cwnd = _imin_(kcp.cwnd, cwnd) } // sliding window, controlled by snd_nxt && sna_una+cwnd newSegsCount := 0 for k := range kcp.snd_queue { if _itimediff(kcp.snd_nxt, kcp.snd_una+cwnd) >= 0 { break } newseg := kcp.snd_queue[k] newseg.conv = kcp.conv newseg.cmd = IKCP_CMD_PUSH newseg.sn = kcp.snd_nxt kcp.snd_buf = append(kcp.snd_buf, newseg) kcp.snd_nxt++ newSegsCount++ kcp.snd_queue[k].data = nil } if newSegsCount > 0 { kcp.snd_queue = kcp.remove_front(kcp.snd_queue, newSegsCount) } // calculate resent resent := uint32(kcp.fastresend) if kcp.fastresend <= 0 { resent = 0xffffffff } // check for retransmissions current := currentMs() var change, lost, lostSegs, fastRetransSegs, earlyRetransSegs uint64 for k := range kcp.snd_buf { segment := &kcp.snd_buf[k] needsend := false if segment.xmit == 0 { // initial transmit needsend = true segment.rto = kcp.rx_rto segment.resendts = current + segment.rto } else if _itimediff(current, segment.resendts) >= 0 { // RTO needsend = true if kcp.nodelay == 0 { segment.rto += kcp.rx_rto } else { segment.rto += kcp.rx_rto / 2 } segment.resendts = current + segment.rto lost++ lostSegs++ } else if segment.fastack >= resent { // fast retransmit needsend = true segment.fastack = 0 segment.rto = kcp.rx_rto segment.resendts = current + segment.rto change++ fastRetransSegs++ } else if segment.fastack > 0 && newSegsCount == 0 { // early retransmit needsend = true segment.fastack = 0 segment.rto = kcp.rx_rto segment.resendts = current + segment.rto change++ earlyRetransSegs++ } if needsend { segment.xmit++ segment.ts = current segment.wnd = seg.wnd segment.una = seg.una size := len(buffer) - len(ptr) need := IKCP_OVERHEAD + len(segment.data) if size+need > int(kcp.mtu) { kcp.output(buffer, size) current = currentMs() // time update for a blocking call ptr = buffer } ptr = segment.encode(ptr) copy(ptr, segment.data) ptr = ptr[len(segment.data):] if segment.xmit >= kcp.dead_link { kcp.state = 0xFFFFFFFF } } } // flash remain segments size := len(buffer) - len(ptr) if size > 0 { kcp.output(buffer, size) } // counter updates sum := lostSegs if lostSegs > 0 { atomic.AddUint64(&DefaultSnmp.LostSegs, lostSegs) } if fastRetransSegs > 0 { atomic.AddUint64(&DefaultSnmp.FastRetransSegs, fastRetransSegs) sum += fastRetransSegs } if earlyRetransSegs > 0 { atomic.AddUint64(&DefaultSnmp.EarlyRetransSegs, earlyRetransSegs) sum += earlyRetransSegs } if sum > 0 { atomic.AddUint64(&DefaultSnmp.RetransSegs, sum) } // update ssthresh // rate halving, https://tools.ietf.org/html/rfc6937 if change > 0 { inflight := kcp.snd_nxt - kcp.snd_una kcp.ssthresh = inflight / 2 if kcp.ssthresh < IKCP_THRESH_MIN { kcp.ssthresh = IKCP_THRESH_MIN } kcp.cwnd = kcp.ssthresh + resent kcp.incr = kcp.cwnd * kcp.mss } // congestion control, https://tools.ietf.org/html/rfc5681 if lost > 0 { kcp.ssthresh = cwnd / 2 if kcp.ssthresh < IKCP_THRESH_MIN { kcp.ssthresh = IKCP_THRESH_MIN } kcp.cwnd = 1 kcp.incr = kcp.mss } if kcp.cwnd < 1 { kcp.cwnd = 1 kcp.incr = kcp.mss } }
flush函数很是的重要,kcp的重要参数都是在调节这个函数的行为,这个函数只有一个参数ackOnly
,意思就是只发送ack,若是ackOnly
为true的话,该函数只遍历ack列表,而后发送,就完事了。 若是不是,也会发送真实数据。 在发送数据前先进行windSize探测,若是开启了拥塞控制nc=0
,则每次发送前检测服务端的winsize,若是服务端的winsize变小了,自身的winsize也要更着变小,来避免拥塞。若是没有开启拥塞控制,就按设置的winsize进行数据发送。
接着循环每一个段数据,并判断每一个段数据的是否该重发,还有何时重发:
resend
参数决定。最后经过kcp.output发送消息hello,output是个回调函数,函数的实体是sess.go
的:
func (s *UDPSession) output(buf []byte) { var ecc [][]byte // extend buf's header space ext := buf if s.headerSize > 0 { ext = s.ext[:s.headerSize+len(buf)] copy(ext[s.headerSize:], buf) } // FEC stage if s.fecEncoder != nil { ecc = s.fecEncoder.Encode(ext) } // encryption stage if s.block != nil { io.ReadFull(rand.Reader, ext[:nonceSize]) checksum := crc32.ChecksumIEEE(ext[cryptHeaderSize:]) binary.LittleEndian.PutUint32(ext[nonceSize:], checksum) s.block.Encrypt(ext, ext) if ecc != nil { for k := range ecc { io.ReadFull(rand.Reader, ecc[k][:nonceSize]) checksum := crc32.ChecksumIEEE(ecc[k][cryptHeaderSize:]) binary.LittleEndian.PutUint32(ecc[k][nonceSize:], checksum) s.block.Encrypt(ecc[k], ecc[k]) } } } // WriteTo kernel nbytes := 0 npkts := 0 // if mrand.Intn(100) < 50 { for i := 0; i < s.dup+1; i++ { if n, err := s.conn.WriteTo(ext, s.remote); err == nil { nbytes += n npkts++ } } // } if ecc != nil { for k := range ecc { if n, err := s.conn.WriteTo(ecc[k], s.remote); err == nil { nbytes += n npkts++ } } } atomic.AddUint64(&DefaultSnmp.OutPkts, uint64(npkts)) atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(nbytes)) }
output函数才是真正的将数据写入内核中,在写入以前先进行了fec编码,fec编码器的实现是用了一个开源库github.com/klauspost/reedsolomon,编码之后的hello就不是和原来的hello同样了,至少多了几个字节。 fec编码器有两个重要的参数reedsolomon.New(dataShards, parityShards, reedsolomon.WithMaxGoroutines(1)),dataShards
和parityShards
,这两个参数决定了fec的冗余度,冗余度越大抗丢包性就越强。
其实这里任务调度器是一个很简单的实现,用一个全局变量updater
来管理session,代码文件为updater.go
。其中最主要的函数
func (h *updateHeap) updateTask() { var timer <-chan time.Time for { select { case <-timer: case <-h.chWakeUp: } h.mu.Lock() hlen := h.Len() now := time.Now() if hlen > 0 && now.After(h.entries[0].ts) { for i := 0; i < hlen; i++ { entry := heap.Pop(h).(entry) if now.After(entry.ts) { entry.ts = now.Add(entry.s.update()) heap.Push(h, entry) } else { heap.Push(h, entry) break } } } if hlen > 0 { timer = time.After(h.entries[0].ts.Sub(now)) } h.mu.Unlock() } }
任务调度器实现了一个堆结构,每当有新的链接,session都会插入到这个堆里,接着for循环每隔interval时间,遍历这个堆,获得entry
而后执行entry.s.update()
。而entry.s.update()
会执行s.kcp.flush(false)
来发送数据。
这里简单介绍了kcp的总体流程,详细介绍了发送数据的流程,但未介绍kcp接收数据的流程,其实在客户端发送数据后,服务端是须要返回ack的,而客户端也须要根据返回的ack来判断数据段是否须要重传仍是在队列里清除该数据段。处理返回来的ack是在函数kcp.Input()函数实现的。具体详细流程下次再介绍。