欢迎关注「Keegan小钢」公众号获取更多文章
撮合引擎开发:开篇git
撮合引擎开发:MVP版本github
撮合引擎开发:对接黑箱redis
撮合引擎开发:解密黑箱流程spring
撮合引擎开发:流程的代码实现json
咱们要开始聊代码实现逻辑了,若是不记得以前讲的目录结构,请回去翻看前文。聊代码实现的第一步天然从程序入口开始,核心就两个函数:init() 和 main(),其代码以下:缓存
package main ... //other codes func init() { initViper() initLog() engine.Init() middleware.Init() process.Init() } func main() { mux := http.NewServeMux() mux.HandleFunc("/openMatching", handler.OpenMatching) mux.HandleFunc("/closeMatching", handler.CloseMatching) mux.HandleFunc("/handleOrder", handler.HandleOrder) log.Printf("HTTP ListenAndServe at port %s", viper.GetString("server.port")) if err := http.ListenAndServe(viper.GetString("server.port"), mux); err != nil { panic(err) } }
init() 函数作了一些初始化的操做,我来简单介绍这几个初始化函数:数据结构
viper 和 redis 的初始化都是参照官方 demo 写的,这里就不展开说明了。log 后续再单独讲。engine 包和 process 包的初始化就须要好好讲讲。app
其中,引擎包的初始化虽然很是简单,但很关键,其代码写在 engine/init.go 文件中,完整代码以下:框架
package engine var ChanMap map[string]chan Order func Init() { ChanMap = make(map[string]chan Order) }
这个保存通道的 map,其 Key 是各交易标的的 symbol,便是说每一个交易标的各有一个订单通道,这些订单通道将做为每一个交易标的的定序队列。
process 包的初始化则以下:
func Init() { symbols := cache.GetSymbols() for _, symbol := range symbols { price := cache.GetPrice(symbol) NewEngine(symbol, price) orderIds := cache.GetOrderIdsWithAction(symbol) for _, orderId := range orderIds { mapOrder := cache.GetOrder(symbol, orderId) order := engine.Order{} order.FromMap(mapOrder) engine.ChanMap[order.Symbol] <- order } } }
简单讲解下实现逻辑:
若是对这里面有些设计逻辑还不太明白的话,也不要紧,后面讲到对应模块时会再详细说明。
main() 函数里,定义了咱们以前所说的三个接口,分别交由对应的 handler 去处理具体的请求,以后就启动 http 服务了。
由于只有几个接口,并且也很简单,所以,并无引入第三方 web 框架,handler 都是用原生实现的。先来看看 OpenMatching 的完整实现:
package handler import ( "encoding/json" "io/ioutil" "net/http" "strings" "matching/errcode" "matching/process" "github.com/shopspring/decimal" ) type openMatchingParams struct { Symbol string `json:"symbol"` Price decimal.Decimal `json:"price"` } func OpenMatching(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } body, err := ioutil.ReadAll(r.Body) if err != nil { w.WriteHeader(http.StatusBadRequest) return } var params openMatchingParams if err := json.Unmarshal(body, ¶ms); err != nil { w.WriteHeader(http.StatusBadRequest) return } if strings.TrimSpace(params.Symbol) == "" { w.Write(errcode.BlankSymbol.ToJson()) return } if params.Price.IsNegative() { w.Write(errcode.InvalidPrice.ToJson()) return } if e := process.NewEngine(params.Symbol, params.Price); !e.IsOK() { w.Write(e.ToJson()) return } w.Write(errcode.OK.ToJson()) }
逻辑很是简单,先判断是否为 POST 请求,再读取 body 里的数据并转为结构体对象,接着对参数作个简单的检查,最后就调用 process.NewEngine(symbol, price) 进入下一步的业务逻辑,若是结果返回是 OK,也返回 OK 做为请求的响应。
另外,用到了第三方的 decimal.Decimal 类型用来表示价格,整个程序都统一用 decimal 来表示浮点数和作精确计算。
CloseMatching 和 HandleOrder 的实现逻辑也是同理,CloseMatching 最后会调用 process.CloseEngine(symbol) 函数进入下一步的处理,HandleOrder 最后则调用 process.Dispatch(order) 进入下一步。不过,Order 结构体是定义在 engine 包的,其结构以下:
type Order struct { Action enum.OrderAction `json:"action"` Symbol string `json:"symbol"` OrderId string `json:"orderId"` Side enum.OrderSide `json:"side"` Type enum.OrderType `json:"type"` Amount decimal.Decimal `json:"amount"` Price decimal.Decimal `json:"price"` Timestamp int64 `json:"timestamp"` }
能够看到,其中的字段,除了有 Decimal 类型,还有 enum 包的几个类型,这几个实际上是咱们程序中本身定义的枚举类型。Golang 语言自己并无提供和其余语言同样的 enum 关键字来定义枚举类型,因此通常采用类型定义+常量来模拟枚举类型,以 enum.OrderAction 为例:
type OrderAction string const ( ActionCreate OrderAction = "create" ActionCancel OrderAction = "cancel" )
其余几个枚举类型也是这样定义的。
另外,为了方便转为字符串和检验参数是否有效,程序中还为每一个枚举类型分别提供了两个函数,仍是以 OrderAction 为例:
func (o OrderAction) String() string { switch o { case ActionCreate: return "create" case ActionCancel: return "cancel" default: return "unknown" } } func (o OrderAction) Valid() bool { if o.String() == "unknown" { return false } return true }
其余几个枚举类型也都定义了相似的两个函数,就再也不贴代码了。
来回顾下 process 包有哪些文件:
└── process # ├── close_engine.go # 关闭引擎 ├── dispatch.go # 分发订单 ├── init.go # 初始化 └── new_engine.go # 启动新引擎
init.go 就一个初始化函数,上文已经讲了。其余三个文件分别定义了上文三个 handler 对应的下一步逻辑实现。
先来看看 new_engine.go:
package process import ( "matching/engine" "matching/errcode" "matching/middleware/cache" "github.com/shopspring/decimal" ) func NewEngine(symbol string, price decimal.Decimal) *errcode.Errcode { if engine.ChanMap[symbol] != nil { return errcode.EngineExist } engine.ChanMap[symbol] = make(chan engine.Order, 100) go engine.Run(symbol, price) cache.SaveSymbol(symbol) cache.SavePrice(symbol, price) return errcode.OK }
逻辑也是比较简单的,第一步先判断 ChanMap[symbol] 是否为空,该 ChanMap 就是上文所说的引擎包初始化时用来保存订单通道的 map。若是 ChanMap[symbol] 不为空,说明该 symbol 的撮合引擎已经启动过了,那就返回错误。若是为空,那就初始化这个 symbol 的通道,从代码可知,ChanMap[symbol] 初始化为一个缓冲大小为 100 的订单通道。
接着,就调用 engine.Run() 启动一个 goroutine 了,这行代码即表示用 goroutine 的方式启动指定 symbol 的撮合引擎了。
而后,就将 symbol 和 price 都缓存起来了。
最后,返回 OK,搞定。
接着,来看看 Dispatch 的实现又是怎样的:
func Dispatch(order engine.Order) *errcode.Errcode { if engine.ChanMap[order.Symbol] == nil { return errcode.EngineNotFound } if order.Action == enum.ActionCreate { if cache.OrderExist(order.Symbol, order.OrderId, order.Action.String()) { return errcode.OrderExist } } else { if !cache.OrderExist(order.Symbol, order.OrderId, enum.ActionCreate.String()) { return errcode.OrderNotFound } } order.Timestamp = time.Now().UnixNano() / 1e3 cache.SaveOrder(order.ToMap()) engine.ChanMap[order.Symbol] <- order return errcode.OK }
第一步,判断 ChanMap[order.Symbol] 是否为空,若是为空,表示引擎没开启,那就没法处理订单。
第二步,判断订单是否存在。若是是 create 订单,那缓存中就不该该查到订单,不然说明是重复请求。若是是 cancel 订单,那缓存中若是也查不到订单,那说明该订单已经所有成交或已经成功撤单过了。
第三步,将订单时间设为当前时间,时间单位是 100 纳秒,这能够保证时间戳长度恰好为 16 位,保存到 Redis 里就不会有精度失真的问题。这点后续文章讲到 Redis 详细设计时再说。
第四步,将订单缓存。
第五步,将订单传入对应的订单通道,对应引擎会从该通道中获取该订单进行处理。这一步就实现了订单的分发。
第六步,返回 OK。
关闭引擎的实现就很是简单了,请看代码:
func CloseEngine(symbol string) *errcode.Errcode { if engine.ChanMap[symbol] == nil { return errcode.EngineNotFound } close(engine.ChanMap[symbol]) return errcode.OK }
核心代码就一行,将对应 symbol 的订单通道关闭。后续的处理实际上是在引擎里完成的,待会咱们再结合引擎里的代码来说解这个设计。
交易引擎 goroutine 的启动入口就是 engine.Run() 函数,来看看其代码实现:
func Run(symbol string, price decimal.Decimal) { lastTradePrice := price book := &orderBook{} book.init() log.Info("engine %s is running", symbol) for { order, ok := <-ChanMap[symbol] if !ok { log.Info("engine %s is closed", symbol) delete(ChanMap, symbol) cache.Clear(symbol) return } log.Info("engine %s receive an order: %s", symbol, order.ToJson()) switch order.Action { case enum.ActionCreate: dealCreate(&order, book, &lastTradePrice) case enum.ActionCancel: dealCancel(&order, book) } } }
第一步,先定义和初始化了一个 book 变量,该变量就是用来保存整个交易委托帐本。
接着,就是一个 for 循环了,for 循环里的第一行就是从对应 symbol 的订单通道里读取出一个订单,读取到订单时,order 变量就会有值,且 ok 变量为 true。若是通道里暂时没有订单,那就会阻塞在这行代码,直到从通道中获取到订单或通道已关闭的消息。
当通道被关闭以后,最后,从通道中读取到的 ok 变量则为 false,固然,在这以前,会先依序读取完通道里剩下的订单。当 ok 为 false 时,引擎里会执行两步操做:一是从 ChanMap 中删除该 symbol 对应的记录,二是清空该 symbol 对应的缓存数据。最后用 return 来退出 for 循环,这样,整个 Run() 函数就结束退出了,意味着该引擎也真正关闭了。
当每读取到一个订单,就会判断是下单仍是撤单,而后进行相应的逻辑处理了。
咱们先来看看撤单的逻辑,这个比较简单:
func dealCancel(order *Order, book *orderBook) { var ok bool switch order.Side { case enum.SideBuy: ok = book.removeBuyOrder(order) case enum.SideSell: ok = book.removeSellOrder(order) } cache.RemoveOrder(order.ToMap()) mq.SendCancelResult(order.Symbol, order.OrderId, ok) log.Info("engine %s, order %s cancel result is %s", order.Symbol, order.OrderId, ok) }
核心就三个步骤:
下单逻辑就比较复杂了,须要根据不一样的订单类型作不一样的逻辑处理,请看代码:
func dealCreate(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) { switch order.Type { case enum.TypeLimit: dealLimit(order, book, lastTradePrice) case enum.TypeLimitIoc: dealLimitIoc(order, book, lastTradePrice) case enum.TypeMarket: dealMarket(order, book, lastTradePrice) case enum.TypeMarketTop5: dealMarketTop5(order, book, lastTradePrice) case enum.TypeMarketTop10: dealMarketTop10(order, book, lastTradePrice) case enum.TypeMarketOpponent: dealMarketOpponent(order, book, lastTradePrice) } }
每一个类型再分买卖方向处理,以 dealLimit() 为例:
func dealLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) { switch order.Side { case enum.SideBuy: dealBuyLimit(order, book, lastTradePrice) case enum.SideSell: dealSellLimit(order, book, lastTradePrice) } }
而后,再来看看 dealBuyLimit() 的处理逻辑:
func dealBuyLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) { LOOP: headOrder := book.getHeadSellOrder() if headOrder == nil || order.Price.LessThan(headOrder.Price) { book.addBuyOrder(order) log.Info("engine %s, a order has added to the orderbook: %s", order.Symbol, order.ToJson()) } else { matchTrade(headOrder, order, book, lastTradePrice) if order.Amount.IsPositive() { goto LOOP } } }
我来解析下这个处理流程:
其中,匹配成交的记录会做为一条输出记录发送到 MQ。
对其余类型的处理也是相似的,就再也不一一讲解了。
那引擎包的实现就先讲到这里,后续文章再聊其余部分的实现。
本小节主要仍是经过代码梳理清楚整个数据流程,包括一些细节上的设计。理解了本文所列举的这些代码,也就对整个撮合服务的实现理解一大半了。
此次的思考题:ChanMap 保存的订单通道是否能够改用无缓冲的通道?用无缓冲的通道和用有缓冲的通道处理逻辑有哪些不一样?两种方案各自的优缺点是什么?
扫描如下二维码便可关注公众号(公众号名称:Keegan小钢)
做者的我的博客