接触到GO以后,GO的网络支持很是使人喜欢。GO实现了在语法层面上能够保持同步语义,可是却又没有牺牲太多性能,底层同样使用了IO路径复用,好比在LINUX下用了EPOLL,在WINDOWS下用了IOCP。git
可是在开发服务端程序的时候,不少都是被动触发的,都是客户端发送来的请求须要处理。天生就是一个event-based的程序。而在GO下,由于并发是做为语言的一部分,goroutine, channel等特性则很容易的使程序员在实现功能时从容的在同步与异步之间进行转换。程序员
由于本身的须要,我针对event-based场景的服务端作了简易的封装。具体代码见这里.github
由于GO的IO机制和并发原语的原生支持,再加上对网络API的封装,程序员能够简单的实现一个高效的服务端或者客户端程序。通常的实现就是调用net.Listen(“tcp4”, address)获得一个net.Listener,而后无限循环调用net.Listener.Accept,以后就能够获得一个net.Conn,能够调用net.Conn的接口设置发送和接收缓冲区大小,能够设置KEEPALIVE等。由于TCP的双工特性,因此能够针对一个net.Conn能够专门启动一个goroutine去无限循环接收对端发来的数据,而后解包等。网络
个人想法是在这个简单实现的基础上作一层薄薄的封装,使其尽可能的精简,可是又不失灵活。但愿可以适应不一样的协议,对使用者形成尽可能小的约束。session
该对象就是对net.Conn的一个简易封装,能够经过swnet.Server.AcceptLoop获得,也能够经过swnet.NewSession建立新的对象,这种通常是客户端情境下使用。获得Session对象后,能够调用Start方法开始工做。之因此还暴露出一个方法叫Start是由于在服务端下,可能会有某些需求,好比针对IP设置了ACL,那么,把Start行为交给使用者决定如何调用。可是这里须要注意的是,若是使用者不想Start,使用者有责任本身Close掉,不然会形成资源泄露。并发
Start后,会启动两个goroutine,一个用于专门接收对端发来的数据,一个专门用来发送数据到对端。想发送数据到对端,能够用AsyncSend方法,该方法会把要发送的数据排队到发送通道。这里使用通道的缘由是由于在服务端情境下,有必要对发送的数据进行排队,防止发送很快,可是对端接收很慢,或者过多的调用AsyncSend方法,致使堆积了太多的数据,增长了内存的压力。经过channel来控制发送速率我认为是比较合理的。同时,还提供了方法能够用来修改channel的长度,一是调用NewSession时传入指定大小,二是调用Session.SetSendChannelSize设置大小,可是要注意的是,调用此方法时必须在Start以前完成,不然会产生错误。这样作的缘由也是由于不必动态更改发送通道大小。框架
若是发送channel满了,AsyncSend方法会返回ErrSendChanBlocking。增长这个错误类型也是由于上面的设计致使的。不返回这个错误,就没有办法让使用者获得处理该问题的机会。使用者若是拿到该错误,能够本身试着分析问题的缘由,或者能够尝试循环发送,或者直接丢弃该次的发送数据。总之可以让使用者获得本身处理的机会。异步
若是Session对象已经Close了,那么调用AsyncSend会返回ErrStoped错误。除此以外,由于AsyncSend是把数据排队到发送channel中,那么使用者有责任确保发送的数据在发送完成前不会修改。tcp
若是数据发送失败,或者其余缘由,个人实现是直接粗暴的Close掉该Session。函数
还有就是,可能有些用例情景下,会发送比较大的数据包,好比64K大小,或者32K大小的数据等,未了避免反复申请内存,特此为Session增长了SetSendCallback方法。能够设置一个回调函数,用于在发送完成后能够调用该回调,给予使用者回收数据对象的机会,好比能够配合sync.Pool使用。虽然我本身测试时并无太大的效果。
为了方便使用者设置一些net.Conn参数,增长了一个RawConn方法,能够获取到net.Conn 的实例。这里实际上是挺纠结的。由于暴露出这个内部资源后,会给予使用者一个很是大的灵活度。它能够直接绕过Session的发送channel,本身玩本身的。不过出于方便使用者使用的目的,我仍是这么作了。使用者本身承担相应的责任。其实这里还能够像net.HTTP那样增长一个Hijack方法,让使用者本身接管net.Conn,本身玩本身的。
Session中的不少SET/GET方法都是没有加锁的。一方面是由于不少操做在Start前一次完成,或者是GET的数据不是那么紧密的。
有些时候,若是一个Session被关闭了,可能须要知道这个行为。因此提供了SetCloseCallback方法,能够设置该方法。不设置也没有关系。调用closeCallback时会确保只调用一次。
由于目标之一就是可以隔离具体协议格式。因此对协议作了抽象。只须要实现PacketProtocol接口便可:
// PacketReader is used to unmarshal a complete packet from buff type PacketReader interface { // Read data from conn and build a complete packet. // How to read from conn is up to you. You can set read timeout or other option. // If buff's capacity is small, you can make a new buff, then return it, // so can reuse to reduce memory overhead. ReadPacket(conn net.Conn, buff []byte) (interface{}, []byte, error) } // PacketWriter is used to marshal packet into buff type PacketWriter interface { // Build a complete packet. If buff's capacity is too small, you can make a new one // and return it to reuse. BuildPacket(packet interface{}, buff []byte) ([]byte, error) // How to write data to conn is up to you. So you can set write timeout or other option. WritePacket(conn net.Conn, buff []byte) error } // PacketProtocol just a composite interface type PacketProtocol interface { PacketReader PacketWriter }
也就是实现PacketReader/PacketWriter两个接口。为了让内存尽可能的复用,减小内存压力,因此在ReadPacket方法和BuildPacket方法的返回值中须要返回一个切片。框架会在第一次调用时传入一个默认大小的切片到这两个方法中,若是容量不够,使用者能够本身从新创建切片,而后写入数据后返回该切片。下一次再实用时就使用这个返回出来的切片。
其中ReadPacket方法是在一个专门用于接收数据的goroutine中调用。实现者能够本身根据本身的策略进行读取,由于传入了net.Conn,因此使用者能够本身设置I/O Timeout。实现者有责任返回一个完整的请求包。若是中间出了错误,有必要返回一个error。当发现有error后,会关闭该Session。这样作的缘由是当读取或者构建一个请求包失败时,多是数据错误,多是链路错误,或者其余缘由,总之,我的认为这种状况下没有必要继续处理,直接关闭连接。并且这里还有一个须要注意的事项,返回出来的请求包中的数据若是有包含切片类型的数据,建议从新分配一个切片,而后从buff中拷贝进去,尽可能不要对buff切片作复用,不然可能会产生额外的BUG。
BuildPacket方法是在一个专门处理发送的goroutine中调用。当发送goroutine收到数据包后,会调用BuildPacket,实现者就能够按照本身的私有格式进行序列化。一样的,buff不够,就本身从新构造一个buff,而后填充数据,并返回这个buff。
WritePacket是给予实现者本身个性化发送的需求。可能实现者须要设置I/O Timeout.
基于event-based的实现,老是少不了要作的事情就是把一个请求包转发到对应的处理函数中。可是具体怎么转,怎么作是取决于具体的用例情景和实现的。因此我这里作的很是简单,就是定义了一个PacketHandler接口:
// PacketHandler is used to process packet that recved from remote session type PacketHandler interface { // When got a valid packet from PacketReader, you can dispatch it. Handle(s *Session, packet interface{}) }
使用者本身实现对应的Handle方法便可。当接收数据的goroutine收到对端发来的数据并调用PacketReader.ReadPacket后,会调用Handle方法 ,传入该Session实例与请求包。传入Session的目的是方便使用者不用去维护一个Session的实例。由于有的程序员要实现的逻辑可能比较简单,他仅仅用Session就知足了他的需求,他只须要实现对应的处理函数就行了。处理完成后,就调用Session.AsyncSend发送回应包。
这里其实能够提供一个简单的默认版本的实现的。可是考虑到协议的不一样,那么就致使调度的key的不一样,因此仍是让使用者本身发挥吧。
使用者其实在这里有很大的自由度,他能够作基于map关系的回调分发逻辑,也能够作一个简单的实现逻辑,而后经过type assert作相应的实现。具体也是看各自的口味而定。我是比较喜欢后者,能够减小不少的Register,实现出Actor Model + Pattern Match味道的东西。
这里还要说一下对服务端的一个简易封装。Server的实现很是简单,就是反复的去Accept,而后构造一个Session,以后就是调用用户传入的回调函数,就完活了。使用者能够本身传入net.Listener,能够传入PacketProtocol, PacketHandler以及SendChanSize。这些参数会在构造Session时传入进去,能够减小重复的代码实现。Server.AcceptLoop不会关闭构造出来的Session,使用者负责完成这件事情!
总体很是简陋,只是搭了一个模制。在我本身未公开的代码里,实际上是实现了我所在公司的协议,实现了PacketProtocol。为此还专门写了个代码生成器。
还有就是NewServer须要传入一个net.Listener,比较蛋疼。后面再决定是否干掉。NewSession须要传入net.Conn,实际上是妥协的产物,由于net.Listener返回的就是net.Conn,这个实例须要交给Session使用,不得已而为之,可是这里囧的是,客户端使用的时候,须要本身去net.Dial,获得一个net.Conn,也许该提供一个swnet.Dial方法。
我这个发布的代码是在原有的代码基础上进行了修改,从达达的https://github.com/funny/link中获得了一些启发,可是又有不少的不一样。再次感谢达达的贡献。