欢迎关注「Keegan小钢」公众号获取更多文章git
价值超5万的撮合引擎:开篇github
价值超5万的撮合引擎:MVP版本redis
撮合引擎开发:对接黑箱服务器
撮合引擎开发:流程的代码实现数据结构
中间件
先来回顾下咱们撮合程序项目中关于中间件的目录结构:分布式
├── middleware # 中间件的包│ ├── cache # 缓存包│ │ └── cache.go # 缓存操做│ ├── mq # 消息队列包│ │ └── mq.go # MQ操做│ └── redis.go # 主要作Redis初始化操做
虽然如今只用到了 Redis 一个中间件,但设计个 middleware 包,会方便之后扩展添加其余中间件,如 Kafka 或 RocketMQ 等。函数
再将缓存和消息队列分包,职责上就很分明,应用时也很明确。flex
redis.go 就只是作初始化的链接,咱们来看看代码:
package middleware
import ( "matching/log"
"github.com/go-redis/redis" "github.com/spf13/viper")
var RedisClient *redis.Client
func Init() { addr := viper.GetString("redis.addr") RedisClient = redis.NewClient(&redis.Options{ Addr: addr, Password: "", // no password set DB: 0, // use default DB })
_, err := RedisClient.Ping().Result() if err != nil { panic(err) } else { log.Printf("Connected to redis: %s", addr) }}
其中,viper 是前文说过的第三方配置库,经过 viper.GetString("redis.addr") 从配置文件读取出要链接的 Redis 的地址,以后就新建一个 Redis 客户端并链接上 Redis 服务器了。
缓存的设计
讲数据结构设计时,咱们已经说过,使用缓存的目的主要有两个:
1.请求去重,避免重复提交相同订单;2.恢复数据,即程序重启后能恢复全部数据。
还记得上一篇文章讲 Dispatch 的实现时,有个判断订单是否存在的逻辑吗?就是读取缓存中是否已经存在该订单,从而判别是否为重复请求或无效请求。以及,还记得 process 包的初始化?就是从缓存中恢复数据的过程。
先了解下,咱们总共缓存了哪些数据:
•
开启撮合的交易标的 symbol;
•
这些交易标的的最新价格;
•全部有效的订单请求,包括下单和撤单请求。
1. 缓存symbol
开启撮合的交易标的 symbol 会有多个,且不能重复,那其实就能够保存为集合 set 类型。我将该 set 的 key 设计为 matching:symbols,以后,每有一个 symbol 开启撮合时,就能够用 Redis 的 sadd 命令将该 symbol 添加进这个集合里去了。而关闭撮合时,则需用 srem 命令将关闭撮合的 symbol 从集合中移除。读取全部 symbol 则可用 smembers 命令操做。
程序里对 symbol 的操做提供了三个函数,分别用来保存 symbol、移除 symbol 和获取全部 symbol,如下是实现的代码:
func SaveSymbol(symbol string) { key := "matching:symbols" RedisClient.SAdd(key, symbol)}
func RemoveSymbol(symbol string) { key := "matching:symbols" RedisClient.SRem(key, symbol)}
func GetSymbols() []string { key := "matching:symbols" return RedisClient.SMembers(key).Val()}
2. 缓存价格
交易标的的最新价格则是每一个 symbol 会有一个价格,且无需缓存历史价格,那我就直接用字符串类型来保存价格,而每一个价格的 key 则包含有各自的 symbol,key 的格式设计为 matching:price:{symbol},假如要保存的 symbol = “BTCUSD”,那对应的 key 值就是 matching:price:BTCUSD,保存的 value 值就是 BTCUSD 的最新价格。
咱们也一样提供了保存价格、获取价格和删除价格的三个函数,代码以下:
func SavePrice(symbol string, price decimal.Decimal) { key := "matching:price:" + symbol RedisClient.Set(key, price.String(), 0)}
func GetPrice(symbol string) decimal.Decimal { key := "matching:price:" + symbol priceStr := RedisClient.Get(key).Val() result, err := decimal.NewFromString(priceStr) if err != nil { result = decimal.Zero } return result}
func RemovePrice(symbol string) { key := "matching:price:" + symbol RedisClient.Del(key)}
3. 缓存订单
对订单的缓存设计则没那么简单了,须要知足两点要求:
1.既能缓存下单请求,也能缓存撤单请求;2.订单要符合定序要求。
先说下第一点,为何须要缓存订单?且为何下单和撤单请求都须要缓存?
先来解答第一个问题,咱们是在内存中撮合的,每一个交易标的引擎里各自维护了一个交易委托帐本,程序运行时,这些帐本是直接保存在程序内存里的。那若是程序退出了,这些帐本都被清空了。若是没有缓存,那程序重启后就没法恢复帐本数据。要知足该需求,就须要缓存帐本里的全部委托单。
关于第二个问题,咱们来考虑这样一个场景:假如订单通道里有撤单请求在排队,而程序并无对撤单请求作缓存,这时程序重启了,那么订单通道里的全部订单还没被引擎接收处理以前就被清空了,撤单请求也就没法恢复了。
所以,程序须要缓存好订单,且下单和撤单都须要缓存。
再来看第二个要求,为何要符合定序?咱们知道,订单通道里的订单是定序的,交易委托帐本里同价格的订单也是按时间排序的,那缓存时若是不定序,程序重启后就难以保证按原有的顺序恢复订单。
那具体要怎么来设计这个订单的缓存呢?个人方案是分两类缓存,第一类保存每一个独立的订单请求,包括下单和撤单;第二类分交易标的保存对应 symbol 全部订单请求的订单 ID 和 action。
第一类,我设计的 Key 格式为 matching:order:{symbol}:{orderId}:{action},symbol、orderId 和 action 则是对应订单的三个变量值。好比,某订单 symbol = “BTCUSD”,orderId = “12345”,action = “cancel”,那该订单保存到 Redis 的 Key 值就是 matching:order:BTCUSD:12345:cancel。该 Key 对应的 Value 则是保存整个订单对象,能够用 hash 类型存储。
第二类,我设计的 Key 格式为 matching:orderids:{symbol},Value 保存的是 sorted set 类型的数据,保存对应 symbol 的全部订单请求,每条记录保存的值为 {orderId}:{action},而 score 值设为对应订单的 {timestamp}。用订单时间做为 score 就能够保证定序了。还记得以前文章咱们将订单时间的单位设为 100 纳秒,保证时间戳长度恰好为 16 位吗?这是由于,若是超过 16 位,那 score 将转为科学计数法表示,那将会致使数字失真。
根据这样的设计,那保存订单时的实现逻辑就如如下代码所示:
func SaveOrder(order map[string]interface{}) { symbol := order["symbol"].(string) orderId := order["orderId"].(string) timestamp := order["timestamp"].(float64) action := order["action"].(string)
key := "matching:order:" + symbol + ":" + orderId + ":" + action RedisClient.HMSet(key, order)
key = "matching:orderids:" + symbol z := &redis.Z{ Score: timestamp, Member: orderId + ":" + action, } RedisClient.ZAdd(key, z)}
另外,还提供了 GetOrder()、UpdateOrder()、RemoveOrder()、OrderExist()、GetOrderIdsWithAction() 等函数。再给大伙看看 GetOrderIdsWithAction() 函数的实现:
func GetOrderIdsWithAction(symbol string) []string { key := "matching:orderids:" + symbol return RedisClient.ZRange(key, 0, -1).Val()}
该函数获得的结果是根据 score 值排好序的,这就是咱们想要的结果。理解了这个设计以后,再翻回去看看 process 包的初始化,你就会明白那些代码的逻辑了。
MQ的设计
咱们选择了使用 Redis 的 Stream 数据结构来做为 MQ 输出,Stream 数据结构采用了相似 Kafka 的设计,应用起来很方便。但因为 Redis 运行于内存的特性,相比 Kafka 快速不少,这也是我选择它来做为撮合程序的输出 MQ 的主要缘由。
咱们只有两类 MQ,撤单结果和成交记录,发送消息的实现以下:
func SendCancelResult(symbol, orderId string, ok bool) { values := map[string]interface{}{"orderId": orderId, "ok": ok} a := &redis.XAddArgs{ Stream: "matching:cancelresults:" + symbol, MaxLenApprox: 1000, Values: values, } RedisClient.XAdd(a)}
func SendTrade(symbol string, trade map[string]interface{}) { a := &redis.XAddArgs{ Stream: "matching:trades:" + symbol, MaxLenApprox: 1000, Values: trade, } RedisClient.XAdd(a)}
其中,matching:cancelresults:{symbol} 就是撤单结果的 MQ 所属的 Key,matching:trades:{symbol} 则是成交记录的 MQ 所属的 Key。能够看到,咱们还根据不一样 symbol 分不一样 MQ,这样还方便下游服务能够根据须要实现分布式订阅不一样 symbol 的 MQ。
小结
本小节讲解了缓存和 MQ 的设计与实现,理解了这部分的设计以后,对整个撮合引擎的核心设计也基本能理解了。
最后,依然留几个思考题:是否能够不用缓存?若是不用缓存能够如何解决去重和数据恢复的问题?
本文分享自微信公众号 - Keegan小钢(keeganlee_me)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。