做用:为每一个线程建立一个独立的空间,使得线程对本身的空间中的数据进行操做(数据隔离)。css
应用:html
""" import threading from threading import local import time obj = local() def task(i): obj.xxxxx = i time.sleep(2) print(obj.xxxxx,i) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start() """ """ import threading from threading import local def task(i): print(threading.get_ident(),i) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start() """ """ import time import threading import greenlet DIC = {} def task(i): # ident = threading.get_ident() ident = greenlet.getcurrent() if ident in DIC: DIC[ident]['xxxxx'] = i else: DIC[ident] = {'xxxxx':i } time.sleep(2) print(DIC[ident]['xxxxx'],i) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start() """ import time import threading try: import greenlet get_ident = greenlet.getcurrent except Exception: get_ident = threading.get_ident class Local(object): DIC = {} def __getattr__(self, item): ident = get_ident() if ident in self.DIC: return self.DIC[ident].get(item) return None def __setattr__(self, key, value): ident = get_ident() if ident in self.DIC: self.DIC[ident][key] = value else: self.DIC[ident] = {key: value} obj = Local() def task(i): obj.xxxxx = i time.sleep(2) print(obj.xxxxx, i) for i in range(10): t = threading.Thread(target=task, args=(i, )) t.start()
- 如何获取一个线程的惟一标记? threading.get_ident() - 根据字典自定义一个相似于threading.local功能? import time import threading DIC = {} def task(i): ident = threading.get_ident() if ident in DIC: DIC[ident]['xxxxx'] = i else: DIC[ident] = {'xxxxx':i } time.sleep(2) print(DIC[ident]['xxxxx'],i) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start() - 根据字典自定义一个为每一个协程开辟空间进行存取数据。 import time import threading import greenlet DIC = {} def task(i): # ident = threading.get_ident() ident = greenlet.getcurrent() if ident in DIC: DIC[ident]['xxxxx'] = i else: DIC[ident] = {'xxxxx':i } time.sleep(2) print(DIC[ident]['xxxxx'],i) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start() - 经过getattr/setattr 构造出来 threading.local的增强版(协程) import time import threading try: import greenlet get_ident = greenlet.getcurrent except Exception as e: get_ident = threading.get_ident class Local(object): DIC = {} def __getattr__(self, item): ident = get_ident() if ident in self.DIC: return self.DIC[ident].get(item) return None def __setattr__(self, key, value): ident = get_ident() if ident in self.DIC: self.DIC[ident][key] = value else: self.DIC[ident] = {key:value} obj = Local() def task(i): obj.xxxxx = i time.sleep(2) print(obj.xxxxx,i) for i in range(10): t = threading.Thread(target=task,args=(i,)) t.start()
应用:DBUtils中为每一个线程建立一个数据库链接时使用java
两类请求:请求上下文,app上下文python
上下文管理的实现(重点)mysql
当请求到来的时候, flask会把请求相关和session相关信息封装到一个ctx = RequestContext对象中, 会把app和g封装到app_ctx = APPContext对象中去, 经过localstack对象将ctx、app_ctx对象封装到local对象中 问题: local是什么?做用? 为每一个线程建立一个独立的空间,使得线程对本身的空间中的数据进行操做(数据隔离) localstack是什么?做用 storage = { 1234:{stack:[ctx,]} } localstack将local中的stack中的维护成一个栈 获取数据(执行视图函数的时候) 经过导入一个模块,用模块.的方式获取咱们想要的数据 实现细节: 经过localproxy对象+偏函数,调用localstack去local中获取对应的响应ctx、app_ctx中封装值 问题:为何要把ctx = request/session app_ctx = app/g 由于离线脚本须要使用app_ctx 请求结束 调用localstk的pop方法,将ctx和app_ctx移除
基础知识web
- 偏函数:给函数取一个默认的参数,这样可使函数没必要每一次都传参。 应用:request = LocalProxy(partial(_lookup_req_object, 'request')) session = LocalProxy(partial(_lookup_req_object, 'session')) import functools def index(a1,a2): return a1 + a2 # 原来的调用方式 # ret = index(1,23) # print(ret) # 偏函数,帮助开发者自动传递参数 new_func = functools.partial(index,666) ret = new_func(1) print(ret)
- super和执行类的区别? """ class Base(object): def func(self): print('Base.func') class Foo(Base): def func(self): # 方式一:根据mro的顺序执行方法 # super(Foo,self).func() # 方式二:主动执行Base类的方法 # Base.func(self) print('Foo.func') obj = Foo() obj.func() """ #################################### class Base(object): def func(self): super(Base, self).func() print('Base.func') class Bar(object): def func(self): print('Bar.func') class Foo(Base,Bar): pass # 示例一 # obj = Foo() # obj.func() # print(Foo.__mro__) # 示例二 # obj = Base() # obj.func()
# -*- coding: utf-8 -*- ''' # @Datetime: 2018/12/23 # @author: Zhang Yafei ''' ''' class Base(object): def func(self): print('Base_func') class Foo(Base): def func(self): # 方式一:根据mro的顺序执行方法 # super(Foo, self).func() # 方式二:主动执行Base类的方法 # Base.func(self) print('Foo_func') obj = Foo() obj.func() ''' ############################### super和执行父类的方法的区别 ################### class Base(object): def func(self): super(Base, self).func() print('Base_func') class Bar(object): def func(self): print('Bar_func') class Foo(Base, Bar): pass obj = Foo() obj.func() print(Foo.__mro__) # obj = Base() print(Base.__mro__) # obj.func()
# -*- coding: utf-8 -*- ''' # @Datetime: 2018/12/23 # @author: Zhang Yafei ''' class Foo(object): def __init__(self): object.__setattr__(self, 'storage', {}) def __setattr__(self, key, value): print(key, value, self.storage) obj = Foo() obj.xx = 123
class Stack(object): """栈""" def __init__(self): self.__list = [] #私有变量,不容许外部调用者对其进行操做 def push(self,item): """添加一个新的元素item到栈顶""" self.__list.append(item) #顺序表尾部插入时间复杂度O(1),头部插入O(n),故尾部方便 #self.__list.insert(0,item) #链表表尾部插入时间复杂度O(n),头部插入O(1),故链表用头插方便 def pop(self): """弹出栈顶元素""" return self.__list.pop() if __name__ == '__main__': s = Stack() s.push(1) s.push(2) s.push(3) s.push(4) print(s.pop()) print(s.pop()) print(s.pop()) print(s.pop())
''' # @Datetime: 2018/12/25 # @author: Zhang Yafei ''' class Obj(object): def func(self): pass # 执行方法一 # obj = Obj() # obj.func() # 方法 # 执行方法二 # obj = Obj() # Obj.func(obj) # 函数 from types import FunctionType, MethodType obj = Obj() print(isinstance(obj.func, FunctionType)) # False print(isinstance(obj.func, MethodType)) # True print(isinstance(Obj.func, FunctionType)) # True print(isinstance(Obj.func, MethodType)) # Flase """ 总结:函数和方法的区别 1.被类调用的是函数,被对象调用的是方法 2.所有参数本身传的是函数,自动传的是方法(好比self) """
# -*- coding: utf-8 -*- """ @Datetime: 2018/12/29 @Author: Zhang Yafei """ """方式一""" # my_singleton.py # # class Singleton(object): # pass # # singleton = Singleton() # # from mysingleton import singleton """方式二:使用装饰器""" # def Singleton(cls): # _instance = {} # # def _singleton(*args, **kargs): # if cls not in _instance: # _instance[cls] = cls(*args, **kargs) # return _instance[cls] # # return _singleton # # # @Singleton # class A(object): # a = 1 # # def __init__(self, x=0): # self.x = x # # # a1 = A(2) # a2 = A(3) """方式三:使用类""" # import threading # import time # # # class Singleton(object): # # def __init__(self): # # time.sleep(1) # pass # @classmethod # def instance(cls, *args, **kwargs): # if not hasattr(Singleton, "_instance"): # Singleton._instance = Singleton(*args, **kwargs) # return Singleton._instance # # # def task(arg): # obj = Singleton.instance() # print(obj) # # # for i in range(10): # t = threading.Thread(target=task,args=[i,]) # t.start() """解决方法:加锁""" # import time # import threading # # # class Singleton(object): # _instance_lock = threading.Lock() # # def __init__(self): # time.sleep(1) # # @classmethod # def instance(cls, *args, **kwargs): # if not hasattr(Singleton, "_instance"): # with Singleton._instance_lock: # if not hasattr(Singleton, "_instance"): # Singleton._instance = Singleton(*args, **kwargs) # return Singleton._instance # # # # # def task(arg): # obj = Singleton.instance() # print(obj) # # for i in range(10): # t = threading.Thread(target=task,args=[i,]) # t.start() # time.sleep(20) # obj = Singleton.instance() # print(obj) """方法四:基于__new__方法""" # # import threading # # class Singleton(object): # _instance_lock = threading.Lock() # # def __init__(self): # pass # # def __new__(cls, *args, **kwargs): # if not hasattr(Singleton, "_instance"): # with Singleton._instance_lock: # if not hasattr(Singleton, "_instance"): # Singleton._instance = object.__new__(cls) # return Singleton._instance # # # obj1 = Singleton() # obj2 = Singleton() # print(obj1,obj2) # # # def task(arg): # obj = Singleton() # print(obj) # # # for i in range(10): # t = threading.Thread(target=task,args=[i,]) # t.start() # """方法五:元类实现单例模式""" import threading class SingletonType(type): _instance_lock = threading.Lock() def __call__(cls, *args, **kwargs): if not hasattr(cls, "_instance"): with SingletonType._instance_lock: if not hasattr(cls, "_instance"): cls._instance = super(SingletonType,cls).__call__(*args, **kwargs) return cls._instance class Foo(metaclass=SingletonType): def __init__(self,name): self.name = name obj1 = Foo('name') obj2 = Foo('name') print(obj1,obj2)
执行流程redis
请求到来时候:wsgi会触发__call__方法,由__call__方法再次调用wsgi_app方法 -在wsgi_app方法中: 首先将 请求相关+空session 封装到一个RequestContext对象中,即ctx 将ctx交给LocalStack中,再由localstack将ctx添加到local中,local结构: __storage__ = { 1231:{stack:[ctx,]} } -根据请求中的cookie中提取名称为sessionnid对应的值,对cookie进行加密+反序列化,再次赋值给ctx中的session -> 视图函数 - 把session中的数据再次写入到cookie中 -将ctx删除 -结果返回用户浏览器 -断开socket链接 # ctx = RequestContext(self, environ) # self是app对象,environ请求相关的原始数据 # ctx.request = Request(environ) # ctx.session = None # 将包含了request/session的ctx对象放到“空调” { 1232:{ctx:ctx对象} 1231:{ctx:ctx对象} 1211:{ctx:ctx对象} 1111:{ctx:ctx对象} 1261:{ctx:ctx对象} } 视图函数: from flask import reuqest,session request.method 请求结束: 根据当前线程的惟一标记,将“空调”上的数据移除。
主要内容sql
a. 温大爷:wsig, run_simple() 自动执行__call__方法,__call__方法调用wsgi_app()方法 b. 赵毅: ctx = ReuqestContext(session,request) # 将request和session封装成一个ctx对象 ctx.push() # 经过localstack添加到local(空调)中 c. 刘松:LocalStack,把ctx对象添加到local中 LocalStack的做用:线程标记做为字典的key,字典里面包含key为stack的字典,value为一个列表, localstack的做用就是将这个列表维护成一个stack. d. 空调:Local __storage__={ 1321:{stack:[ctx,]} }
a. 温大爷:wsig b. 赵毅: ctx = ReuqestContext(session=None,request) ctx.push() c. 刘松:LocalStack,把ctx对象添加到local中 d. 空调:Local __storage__={ 1321:{stack:[ctx,]} } e. 郭浩:经过刘松获取ctx中的session,给session赋值(从cookie中读取数据) => open_session
pip3 install flask-session 掌握: - 使用 import redis from flask import Flask,request,session from flask.sessions import SecureCookieSessionInterface from flask_session import Session app = Flask(__name__) # app.session_interface = SecureCookieSessionInterface() # app.session_interface = RedisSessionInterface() app.config['SESSION_TYPE'] = 'redis' app.config['SESSION_REDIS'] = redis.Redis(host='140.143.227.206',port=6379,password='1234') Session(app) @app.route('/login') def login(): session['user'] = 'alex' return 'asdfasfd' @app.route('/home') def index(): print(session.get('user')) return '...' if __name__ == '__main__': app.run() - 原理: - session数据保存到redis session:随机字符串1:q23asifaksdfkajsdfasdf session:随机字符串2:q23asifaksdfkajsdfasdf session:随机字符串3:q23asifaksdfkajsdfasdf session:随机字符串4:q23asifaksdfkajsdfasdf session:随机字符串5:q23asifaksdfkajsdfasdf - 随机字符串返回给用户,浏览器。 随机字符串 源码: from flask_session import RedisSessionInterface
- 程序启动: 两个Local: local1 = { } local2 = { } 两个LocalStack: _request_ctx_stack _app_ctx_stack - 请求到来 对数据进行封装: ctx = RequestContext(request,session) app_ctx = AppContext(app,g) 保存数据: 将包含了(app,g)数据的app_ctx对象,利用 _app_ctx_stack(贝贝,LocalStack())将app_ctx添加到Local中 storage = { 1231:{stack:[app_ctx(app,g),]} } 将包含了request,session数据的ctx对象,利用_request_ctx_stack(刘淞,LocalStack()),将ctx添加到Local中 storage = { 1231:{stack:[ctx(request,session),]} - 视图函数处理: from flask import Flask,request,session,current_app,g app = Flask(__name__) @app.route('/index') def index(): # 去请求上下文中获取值 _request_ctx_stack request.method # 找小东北获取值 session['xxx'] # 找龙泰获取值 # 去app上下文中获取值:_app_ctx_stack print(current_app) print(g) return "Index" if __name__ == '__main__': app.run() app.wsgi_app - 结束 _app_ctx_stack.pop() _request_ctx_stack.pop() app上下文管理执行流程
请求上下文和应用上下文须要先放入Local中,才能获取到数据库
# -*- coding: utf-8 -*- """ @Datetime: 2019/1/4 @Author: Zhang Yafei """ from flask import Flask, current_app, request, session, g app = Flask(__name__) # 错误:程序解释的时候尚未执行__call__方法,尚未将上下文信息放入local中,因此取不到会报错 # print(current_app.config) @app.route('/index') def index(): # 正确 print(current_app.config) return 'index' if __name__ == '__main__': app.run()
# -*- coding: utf-8 -*- """ @Datetime: 2018/12/31 @Author: Zhang Yafei """ from web import db, create_app from flask import current_app # 错误,尚未放入local中 # print(current_app.config) # RuntimeError: Working outside of application context. app = create_app() # app_ctx = app/g app_ctx = app.app_context() with app_ctx: # __enter__,经过LocalStack放入Local中 # db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置 # 正确 print(current_app.config)
1. Flask中g的生命周期? (1) 当一个请求到来时,flask会把app和g封装成一个AppContext对象, (2) 经过_app_ctx_stack = LocalStack()(做用是将local维护成一个栈),将g存放到local中. app_ctx = _app_ctx_stack.top app_ctx.push() _app_ctx_stack.push(self) rv = getattr(self._local, 'stack', None) rv.append(obj) (3) 执行视图函数: g = LocalProxy(partial(_lookup_app_object, 'g')) g.x 经过localProxy对象去local中取g (4) _app_ctx_stack.pop() # 将g清空 - 一次请求声明周期结束 注:两次请求的g不同 2. g和session同样吗? g只能在同一次请求起做用,请求结束以后就销毁了,昙花一现 但session请求完以后不销毁,不一样请求均可以使用 3. g和全局变量同样吗? g是项目启动的时候建立的,且一次请求结束以后删除,能够经过线程标记实现多线程 全局变量屡次请求不删除,能够共用,但不能多线程
class UrlManager(object): @staticmethod def buildUrl(path): return path @staticmethod def buildStaticUrl(path): path = path + '?ver=' + '201901292105' return UrlManager.buildUrl(path) url = url_for('index') # /index url_1 = UrlManager.buildUrl('/api') # /api url_2 = UrlManager.buildStaticUrl('/css/bootstrap.css') # /css/bootstrap.css?ver=201901292105 msg = 'Hello World,url:{} url_1:{} url_2:{}'.format(url, url_1, url_2) # .....
A logging.Logger object for this application. The default configuration is to log to stderr if the application is in debug mode. This logger can be used to (surprise) log messages. Here some examples: app.logger.debug('A value for debugging') app.logger.warning('A warning occurred (%d apples)', 42) app.logger.error('An error occurred')
@app.errorhandler(404) def page_not_found(error): app.logger.error(error) return 'This page does not exist', 404
from DBUtils.PooledDB import PooledDB, SharedDBConnection POOL = PooledDB( creator=pymysql, # 使用连接数据库的模块 maxconnections=6, # 链接池容许的最大链接数,0和None表示不限制链接数 mincached=2, # 初始化时,连接池中至少建立的空闲的连接,0表示不建立 maxcached=5, # 连接池中最多闲置的连接,0和None不限制 maxshared=3, # 连接池中最多共享的连接数量,0和None表示所有共享。PS: 无用,由于pymysql和MySQLdb等模块的 threadsafety都为1,全部值不管设置为多少,_maxcached永远为0,因此永远是全部连接都共享。 blocking=True, # 链接池中若是没有可用链接后,是否阻塞等待。True,等待;False,不等待而后报错 maxusage=None, # 一个连接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always host='127.0.0.1', port=3306, user='root', password='0000', database='flask_code', charset='utf8' ) -注意:使用数据库链接池 from settings import POOL conn = POOL.connection() cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
-使用 生成HTML标签 form表单验证 -安装 pip install wtforms -使用 示例:用户登陆 用户注册 实时更新:从数据库获取字段,静态字段只在程序扫描第一遍时执行。 解决方法:1.重启 2. 重写form的构造方法,每一次实例化都动态更新须要更新的字段 问题: 1. form对象为何能够被for循环? 答:变为可迭代对象: class Obj(object): def __iter__(self): # return iter([1,2,3]) yield 1 yield 2 yield 3 obj = Obj() for item in obj: print(item) 2. new方法的返回值决定对象究竟是什么? class BAR(object): def __init__(self, cls): self.cls = cls class NEW_OBJ(object): def __new__(cls, *args, **kwargs): # return super(NEW_OBJ, cls).__new__(cls, *args, **kwargs) # <__main__.NEW_OBJ object at 0x000000D445061CF8> # return 123 # 123 # return BAR # <class '__main__.BAR'> # return BAR() # <__main__.BAR object at 0x000000AD77141C50> return BAR(cls) # <__main__.BAR object at 0x0000003BFFA31D68> obj = NEW_OBJ() print(obj) 3. metaclass 1. 建立类时,先执行metaclass(默认为type)的__init__方法 2. 类在实例化时, 执行metaclass(默认为type)的__call__方法,__call__方法的返回值就是实例化的对象 __call__内部调用: - 类.__new__方法:建立对象 _ 类.__init__方法:对象的初始化 class MyType(type): def __init__(self, *args, **kwargs): print('MyType的__init__') super(MyType, self).__init__(*args, **kwargs) def __call__(cls, *args, **kwargs): print('MyType的__call__') obj3 = cls.__new__(cls) cls.__init__(obj3) return obj class Obj3(object, metaclass=MyType): x = 123 def __init__(self): print('Obj3的__init__') def __new__(cls, *args, **kwargs): print('Obj3的__new__') return object.__new__(cls) def func(self): return 666 源码: -类的建立 type.__init__ -对象的建立: type.__call__ 类.__new__ 类.__init__ Meta - __mro__ - 应用:wtforms中meta使用(meta做用定制csrf token) 钩子函数 class LoginForm(Form): name = simple.StringField( validators=[ validators.DataRequired(message='用户名不能为空.'), ], widget=widgets.TextInput(), render_kw={'placeholder':'请输入用户名'} ) pwd = simple.PasswordField( validators=[ validators.DataRequired(message='密码不能为空.'), ], render_kw={'placeholder':'请输入密码'} ) def validate_name(self, field): """ 自定义name字段规则 :param field: :return: """ # 最开始初始化时,self.data中已经有全部的值 print('钩子函数获取的值',field.data) if not field.data.startswith('old'): raise validators.ValidationError("用户名必须以old开头") # 继续后续验证 # raise validators.StopValidation("用户名必须以old开头") # 再也不继续后续验证 总结: 记住: https://www.cnblogs.com/wupeiqi/articles/8202357.html 理解: - metaclass - __new__ - __mro__
pip install flask-sqlalchemy
导入并实例化SQLAlchemy from flask_sqlalchemy import SQLAlchemy db = SQLAlchemy() 注意事项: - 必须在导入蓝图以前 - 必须导入models.py
db.init_app(app)
# ##### SQLALchemy配置文件 ##### SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:0000@127.0.0.1:3306/flask_web?charset=utf8" SQLALCHEMY_POOL_SIZE = 10 SQLALCHEMY_MAX_OVERFLOW = 5
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column from sqlalchemy import Integer,String,Text,Date,DateTime from sqlalchemy import create_engine from chun import db class Users(db.Model): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=False) depart_id = Column(Integer)
from web import db,create_app app = create_app() app_ctx = app.app_context() # app_ctx = app/g with app_ctx: # __enter__,经过LocalStack放入Local中 db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置
from flask import Blueprint from web import db from web import models us = Blueprint('us',__name__) @us.route('/index') def index(): # 使用SQLAlchemy在数据库中插入一条数据 # db.session.add(models.Users(name='高件套',depart_id=1)) # db.session.commit() # db.session.remove() result = db.session.query(models.Users).all() print(result) db.session.remove() return 'Index'
# python manage.py runserver -h 127.0.0.1 -p 8001 from web import create_app from flask_script import Manager app = create_app() manager = Manager(app) if __name__ == '__main__': # app.run() manager.run()
from web import create_app from flask_script import Manager app = create_app() manager = Manager(app) @manager.command def custom(arg): """ 自定义命令 python manage.py custom 123 :param arg: :return: """ print(arg) if __name__ == '__main__': # app.run() manager.run()
from web import create_app from flask_script import Manager app = create_app() manager = Manager(app) @manager.option('-n', '--name', dest='name') @manager.option('-u', '--url', dest='url') def cmd(name, url): """ 自定义命令 执行: python manage.py cmd -n wupeiqi -u http://www.oldboyedu.com :param name: :param url: :return: """ print(name, url) if __name__ == '__main__': # app.run() manager.run()
(1) 安装编程
pip install flask-sqlacodegen
(2)使用
flask-sqlacodegen "mysql://root:0000@127.0.0.1/food_db" --outfile "common/models/model.py" --flask flask-sqlacodegen "mysql://root:0000@127.0.0.1/food_db" --tables user --outfile "common/models/user.py" --flask
依赖:flask-script
from flask_script import Manager from flask_migrate import Migrate, MigrateCommand from web import create_app from web import db app = create_app() manager = Manager(app) Migrate(app, db) """ # 数据库迁移命名 python manage.py db init python manage.py db migrate python manage.py db upgrade """ manager.add_command('db', MigrateCommand) if __name__ == '__main__': manager.run() # app.run()
第一步:pip install pipreqs 第二步: 进入项目目录:pipreqs ./ --encoding=utf-8 从文件中安装包:pip install -r requirements.txt 补充: 虚拟环境导出包:pip freeze>requeriments.txt
方式一:pip3 install virtualenv virtualenv env1 --no-site-packages 方式二::python -m venv mysite_venv 进入script目录: 激活虚拟环境:activate 退出虚拟环境:deactivate
信号即就是框架给咱们预留出一些位置,让咱们能经过几行简单的代码在生命周期的某个位置执行一些操做
request_started = _signals.signal('request-started') # 请求到来前执行 request_finished = _signals.signal('request-finished') # 请求结束后执行 before_render_template = _signals.signal('before-render-template') # 模板渲染前执行 template_rendered = _signals.signal('template-rendered') # 模板渲染后执行 got_request_exception = _signals.signal('got-request-exception') # 请求执行出现异常时执行 request_tearing_down = _signals.signal('request-tearing-down') # 请求执行完毕后自动执行(不管成功与否) appcontext_tearing_down = _signals.signal('appcontext-tearing-down')# 应用上下文执行完毕后自动执行(不管成功与否) appcontext_pushed = _signals.signal('appcontext-pushed') # 应用上下文push时执行 appcontext_popped = _signals.signal('appcontext-popped') # 应用上下文pop时执行 message_flashed = _signals.signal('message-flashed') # 调用flask在其中添加数据时,自动触发
源码示例
class Flask(_PackageBoundObject): def full_dispatch_request(self): self.try_trigger_before_first_request_functions() try: # ############### 触发request_started 信号 ############### request_started.send(self) rv = self.preprocess_request() if rv is None: rv = self.dispatch_request() except Exception as e: rv = self.handle_user_exception(e) response = self.make_response(rv) response = self.process_response(response) # ############### request_finished 信号 ############### request_finished.send(self, response=response) return response def wsgi_app(self, environ, start_response): ctx = self.request_context(environ) ctx.push() error = None try: try: response = self.full_dispatch_request() except Exception as e: error = e response = self.make_response(self.handle_exception(e)) return response(environ, start_response) finally: if self.should_ignore_error(error): error = None ctx.auto_pop(error)
def render_template(template_name_or_list, **context): """Renders a template from the template folder with the given context. :param template_name_or_list: the name of the template to be rendered, or an iterable with template names the first one existing will be rendered :param context: the variables that should be available in the context of the template. """ ctx = _app_ctx_stack.top ctx.app.update_template_context(context) return _render(ctx.app.jinja_env.get_or_select_template(template_name_or_list), context, ctx.app) def _render(template, context, app): """Renders the template and fires the signal""" # ############### before_render_template 信号 ############### before_render_template.send(app, template=template, context=context) rv = template.render(context) # ############### template_rendered 信号 ############### template_rendered.send(app, template=template, context=context) return rv before_render_template
class Flask(_PackageBoundObject): def handle_exception(self, e): exc_type, exc_value, tb = sys.exc_info() # ############### got_request_exception 信号 ############### got_request_exception.send(self, exception=e) handler = self._find_error_handler(InternalServerError()) if self.propagate_exceptions: # if we want to repropagate the exception, we can attempt to # raise it with the whole traceback in case we can do that # (the function was actually called from the except part) # otherwise, we just raise the error again if exc_value is e: reraise(exc_type, exc_value, tb) else: raise e self.log_exception((exc_type, exc_value, tb)) if handler is None: return InternalServerError() return handler(e) def wsgi_app(self, environ, start_response): ctx = self.request_context(environ) ctx.push() error = None try: try: response = self.full_dispatch_request() except Exception as e: error = e # 这里这里这里这里这里这里这里这里这里这里这里这里 # response = self.make_response(self.handle_exception(e)) return response(environ, start_response) finally: if self.should_ignore_error(error): error = None ctx.auto_pop(error) got_request_exception
class AppContext(object): def push(self): """Binds the app context to the current context.""" self._refcnt += 1 if hasattr(sys, 'exc_clear'): sys.exc_clear() _app_ctx_stack.push(self) # ############## 触发 appcontext_pushed 信号 ############## appcontext_pushed.send(self.app) def pop(self, exc=_sentinel): """Pops the app context.""" try: self._refcnt -= 1 if self._refcnt <= 0: if exc is _sentinel: exc = sys.exc_info()[1] # ############## 触发 appcontext_tearing_down 信号 ############## self.app.do_teardown_appcontext(exc) finally: rv = _app_ctx_stack.pop() assert rv is self, 'Popped wrong app context. (%r instead of %r)' \ % (rv, self) # ############## 触发 appcontext_popped 信号 ############## appcontext_popped.send(self.app) class RequestContext(object): def push(self): top = _request_ctx_stack.top if top is not None and top.preserved: top.pop(top._preserved_exc) app_ctx = _app_ctx_stack.top if app_ctx is None or app_ctx.app != self.app: # #################################################### app_ctx = self.app.app_context() app_ctx.push() self._implicit_app_ctx_stack.append(app_ctx) else: self._implicit_app_ctx_stack.append(None) if hasattr(sys, 'exc_clear'): sys.exc_clear() _request_ctx_stack.push(self) # Open the session at the moment that the request context is # available. This allows a custom open_session method to use the # request context (e.g. code that access database information # stored on `g` instead of the appcontext). self.session = self.app.open_session(self.request) if self.session is None: self.session = self.app.make_null_session() class Flask(_PackageBoundObject): def wsgi_app(self, environ, start_response): ctx = self.request_context(environ) ctx.push() error = None try: try: response = self.full_dispatch_request() except Exception as e: error = e response = self.make_response(self.handle_exception(e)) return response(environ, start_response) finally: if self.should_ignore_error(error): error = None ctx.auto_pop(error) def pop(self, exc=_sentinel): app_ctx = self._implicit_app_ctx_stack.pop() try: clear_request = False if not self._implicit_app_ctx_stack: self.preserved = False self._preserved_exc = None if exc is _sentinel: exc = sys.exc_info()[1] # ################## 触发 request_tearing_down 信号 ################## self.app.do_teardown_request(exc) # If this interpreter supports clearing the exception information # we do that now. This will only go into effect on Python 2.x, # on 3.x it disappears automatically at the end of the exception # stack. if hasattr(sys, 'exc_clear'): sys.exc_clear() request_close = getattr(self.request, 'close', None) if request_close is not None: request_close() clear_request = True finally: rv = _request_ctx_stack.pop() # get rid of circular dependencies at the end of the request # so that we don't require the GC to be active. if clear_request: rv.request.environ['werkzeug.request'] = None # Get rid of the app as well if necessary. if app_ctx is not None: # #################################################### app_ctx.pop(exc) assert rv is self, 'Popped wrong request context. ' \ '(%r instead of %r)' % (rv, self) def auto_pop(self, exc): if self.request.environ.get('flask._preserve_context') or \ (exc is not None and self.app.preserve_context_on_exception): self.preserved = True self._preserved_exc = exc else: self.pop(exc)
def flash(message, category='message'): """Flashes a message to the next request. In order to remove the flashed message from the session and to display it to the user, the template has to call :func:`get_flashed_messages`. .. versionchanged:: 0.3 `category` parameter added. :param message: the message to be flashed. :param category: the category for the message. The following values are recommended: ``'message'`` for any kind of message, ``'error'`` for errors, ``'info'`` for information messages and ``'warning'`` for warnings. However any kind of string can be used as category. """ # Original implementation: # # session.setdefault('_flashes', []).append((category, message)) # # This assumed that changes made to mutable structures in the session are # are always in sync with the session object, which is not true for session # implementations that use external storage for keeping their keys/values. flashes = session.get('_flashes', []) flashes.append((category, message)) session['_flashes'] = flashes # ############### 触发 message_flashed 信号 ############### message_flashed.send(current_app._get_current_object(), message=message, category=category)
# -*- coding: utf-8 -*- """ @Datetime: 2019/1/7 @Author: Zhang Yafei """ from flask import Flask, current_app, flash, render_template from flask.signals import _signals app = Flask(import_name=__name__) # 自定义信号 xxxxx = _signals.signal('xxxxx') def func(sender, *args, **kwargs): print(sender) # 自定义信号中注册函数 xxxxx.connect(func) @app.route("/x") def index(): # 触发信号 xxxxx.send('123123', k1='v1') return 'Index' if __name__ == '__main__': app.run()
重点:
1. 上下文管理 当请求到来的时候, flask会把请求相关和session相关信息封装到一个ctx = RequestContext对象中, 会把app和g封装到app_ctx = APPContext对象中去, 经过localstack对象将ctx、app_ctx对象封装到local对象中 问题: local是什么?做用? 为每一个线程建立一个独立的空间,使得线程对本身的空间中的数据进行操做(数据隔离) localstack是什么?做用 storage = { 1234:{stack:[ctx,]} } localstack将local中的stack中的维护成一个栈 获取数据 经过localproxy对象+偏函数,调用localstack去local中获取响应ctx、app_ctx中封装值 问题:为何要把ctx = request/session app_ctx = app/g 由于离线脚本须要使用app_ctx 请求结束 调用localstk的pop方法,将ctx和app_ctx移除 2.程序中有多少个local和localstack? 请求localstack,帮助开发在对stack对应列表操做 请求local = { 1212:{ stack:[ctx,] } } 应用localstack,帮助开发在对stack对应列表操做 应用local = { 1212:{ stack:[app_ctx,] } } 3.为何要建立两个local和两个localstack? - 编写离线脚本时,须要配置文件,而配置文件存放在app中。 - 编写离线脚本时,不须要请求相关的数据 因此,将app和请求相关的数据分开: 应用上下文(app,g) 请求上下文(request,session) 4. Local中做用? 为每一个协程建立一个独立的空间,使得协程对本身的空间中的数据进行操做(数据隔离) 相似于threading.local的做用,可是是他升级版(greentlet.get_currt()) __storage__ = { 1231:{}, 1232:{}, } 5.LocalStack做用? 将local中存储数据的列表stack维护成一个栈。协程标记做为字典的key,字典里面包含key为stack的字典, value为一个列表,localstack的做用就是将这个列表维护成一个stack. __storage__ = { 1231:{stack:[],}, 1232:{stack:[],}, } 6.为何要维护成一个栈? web运行时(web runtime): 请求Local = { 1111:{ stack:[ctx1,] }, 1112:{ stack:[ctx2,] }, 1113:{ stack:[ctx3,] } } 应用Local = { 1111:{ stack:[app_ctx1,] } 1112:{ stack:[app_ctx2,] }, 1113:{ stack:[app_ctx3,] } } 多app离线脚本: from flask import current_app app1 = create_app() app_ctx1 = app1.app_context() # app_ctx = app/g app2 = create_app() app_ctx2 = app2.app_context() # app_ctx = app/g with app_ctx1: # __enter__,经过LocalStack放入Local中 print(current_app) # app1 with app_ctx2: print(current_app) # app2 print(current_app) # app1 """ 请求Local = { } 应用Local = { 1111:{ stack:[app_ctx1,app_ctx2] } } """ 7. threading.local做用? 为每一个线程建立一个独立的空间,使得线程对本身的空间中的数据进行操做(数据隔离) 8. LocalProxy做用? 执行视图函数的时候,LocalProxy经过Localstack从local中取数据 9. g的做用? 相似每次请求中的全局变量,但要注意的是g只存在于一次请求中,请求结束以后就销毁了。 10.为何要导入request,就可使用? 每次执行request.xx方法时,会触发LocalProxy对象的__getattr__等方法,由方法每次动态的使用LocalStack去Local中获取数据。 10. before_request、after_request 实现原理? 将视图函数添加到一个列表中,而后循环执行其中的函数,after_request将列表reverse一下。 11. 对面向对象的理解? python支持函数式编程和面向对象编程。java和c#只支持面向对象编程。 基础:谈面向对象就要从他的三大特性开始提及,如:封装、继承、多态。 封装: - 方法封装到来类中:某一类功能类似的方法 class File: def file_add():pass def file_update():pass def file_del():pass def file_fetch():pass - 数据封装到对象中 class File: def __init__(self,name,age,email): self.name = name self.age = age self.email = email def file_add():pass def file_update():pass def file_del():pass def file_fetch():pass obj1 = File('oldboy',19,"asdf@live.com") obj2 = File('oldboy1',119,"asdf12@live.com") 应用: - session/request封装到了RequestContext对象中 - app/g封装到了AppContext中 继承:若是多个类中有相同的方法,为了不重复编写,能够将其放在父类(基类)中。 python支持多继承,继承顺序按__mro__的顺序执行,新式类按广度优先,旧式类类广度优先,Python3都是新式类。先执行左边,在执行右边 class Base(object): def xxxx():pass class File(Base): def __init__(self,name,age,email): self.name = name self.age = age self.email = email def file_add():pass def file_update():pass def file_del():pass def file_fetch():pass class DB(Base): def db_add():pass def db_update():pass def db_del():pass def xxxx():pass def db_fetch():pass 应用: rest framework中的视图类的继承 多态(鸭子模型):天生支持多态,对于参数来讲能够传入任何类型的对象,只要保证有想要的send方法便可。 class Msg(object): def send(): pass class WX(object): def send(): pass def func(arg): arg.send() 进阶: __init__,初始化 __new__,建立对象 __call__,对象() __getattr__,对象.xx 且xx不存在 __getattribute__, 对象.xx xx为任何属性 __setattr__. 对象.xx = yy __delattr__, del 对象.xx __setitem__,对象['xx'] = yy __getitem__, 对象['xx'] __delitem__, del 对象['xx'] __mro__,查找成员顺序 __str__, 对象返回值 __repr__, __iter__, 实现此方法,且返回一个迭代器,此对象可迭代 __dict__, 类的成员 __add__, 类 + xx __del__, 对象的生命周期结束以后 高级:metaclass 1. 类建立 class Foo(object):pass Foo = type('Foo',(object,),{}) 2. 如何指定类由自定义type建立? class MyType(type): pass class Foo(object,metaclass=MyType): # __metaclass__ = MyType # py2 pass Foo = MyType('Foo',(object,),{}) 3. 默认执行顺序 class Foo(object,metaclass=MyType): pass obj = Foo() class MyType(type): def __init__(self,*args,**kwargs): print('111') super(MyType,self).__init__(*args,**kwargs) class Base(object, metaclass=MyType): pass class Foo(Base): pass 若是一类本身或基类中指定了metaclass,那么该类就是由metaclass指定的type或mytype建立。 同: class MyType(type): def __init__(self,*args,**kwargs): print('111') super(MyType,self).__init__(*args,**kwargs) # class Base(object, metaclass=MyType): # pass Base = MyType('Base',(object,),{}) class Foo(Base): pass 同: class MyType(type): def __init__(self,*args,**kwargs): print('111') super(MyType,self).__init__(*args,**kwargs) # class Base(object, metaclass=MyType): # pass def with_metaclass(arg): Base = MyType('Base',(arg,),{}) return Base class Foo(with_metaclass(object)): pass