本文是 《用 Golang 实现一个 Redis》系列文章第二篇,本文将分别介绍Redis 通讯协议 以及 协议解析器 的实现,若您对协议有所了解能够直接阅读协议解析器部分。html
Redis 自 2.0 版本起使用了统一的协议 RESP (REdis Serialization Protocol),该协议易于实现,计算机能够高效的进行解析且易于被人类读懂。git
RESP 是一个二进制安全的文本协议,工做于 TCP 协议上。客户端和服务器发送的命令或数据一概以 \r\n
(CRLF)结尾。github
RESP 定义了5种格式:golang
llen
、scard
等命令的返回值, 64位有符号整数get
等命令的返回值lrange
等命令响应的格式RESP 经过第一个字符来表示格式:redis
$
开始*
开始Bulk String有两行,第一行为 $
+正文长度,第二行为实际内容。如:数据库
$3\r\nSET\r\n
Bulk String 是二进制安全的能够包含任意字节,就是说能够在 Bulk String 内部包含 "\r\n" 字符(行尾的CRLF被隐藏):数组
$4 a\r\nb
$-1
表示 nil, 好比使用 get 命令查询一个不存在的key时,响应即为$-1
。缓存
Array 格式第一行为 "*"+数组长度,其后是相应数量的 Bulk String。如, ["foo", "bar"]
的报文:安全
*2 $3 foo $3 bar
客户端也使用 Array 格式向服务端发送指令。命令自己将做为第一个参数,如 SET key value
指令的RESP报文:服务器
*3 $3 SET $3 key $5 value
将换行符打印出来:
*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n
咱们在 实现TCP服务器 一文中已经介绍过TCP服务器的实现,协议解析器将实现其 Handler 接口充当应用层服务器。
协议解析器将接收 Socket 传来的数据,并将其数据还原为 [][]byte
格式,如 "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\value\r\n"
将被还原为 ['SET', 'key', 'value']
。
本文完整代码: Github: HDT3213/godis
来自客户端的请求均为数组格式,它在第一行中标记报文的总行数并使用CRLF
做为分行符。
bufio
标准库能够将从 reader 读到的数据缓存到 buffer 中,直至遇到分隔符或读取完毕后返回,因此咱们使用 reader.ReadBytes('\n')
来保证每次读取到完整的一行。
须要注意的是RESP是二进制安全
的协议,它容许在正文中使用CRLF
字符。举例来讲 Redis 能够正确接收并执行SET "a\r\nb" 1
指令, 这条指令的正确报文是这样的:
*3 $3 SET $4 a\r\nb $7 myvalue
当 ReadBytes
读取到第五行 "a\r\nb\r\n"时会将其误认为两行:
*3 $3 SET $4 a // 错误的分行 b // 错误的分行 $7 myvalue
所以当读取到第四行$4
后, 不该该继续使用 ReadBytes('\n')
读取下一行, 应使用 io.ReadFull(reader, msg)
方法来读取指定长度的内容。
msg = make([]byte, 4 + 2) // 正文长度4 + 换行符长度2 _, err = io.ReadFull(reader, msg)
定义 Client
结构体做为客户端抽象:
type Client struct { /* 与客户端的 Tcp 链接 */ conn net.Conn /* * 带有 timeout 功能的 WaitGroup, 用于优雅关闭 * 当响应被完整发送前保持 waiting 状态, 阻止连接被关闭 */ waitingReply wait.Wait /* 标记客户端是否正在发送指令 */ sending atomic.AtomicBool /* 客户端正在发送的参数数量, 即 Array 第一行指定的数组长度 */ expectedArgsCount uint32 /* 已经接收的参数数量, 即 len(args)*/ receivedCount uint32 /* * 已经接收到的命令参数,每一个参数由一个 []byte 表示 */ args [][]byte }
定义解析器:
type Handler struct { /* * 记录活跃的客户端连接 * 类型为 *Client -> placeholder */ activeConn sync.Map /* 数据库引擎,执行指令并返回结果 */ db db.DB /* 关闭状态标志位,关闭过程当中时拒绝新建链接和新请求 */ closing atomic.AtomicBool }
接下来能够编写主要部分了:
func (h *Handler)Handle(ctx context.Context, conn net.Conn) { if h.closing.Get() { // 关闭过程当中不接受新链接 _ = conn.Close() } /* 初始化客户端状态 */ client := &Client { conn: conn, } h.activeConn.Store(client, 1) reader := bufio.NewReader(conn) var fixedLen int64 = 0 // 将要读取的 BulkString 的正文长度 var err error var msg []byte for { /* 读取下一行数据 */ if fixedLen == 0 { // 正常模式下使用 CRLF 区分数据行 msg, err = reader.ReadBytes('\n') // 判断是否以 \r\n 结尾 if len(msg) == 0 || msg[len(msg) - 2] != '\r' { errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"} _, _ = client.conn.Write(errReply.ToBytes()) } } else { // 当读取到 BulkString 第二行时,根据给出的长度进行读取 msg = make([]byte, fixedLen + 2) _, err = io.ReadFull(reader, msg) // 判断是否以 \r\n 结尾 if len(msg) == 0 || msg[len(msg) - 2] != '\r' || msg[len(msg) - 1] != '\n'{ errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"} _, _ = client.conn.Write(errReply.ToBytes()) } // Bulk String 读取完毕,从新使用正常模式 fixedLen = 0 } // 处理 IO 异常 if err != nil { if err == io.EOF || err == io.ErrUnexpectedEOF { logger.Info("connection close") } else { logger.Warn(err) } _ = client.Close() h.activeConn.Delete(client) return // io error, disconnect with client } /* 解析收到的数据 */ if !client.sending.Get() { // sending == false 代表收到了一条新指令 if msg[0] == '*' { // 读取第一行获取参数个数 expectedLine, err := strconv.ParseUint(string(msg[1:len(msg)-2]), 10, 32) if err != nil { _, _ = client.conn.Write(UnknownErrReplyBytes) continue } // 初始化客户端状态 client.waitingReply.Add(1) // 有指令未处理完成,阻止服务器关闭 client.sending.Set(true) // 正在接收指令中 // 初始化计数器和缓冲区 client.expectedArgsCount = uint32(expectedLine) client.receivedCount = 0 client.args = make([][]byte, expectedLine) } else { // TODO: text protocol } } else { // 收到了指令的剩余部分(非首行) line := msg[0:len(msg)-2] // 移除换行符 if line[0] == '$' { // BulkString 的首行,读取String长度 fixedLen, err = strconv.ParseInt(string(line[1:]), 10, 64) if err != nil { errReply := &reply.ProtocolErrReply{Msg:err.Error()} _, _ = client.conn.Write(errReply.ToBytes()) } if fixedLen <= 0 { errReply := &reply.ProtocolErrReply{Msg:"invalid multibulk length"} _, _ = client.conn.Write(errReply.ToBytes()) } } else { // 收到参数 client.args[client.receivedCount] = line client.receivedCount++ } // 一条命令发送完毕 if client.receivedCount == client.expectedArgsCount { client.sending.Set(false) // 执行命令并响应 result := h.db.Exec(client.args) if result != nil { _, _ = conn.Write(result.ToBytes()) } else { _, _ = conn.Write(UnknownErrReplyBytes) } // 重置客户端状态,等待下一条指令 client.expectedArgsCount = 0 client.receivedCount = 0 client.args = nil client.waitingReply.Done() } } } }