【前言】在搞定交易接口后,咱们开发交易系统的第一步就是要弄清楚系统的工做原理。本文是我看的公司的中频平台文档的总结,公司是基于开源的vn.py修改而来,可是驱动引擎是同样的。git
会参考vn.py官方文档,公司是参考其修改的。但在正在研发的高频交易平台上是根据CTP接口接发数据的格式,合并成单线程,配合solarflare的网卡,速度不吃亏。本文以及下面的三篇文章主要是现有的中频平台的,固然高频相似于次,增长了共享内存等。github
全部的计算机程序均可以大体分为两类:脚本型(单次运行)和连续运行型(直到用户主动退出)。算法
脚本型的程序包括最先的批处理文件以及使用Python作交易策略回测等等,这类程序的特色是在用户启动后会按照编程时设计好的步骤一步步运行,全部步骤运行完后自动退出。数据库
连续运行型的程序包含了操做系统和绝大部分咱们平常使用的软件等等,这类程序启动后会处于一个无限循环中连续运行,直到用户主动退出时才会结束。编程
咱们要开发的交易系统就是属于连续运行型程序,而这种程序根据其计算逻辑的运行机制不一样,又能够粗略的分为时间驱动和事件驱动两种。并发
时间驱动的程序逻辑相对容易设计,简单来讲就是让电脑每隔一段时间自动作一些事情。这个事情自己能够很复杂、包括不少步骤,但这些步骤都是线性的,按照顺序一步步执行下来。app
如下代码展现了一个很是简单的时间驱动的Python程序。框架
from time import sleep def demo(): print u'时间驱动的程序每隔1秒运行demo函数' while 1: demo() sleep(1.0)
时间驱动的程序本质上就是每隔一段时间固定运行一次脚本(上面代码中的demo函数)。尽管脚本自身能够很长、包含很是多的步骤,可是咱们能够看出这种程序的运行机制相对比较简单、容易理解。异步
举一些量化交易相关的例子:函数
每隔5分钟,经过新浪财经网页的公开API读取一次沪深300成分股的价格,根据当日涨幅进行排序后输出到电脑屏幕上。
每隔1秒钟,检查一次最新收到的股指期货TICK数据,更新K线和其余技术指标,检查是否知足趋势策略的下单条件,若知足则执行下单。
对速度要求较高的量化交易方面(日内CTA策略、高频策略等等),时间驱动的程序会存在一个很是大的缺点:对数据信息在反应操做上的处理延时。例子中,在每次逻辑脚本运行完等待的那1秒钟里,程序对于接收到的新数据信息(行情、成交推送等等)是不会作出任何反应的,只有在等待时间结束后脚本再次运行时才会进行相关的计算处理。而处理延时在量化交易中的直接后果就是:市价单滑点、限价单错过本可成交的价格。
时间驱动的程序在量化交易方面还存在一些其余的缺点:如浪费CPU的计算资源、实现异步逻辑复杂度高等等。
与时间驱动对应的就是事件驱动的程序:当某个新的事件被推送到程序中时(如API推送新的行情、成交),程序当即调用和这个事件相对应的处理函数进行相关的操做。
上面例子的事件驱动版:交易程序对股指TICK数据进行监听,当没有新的行情过来时,程序保持监听状态不进行任何操做;当收到新的数据时,数据处理函数当即更新K线和其余技术指标,并检查是否知足趋势策略的下单条件执行下单。
对于简单的程序,咱们能够采用上面测试代码中的方案,直接在API的回调函数中写入相应的逻辑。但随着程序复杂度的增长,这种方案会变得愈来愈不可行。假设咱们有一个带有图形界面的量化交易系统,系统在某一时刻接收到了API推送的股指期货行情数据,针对这个数据系统须要进行以下处理:
更新图表上显示的K线图形(绘图)
更新行情监控表中股指期货的行情数据(表格更新)
策略1须要运行一次内部算法,检查该数据是否会触发策略进行下单(运算、下单)
策略2一样须要运行一次内部算法,检查该数据是否会触发策略进行下单(运算、下单)
风控系统须要检查最新行情价格是否会致使帐户的总体风险超限,若超限须要进行报警(运算、报警)
此时将上面全部的操做都写到一个回调函数中无疑变成了很是差的方案,代码过长容易出错不说,可扩展性也差,每添加一个策略或者功能则又须要修改以前的源代码(有经验的读者会知道,常常修改生产代码是一种很是危险的运营管理方法)。
小结:虽然咱们的交易平台上没有图形界面,由于这只是一种辅助功能,不是生产的核心功能。可是,也有可能有其余的信号或者事件须要咱们处理,那么什么时候处理?分配多少资源处理?因此应该下降耦合,为了解决这种状况,咱们须要用到事件驱动引擎来管理不一样事件的事件监听函数并执行全部和事件驱动相关的操做。
vn.py框架中的vn.event模块包含了一个可扩展的事件驱动引擎。整个引擎的实现并不复杂,除去注释、空行后大概也就100行左右的代码:
# encoding: UTF-8 # 系统模块 from Queue import Queue, Empty from threading import Thread # 第三方模块 from PyQt4.QtCore import QTimer # 本身开发的模块 from eventType import * ######################################################################## class EventEngine: """ 事件驱动引擎 事件驱动引擎中全部的变量都设置为了私有,这是为了防止不当心 从外部修改了这些变量的值或状态,致使bug。 变量说明 __queue:私有变量,事件队列 __active:私有变量,事件引擎开关 __thread:私有变量,事件处理线程 __timer:私有变量,计时器 __handlers:私有变量,事件处理函数字典 方法说明 __run: 私有方法,事件处理线程连续运行用 __process: 私有方法,处理事件,调用注册在引擎中的监听函数 __onTimer:私有方法,计时器固定事件间隔触发后,向事件队列中存入计时器事件 start: 公共方法,启动引擎 stop:公共方法,中止引擎 register:公共方法,向引擎中注册监听函数 unregister:公共方法,向引擎中注销监听函数 put:公共方法,向事件队列中存入新的事件 事件监听函数必须定义为输入参数仅为一个event对象,即: 函数 def func(event) ... 对象方法 def method(self, event) ... """ #---------------------------------------------------------------------- def __init__(self): """初始化事件引擎""" # 事件队列 self.__queue = Queue() # 事件引擎开关 self.__active = False # 事件处理线程 self.__thread = Thread(target = self.__run) # 计时器,用于触发计时器事件 self.__timer = QTimer() self.__timer.timeout.connect(self.__onTimer) # 这里的__handlers是一个字典,用来保存对应的事件调用关系 # 其中每一个键对应的值是一个列表,列表中保存了对该事件进行监听的函数功能 self.__handlers = {} #---------------------------------------------------------------------- def __run(self): """引擎运行""" while self.__active == True: try: event = self.__queue.get(block = True, timeout = 1) # 获取事件的阻塞时间设为1秒 self.__process(event) except Empty: pass #---------------------------------------------------------------------- def __process(self, event): """处理事件""" # 检查是否存在对该事件进行监听的处理函数 if event.type_ in self.__handlers: # 若存在,则按顺序将事件传递给处理函数执行 [handler(event) for handler in self.__handlers[event.type_]] # 以上语句为Python列表解析方式的写法,对应的常规循环写法为: #for handler in self.__handlers[event.type_]: #handler(event) #---------------------------------------------------------------------- def __onTimer(self): """向事件队列中存入计时器事件""" # 建立计时器事件 event = Event(type_=EVENT_TIMER) # 向队列中存入计时器事件 self.put(event) #---------------------------------------------------------------------- def start(self): """引擎启动""" # 将引擎设为启动 self.__active = True # 启动事件处理线程 self.__thread.start() # 启动计时器,计时器事件间隔默认设定为1秒 self.__timer.start(1000) #---------------------------------------------------------------------- def stop(self): """中止引擎""" # 将引擎设为中止 self.__active = False # 中止计时器 self.__timer.stop() # 等待事件处理线程退出 self.__thread.join() #---------------------------------------------------------------------- def register(self, type_, handler): """注册事件处理函数监听""" # 尝试获取该事件类型对应的处理函数列表,若无则建立 try: handlerList = self.__handlers[type_] except KeyError: handlerList = [] self.__handlers[type_] = handlerList # 若要注册的处理器不在该事件的处理器列表中,则注册该事件 if handler not in handlerList: handlerList.append(handler) #---------------------------------------------------------------------- def unregister(self, type_, handler): """注销事件处理函数监听""" # 尝试获取该事件类型对应的处理函数列表,若无则忽略该次注销请求 try: handlerList = self.handlers[type_] # 若是该函数存在于列表中,则移除 if handler in handlerList: handlerList.remove(handler) # 若是函数列表为空,则从引擎中移除该事件类型 if not handlerList: del self.handlers[type_] except KeyError: pass #---------------------------------------------------------------------- def put(self, event): """向事件队列中存入事件""" self.__queue.put(event)
当事件驱动引擎对象被建立时,初始化函数__init__会建立如下私有变量:
__queue:用来保存事件的队列
__active:用来控制引擎启动、中止的开关
__thread:负责处理事件、执行具体操做的线程
__timer:用来每隔一段时间触发定时事件的计时器
__handlers:用来保存不一样类型事件所对应的事件处理函数的字典
引擎提供了register方法,用来向引擎注册事件处理函数的监听,传入参数为
type_:表示事件类型的常量字符串,由用户自行定义,注意不一样事件类型间不能重复
handler:当该类型的事件被触发时,用户但愿进行相应操做的事件处理函数,函数的定义方法参考代码中的注释
当用户调用register方法注册事件处理函数时,引擎会尝试获取__handlers字典中该事件类型所对应的处理函数列表(若无则建立一个空列表),并向这个列表中添加该事件处理函数。使用了Python的列表对象,用户能够很容易的控制同一个事件类型下多个事件处理函数的工做顺序,所以对某些涉及多步操做的复杂算法能够保证按照正确的顺序执行,这点是相比于某些系统0消息机制(如Qt的Signal/Slot)最大的优点。
如当标的物行情发生变化时,期权高频套利算法须要执行如下操做:
使用订价引擎先计算新的期权理论价、希腊值
使用风控引擎对当前持仓的风险度汇总,并计算报价的中间价
使用套利引擎基于预先设定的价差、下单手数等参数,计算具体价格并发单
以上三步操做,只需在交易系统启动时按顺序注册监听到标的物行情事件上,就能够保证操做顺序的正确。
和register对应的是unregister方法,用于注销事件处理函数的监听,传入参数相同,具体原理请参照源代码。在实际应用中,用户能够动态的组合使用register和unregister方法,只在须要监听某些事件的时候监听,完成后取消监听,从而节省CPU资源。
这里让笔者吐槽一下某些国内的C++平台(固然不是指全部的),每一个策略对系统里全部的订单回报进行监听,若是是自身相关的就处理,不相关的就PASS。这种写法,光是判断是否和自身相关就得多作多少无谓的判断、浪费多少CPU资源,随着策略数量的增长,浪费呈线性增长的趋势,这种平台还叫嚣作高频,唉......
用户能够经过引擎的put方法向事件队列__queue中存入事件,等待事件处理线程来进行处理,事件类的实现以下:
######################################################################## class Event: """事件对象""" #---------------------------------------------------------------------- def __init__(self, type_=None): """Constructor""" self.type_ = type_ # 事件类型 self.dict_ = {} # 字典用于保存具体的事件数据
对象建立时用户能够选择传入事件类型字符串type_做为参数。dict_字典用于保存具体事件相关的数据信息,以供事件处理函数进行操做。
事件引擎的事件处理线程__thread中执行连续运行工做的函数为__run:当事件引擎的开关__active没有被关闭时,引擎尝试从事件队列中读取最新的事件,若读取成功则当即调用__process函数处理该事件,若没法读取(队列为空)则进入阻塞状态节省CPU资源,当阻塞时间(默认为1秒)结束时再次进入以上循环。
__process函数工做时,首先检查事件对象的事件类型在__handlers字典中是否存在,若存在(说明有事件处理函数在监听该事件)则按照注册顺序调用监听函数列表中的事件处理函数进行相关操做。
事件引擎中的__timer是一个PyQt中的QTimer对象,提供的功能很是简单:每隔一段时间(由用户设定)自动运行函数__onTimer。__onTimer函数会建立一个类型为EVENT_TIMER(在eventType.py文件中定义)的事件对象,并调用引擎的put方法存入到事件队列中。
敏感的读者可能已经意识到了,这个计时器本质上是一个由时间驱动的功能。尽管咱们在前文中提到了事件驱动在量化交易平台开发中的重要性,但不能否认某些交易功能的实现必须基于时间驱动,例如:下单后若2秒不成交则当即撤单、每隔5分钟将当日的成交记录保存到数据库中等。这类功能在实现时就能够选择使用事件处理函数对EVENT_TIMER类型的计时器事件进行监听(参考下一章节“事件驱动引擎使用”中的示例)。
用户能够经过start和stop两个方法来启动和中止事件驱动引擎,原理很简单读者能够直接参考源代码。
当启动计时器时,事件间隔默认设定为了1秒(1000毫秒),这个参数用户能够视乎本身的需求进行调整。假设用户使用时间驱动的函数工做间隔为分钟级,则能够选择将参数设置为60秒(600000毫秒),以此类推。
一样在eventEngine.py中,包含了一段测试代码test函数,用来展现事件驱动引擎的使用方法:
#---------------------------------------------------------------------- def test(): """测试函数""" import sys from datetime import datetime from PyQt4.QtCore import QCoreApplication def simpletest(event): print u'处理每秒触发的计时器事件:%s' % str(datetime.now()) app = QCoreApplication(sys.argv) ee = EventEngine() ee.register(EVENT_TIMER, simpletest) ee.start() app.exec_() # 直接运行脚本能够进行测试 if __name__ == '__main__': test()
test函数总体上包含了这几步:
导入相关的包(sys、datetime、PyQt4),注意因为EventEngine的实现中使用了PyQt4的QTimer类,所以整个程序的运行必须包含在Qt事件循环中,即便用QCoreApplication(或者PyQt4.QtGui中的QApplication)的exec_()方法在程序主线程中启动事件循环。
定义一个简单的函数simpletest,该函数包含一个输入参数event对象,函数被调用后会打印一段字符以及当前的时间
建立QCoreApplication对象app
建立事件驱动引擎EventEngine对象ee
向引擎中注册simpletest函数对定时器事件EVENT_TIMER的监听
启动事件驱动引擎
启动Qt事件循环
总体上看,当用户开发本身的程序时,须要修改的只是第2步和第5步:建立本身的事件处理函数并将这些函数注册到相应的事件类型上进行监听。
有了API接口和事件驱动引擎,接下来咱们能够开始开发本身的平台了,后面的几篇文章将会一步步展现一个简单的LTS交易平台的开发过程。