撮合引擎开发:流程的代码实现

欢迎关注「Keegan小钢」公众号获取更多文章

撮合引擎开发:开篇git

撮合引擎开发:MVP版本github

撮合引擎开发:数据结构设计web

撮合引擎开发:对接黑箱redis

撮合引擎开发:解密黑箱流程spring

撮合引擎开发:流程的代码实现json


程序入口

咱们要开始聊代码实现逻辑了,若是不记得以前讲的目录结构,请回去翻看前文。聊代码实现的第一步天然从程序入口开始,核心就两个函数:init()main(),其代码以下:缓存

package main

... //other codes

func init() {
    initViper()
    initLog()

    engine.Init()
    middleware.Init()
    process.Init()
}

func main() {
    mux := http.NewServeMux()
    mux.HandleFunc("/openMatching", handler.OpenMatching)
    mux.HandleFunc("/closeMatching", handler.CloseMatching)
    mux.HandleFunc("/handleOrder", handler.HandleOrder)

    log.Printf("HTTP ListenAndServe at port %s", viper.GetString("server.port"))
    if err := http.ListenAndServe(viper.GetString("server.port"), mux); err != nil {
        panic(err)
    }
}

init() 函数作了一些初始化的操做,我来简单介绍这几个初始化函数:数据结构

  • initViper():配置文件初始化,使用了第三方配置库 viper,这是一个被普遍使用的配置库,其 github 地址为 https://github.com/spf13/viper
  • initLog():日志初始化,程序主要使用本身定义的日志包用来输出日志文件,该日志包的实现后续文章再单独讲。
  • engine.Init():引擎包的初始化,只是初始化了一个 map,用来保存不一样交易标的的订单 channel,做为各交易标的的定序队列来用。
  • middleware.Init():中间件的初始化,咱们用到的中间件就只有 Redis,因此这里其实就是初始化 Redis 链接。Redis 客户端库方面我选择的是 go-redis/redis
  • process.Init():这一步主要是从缓存加载和恢复各交易标的引擎的启动和全部订单数据。

viper 和 redis 的初始化都是参照官方 demo 写的,这里就不展开说明了。log 后续再单独讲。engine 包和 process 包的初始化就须要好好讲讲。app

其中,引擎包的初始化虽然很是简单,但很关键,其代码写在 engine/init.go 文件中,完整代码以下:框架

package engine

var ChanMap map[string]chan Order

func Init() {
    ChanMap = make(map[string]chan Order)
}

这个保存通道的 map,其 Key 是各交易标的的 symbol,便是说每一个交易标的各有一个订单通道,这些订单通道将做为每一个交易标的的定序队列。

process 包的初始化则以下:

func Init() {
    symbols := cache.GetSymbols()
    for _, symbol := range symbols {
        price := cache.GetPrice(symbol)
        NewEngine(symbol, price)

        orderIds := cache.GetOrderIdsWithAction(symbol)
        for _, orderId := range orderIds {
            mapOrder := cache.GetOrder(symbol, orderId)
            order := engine.Order{}
            order.FromMap(mapOrder)
            engine.ChanMap[order.Symbol] <- order
        }
    }
}

简单讲解下实现逻辑:

  1. 从缓存读取全部 symbol,即程序重启以前,已经开启了撮合的全部交易标的的 symbol;
  2. 从缓存读取每一个 symbol 对应的价格,这是程序重启前的最新成交价格;
  3. 启动每一个 symbol 的撮合引擎;
  4. 从缓存读取每一个 symbol 的全部订单,这些订单都是按时间顺序排列的;
  5. 按顺序将这些订单添加到对应 symbol 的订单通道里去。

若是对这里面有些设计逻辑还不太明白的话,也不要紧,后面讲到对应模块时会再详细说明。

main() 函数里,定义了咱们以前所说的三个接口,分别交由对应的 handler 去处理具体的请求,以后就启动 http 服务了。

handler

由于只有几个接口,并且也很简单,所以,并无引入第三方 web 框架,handler 都是用原生实现的。先来看看 OpenMatching 的完整实现:

package handler

import (
    "encoding/json"
    "io/ioutil"
    "net/http"
    "strings"

    "matching/errcode"
    "matching/process"

    "github.com/shopspring/decimal"
)

type openMatchingParams struct {
    Symbol string          `json:"symbol"`
    Price  decimal.Decimal `json:"price"`
}

func OpenMatching(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "application/json")
    if r.Method != http.MethodPost {
        w.WriteHeader(http.StatusMethodNotAllowed)
        return
    }

    body, err := ioutil.ReadAll(r.Body)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    var params openMatchingParams
    if err := json.Unmarshal(body, &params); err != nil {
        w.WriteHeader(http.StatusBadRequest)
        return
    }

    if strings.TrimSpace(params.Symbol) == "" {
        w.Write(errcode.BlankSymbol.ToJson())
        return
    }

    if params.Price.IsNegative() {
        w.Write(errcode.InvalidPrice.ToJson())
        return
    }

    if e := process.NewEngine(params.Symbol, params.Price); !e.IsOK() {
        w.Write(e.ToJson())
        return
    }

    w.Write(errcode.OK.ToJson())
}

逻辑很是简单,先判断是否为 POST 请求,再读取 body 里的数据并转为结构体对象,接着对参数作个简单的检查,最后就调用 process.NewEngine(symbol, price) 进入下一步的业务逻辑,若是结果返回是 OK,也返回 OK 做为请求的响应。

另外,用到了第三方的 decimal.Decimal 类型用来表示价格,整个程序都统一用 decimal 来表示浮点数和作精确计算。

CloseMatchingHandleOrder 的实现逻辑也是同理,CloseMatching 最后会调用 process.CloseEngine(symbol) 函数进入下一步的处理,HandleOrder 最后则调用 process.Dispatch(order) 进入下一步。不过,Order 结构体是定义在 engine 包的,其结构以下:

type Order struct {
    Action    enum.OrderAction `json:"action"`
    Symbol    string           `json:"symbol"`
    OrderId   string           `json:"orderId"`
    Side      enum.OrderSide   `json:"side"`
    Type      enum.OrderType   `json:"type"`
    Amount    decimal.Decimal  `json:"amount"`
    Price     decimal.Decimal  `json:"price"`
    Timestamp int64            `json:"timestamp"`
}

能够看到,其中的字段,除了有 Decimal 类型,还有 enum 包的几个类型,这几个实际上是咱们程序中本身定义的枚举类型。Golang 语言自己并无提供和其余语言同样的 enum 关键字来定义枚举类型,因此通常采用类型定义+常量来模拟枚举类型,以 enum.OrderAction 为例:

type OrderAction string

const (
    ActionCreate OrderAction = "create"
    ActionCancel OrderAction = "cancel"
)

其余几个枚举类型也是这样定义的。

另外,为了方便转为字符串和检验参数是否有效,程序中还为每一个枚举类型分别提供了两个函数,仍是以 OrderAction 为例:

func (o OrderAction) String() string {
    switch o {
    case ActionCreate:
        return "create"
    case ActionCancel:
        return "cancel"
    default:
        return "unknown"
    }
}

func (o OrderAction) Valid() bool {
    if o.String() == "unknown" {
        return false
    }
    return true
}

其余几个枚举类型也都定义了相似的两个函数,就再也不贴代码了。

process 包

来回顾下 process 包有哪些文件:

└── process                  #
    ├── close_engine.go      # 关闭引擎
    ├── dispatch.go          # 分发订单
    ├── init.go              # 初始化
    └── new_engine.go        # 启动新引擎

init.go 就一个初始化函数,上文已经讲了。其余三个文件分别定义了上文三个 handler 对应的下一步逻辑实现。

启动新引擎

先来看看 new_engine.go

package process

import (
    "matching/engine"
    "matching/errcode"
    "matching/middleware/cache"

    "github.com/shopspring/decimal"
)

func NewEngine(symbol string, price decimal.Decimal) *errcode.Errcode {
    if engine.ChanMap[symbol] != nil {
        return errcode.EngineExist
    }

    engine.ChanMap[symbol] = make(chan engine.Order, 100)
    go engine.Run(symbol, price)

    cache.SaveSymbol(symbol)
    cache.SavePrice(symbol, price)

    return errcode.OK
}

逻辑也是比较简单的,第一步先判断 ChanMap[symbol] 是否为空,该 ChanMap 就是上文所说的引擎包初始化时用来保存订单通道的 map。若是 ChanMap[symbol] 不为空,说明该 symbol 的撮合引擎已经启动过了,那就返回错误。若是为空,那就初始化这个 symbol 的通道,从代码可知,ChanMap[symbol] 初始化为一个缓冲大小为 100 的订单通道。

接着,就调用 engine.Run() 启动一个 goroutine 了,这行代码即表示用 goroutine 的方式启动指定 symbol 的撮合引擎了。

而后,就将 symbol 和 price 都缓存起来了。

最后,返回 OK,搞定。

2. 分发订单

接着,来看看 Dispatch 的实现又是怎样的:

func Dispatch(order engine.Order) *errcode.Errcode {
    if engine.ChanMap[order.Symbol] == nil {
        return errcode.EngineNotFound
    }

    if order.Action == enum.ActionCreate {
        if cache.OrderExist(order.Symbol, order.OrderId, order.Action.String()) {
            return errcode.OrderExist
        }
    } else {
        if !cache.OrderExist(order.Symbol, order.OrderId, enum.ActionCreate.String()) {
            return errcode.OrderNotFound
        }
    }

    order.Timestamp = time.Now().UnixNano() / 1e3
    cache.SaveOrder(order.ToMap())
    engine.ChanMap[order.Symbol] <- order

    return errcode.OK
}

第一步,判断 ChanMap[order.Symbol] 是否为空,若是为空,表示引擎没开启,那就没法处理订单。

第二步,判断订单是否存在。若是是 create 订单,那缓存中就不该该查到订单,不然说明是重复请求。若是是 cancel 订单,那缓存中若是也查不到订单,那说明该订单已经所有成交或已经成功撤单过了。

第三步,将订单时间设为当前时间,时间单位是 100 纳秒,这能够保证时间戳长度恰好为 16 位,保存到 Redis 里就不会有精度失真的问题。这点后续文章讲到 Redis 详细设计时再说。

第四步,将订单缓存。

第五步,将订单传入对应的订单通道,对应引擎会从该通道中获取该订单进行处理。这一步就实现了订单的分发。

第六步,返回 OK。

3. 关闭引擎

关闭引擎的实现就很是简单了,请看代码:

func CloseEngine(symbol string) *errcode.Errcode {
    if engine.ChanMap[symbol] == nil {
        return errcode.EngineNotFound
    }

    close(engine.ChanMap[symbol])

    return errcode.OK
}

核心代码就一行,将对应 symbol 的订单通道关闭。后续的处理实际上是在引擎里完成的,待会咱们再结合引擎里的代码来说解这个设计。

引擎入口的实现

交易引擎 goroutine 的启动入口就是 engine.Run() 函数,来看看其代码实现:

func Run(symbol string, price decimal.Decimal) {
    lastTradePrice := price

    book := &orderBook{}
    book.init()

    log.Info("engine %s is running", symbol)
    for {
        order, ok := <-ChanMap[symbol]
        if !ok {
            log.Info("engine %s is closed", symbol)
            delete(ChanMap, symbol)
            cache.Clear(symbol)
            return
        }
        log.Info("engine %s receive an order: %s", symbol, order.ToJson())
        switch order.Action {
        case enum.ActionCreate:
            dealCreate(&order, book, &lastTradePrice)
        case enum.ActionCancel:
            dealCancel(&order, book)
        }
    }
}

第一步,先定义和初始化了一个 book 变量,该变量就是用来保存整个交易委托帐本

接着,就是一个 for 循环了,for 循环里的第一行就是从对应 symbol 的订单通道里读取出一个订单,读取到订单时,order 变量就会有值,且 ok 变量为 true。若是通道里暂时没有订单,那就会阻塞在这行代码,直到从通道中获取到订单或通道已关闭的消息。

当通道被关闭以后,最后,从通道中读取到的 ok 变量则为 false,固然,在这以前,会先依序读取完通道里剩下的订单。当 ok 为 false 时,引擎里会执行两步操做:一是从 ChanMap 中删除该 symbol 对应的记录,二是清空该 symbol 对应的缓存数据。最后用 return 来退出 for 循环,这样,整个 Run() 函数就结束退出了,意味着该引擎也真正关闭了。

当每读取到一个订单,就会判断是下单仍是撤单,而后进行相应的逻辑处理了。

咱们先来看看撤单的逻辑,这个比较简单:

func dealCancel(order *Order, book *orderBook) {
    var ok bool
    switch order.Side {
    case enum.SideBuy:
        ok = book.removeBuyOrder(order)
    case enum.SideSell:
        ok = book.removeSellOrder(order)
    }

    cache.RemoveOrder(order.ToMap())
    mq.SendCancelResult(order.Symbol, order.OrderId, ok)
    log.Info("engine %s, order %s cancel result is %s", order.Symbol, order.OrderId, ok)
}

核心就三个步骤:

  1. 从委托帐本中移除该订单;
  2. 从缓存中移除该订单;
  3. 发送撤单结果到 MQ。

下单逻辑就比较复杂了,须要根据不一样的订单类型作不一样的逻辑处理,请看代码:

func dealCreate(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
    switch order.Type {
    case enum.TypeLimit:
        dealLimit(order, book, lastTradePrice)
    case enum.TypeLimitIoc:
        dealLimitIoc(order, book, lastTradePrice)
    case enum.TypeMarket:
        dealMarket(order, book, lastTradePrice)
    case enum.TypeMarketTop5:
        dealMarketTop5(order, book, lastTradePrice)
    case enum.TypeMarketTop10:
        dealMarketTop10(order, book, lastTradePrice)
    case enum.TypeMarketOpponent:
        dealMarketOpponent(order, book, lastTradePrice)
    }
}

每一个类型再分买卖方向处理,以 dealLimit() 为例:

func dealLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
    switch order.Side {
    case enum.SideBuy:
        dealBuyLimit(order, book, lastTradePrice)
    case enum.SideSell:
        dealSellLimit(order, book, lastTradePrice)
    }
}

而后,再来看看 dealBuyLimit() 的处理逻辑:

func dealBuyLimit(order *Order, book *orderBook, lastTradePrice *decimal.Decimal) {
LOOP:
    headOrder := book.getHeadSellOrder()
    if headOrder == nil || order.Price.LessThan(headOrder.Price) {
        book.addBuyOrder(order)
        log.Info("engine %s, a order has added to the orderbook: %s", order.Symbol, order.ToJson())
    } else {
        matchTrade(headOrder, order, book, lastTradePrice)
        if order.Amount.IsPositive() {
            goto LOOP
        }
    }
}

我来解析下这个处理流程:

  1. 从委托帐本中读取出卖单队列的头部订单;
  2. 若是头部订单为空,或新订单(买单)价格小于头部订单(卖单),则没法匹配成交,那就将新订单添加到委托帐本的买单队列中去;
  3. 若是头部订单不为空,且新订单(买单)价格大于等于头部订单(卖单),则两个订单能够匹配成交,那就对这两个订单进行成交处理;
  4. 若是上一步的成交处理完以后,新订单的剩余数量还不为零,那就继续重复第一步。

其中,匹配成交的记录会做为一条输出记录发送到 MQ。

对其余类型的处理也是相似的,就再也不一一讲解了。

那引擎包的实现就先讲到这里,后续文章再聊其余部分的实现。

小结

本小节主要仍是经过代码梳理清楚整个数据流程,包括一些细节上的设计。理解了本文所列举的这些代码,也就对整个撮合服务的实现理解一大半了。

此次的思考题:ChanMap 保存的订单通道是否能够改用无缓冲的通道?用无缓冲的通道和用有缓冲的通道处理逻辑有哪些不一样?两种方案各自的优缺点是什么?


扫描如下二维码便可关注公众号(公众号名称:Keegan小钢)

做者的我的博客
相关文章
相关标签/搜索