所谓网关,其实就是维持玩家客户端的链接,将玩家发的游戏请求转发到具体后端服务的服务器,具备如下几个功能点:git
对于http请求来讲,micro框架自己已经实现了api网关,能够参阅以前的博客github
牌类游戏使用微服务重构笔记(二): micro框架简介:micro toolkitgolang
可是对于游戏服务器,通常都是须要长连接的,须要咱们本身实现web
网关自己应该是支持多协议的,这里就以websocket举例说明我重构过程当中的思路,其余协议相似。首先选择提供websocket链接的库 推荐使用melody,基于websocket库,语法很是简单,数行代码便可实现websocket服务器。咱们的游戏须要websocket网关的缘由在于客户端不支持HTTP2,不能和grpc服务器直连后端
package main
import (
"github.com/micro/go-web"
"gopkg.in/olahol/melody.v1"
"log"
"net/http"
)
func main() {
// New web service
service := web.NewService(web.Name("go.micro.api.gateway"))
// parse command line
service.Init()
// new melody
m := melody.New()
// Handle websocket connection
service.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
_ = m.HandleRequest(w, r)
})
// handle connection with new session
m.HandleConnect(func(session *melody.Session) {
})
// handle disconnection
m.HandleDisconnect(func(session *melody.Session) {
})
// handle message
m.HandleMessage(func(session *melody.Session, bytes []byte) {
})
// run service
if err := service.Run(); err != nil {
log.Fatal("Run: ", err)
}
}
复制代码
网关能够收取或发送数据,而且数据结构比较统一都是[]byte
,这一点是否是很像grpc stream
,所以就可使用protobuf
的oneof
特性来定义请求和响应,可参照上期博客api
牌类游戏使用微服务重构笔记(六): protobuf爬坑bash
定义gateway.basic.proto
,对网关收/发的消息进行归类服务器
message Message {
oneof message {
Req req = 1; // 客户端请求 要求响应
Rsp rsp = 2; // 服务端响应
Notify notify = 3; // 客户端推送 不要求响应
Event event = 4; // 服务端推送
Stream stream = 5; // 双向流请求
Ping ping = 6; // ping
Pong pong = 7;// pong
}
}
复制代码
对于req
、notify
都是客户端的无状态请求,对应后端的无状态服务器,这里仅须要实现本身的路由规则便可,好比微信
message Req {
string serviceName = 1; // 服务名
string method = 2; // 方法名
bytes args = 3; // 参数
google.protobuf.Timestamp timestamp = 4; // 时间戳
...
}
复制代码
req-rsp
思路与micro toolkit
的api网关相似(rpc 处理器),比较简单,可参照以前的博客。websocket
咱们的项目对于此类请求都走http了,并无经过这个网关, 仅有一些基本的req
,好比authReq
处理session
认证。主要考虑是http简单、无状态、好维护,再加上此类业务对实时性要求也不高。
游戏服务器通常都是有状态的、双向的、实时性要求较高,req-rsp
模式并不适合,就须要网关进行转发。每添加一种grpc后端服务器,仅须要在oneof
中添加一个stream来拓展
message Stream {
oneof stream {
room.basic.Message roomMessage = 1; // 房间服务器
game.basic.Message gameMessage = 2; // 游戏服务器
mate.basic.Message mateMessage = 3; // 匹配服务器
}
}
复制代码
而且须要定义一个对应的路由请求,来处理转发到哪一台后端服务器上(实现不一样也能够不须要),这里会涉及到一点业务,例如
message JoinRoomStreamReq {
room.basic.RoomType roomType = 1;
string roomNo = 2;
}
复制代码
这里根据客户端的路由请求的房间号和房间类型,网关来选择正确的房间服务器(甚至可能连接到旧版本的房间服务器上)
选择正确的服务器后,创建stream 双向流
address := "xxxxxxx" // 选择后的服务器地址
ctx := context.Background() // 顶层context
m := make(map[string]string) // some metadata
streamCtx, cancelFunc := context.WithCancel(ctx) // 复制一个子context
// 创建stream 双向流
stream, err := xxxClient.Stream(metadata.NewContext(streamCtx, m), client.WithAddress(address))
// 存储在session上
session.Set("stream", stream)
session.Set("cancelFunc", cancelFunc)
// 启动一个goroutine 收取stream消息并转发
go func(c context.Context, s pb.xxxxxStream) {
// 退出时关闭 stream
defer func() {
session.Set("stream", nil)
session.Set("cancelFunc", nil)
if err := s.Close(); err != nil {
// do something with close err
}
}()
for {
select {
case <-c.Done():
// do something with ctx cancel
return
default:
res, err := s.Recv()
if err != nil {
// do something with recv err
return
}
// send to session 这里能够经过oneof包装告知客户端是哪一个stream发来的消息
...
}
}
}(streamCtx, stream)
复制代码
转发就比较简单了,直接上代码
对于某个stream的请求
message Stream {
oneof stream {
room.basic.Message roomMessage = 1; // 房间服务器
game.basic.Message gameMessage = 2; // 游戏服务器
mate.basic.Message mateMessage = 3; // 匹配服务器
}
}
复制代码
添加转发代码
s, exits := session.Get("stream")
if !exits {
return
}
if stream, ok := s.(pb.xxxxStream); ok {
err := stream.Send(message)
if err != nil {
log.Println("send err:", err)
return
}
}
复制代码
当须要关闭某个stream时, 只须要调用对应的cancelFunc
便可
if v, e := session.Get("cancelFunc"); e {
if c, ok := v.(context.CancelFunc); ok {
c()
}
}
复制代码
因为接收请求的入口统一,使用oneof
就能够一路switch case
,每添加一个req
或者一种stream
只须要添加一个case, 代码看起来仍是比较简单、清爽的
func HandleMessageBinary(session *melody.Session, bytes []byte) {
var msg pb.Message
if err := proto.Unmarshal(bytes, &msg); err != nil {
// do something
return
}
defer func() {
err := recover()
if err != nil {
// do something with panic
}
}()
switch x := msg.Message.(type) {
case *pb.Message_Req:
handleReq(session, x.Req)
case *pb.Message_Stream:
handleStream(session, x.Stream)
case *pb.Message_Ping:
handlePing(session, x.Ping)
default:
log.Println("unknown req type")
}
}
func handleStream(session *melody.Session, message *pb.Stream) {
switch x := message.Stream.(type) {
case *pb.Stream_RoomMessage:
handleRoomStream(session, x.RoomMessage)
case *pb.Stream_GameMessage:
handleGameStream(session, x.GameMessage)
case *pb.Stream_MateMessage:
handleMateStream(session, x.MateMessage)
}
}
复制代码
对于游戏热更新不停服仍是挺重要的,个人思路将会在以后的博客里介绍,能够关注一波 嘿嘿
pprof
观测会发现goroutine
和内存都在缓慢增加,也就是存在goroutine leak!
,缘由在于 micro源码在包装grpc时,没有对关闭stream完善,只有收到io.EOF
的错误时才会关闭grpc的conn链接func (g *grpcStream) Recv(msg interface{}) (err error) {
defer g.setError(err)
if err = g.stream.RecvMsg(msg); err != nil {
if err == io.EOF {
// #202 - inconsistent gRPC stream behavior
// the only way to tell if the stream is done is when we get a EOF on the Recv
// here we should close the underlying gRPC ClientConn
closeErr := g.conn.Close()
if closeErr != nil {
err = closeErr
}
}
}
return
}
复制代码
而且有一个TODO
// Close the gRPC send stream
// #202 - inconsistent gRPC stream behavior
// The underlying gRPC stream should not be closed here since the
// stream should still be able to receive after this function call
// TODO: should the conn be closed in another way?
func (g *grpcStream) Close() error {
return g.stream.CloseSend()
}
复制代码
解决方法也比较简单,本身fork一份源码改一下关闭stream的时候同时关闭conn(咱们的业务是能够的由于在grpc stream客户端和服务端均实现收到err后关闭stream),或者等做者更新用更科学的方式关闭
get
和set
数据时会发生map的读写竞争而panic,能够查看issue,解决方法也比较简单本人学习golang、micro、k8s、grpc、protobuf等知识的时间较短,若是有理解错误的地方,欢迎批评指正,能够加我微信一块儿探讨学习