这是系列最后一篇文章了,最后咱们来为咱们的rpc框架实现一个http gateway。这个功能实际上受到了rpcx的启发,基于这种方式实现一个简单的相似service mesh中的sidecar。代码地址:githubhtml
http gateway能够接收来自客户端的http请求并将其转换为rpc请求而后交给服务端处理,再将服务端处理事后的结果经过http响应返回给客户端。git
http gateway的大体原理就是将咱们的RPC协议中header部分放到http header中,而后RPC协议中的body部分放到http body便可。github
首先咱们须要定义http header中各个字段的名称:json
const (
HEADER_SEQ = "rpc-header-seq" //序号, 用来惟一标识请求或响应
HEADER_MESSAGE_TYPE = "rpc-header-message_type" //消息类型,用来标识一个消息是请求仍是响应
HEADER_COMPRESS_TYPE = "rpc-header-compress_type" //压缩类型,用来标识一个消息的压缩方式
HEADER_SERIALIZE_TYPE = "rpc-header-serialize_type" //序列化类型,用来标识消息体采用的编码方式
HEADER_STATUS_CODE = "rpc-header-status_code" //状态类型,用来标识一个请求是正常仍是异常
HEADER_SERVICE_NAME = "rpc-header-service_name" //服务名
HEADER_METHOD_NAME = "rpc-header-method_name" //方法名
HEADER_ERROR = "rpc-header-error" //方法调用发生的异常
HEADER_META_DATA = "rpc-header-meta_data" //其余元数据
)
复制代码
而后咱们须要启动一个http server,用来接收http请求。这里咱们使用go自带的api,默认使用5080端口,若是发现端口已经被占用了,就递增端口。api
func (s *SGServer) startGateway() {
port := 5080
ln, err := net.Listen("tcp", ":" + strconv.Itoa(port))
for err != nil && strings.Contains(err.Error(), "address already in use") {
port++
ln, err = net.Listen("tcp", ":" + strconv.Itoa(port))
}
if err != nil {
log.Printf("error listening gateway: %s", err.Error())
}
log.Printf("gateway listenning on " + strconv.Itoa(port))
//避免阻塞,使用新的goroutine来执行http server
go func() {
err := http.Serve(ln, s)
if err != nil {
log.Printf("error serving http %s", err.Error())
}
}()
}
复制代码
接下来咱们须要实现ServeHTTP函数:app
func (s *SGServer) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
//若是url不对则直接返回
if r.URL.Path != "/invoke" {
rw.WriteHeader(404)
return
}
//若是method不对则直接返回
if r.Method != "POST" {
rw.WriteHeader(405)
return
}
//构造新的请求
request := protocol.NewMessage(s.Option.ProtocolType)
//根据http header填充request的header
request, err := parseHeader(request, r)
if err != nil {
rw.WriteHeader(400)
}
//根据http body填充request的data
request, err = parseBody(request, r)
if err != nil {
rw.WriteHeader(400)
}
//构造context
ctx := metadata.WithMeta(context.Background(), request.MetaData)
response := request.Clone()
response.MessageType = protocol.MessageTypeResponse
//处理请求
response = s.process(ctx, request, response)
//返回相应
s.writeHttpResponse(response, rw, r)
}
func parseBody(message *protocol.Message, request *http.Request) (*protocol.Message, error) {
data, err := ioutil.ReadAll(request.Body)
if err != nil {
return nil, err
}
message.Data = data
return message, nil
}
func parseHeader(message *protocol.Message, request *http.Request) (*protocol.Message, error) {
headerSeq := request.Header.Get(HEADER_SEQ)
seq, err := strconv.ParseUint(headerSeq, 10, 64)
if err != nil {
return nil, err
}
message.Seq = seq
headerMsgType := request.Header.Get(HEADER_MESSAGE_TYPE)
msgType, err := protocol.ParseMessageType(headerMsgType)
if err != nil {
return nil, err
}
message.MessageType = msgType
headerCompressType := request.Header.Get(HEADER_COMPRESS_TYPE)
compressType, err := protocol.ParseCompressType(headerCompressType)
if err != nil {
return nil, err
}
message.CompressType = compressType
headerSerializeType := request.Header.Get(HEADER_SERIALIZE_TYPE)
serializeType, err := codec.ParseSerializeType(headerSerializeType)
if err != nil {
return nil, err
}
message.SerializeType = serializeType
headerStatusCode := request.Header.Get(HEADER_STATUS_CODE)
statusCode, err := protocol.ParseStatusCode(headerStatusCode)
if err != nil {
return nil, err
}
message.StatusCode = statusCode
serviceName := request.Header.Get(HEADER_SERVICE_NAME)
message.ServiceName = serviceName
methodName := request.Header.Get(HEADER_METHOD_NAME)
message.MethodName = methodName
errorMsg := request.Header.Get(HEADER_ERROR)
message.Error = errorMsg
headerMeta := request.Header.Get(HEADER_META_DATA)
meta := make(map[string]interface{})
err = json.Unmarshal([]byte(headerMeta), &meta)
if err != nil {
return nil, err
}
message.MetaData = meta
return message, nil
}
func (s *SGServer) writeHttpResponse(message *protocol.Message, rw http.ResponseWriter, r *http.Request) {
header := rw.Header()
header.Set(HEADER_SEQ, string(message.Seq))
header.Set(HEADER_MESSAGE_TYPE, message.MessageType.String())
header.Set(HEADER_COMPRESS_TYPE, message.CompressType.String())
header.Set(HEADER_SERIALIZE_TYPE, message.SerializeType.String())
header.Set(HEADER_STATUS_CODE, message.StatusCode.String())
header.Set(HEADER_SERVICE_NAME, message.ServiceName)
header.Set(HEADER_METHOD_NAME, message.MethodName)
header.Set(HEADER_ERROR, message.Error)
metaDataJson, _ := json.Marshal(message.MetaData)
header.Set(HEADER_META_DATA, string(metaDataJson))
_, _ = rw.Write(message.Data)
}
复制代码
最后咱们只须要在wrapper中启动http server便可。框架
func (w *DefaultServerWrapper) WrapServe(s *SGServer, serveFunc ServeFunc) ServeFunc {
return func(network string, addr string, meta map[string]interface{}) error {
//省略前面的部分
...
//启动gateway
s.startGateway()
return serveFunc(network, addr, meta)
}
}
复制代码
客户端测试代码:tcp
func MakeHttpCall() {
//声明参数并序列化,放到http请求的body中
arg := service.Args{A: rand.Intn(200), B: rand.Intn(100)}
data, _ := msgpack.Marshal(arg)
body := bytes.NewBuffer(data)
req, err := http.NewRequest("POST", "http://localhost:5080/invoke", body)
if err != nil {
log.Println(err)
return
}
req.Header.Set(server.HEADER_SEQ, "1")
req.Header.Set(server.HEADER_MESSAGE_TYPE, protocol.MessageTypeRequest.String())
req.Header.Set(server.HEADER_COMPRESS_TYPE,protocol.CompressTypeNone.String())
req.Header.Set(server.HEADER_SERIALIZE_TYPE,codec.MessagePack.String())
req.Header.Set(server.HEADER_STATUS_CODE,protocol.StatusOK.String())
req.Header.Set(server.HEADER_SERVICE_NAME,"Arith")
req.Header.Set(server.HEADER_METHOD_NAME,"Add")
req.Header.Set(server.HEADER_ERROR,"")
meta := map[string]interface{}{"key":"value"}
metaJson, _ := json.Marshal(meta)
req.Header.Set(server.HEADER_META_DATA,string(metaJson))
response, err := http.DefaultClient.Do(req)
if err != nil {
log.Println(err)
return
}
if response.StatusCode != 200 {
log.Println(response)
} else if response.Header.Get(server.HEADER_ERROR) != "" {
log.Println(response.Header.Get(server.HEADER_ERROR))
} else {
data, err = ioutil.ReadAll(response.Body)
result := service.Reply{}
msgpack.Unmarshal(data, &result)
fmt.Println(result.C)
}
}
复制代码
这个系列到此就告一段落了,可是还有不少须要改进和丰富的地方甚至是错误,后续再以单独文章的形式更新。ide