最近看Tornado源码给了我很多启发,心血来潮决定本身试着只用python标准库来实现一个异步非阻塞web框架。花了点时间感受还能够,一百多行的代码已经能够撑起一个极简框架了。html
须要的相关知识点:python
掌握上面三个点的知识就彻底没有问题,不是很清楚的同窗我也推荐几篇参考文章git
HTTP协议详细介绍(http://www.javashuo.com/article/p-ggogpsqq-ha.html)github
Python篇-IO多路复用详解(https://www.jianshu.com/p/818f27379a5e)web
Python异步IO之协程(一):从yield from到async的使用(https://blog.csdn.net/SL_World/article/details/86597738)数据库
实验环境:浏览器
python 3.7.3
因为在框架中会使用到async/await关键字,因此只要确保python版本在3.5以上便可。服务器
咱们的框架要实现最基本的几个功能:并发
固然一个完善的web框架须要实现的远远不止这些,这里咱们如今只须要它能跑起来就足够了。app
HTTP是基于TCP/IP通讯协议来实现数据传输,与通常的C/S相比,它的特色在于当客户端(浏览器)向服务端发起HTTP请求,服务端响应数据后双方立马断开链接,服务端没法主动向客户端发送数据。HTTP协议数据传输内容分为请求头和请求体,请求头和请求体之间使用"\r\n\r\n"进行分隔。在请求头中,第一行包含了请求方式,请求路径和HTTP协议,此后每一行以key: value的形式传输数据。
对于咱们的web服务端来讲,须要的就是解析http请求和处理http响应。
咱们经过写两个类,HttpRequest和HttpResponse来实现。
HttpRequest设计目标是解析从socket接收request数据
1 class HttpRequest(object): 2 def __init__(self, content: bytes): 3 self.content = content.decode('utf-8') 4 self.headers = {} 5 self.GET = {} 6 self.url = '' 7 self.full_path = '' 8 self.body = '' 9 try: 10 header, self.body = self.content.split('\r\n\r\n') 11 temp = header.split('\r\n') 12 first_line = temp.pop(0) 13 self.method, self.url, self.protocol = first_line.split(' ') 14 self.full_path = self.url 15 for t in temp: 16 k, v = t.split(': ', 1) 17 self.headers[k] = v 18 except Exception as e: 19 print(e) 20 if len(self.url.split('?')) > 1: # 解析GET参数 21 self.url = self.full_path.split('?')[0] # 把url中携带的参数去掉 22 parms = self.full_path.split('?')[1].split('&') 23 for p in parms: # 将GET参数添加到self.GET字典 24 k, v = p.split('=') 25 self.GET[k] = v
在类中,咱们实现解析http请求的headers、method、url和GET参数,其实还有不少事情没有作,好比使用POST传输数据时,数据是在请求体中,针对这部份内容我并无开始写,缘由在于本文主要目的仍是异步非阻塞框架,目前的功能已经足以支持咱们进行下一步实验了。
HTTP响应也能够分为响应头和响应体,咱们能够很简单的实现一个response:
1 class HttpResponse(object): 2 def __init__(self, data: str): 3 self.status_code = 200 # 默认响应状态 200 4 self.headers = 'HTTP/1.1 %s OK\r\n' 5 self.headers += 'Server:AsyncWeb' 6 self.headers += '\r\n\r\n' 7 self.data = data 8 9 @property 10 def content(self): 11 return bytes((self.headers + self.data) % self.status_code, encoding='utf8')
HttpResponse中并无作太多的事情,接受一个字符串,并使用content返回一个知足HTTP响应格式的bytes。
从用户调用角度,可使用return HttpResponse("欢迎来到AsynicWeb")来返回数据。
咱们也能够简单的定义一个404页面:
Http404 = HttpResponse('<html><h1>404</h1></html>') Http404.status_code = 404
路由映射简单理解就是从一个URL地址找到对应的逻辑函数。举个例子,咱们访问http://127.0.0.1:8000这个页面,在http请求中它的url是"/",在web服务器中有一个函数index,web服务器可以由url地址"/"找到函数index,这就是一个路由映射。
其实路由映射实现起来很是简单。咱们只要定义一个映射列表,列表中的每一个元素包含url和逻辑处理(视图函数)两部分,当一个http请求到达的时候,遍历映射列表,使用正则匹配每个url,若是请求的url和映射表中的相同,咱们就能够取出对应的视图函数。
路由映射表是彻底由用户来定义映射关系的,它应该使用一个咱们定义的标准结构,好比:
routers = [ ('/$', IndexView), ('/home', asy) ]
视图是指可以根据一个请求,执行某些逻辑运算,最终返回响应的模块。说到这里,一个web框架的运行流程就出来了:
http请求——路由映射表——视图——执行视图获取返回值——http响应
在咱们的框架中,借鉴Django的设计,咱们让它支持类视图(CBV)和函数视图(FBV)两种模式。
对于函数视图,彻底由用户本身定义,只要至少可以接受一个request参数便可
对于类视图,咱们须要作一些预处理,确保用户按咱们的规则来实现类视图。
定义一个View类:
1 class View(object): 2 # CBV应继承View类 3 def dispatch(self, request): 4 method = request.method.lower() 5 if hasattr(self, method): 6 return getattr(self, method)(request) 7 else: 8 return Http404
在View类中,咱们只写了一个dispatch方法,其实就作了一件事:反射。当咱们在路由映射表中找对应的视图时,若是判断视图属于类,咱们就调用dispatch方法。
从用户角度来看,实现一个CBV只须要继承View类,而后经过定义get、post、delete等方法来实现不一样的处理。
上面几个小节实现了web框架的大致执行路径,从这节开始咱们实现web服务器的核心。
经过IO多路复用能够达到单线程实现高并发的效果,一个标准的IO多路复用写法:
1 server = socket(AF_INET, SOCK_STREAM) 2 server.bind(("127.0.0.1", 8000)) 3 server.setblocking(False) # 设置非阻塞 4 server.listen(128) 5 Future_Task_Wait = {} 6 rlist = [server, ] 7 while True: 8 r, w, x = select.select(rlist, [], [], 0.1) 9 for o in r: 10 if o == server: 11 '''判断o是server仍是conn''' 12 conn, addr = o.accept() 13 conn.setblocking(False) # 设置非阻塞 14 rlist.append(conn) # 客户链接 加入轮询列表 15 else: 16 data = b"" 17 while True: # 接收客户传输数据 18 try: 19 chunk = o.recv(1024) 20 data = data + chunk 21 except Exception as e: 22 chunk = None 23 if not chunk: 24 break 25 dosomething(o, data, routers) # 拿到数据干点啥
经过这段代码咱们能够得到全部的请求了,下一步就是处理这些请求。
咱们就定义一个dosomething函数
1 import re 2 import time 3 from types import FunctionType 4 5 def dosomething(o, data, routers): 6 '''解析http请求,寻找映射函数并执行获得结果
7 :param o: socket链接对象 8 :param data: socket接收数据 9 :return: 响应结果 10 ''' 11 request = HttpRequest(data) 12 print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0], 13 request.method, request.url) 14 flag = False 15 for router in routers: 16 if re.match(router[0], request.url): 17 target = router[1] 18 flag = True 19 break 20 if flag: 21 # 判断targe是函数仍是类 22 if isinstance(target, FunctionType): 23 result = target(request) 24 elif issubclass(target, View): 25 result = target().dispatch(request) 26 else: 27 result = Http404 28 else: 29 result = Http404 30 return result
这段代码作了这么几件事。1.实例化HttpRequest;2.使用正则遍历路由映射表;3.将request传入视图函数或类视图的dispatch方法;4.拿到result结果
咱们经过result = dosomething(o, data, routers)能够拿到结果,接下来咱们只须要把结果发回给客户端并断开链接就能够了
o.sendall(result.content) # 因为result是一个HttpResponse对象 咱们使用content属性
rlist.remove(o) # 从轮询中删除链接
o.close() # 关闭链接
至此,咱们的web框架已经搭建好了。
但它仍是一个同步的框架,在咱们的服务端中,其实一直经过while循环在监听select是否变化,假如咱们在视图函数中添加IO操做,其余链接依然会阻塞等待,接下来让咱们的框架实现对协程的支持。
在实现协程以前,咱们先聊聊Tornado的Future对象。能够说Tornado异步非阻塞的实现核心就是Future。
Future对象内部维护了一个重要属性_result,这是一个标记位,一个刚实例化的Future内部的_result=None,咱们能够经过其余操做来更改_result的状态。另外一方面,咱们能够一直监听每一个Future对象的_result状态,若是发生变化就执行某些特定的操做。
咱们在第六节定义的dosomething函数中拿到了一个result,它应当是一个HttpResponse对象,那么能不能返回一个Future对象呢。
假如result是一个Future对象,咱们的服务端不立马返回结果,而是把Future放进另外一个轮询列表中,当Future内的_result改变时再返回结果,就达到了异步的效果。
咱们也能够定义一个Future类,这个类维护只一个变量result:
1 class Future(object): 2 def __init__(self): 3 self.result = None
对于框架使用者来讲,在视图函数要么返回一个HttpResponse对象表明当即返回,要么返回一个Future对象说你先别管我,我把事情干完了再通知你返回结果。
既然视图函数返回的可能不仅是HttpResponse对象,那么咱们就须要对第六步的代码增长额外的处理:
Future_Task_Wait = {} # 定义一个异步Future字典 result = dosomething() # 拿到结果后执行下面判断 if isinstance(result, Future): Future_Task_Wait[o] = result # Futre对象则加入字典 else: o.sendall(result.content) # 非Future对象直接返回结果并断开链接 rlist.remove(o) o.close()
在while True轮询内再增长一段代码,遍历Future_Task_Wait字典:
rm_conn = [] # 须要移除列表的conn for conn, future in Future_Task_Wait.items(): if future.result: try: conn.sendall(HttpResponse(data=future.result).content) # 返回result finally: rlist.remove(conn) conn.close() rm_conn.append(conn) for conn in rm_conn: # 在字典中删除conn del Future_Task_Wait[conn]
这样,咱们就能够返回一个Future来告诉服务器这是未来才返回的对象。
那回归正题,咱们到底该如何使用协程?这里我用的方法是建立一个子线程来执行协程事件循环,主线程永远在监听socket。
from threading import Thread def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() coroutine_loop = asyncio.new_event_loop() # 建立协程事件循环 run_loop_thread = Thread(target=start_loop, args=(coroutine_loop,)) # 新起线程运行事件循环, 防止阻塞主线程 run_loop_thread.start() # 运行线程,即运行协程事件循环
当咱们要把asyncdo方法添加做为协程任务时
asyncio.run_coroutine_threadsafe(asyncdo(), coroutine_loop)
好了,异步非阻塞的核心代码分析的差很少了,将六七节的代码整合写成一个类
1 import re 2 import time 3 import select 4 import asyncio 5 from socket import * 6 from threading import Thread 7 from types import FunctionType 8 from http.response import Http404, HttpResponse 9 from http.request import HttpRequest 10 from views import View 11 from core.future import Future 12 13 class App(object): 14 # web应用程序 15 coroutine_loop = None 16 17 def __new__(cls, *args, **kwargs): 18 # 使用单例模式 19 if not hasattr(cls, '_instance'): 20 App._instance = super().__new__(cls) 21 return App._instance 22 23 def listen(self, host, port, routers): 24 # IO多路复用监听链接 25 server = socket(AF_INET, SOCK_STREAM) 26 server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1) 27 server.bind((host, port)) 28 server.setblocking(False) 29 server.listen(128) 30 Future_Task_Wait = {} 31 rlist = [server, ] 32 while True: 33 r, w, x = select.select(rlist, [], [], 0.01) 34 for o in r: 35 if o == server: 36 '''判断o是server仍是conn''' 37 conn, addr = o.accept() 38 conn.setblocking(False) 39 rlist.append(conn) 40 else: 41 data = b"" 42 while True: 43 try: 44 chunk = o.recv(1024) 45 data = data + chunk 46 except Exception as e: 47 chunk = None 48 if not chunk: 49 break 50 try: 51 request = HttpRequest(data, o) 52 print(time.strftime("【%Y-%m-%d %X】",time.localtime()), o.getpeername()[0], 53 request.method, request.url) 54 flag = False 55 for router in routers: 56 if re.match(router[0], request.url): 57 target = router[1] 58 flag = True 59 break 60 if flag: 61 # 判断targe是函数仍是类 62 if isinstance(target, FunctionType): 63 result = target(request) 64 elif issubclass(target, View): 65 result = target().dispatch(request) 66 else: 67 result = Http404 68 else: 69 result = Http404 70 # 判断result是否是future 71 if isinstance(result, Future): 72 Future_Task_Wait[o] = result 73 else: 74 o.sendall(result.content) 75 rlist.remove(o) 76 o.close() 77 except Exception as e: 78 print(e) 79 rm_conn = [] 80 for conn, future in Future_Task_Wait.items(): 81 if future.result: 82 try: 83 conn.sendall(HttpResponse(data=future.result).content) 84 finally: 85 rlist.remove(conn) 86 conn.close() 87 rm_conn.append(conn) 88 for conn in rm_conn: 89 del Future_Task_Wait[conn] 90 91 def run(self, host='127.0.0.1', port=8000, routers=()): 92 # 主线程select多路复用,处理http请求和响应 93 # 给协程单首创建一个子线程,负责处理View函数提交的协程 94 def start_loop(loop): 95 asyncio.set_event_loop(loop) 96 loop.run_forever() 97 self.coroutine_loop = asyncio.new_event_loop() # 建立协程事件循环 98 run_loop_thread = Thread(target=start_loop, args=(self.coroutine_loop,)) # 新起线程运行事件循环, 防止阻塞主线程 99 run_loop_thread.start() # 运行线程,即运行协程事件循环 100 self.listen(host, port, routers)
如今,能够测试咱们的web框架了。
1 import asyncio 2 from core.server import App 3 from views import View 4 from http.response import * 5 from core.future import Future 6 7 8 class IndexView(View): 9 def get(self, request): 10 return HttpResponse('欢迎来到首页') 11 12 def post(self, request): 13 return HttpResponse('post') 14 15 def asy(request): 16 future = Future() 17 print('异步调用') 18 wait = request.url.split('/')[-1] 19 try: 20 wait = int(wait) 21 except: 22 wait = 5 23 asyncio.run_coroutine_threadsafe(dosomething(future, wait), app.coroutine_loop) 24 print('返回Future') 25 return future 26 27 async def dosomething(future, wait): 28 # 异步函数 29 await asyncio.sleep(wait)# 模拟异步操做 30 future.result = '等待了%s秒' % wait 31 32 routers = [ 33 ('/$', IndexView), 34 ('/home', asy) 35 ] 36 37 # 从用户角度只需使用run() 38 app = App() 39 app.run('127.0.0.1', 8080, routers=routers)
浏览器访问http://127.0.0.1:8080,返回没有问题,若是有同窗使用Chrome可能会乱码,那是由于咱们的HttpResponse没有返回指定编码,添加一个响应头便可。
浏览器访问http://127.0.0.1:8080/home,这时候会执行协程,默认等待5s后返回结果,你能够在多个标签页访问这个地址,经过等待时间来验证咱们的异步框架是否正常工做。
至此,咱们要实现的异步非阻塞web框架已经完成了。固然这个框架说到底仍是太简陋,后续彻底能够优化HttpRequest和HttpResponse、增长对数据库、模板语言等等组件的扩展。
完整源码已经上传至https://github.com/sswest/AsyncWeb